Bitcoin Core 29.99.0
P2P Digital Currency
type-context.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_TYPE_CONTEXT_H
6#define MP_PROXY_TYPE_CONTEXT_H
7
8#include <mp/proxy-io.h>
9#include <mp/util.h>
10
11namespace mp {
12template <typename Output>
15 ClientInvokeContext& invoke_context,
16 Output&& output,
17 typename std::enable_if<std::is_same<decltype(output.get()), Context::Builder>::value>::type* enable = nullptr)
18{
19 auto& connection = invoke_context.connection;
20 auto& thread_context = invoke_context.thread_context;
21
22 // Create local Thread::Server object corresponding to the current thread
23 // and pass a Thread::Client reference to it in the Context.callbackThread
24 // field so the function being called can make callbacks to this thread.
25 // Also store the Thread::Client reference in the callback_threads map so
26 // future calls over this connection can reuse it.
27 auto [callback_thread, _]{SetThread(
28 thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
29 [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
30
31 // Call remote ThreadMap.makeThread function so server will create a
32 // dedicated worker thread to run function calls from this thread. Store the
33 // Thread::Client reference it returns in the request_threads map.
34 auto make_request_thread{[&]{
35 // This code will only run if an IPC client call is being made for the
36 // first time on this thread. After the first call, subsequent calls
37 // will use the existing request thread. This code will also never run at
38 // all if the current thread is a request thread created for a different
39 // IPC client, because in that case PassField code (below) will have set
40 // request_thread to point to the calling thread.
41 auto request = connection.m_thread_map.makeThreadRequest();
42 request.setName(thread_context.thread_name);
43 return request.send().getResult(); // Nonblocking due to capnp request pipelining.
44 }};
45 auto [request_thread, _1]{SetThread(
46 thread_context.request_threads, thread_context.waiter->m_mutex,
47 &connection, make_request_thread)};
48
49 auto context = output.init();
50 context.setThread(request_thread->second.m_client);
51 context.setCallbackThread(callback_thread->second.m_client);
52}
53
56template <typename Accessor, typename ServerContext, typename Fn, typename... Args>
57auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& fn, Args&&... args) ->
58 typename std::enable_if<
59 std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
60 kj::Promise<typename ServerContext::CallContext>>::type
61{
62 const auto& params = server_context.call_context.getParams();
63 Context::Reader context_arg = Accessor::get(params);
64 auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
65 auto& server = server_context.proxy_server;
66 int req = server_context.req;
67 auto invoke = [fulfiller = kj::mv(future.fulfiller),
68 call_context = kj::mv(server_context.call_context), &server, req, fn, args...]() mutable {
69 const auto& params = call_context.getParams();
70 Context::Reader context_arg = Accessor::get(params);
71 ServerContext server_context{server, call_context, req};
72 {
73 // Before invoking the function, store a reference to the
74 // callbackThread provided by the client in the
75 // thread_local.request_threads map. This way, if this
76 // server thread needs to execute any RPCs that call back to
77 // the client, they will happen on the same client thread
78 // that is waiting for this function, just like what would
79 // happen if this were a normal function call made on the
80 // local stack.
81 //
82 // If the request_threads map already has an entry for this
83 // connection, it will be left unchanged, and it indicates
84 // that the current thread is an RPC client thread which is
85 // in the middle of an RPC call, and the current RPC call is
86 // a nested call from the remote thread handling that RPC
87 // call. In this case, the callbackThread value should point
88 // to the same thread already in the map, so there is no
89 // need to update the map.
90 auto& thread_context = g_thread_context;
91 auto& request_threads = thread_context.request_threads;
92 auto [request_thread, inserted]{SetThread(
93 request_threads, thread_context.waiter->m_mutex,
94 server.m_context.connection,
95 [&] { return context_arg.getCallbackThread(); })};
96
97 // If an entry was inserted into the requests_threads map,
98 // remove it after calling fn.invoke. If an entry was not
99 // inserted, one already existed, meaning this must be a
100 // recursive call (IPC call calling back to the caller which
101 // makes another IPC call), so avoid modifying the map.
102 const bool erase_thread{inserted};
103 KJ_DEFER(if (erase_thread) {
104 std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
105 // Call erase here with a Connection* argument instead
106 // of an iterator argument, because the `request_thread`
107 // iterator may be invalid if the connection is closed
108 // during this function call. More specifically, the
109 // iterator may be invalid because SetThread adds a
110 // cleanup callback to the Connection destructor that
111 // erases the thread from the map, and also because the
112 // ProxyServer<Thread> destructor calls
113 // request_threads.clear().
114 request_threads.erase(server.m_context.connection);
115 });
116 fn.invoke(server_context, args...);
117 }
118 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
119 server.m_context.loop->sync([&] {
120 auto fulfiller_dispose = kj::mv(fulfiller);
121 fulfiller_dispose->fulfill(kj::mv(call_context));
122 });
123 }))
124 {
125 server.m_context.loop->sync([&]() {
126 auto fulfiller_dispose = kj::mv(fulfiller);
127 fulfiller_dispose->reject(kj::mv(*exception));
128 });
129 }
130 };
131
132 // Lookup Thread object specified by the client. The specified thread should
133 // be a local Thread::Server object, but it needs to be looked up
134 // asynchronously with getLocalServer().
135 auto thread_client = context_arg.getThread();
136 return server.m_context.connection->m_threads.getLocalServer(thread_client)
137 .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
138 // Assuming the thread object is found, pass it a pointer to the
139 // `invoke` lambda above which will invoke the function on that
140 // thread.
141 KJ_IF_MAYBE (thread_server, perhaps) {
142 const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
143 server.m_context.loop->log()
144 << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
145 thread.m_thread_context.waiter->post(std::move(invoke));
146 } else {
147 server.m_context.loop->log()
148 << "IPC server error request #" << req << ", missing thread to execute request";
149 throw std::runtime_error("invalid thread handle");
150 }
151 })
152 // Wait for the invocation to finish before returning to the caller.
153 .then([invoke_wait = kj::mv(future.promise)]() mutable { return kj::mv(invoke_wait); });
154}
155} // namespace mp
156
157#endif // MP_PROXY_TYPE_CONTEXT_H
ArgsManager & args
Definition: bitcoind.cpp:277
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
auto PassField(Priority< 1 >, TypeList<>, ServerContext &server_context, const Fn &fn, Args &&... args) -> typename std::enable_if< std::is_same< decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader >::value, kj::Promise< typename ServerContext::CallContext > >::type
PassField override for mp.Context arguments.
Definition: type-context.h:57
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
Definition: proxy-io.h:59
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:40
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:308
void CustomBuildField(TypeList< LocalType >, Priority< 1 >, InvokeContext &invoke_context, Value &&value, Output &&output)
Overload multiprocess library's CustomBuildField hook to allow any serializable object to be stored i...
Definition: common-types.h:63
ThreadContext & thread_context
Definition: proxy-io.h:36
Connection & connection
Definition: proxy-io.h:31
Function parameter type for prioritizing overloaded function calls that would otherwise be ambiguous.
Definition: util.h:108
ThreadContext & m_thread_context
Definition: proxy-io.h:90
ConnThreads request_threads
When client is making a request to a server, this is the thread argument it passes in the request,...
Definition: proxy-io.h:568
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:548
Generic utility functions used by capnp code.
Definition: util.h:32
consteval auto _(util::TranslatedLiteral str)
Definition: translation.h:79