5#ifndef MP_PROXY_TYPE_CONTEXT_H
6#define MP_PROXY_TYPE_CONTEXT_H
14template <
typename Output>
19 typename std::enable_if<std::is_same<
decltype(output.get()), Context::Builder>::value>::type* enable =
nullptr)
30 GuardedRef{thread_context.waiter->m_mutex, thread_context.callback_threads}, &connection,
31 [&] {
return connection.m_threads.add(kj::heap<
ProxyServer<Thread>>(connection, thread_context, std::thread{})); })};
36 auto make_request_thread{[&]{
43 auto request = connection.m_thread_map.makeThreadRequest();
44 request.setName(thread_context.thread_name);
45 return request.send().getResult();
48 GuardedRef{thread_context.waiter->m_mutex, thread_context.request_threads},
49 &connection, make_request_thread)};
51 auto context = output.init();
52 context.setThread(request_thread->second->m_client);
53 context.setCallbackThread(callback_thread->second->m_client);
58template <
typename Accessor,
typename ServerContext,
typename Fn,
typename... Args>
60 typename std::enable_if<
61 std::is_same<
decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
62 kj::Promise<typename ServerContext::CallContext>>::type
64 const auto& params = server_context.call_context.getParams();
65 Context::Reader context_arg = Accessor::get(params);
66 auto& server = server_context.proxy_server;
67 int req = server_context.req;
74 auto self = server.thisCap();
75 auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn,
args...](
CancelMonitor& cancel_monitor)
mutable {
76 MP_LOG(*server.m_context.loop,
Log::Debug) <<
"IPC server executing request #" << req;
77 const auto& params = call_context.getParams();
78 Context::Reader context_arg = Accessor::get(params);
99 auto& request_threads = thread_context.request_threads;
101 bool inserted{
false};
103 Lock cancel_lock{cancel_mutex};
104 server_context.cancel_lock = &cancel_lock;
105 server.m_context.loop->sync([&] {
107 if (cancel_monitor.m_canceled) {
108 server_context.request_canceled = true;
112 assert(!cancel_monitor.m_on_cancel);
113 cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
114 MP_LOG(*server.m_context.loop, Log::Info) <<
"IPC server request #" << req <<
" canceled while executing.";
126 Lock cancel_lock{cancel_mutex};
127 server_context.request_canceled =
true;
130 std::tie(request_thread, inserted) =
SetThread(
131 GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
132 [&] {
return context_arg.getCallbackThread(); });
140 const bool erase_thread{inserted};
149 cancel_lock.m_lock.unlock();
155 server.m_context.loop->sync([&] {
156 auto self_dispose{kj::mv(self)};
164 ConnThreads::node_type removed;
166 Lock lock(thread_context.waiter->m_mutex);
167 removed = request_threads.extract(server.m_context.connection);
172 if (server_context.request_canceled) {
173 MP_LOG(*server.m_context.loop,
Log::Info) <<
"IPC server request #" << req <<
" canceled before it could be executed";
174 }
else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
176 fn.invoke(server_context,
args...);
178 MP_LOG(*server.m_context.loop,
Log::Info) <<
"IPC server request #" << req <<
" interrupted (" << e.
what() <<
")";
181 MP_LOG(*server.m_context.loop,
Log::Error) <<
"IPC server request #" << req <<
" uncaught exception (" << kj::str(*exception).cStr() <<
")";
182 throw kj::mv(*exception);
192 auto thread_client = context_arg.getThread();
193 auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
194 .then([&server, invoke = kj::mv(invoke), req](
const kj::Maybe<Thread::Server&>& perhaps)
mutable {
198 KJ_IF_MAYBE (thread_server, perhaps) {
199 auto& thread =
static_cast<ProxyServer<Thread>&
>(*thread_server);
200 MP_LOG(*server.m_context.loop, Log::Debug)
201 <<
"IPC server post request #" << req <<
" {" << thread.m_thread_context.thread_name <<
"}";
202 return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
204 MP_LOG(*server.m_context.loop, Log::Error)
205 <<
"IPC server error request #" << req <<
", missing thread to execute request";
206 throw std::runtime_error(
"invalid thread handle");
214 return server.m_context.connection->m_canceler.wrap(kj::mv(result));
Helper class that detects when a promise is canceled.
Functions to serialize / deserialize common bitcoin types.
auto PassField(Priority< 1 >, TypeList< LocalType & >, ServerContext &server_context, Fn &&fn, Args &&... args) -> Require< typename decltype(Accessor::get(server_context.call_context.getParams()))::Calls >
PassField override for callable interface reference arguments.
ServerInvokeContext< ProxyServer< Interface >, ::capnp::CallContext< Params, Results > > ServerContext
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
thread_local ThreadContext g_thread_context
ConnThreads::iterator ConnThread
void CustomBuildField(TypeList< LocalType >, Priority< 1 >, InvokeContext &invoke_context, Value &&value, Output &&output)
Overload multiprocess library's CustomBuildField hook to allow any serializable object to be stored i...
ThreadContext & thread_context
Exception thrown from code executing an IPC call that is interrupted.
const char * what() const noexcept override
Function parameter type for prioritizing overloaded function calls that would otherwise be ambiguous.
Generic utility functions used by capnp code.
consteval auto _(util::TranslatedLiteral str)