Bitcoin Core 31.99.0
P2P Digital Currency
type-context.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_TYPE_CONTEXT_H
6#define MP_PROXY_TYPE_CONTEXT_H
7
8#include <mp/proxy-io.h>
9#include <mp/util.h>
10
11#include <kj/string.h>
12
13namespace mp {
14template <typename Output>
17 ClientInvokeContext& invoke_context,
18 Output&& output,
19 typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
20{
21 auto& connection = invoke_context.connection;
22 auto& thread_context = invoke_context.thread_context;
23
24 // Create local Thread::Server object corresponding to the current thread
25 // and pass a Thread::Client reference to it in the Context.callbackThread
26 // field so the function being called can make callbacks to this thread.
27 // Also store the Thread::Client reference in the callback_threads map so
28 // future calls over this connection can reuse it.
29 auto [callback_thread, _]{SetThread(
30 GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
31 [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
32
33 // Call remote ThreadMap.makeThread function so server will create a
34 // dedicated worker thread to run function calls from this thread. Store the
35 // Thread::Client reference it returns in the request_threads map.
36 auto make_request_thread{[&]{
37 // This code will only run if an IPC client call is being made for the
38 // first time on this thread. After the first call, subsequent calls
39 // will use the existing request thread. This code will also never run at
40 // all if the current thread is a request thread created for a different
41 // IPC client, because in that case PassField code (below) will have set
42 // request_thread to point to the calling thread.
43 auto request = connection.m_thread_map.makeThreadRequest();
44 request.setName(thread_context.thread_name);
45 return request.send().getResult(); // Nonblocking due to capnp request pipelining.
46 }};
47 auto [request_thread, _1]{SetThread(
48 GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
49 &connection, make_request_thread)};
50
51 auto context = output.init();
52 context.setThread(request_thread->second->m_client);
53 context.setCallbackThread(callback_thread->second->m_client);
54}
55
58template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
59auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
60 typename std::enable_if<
61 std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
62 kj::Promise<typename ServerContext::CallContext>>::type
63{
64 auto& server = server_context.proxy_server;
65 EventLoop& loop = *server.m_context.loop;
66 int req = server_context.req;
67 // Keep a reference to the ProxyServer instance by assigning it to the self
68 // variable. ProxyServer instances are reference-counted and if the client
69 // drops its reference and the IPC call is canceled, this variable keeps the
70 // instance alive until the method finishes executing. The self variable
71 // needs to be destroyed on the event loop thread so it is freed in a sync()
72 // call below.
73 auto self = server.thisCap();
74 auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, &loop, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
75 MP_LOG(loop, Log::Debug) << "IPC server executing request #" << req;
78 ServerContext server_context{server, call_context, req};
79 // Before invoking the function, store a reference to the
80 // callbackThread provided by the client in the
81 // thread_local.request_threads map. This way, if this
82 // server thread needs to execute any RPCs that call back to
83 // the client, they will happen on the same client thread
84 // that is waiting for this function, just like what would
85 // happen if this were a normal function call made on the
86 // local stack.
87 //
88 // If the request_threads map already has an entry for this
89 // connection, it will be left unchanged, and it indicates
90 // that the current thread is an RPC client thread which is
91 // in the middle of an RPC call, and the current RPC call is
92 // a nested call from the remote thread handling that RPC
93 // call. In this case, the callbackThread value should point
94 // to the same thread already in the map, so there is no
95 // need to update the map.
96 auto& thread_context = g_thread_context;
97 auto& request_threads = thread_context.request_threads;
98 ConnThread request_thread;
99 bool inserted{false};
100 Mutex cancel_mutex;
101 Lock cancel_lock{cancel_mutex};
102 server_context.cancel_lock = &cancel_lock;
103 loop.sync([&] {
104 // Detect request being canceled before it executes.
105 if (cancel_monitor.m_canceled) {
106 server_context.request_canceled = true;
107 return;
108 }
109 // Detect request being canceled while it executes.
110 assert(!cancel_monitor.m_on_cancel);
111 cancel_monitor.m_on_cancel = [&loop, &server_context, &cancel_mutex, req]() {
112 MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
113 // Lock cancel_mutex here to block the event loop
114 // thread and prevent it from deleting the request's
115 // params and response structs while the execution
116 // thread is accessing them. Because this lock is
117 // released before the event loop thread does delete
118 // the structs, the mutex does not provide any
119 // protection from the event loop deleting the
120 // structs _before_ the execution thread acquires
121 // it. So in addition to locking the mutex, the
122 // execution thread always checks request_canceled
123 // as well before accessing the structs.
124 Lock cancel_lock{cancel_mutex};
125 server_context.request_canceled = true;
126 };
127 // Update requests_threads map if not canceled. We know
128 // the request is not canceled currently because
129 // cancel_monitor.m_canceled was checked above and this
130 // code is running on the event loop thread.
131 std::tie(request_thread, inserted) = SetThread(
132 GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
133 [&] { return Accessor::get(call_context.getParams()).getCallbackThread(); });
134 });
135
136 // If an entry was inserted into the request_threads map,
137 // remove it after calling fn.invoke. If an entry was not
138 // inserted, one already existed, meaning this must be a
139 // recursive call (IPC call calling back to the caller which
140 // makes another IPC call), so avoid modifying the map.
141 const bool erase_thread{inserted};
142 KJ_DEFER(
143 // Release the cancel lock before calling loop->sync and
144 // waiting for the event loop thread, because if a
145 // cancellation happened, it needs to run the on_cancel
146 // callback above. It's safe to release cancel_lock at
147 // this point because the fn.invoke() call below will be
148 // finished and no longer accessing the params or
149 // results structs.
150 cancel_lock.m_lock.unlock();
151 // Erase the request_threads entry on the event loop
152 // thread with loop->sync(), so if the connection is
153 // broken there is not a race between this thread and
154 // the disconnect handler trying to destroy the thread
155 // client object.
156 loop.sync([&] {
157 // Clear cancellation callback. At this point the
158 // method invocation finished and the result is
159 // either being returned, or discarded if a
160 // cancellation happened. So we do not need to be
161 // notified of cancellations after this point. Also
162 // we do not want to be notified because
163 // cancel_mutex and server_context could be out of
164 // scope when it happens.
165 cancel_monitor.m_on_cancel = nullptr;
166 auto self_dispose{kj::mv(self)};
167 if (erase_thread) {
168 // Look up the thread again without using existing
169 // iterator since entry may no longer be there after
170 // a disconnect. Destroy node after releasing
171 // Waiter::m_mutex, so the ProxyClient<Thread>
172 // destructor is able to use EventLoop::mutex
173 // without violating lock order.
174 ConnThreads::node_type removed;
175 {
176 Lock lock(thread_context.waiter->m_mutex);
177 removed = request_threads.extract(server.m_context.connection);
178 }
179 }
180 });
181 );
182 if (server_context.request_canceled) {
183 MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
184 } else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
185 try {
186 fn.invoke(server_context, args...);
187 } catch (const InterruptException& e) {
188 MP_LOG(loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
189 }
190 })) {
191 MP_LOG(loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
192 kj::throwRecoverableException(kj::mv(*exception));
193 }
194 return call_context;
195 // End of scope: if KJ_DEFER was reached, it runs here
196 };
197
198 // Lookup Thread object specified by the client. The specified thread should
199 // be a local Thread::Server object, but it needs to be looked up
200 // asynchronously with getLocalServer().
201 const auto& params = server_context.call_context.getParams();
202 Context::Reader context_arg = Accessor::get(params);
203 auto thread_client = context_arg.getThread();
204 auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
205 .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
206 // Assuming the thread object is found, pass it a pointer to the
207 // `invoke` lambda above which will invoke the function on that
208 // thread.
209 KJ_IF_MAYBE (thread_server, perhaps) {
210 auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
211 MP_LOG(loop, Log::Debug)
212 << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
213 return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
214 } else {
215 MP_LOG(loop, Log::Error)
216 << "IPC server error request #" << req << ", missing thread to execute request";
217 throw std::runtime_error("invalid thread handle");
218 }
219 });
220 // Use connection m_canceler object to cancel the result promise if the
221 // connection is destroyed. (By default Cap'n Proto does not cancel requests
222 // on disconnect, since it's possible clients might want to make requests
223 // and immediately disconnect without waiting for results, but not want the
224 // requests to be canceled.)
225 return server.m_context.connection->m_canceler.wrap(kj::mv(result));
226}
227} // namespace mp
228
229#endif // MP_PROXY_TYPE_CONTEXT_H
if(!SetupNetworking())
ArgsManager & args
Definition: bitcoind.cpp:278
Helper class that detects when a promise is canceled.
Definition: util.h:293
Event loop implementation.
Definition: proxy-io.h:239
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
Definition: proxy-io.h:269
void * m_context
External context pointer.
Definition: proxy-io.h:342
std::function< void()> testing_hook_async_request_done
Hook called on the worker thread just before returning results.
Definition: proxy-io.h:358
std::function< void()> testing_hook_async_request_start
Hook called on the worker thread when it starts to execute an async request.
Definition: proxy-io.h:355
Definition: util.h:171
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
auto PassField(Priority< 1 >, TypeList< LocalType & >, ServerContext &server_context, Fn &&fn, Args &&... args) -> Require< typename decltype(Accessor::get(server_context.call_context.getParams()))::Calls >
PassField override for callable interface reference arguments.
Definition: proxy-types.h:291
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
Definition: proxy-io.h:72
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:321
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:41
ConnThreads::iterator ConnThread
Definition: proxy-io.h:654
void CustomBuildField(TypeList< LocalType >, Priority< 1 >, InvokeContext &invoke_context, Value &&value, Output &&output)
Overload multiprocess library's CustomBuildField hook to allow any serializable object to be stored i...
Definition: common-types.h:63
#define MP_LOG(loop,...)
Definition: proxy-io.h:209
ThreadContext & thread_context
Definition: proxy-io.h:36
Connection & connection
Definition: proxy-io.h:31
Function parameter type for prioritizing overloaded function calls that would otherwise be ambiguous.
Definition: util.h:109
Generic utility functions used by capnp code.
Definition: util.h:33
consteval auto _(util::TranslatedLiteral str)
Definition: translation.h:79
assert(!tx.IsCoinBase())