Bitcoin Core 30.99.0
P2P Digital Currency
type-context.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_TYPE_CONTEXT_H
6#define MP_PROXY_TYPE_CONTEXT_H
7
8#include <mp/proxy-io.h>
9#include <mp/util.h>
10
11namespace mp {
12template <typename Output>
15 ClientInvokeContext& invoke_context,
16 Output&& output,
17 typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
18{
19 auto& connection = invoke_context.connection;
20 auto& thread_context = invoke_context.thread_context;
21
22 // Create local Thread::Server object corresponding to the current thread
23 // and pass a Thread::Client reference to it in the Context.callbackThread
24 // field so the function being called can make callbacks to this thread.
25 // Also store the Thread::Client reference in the callback_threads map so
26 // future calls over this connection can reuse it.
27 auto [callback_thread, _]{SetThread(
28 GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
29 [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
30
31 // Call remote ThreadMap.makeThread function so server will create a
32 // dedicated worker thread to run function calls from this thread. Store the
33 // Thread::Client reference it returns in the request_threads map.
34 auto make_request_thread{[&]{
35 // This code will only run if an IPC client call is being made for the
36 // first time on this thread. After the first call, subsequent calls
37 // will use the existing request thread. This code will also never run at
38 // all if the current thread is a request thread created for a different
39 // IPC client, because in that case PassField code (below) will have set
40 // request_thread to point to the calling thread.
41 auto request = connection.m_thread_map.makeThreadRequest();
42 request.setName(thread_context.thread_name);
43 return request.send().getResult(); // Nonblocking due to capnp request pipelining.
44 }};
45 auto [request_thread, _1]{SetThread(
46 GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
47 &connection, make_request_thread)};
48
49 auto context = output.init();
50 context.setThread(request_thread->second->m_client);
51 context.setCallbackThread(callback_thread->second->m_client);
52}
53
56template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
57auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
58 typename std::enable_if<
59 std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
60 kj::Promise<typename ServerContext::CallContext>>::type
61{
62 const auto& params = server_context.call_context.getParams();
63 Context::Reader context_arg = Accessor::get(params);
64 auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
65 auto& server = server_context.proxy_server;
66 int req = server_context.req;
67 auto invoke = [fulfiller = kj::mv(future.fulfiller),
68 call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
69 const auto& params = call_context.getParams();
70 Context::Reader context_arg = Accessor::get(params);
71 ServerContext server_context{server, call_context, req};
72 {
73 // Before invoking the function, store a reference to the
74 // callbackThread provided by the client in the
75 // thread_local.request_threads map. This way, if this
76 // server thread needs to execute any RPCs that call back to
77 // the client, they will happen on the same client thread
78 // that is waiting for this function, just like what would
79 // happen if this were a normal function call made on the
80 // local stack.
81 //
82 // If the request_threads map already has an entry for this
83 // connection, it will be left unchanged, and it indicates
84 // that the current thread is an RPC client thread which is
85 // in the middle of an RPC call, and the current RPC call is
86 // a nested call from the remote thread handling that RPC
87 // call. In this case, the callbackThread value should point
88 // to the same thread already in the map, so there is no
89 // need to update the map.
90 auto& thread_context = g_thread_context;
91 auto& request_threads = thread_context.request_threads;
92 ConnThread request_thread;
93 bool inserted;
94 server.m_context.loop->sync([&] {
95 std::tie(request_thread, inserted) = SetThread(
96 GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
97 [&] { return context_arg.getCallbackThread(); });
98 });
99
100 // If an entry was inserted into the request_threads map,
101 // remove it after calling fn.invoke. If an entry was not
102 // inserted, one already existed, meaning this must be a
103 // recursive call (IPC call calling back to the caller which
104 // makes another IPC call), so avoid modifying the map.
105 const bool erase_thread{inserted};
106 KJ_DEFER(if (erase_thread) {
107 // Erase the request_threads entry on the event loop
108 // thread with loop->sync(), so if the connection is
109 // broken there is not a race between this thread and
110 // the disconnect handler trying to destroy the thread
111 // client object.
112 server.m_context.loop->sync([&] {
113 // Look up the thread again without using existing
114 // iterator since entry may no longer be there after
115 // a disconnect. Destroy node after releasing
116 // Waiter::m_mutex, so the ProxyClient<Thread>
117 // destructor is able to use EventLoop::mutex
118 // without violating lock order.
119 ConnThreads::node_type removed;
120 {
121 Lock lock(thread_context.waiter->m_mutex);
122 removed = request_threads.extract(server.m_context.connection);
123 }
124 });
125 });
126 fn.invoke(server_context, args...);
127 }
128 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
129 server.m_context.loop->sync([&] {
130 auto fulfiller_dispose = kj::mv(fulfiller);
131 fulfiller_dispose->fulfill(kj::mv(call_context));
132 });
133 }))
134 {
135 server.m_context.loop->sync([&]() {
136 auto fulfiller_dispose = kj::mv(fulfiller);
137 fulfiller_dispose->reject(kj::mv(*exception));
138 });
139 }
140 };
141
142 // Lookup Thread object specified by the client. The specified thread should
143 // be a local Thread::Server object, but it needs to be looked up
144 // asynchronously with getLocalServer().
145 auto thread_client = context_arg.getThread();
146 return server.m_context.connection->m_threads.getLocalServer(thread_client)
147 .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
148 // Assuming the thread object is found, pass it a pointer to the
149 // `invoke` lambda above which will invoke the function on that
150 // thread.
151 KJ_IF_MAYBE (thread_server, perhaps) {
152 const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
153 server.m_context.loop->log()
154 << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
155 thread.m_thread_context.waiter->post(std::move(invoke));
156 } else {
157 server.m_context.loop->log()
158 << "IPC server error request #" << req << ", missing thread to execute request";
159 throw std::runtime_error("invalid thread handle");
160 }
161 })
162 // Wait for the invocation to finish before returning to the caller.
163 .then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
164}
165} // namespace mp
166
167#endif // MP_PROXY_TYPE_CONTEXT_H
ArgsManager & args
Definition: bitcoind.cpp:282
Definition: util.h:170
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
auto PassField(Priority< 1 >, TypeList< LocalType & >, ServerContext &server_context, Fn &&fn, Args &&... args) -> Require< typename decltype(Accessor::get(server_context.call_context.getParams()))::Calls >
PassField override for callable interface reference arguments.
Definition: proxy-types.h:241
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
Definition: proxy-io.h:59
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:316
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:40
ConnThreads::iterator ConnThread
Definition: proxy-io.h:552
void CustomBuildField(TypeList< LocalType >, Priority< 1 >, InvokeContext &invoke_context, Value &&value, Output &&output)
Overload multiprocess library's CustomBuildField hook to allow any serializable object to be stored i...
Definition: common-types.h:63
ThreadContext & thread_context
Definition: proxy-io.h:36
Connection & connection
Definition: proxy-io.h:31
Function parameter type for prioritizing overloaded function calls that would otherwise be ambiguous.
Definition: util.h:108
ThreadContext & m_thread_context
Definition: proxy-io.h:88
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:575
Generic utility functions used by capnp code.
Definition: util.h:32
consteval auto _(util::TranslatedLiteral str)
Definition: translation.h:79