Bitcoin Core 30.99.0
P2P Digital Currency
type-context.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_TYPE_CONTEXT_H
6#define MP_PROXY_TYPE_CONTEXT_H
7
8#include <mp/proxy-io.h>
9#include <mp/util.h>
10
11#include <kj/string.h>
12
13namespace mp {
14template <typename Output>
17 ClientInvokeContext& invoke_context,
18 Output&& output,
19 typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
20{
21 auto& connection = invoke_context.connection;
22 auto& thread_context = invoke_context.thread_context;
23
24 // Create local Thread::Server object corresponding to the current thread
25 // and pass a Thread::Client reference to it in the Context.callbackThread
26 // field so the function being called can make callbacks to this thread.
27 // Also store the Thread::Client reference in the callback_threads map so
28 // future calls over this connection can reuse it.
29 auto [callback_thread, _]{SetThread(
30 GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
31 [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
32
33 // Call remote ThreadMap.makeThread function so server will create a
34 // dedicated worker thread to run function calls from this thread. Store the
35 // Thread::Client reference it returns in the request_threads map.
36 auto make_request_thread{[&]{
37 // This code will only run if an IPC client call is being made for the
38 // first time on this thread. After the first call, subsequent calls
39 // will use the existing request thread. This code will also never run at
40 // all if the current thread is a request thread created for a different
41 // IPC client, because in that case PassField code (below) will have set
42 // request_thread to point to the calling thread.
43 auto request = connection.m_thread_map.makeThreadRequest();
44 request.setName(thread_context.thread_name);
45 return request.send().getResult(); // Nonblocking due to capnp request pipelining.
46 }};
47 auto [request_thread, _1]{SetThread(
48 GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
49 &connection, make_request_thread)};
50
51 auto context = output.init();
52 context.setThread(request_thread->second->m_client);
53 context.setCallbackThread(callback_thread->second->m_client);
54}
55
58template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
59auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
60 typename std::enable_if<
61 std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
62 kj::Promise<typename ServerContext::CallContext>>::type
63{
64 const auto& params = server_context.call_context.getParams();
65 Context::Reader context_arg = Accessor::get(params);
66 auto& server = server_context.proxy_server;
67 int req = server_context.req;
68 // Keep a reference to the ProxyServer instance by assigning it to the self
69 // variable. ProxyServer instances are reference-counted and if the client
70 // drops its reference and the IPC call is canceled, this variable keeps the
71 // instance alive until the method finishes executing. The self variable
72 // needs to be destroyed on the event loop thread so it is freed in a sync()
73 // call below.
74 auto self = server.thisCap();
75 auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
76 MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
77 const auto& params = call_context.getParams();
78 Context::Reader context_arg = Accessor::get(params);
79 ServerContext server_context{server, call_context, req};
80 {
81 // Before invoking the function, store a reference to the
82 // callbackThread provided by the client in the
83 // thread_local.request_threads map. This way, if this
84 // server thread needs to execute any RPCs that call back to
85 // the client, they will happen on the same client thread
86 // that is waiting for this function, just like what would
87 // happen if this were a normal function call made on the
88 // local stack.
89 //
90 // If the request_threads map already has an entry for this
91 // connection, it will be left unchanged, and it indicates
92 // that the current thread is an RPC client thread which is
93 // in the middle of an RPC call, and the current RPC call is
94 // a nested call from the remote thread handling that RPC
95 // call. In this case, the callbackThread value should point
96 // to the same thread already in the map, so there is no
97 // need to update the map.
98 auto& thread_context = g_thread_context;
99 auto& request_threads = thread_context.request_threads;
100 ConnThread request_thread;
101 bool inserted{false};
102 Mutex cancel_mutex;
103 Lock cancel_lock{cancel_mutex};
104 server_context.cancel_lock = &cancel_lock;
105 server.m_context.loop->sync([&] {
106 // Detect request being canceled before it executes.
107 if (cancel_monitor.m_canceled) {
108 server_context.request_canceled = true;
109 return;
110 }
111 // Detect request being canceled while it executes.
112 assert(!cancel_monitor.m_on_cancel);
113 cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
114 MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
115 // Lock cancel_mutex here to block the event loop
116 // thread and prevent it from deleting the request's
117 // params and response structs while the execution
118 // thread is accessing them. Because this lock is
119 // released before the event loop thread does delete
120 // the structs, the mutex does not provide any
121 // protection from the event loop deleting the
122 // structs _before_ the execution thread acquires
123 // it. So in addition to locking the mutex, the
124 // execution thread always checks request_canceled
125 // as well before accessing the structs.
126 Lock cancel_lock{cancel_mutex};
127 server_context.request_canceled = true;
128 };
129 // Update requests_threads map if not canceled.
130 std::tie(request_thread, inserted) = SetThread(
131 GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
132 [&] { return context_arg.getCallbackThread(); });
133 });
134
135 // If an entry was inserted into the request_threads map,
136 // remove it after calling fn.invoke. If an entry was not
137 // inserted, one already existed, meaning this must be a
138 // recursive call (IPC call calling back to the caller which
139 // makes another IPC call), so avoid modifying the map.
140 const bool erase_thread{inserted};
141 KJ_DEFER(
142 // Release the cancel lock before calling loop->sync and
143 // waiting for the event loop thread, because if a
144 // cancellation happened, it needs to run the on_cancel
145 // callback above. It's safe to release cancel_lock at
146 // this point because the fn.invoke() call below will be
147 // finished and no longer accessing the params or
148 // results structs.
149 cancel_lock.m_lock.unlock();
150 // Erase the request_threads entry on the event loop
151 // thread with loop->sync(), so if the connection is
152 // broken there is not a race between this thread and
153 // the disconnect handler trying to destroy the thread
154 // client object.
155 server.m_context.loop->sync([&] {
156 auto self_dispose{kj::mv(self)};
157 if (erase_thread) {
158 // Look up the thread again without using existing
159 // iterator since entry may no longer be there after
160 // a disconnect. Destroy node after releasing
161 // Waiter::m_mutex, so the ProxyClient<Thread>
162 // destructor is able to use EventLoop::mutex
163 // without violating lock order.
164 ConnThreads::node_type removed;
165 {
166 Lock lock(thread_context.waiter->m_mutex);
167 removed = request_threads.extract(server.m_context.connection);
168 }
169 }
170 });
171 );
172 if (server_context.request_canceled) {
173 MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
174 } else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
175 try {
176 fn.invoke(server_context, args...);
177 } catch (const InterruptException& e) {
178 MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
179 }
180 })) {
181 MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
182 throw kj::mv(*exception);
183 }
184 // End of scope: if KJ_DEFER was reached, it runs here
185 }
186 return call_context;
187 };
188
189 // Lookup Thread object specified by the client. The specified thread should
190 // be a local Thread::Server object, but it needs to be looked up
191 // asynchronously with getLocalServer().
192 auto thread_client = context_arg.getThread();
193 auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
194 .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
195 // Assuming the thread object is found, pass it a pointer to the
196 // `invoke` lambda above which will invoke the function on that
197 // thread.
198 KJ_IF_MAYBE (thread_server, perhaps) {
199 auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
200 MP_LOG(*server.m_context.loop, Log::Debug)
201 << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
202 return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
203 } else {
204 MP_LOG(*server.m_context.loop, Log::Error)
205 << "IPC server error request #" << req << ", missing thread to execute request";
206 throw std::runtime_error("invalid thread handle");
207 }
208 });
209 // Use connection m_canceler object to cancel the result promise if the
210 // connection is destroyed. (By default Cap'n Proto does not cancel requests
211 // on disconnect, since it's possible clients might want to make requests
212 // and immediately disconnect without waiting for results, but not want the
213 // requests to be canceled.)
214 return server.m_context.connection->m_canceler.wrap(kj::mv(result));
215}
216} // namespace mp
217
218#endif // MP_PROXY_TYPE_CONTEXT_H
if(!SetupNetworking())
ArgsManager & args
Definition: bitcoind.cpp:277
Helper class that detects when a promise is canceled.
Definition: util.h:293
Definition: util.h:171
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
auto PassField(Priority< 1 >, TypeList< LocalType & >, ServerContext &server_context, Fn &&fn, Args &&... args) -> Require< typename decltype(Accessor::get(server_context.call_context.getParams()))::Calls >
PassField override for callable interface reference arguments.
Definition: proxy-types.h:241
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
Definition: proxy-io.h:72
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:321
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:41
ConnThreads::iterator ConnThread
Definition: proxy-io.h:638
void CustomBuildField(TypeList< LocalType >, Priority< 1 >, InvokeContext &invoke_context, Value &&value, Output &&output)
Overload multiprocess library's CustomBuildField hook to allow any serializable object to be stored i...
Definition: common-types.h:63
#define MP_LOG(loop,...)
Definition: proxy-io.h:209
ThreadContext & thread_context
Definition: proxy-io.h:36
Exception thrown from code executing an IPC call that is interrupted.
Definition: util.h:278
const char * what() const noexcept override
Definition: util.h:280
Connection & connection
Definition: proxy-io.h:31
Function parameter type for prioritizing overloaded function calls that would otherwise be ambiguous.
Definition: util.h:109
Generic utility functions used by capnp code.
Definition: util.h:33
consteval auto _(util::TranslatedLiteral str)
Definition: translation.h:79
assert(!tx.IsCoinBase())