5#ifndef MP_PROXY_TYPE_CONTEXT_H
6#define MP_PROXY_TYPE_CONTEXT_H
12template <
typename Output>
17 typename std::enable_if<std::is_same<
decltype(output.get()), Context::Builder>::value>::type* enable =
nullptr)
28 thread_context.callback_threads, thread_context.waiter->m_mutex, &connection,
29 [&] { return connection.m_threads.add(kj::heap<ProxyServer<Thread>>(thread_context, std::thread{})); })};
34 auto make_request_thread{[&]{
41 auto request = connection.m_thread_map.makeThreadRequest();
42 request.setName(thread_context.thread_name);
43 return request.send().getResult();
46 thread_context.request_threads, thread_context.waiter->m_mutex,
47 &connection, make_request_thread)};
49 auto context = output.init();
50 context.setThread(request_thread->second.m_client);
51 context.setCallbackThread(callback_thread->second.m_client);
56template <
typename Accessor,
typename ServerContext,
typename Fn,
typename... Args>
58 typename std::enable_if<
59 std::is_same<
decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
60 kj::Promise<typename ServerContext::CallContext>>::type
62 const auto& params = server_context.call_context.getParams();
63 Context::Reader context_arg = Accessor::get(params);
64 auto future = kj::newPromiseAndFulfiller<typename ServerContext::CallContext>();
65 auto& server = server_context.proxy_server;
66 int req = server_context.req;
68 [fulfiller = kj::mv(future.fulfiller),
69 call_context = kj::mv(server_context.call_context), &server, req, fn,
args...]()
mutable {
70 const auto& params = call_context.getParams();
71 Context::Reader context_arg = Accessor::get(params);
72 ServerContext server_context{server, call_context, req};
73 bool disconnected{false};
92 auto& thread_context = g_thread_context;
93 auto& request_threads = thread_context.request_threads;
94 auto [request_thread, inserted]{SetThread(
95 request_threads, thread_context.waiter->m_mutex,
96 server.m_context.connection,
97 [&] { return context_arg.getCallbackThread(); })};
104 const bool erase_thread{inserted};
106 std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
117 disconnected = !request_threads.erase(server.m_context.connection);
119 disconnected = !request_threads.count(server.m_context.connection);
122 fn.invoke(server_context,
args...);
134 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
135 server.m_context.connection->m_loop.sync([&] {
136 auto fulfiller_dispose = kj::mv(fulfiller);
137 fulfiller_dispose->fulfill(kj::mv(call_context));
141 server.m_context.connection->m_loop.sync([&]() {
142 auto fulfiller_dispose = kj::mv(fulfiller);
143 fulfiller_dispose->reject(kj::mv(*exception));
151 auto thread_client = context_arg.getThread();
152 return server.m_context.connection->m_threads.getLocalServer(thread_client)
153 .then([&server, invoke, req](
const kj::Maybe<Thread::Server&>& perhaps) {
157 KJ_IF_MAYBE (thread_server, perhaps) {
158 const auto& thread =
static_cast<ProxyServer<Thread>&
>(*thread_server);
159 server.m_context.connection->m_loop.log()
160 <<
"IPC server post request #" << req <<
" {" << thread.m_thread_context.thread_name <<
"}";
161 thread.m_thread_context.waiter->post(std::move(invoke));
163 server.m_context.connection->m_loop.log()
164 <<
"IPC server error request #" << req <<
", missing thread to execute request";
165 throw std::runtime_error(
"invalid thread handle");
169 .then([invoke_wait = kj::mv(future.promise)]()
mutable {
return kj::mv(invoke_wait); });
Functions to serialize / deserialize common bitcoin types.
auto PassField(Priority< 1 >, TypeList<>, ServerContext &server_context, const Fn &fn, Args &&... args) -> typename std::enable_if< std::is_same< decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader >::value, kj::Promise< typename ServerContext::CallContext > >::type
PassField override for mp.Context arguments.
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
AsyncCallable< std::remove_reference_t< Callable > > MakeAsyncCallable(Callable &&callable)
Construct AsyncCallable object.
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
void CustomBuildField(TypeList< LocalType >, Priority< 1 >, InvokeContext &invoke_context, Value &&value, Output &&output)
Overload multiprocess library's CustomBuildField hook to allow any serializable object to be stored i...
ThreadContext & thread_context
Function parameter type for prioritizing overloaded function calls that would otherwise be ambiguous.
Generic utility functions used by capnp code.
consteval auto _(util::TranslatedLiteral str)