Bitcoin Core 29.99.0
P2P Digital Currency
type-context.h
Go to the documentation of this file.
1// Copyright (c) 2025 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_TYPE_CONTEXT_H
6#define MP_PROXY_TYPE_CONTEXT_H
7
8#include <mp/proxy-io.h>
9#include <mp/util.h>
10
11namespace mp {
12template <typename Output>
15 ClientInvokeContext& invoke_context,
16 Output&& output,
17 typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
18{
19 auto& connection = invoke_context.connection;
20 auto& thread_context = invoke_context.thread_context;
21
22 // Create local Thread::Server object corresponding to the current thread
23 // and pass a Thread::Client reference to it in the Context.callbackThread
24 // field so the function being called can make callbacks to this thread.
25 // Also store the Thread::Client reference in the callback_threads map so
26 // future calls over this connection can reuse it.
27 auto [callback_thread, _]{SetThread(
28 thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
29 [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
30
31 // Call remote ThreadMap.makeThread function so server will create a
32 // dedicated worker thread to run function calls from this thread. Store the
33 // Thread::Client reference it returns in the request_threads map.
34 auto make_request_thread{[&]{
35 // This code will only run if an IPC client call is being made for the
36 // first time on this thread. After the first call, subsequent calls
37 // will use the existing request thread. This code will also never run at
38 // all if the current thread is a request thread created for a different
39 // IPC client, because in that case PassField code (below) will have set
40 // request_thread to point to the calling thread.
41 auto request = connection.m_thread_map.makeThreadRequest();
42 request.setName(thread_context.thread_name);
43 return request.send().getResult(); // Nonblocking due to capnp request pipelining.
44 }};
45 auto [request_thread, _1]{SetThread(
46 thread_context.request_threads, thread_context.waiter->m_mutex,
47 &connection, make_request_thread)};
48
49 auto context = output.init();
50 context.setThread(request_thread->second.m_client);
51 context.setCallbackThread(callback_thread->second.m_client);
52}
53
56template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
57auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
58 typename std::enable_if<
59 std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
60 kj::Promise<typename ServerContext::CallContext>>::type
61{
62 const auto& params = server_context.call_context.getParams();
63 Context::Reader context_arg = Accessor::get(params);
64 auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
65 auto& server = server_context.proxy_server;
66 int req = server_context.req;
67 auto invoke = MakeAsyncCallable(
68 [fulfiller = kj::mv(future.fulfiller),
69 call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
70 const auto& params = call_context.getParams();
71 Context::Reader context_arg = Accessor::get(params);
72 ServerContext server_context{server, call_context, req};
73 bool disconnected{false};
74 {
75 // Before invoking the function, store a reference to the
76 // callbackThread provided by the client in the
77 // thread_local.request_threads map. This way, if this
78 // server thread needs to execute any RPCs that call back to
79 // the client, they will happen on the same client thread
80 // that is waiting for this function, just like what would
81 // happen if this were a normal function call made on the
82 // local stack.
83 //
84 // If the request_threads map already has an entry for this
85 // connection, it will be left unchanged, and it indicates
86 // that the current thread is an RPC client thread which is
87 // in the middle of an RPC call, and the current RPC call is
88 // a nested call from the remote thread handling that RPC
89 // call. In this case, the callbackThread value should point
90 // to the same thread already in the map, so there is no
91 // need to update the map.
92 auto& thread_context = g_thread_context;
93 auto& request_threads = thread_context.request_threads;
94 auto [request_thread, inserted]{SetThread(
95 request_threads, thread_context.waiter->m_mutex,
96 server.m_context.connection,
97 [&] { return context_arg.getCallbackThread(); })};
98
99 // If an entry was inserted into the requests_threads map,
100 // remove it after calling fn.invoke. If an entry was not
101 // inserted, one already existed, meaning this must be a
102 // recursive call (IPC call calling back to the caller which
103 // makes another IPC call), so avoid modifying the map.
104 const bool erase_thread{inserted};
105 KJ_DEFER({
106 std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
107 // Call erase here with a Connection* argument instead
108 // of an iterator argument, because the `request_thread`
109 // iterator may be invalid if the connection is closed
110 // during this function call. More specifically, the
111 // iterator may be invalid because SetThread adds a
112 // cleanup callback to the Connection destructor that
113 // erases the thread from the map, and also because the
114 // ProxyServer<Thread> destructor calls
115 // request_threads.clear().
116 if (erase_thread) {
117 disconnected = !request_threads.erase(server.m_context.connection);
118 } else {
119 disconnected = !request_threads.count(server.m_context.connection);
120 }
121 });
122 fn.invoke(server_context, args...);
123 }
124 if (disconnected) {
125 // If disconnected is true, the Connection object was
126 // destroyed during the method call. Deal with this by
127 // returning without ever fulfilling the promise, which will
128 // cause the ProxyServer object to leak. This is not ideal,
129 // but fixing the leak will require nontrivial code changes
130 // because there is a lot of code assuming ProxyServer
131 // objects are destroyed before Connection objects.
132 return;
133 }
134 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
135 server.m_context.connection->m_loop.sync([&] {
136 auto fulfiller_dispose = kj::mv(fulfiller);
137 fulfiller_dispose->fulfill(kj::mv(call_context));
138 });
139 }))
140 {
141 server.m_context.connection->m_loop.sync([&]() {
142 auto fulfiller_dispose = kj::mv(fulfiller);
143 fulfiller_dispose->reject(kj::mv(*exception));
144 });
145 }
146 });
147
148 // Lookup Thread object specified by the client. The specified thread should
149 // be a local Thread::Server object, but it needs to be looked up
150 // asynchronously with getLocalServer().
151 auto thread_client = context_arg.getThread();
152 return server.m_context.connection->m_threads.getLocalServer(thread_client)
153 .then([&server, invoke, req](const kj::Maybe<Thread::Server&>& perhaps) {
154 // Assuming the thread object is found, pass it a pointer to the
155 // `invoke` lambda above which will invoke the function on that
156 // thread.
157 KJ_IF_MAYBE (thread_server, perhaps) {
158 const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
159 server.m_context.connection->m_loop.log()
160 << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
161 thread.m_thread_context.waiter->post(std::move(invoke));
162 } else {
163 server.m_context.connection->m_loop.log()
164 << "IPC server error request #" << req << ", missing thread to execute request";
165 throw std::runtime_error("invalid thread handle");
166 }
167 })
168 // Wait for the invocation to finish before returning to the caller.
169 .then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
170}
171} // namespace mp
172
173#endif // MP_PROXY_TYPE_CONTEXT_H
ArgsManager & args
Definition: bitcoind.cpp:277
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
auto PassField(Priority< 1 >, TypeList<>, ServerContext &server_context, const Fn &fn, Args &&... args) -> typename std::enable_if< std::is_same< decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader >::value, kj::Promise< typename ServerContext::CallContext > >::type
PassField override for mp.Context arguments.
Definition: type-context.h:57
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
Definition: proxy-io.h:56
AsyncCallable< std::remove_reference_t< Callable > > MakeAsyncCallable(Callable &&callable)
Construct AsyncCallable object.
Definition: util.h:184
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:288
void CustomBuildField(TypeList< LocalType >, Priority< 1 >, InvokeContext &invoke_context, Value &&value, Output &&output)
Overload multiprocess library's CustomBuildField hook to allow any serializable object to be stored i...
Definition: common-types.h:63
ThreadContext & thread_context
Definition: proxy-io.h:33
Connection & connection
Definition: proxy-io.h:28
Function parameter type for prioritizing overloaded function calls that would otherwise be ambiguous.
Definition: util.h:109
Generic utility functions used by capnp code.
Definition: util.h:33
consteval auto _(util::TranslatedLiteral str)
Definition: translation.h:79