11#include <mp/proxy.capnp.h>
13#include <capnp/rpc-twoparty.h>
16#include <condition_variable>
18#include <kj/function.h>
43template <
typename ProxyServer,
typename CallContext_>
71template <
typename Interface,
typename Params,
typename Results>
100 kj::Promise<void> getName(GetNameContext context)
override;
106 template<
typename T,
typename Fn>
107 kj::Promise<T> post(Fn&& fn);
114 kj::Promise<void> m_thread_ready{kj::READY_NOW};
122 void taskFailed(kj::Exception&& exception)
override;
178 template <
typename T>
185 template <
typename T>
188 return logger << std::forward<T>(value);
191 explicit operator bool()
const
207#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
209#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
249 EventLoop(
const char* exe_name, std::function<
void(
bool, std::string)> old_callback,
void* context =
nullptr)
251 LogFn{[old_callback =
std::move(old_callback)](
LogMessage log_data) {old_callback(log_data.level ==
Log::Raise, std::move(log_data.message));}},
263 void post(kj::Function<
void()> fn);
268 template <
typename Callable>
271 post(std::forward<Callable>(callable));
276 void addAsyncCleanup(std::function<
void()> fn);
296 const
char* m_exe_name;
299 std::thread::
id m_thread_id =
std::this_thread::get_id();
303 std::thread m_async_thread;
324 std::condition_variable m_cv;
327 kj::AsyncIoContext m_io_context;
376 template <
typename Fn>
379 const Lock lock(m_mutex);
380 if (m_fn)
return false;
381 m_fn = std::forward<Fn>(fn);
386 template <
class Predicate>
396 auto fn = std::move(*m_fn);
400 const bool done = pred();
426 :
m_loop(loop), m_stream(kj::mv(stream_)),
427 m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
428 m_rpc_system(::capnp::makeRpcClient(m_network)) {}
430 kj::Own<kj::AsyncIoStream>&& stream_,
431 const std::function<::capnp::Capability::Client(
Connection&)>& make_client)
432 :
m_loop(loop), m_stream(kj::mv(stream_)),
433 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
434 m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
446 CleanupIt addSyncCleanup(std::function<
void()> fn);
450 template <
typename F>
458 m_on_disconnect.add(m_network.onDisconnect().then(
459 [f = std::forward<F>(f),
this]()
mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
468 kj::TaskSet m_on_disconnect{m_error_handler};
470 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>>
m_rpc_system;
474 ThreadMap::Client m_thread_map{
nullptr};
501 ::capnp::word scratch[4]{};
502 ::capnp::MallocMessageBuilder message{scratch};
503 ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
504 ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
507template <
typename Interface,
typename Impl>
510 bool destroy_connection)
518 typename Interface::Client(std::move(
m_client));
555 typename Interface::Client(std::move(m_client));
557 if (destroy_connection) {
564 Sub::construct(*
this);
567template <
typename Interface,
typename Impl>
573template <
typename Interface,
typename Impl>
592template <
typename Interface,
typename Impl>
615 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(
m_context.cleanup_fns)]()
mutable {
640template <
typename Interface,
typename Impl>
653using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
694 std::unique_ptr<Waiter> waiter =
nullptr;
729 bool loop_thread =
false;
732template<
typename T,
typename Fn>
735 auto ready = kj::newPromiseAndFulfiller<void>();
736 auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
743 auto self = thisCap();
744 auto ret = m_thread_ready.then([
this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]()
mutable {
745 auto result = kj::newPromiseAndFulfiller<T>();
746 bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
755 m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
756 ready_fulfiller->fulfill();
757 ready_fulfiller = nullptr;
759 std::optional<T> result_value;
760 kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
761 m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
765 cancel_monitor_ptr = nullptr;
769 KJ_IF_MAYBE(e, exception) {
770 assert(!result_value);
771 result_fulfiller->reject(kj::mv(*e));
773 assert(result_value);
774 result_fulfiller->fulfill(kj::mv(*result_value));
775 result_value.reset();
777 result_fulfiller = nullptr;
782 m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
790 return kj::mv(result.promise);
791 }).attach(kj::heap<CancelProbe>(cancel_monitor));
792 m_thread_ready = kj::mv(ready.promise);
799template <
typename InitInterface>
802 typename InitInterface::Client init_client(
nullptr);
803 std::unique_ptr<Connection> connection;
806 loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
807 connection = std::make_unique<Connection>(loop, kj::mv(stream));
808 init_client = connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<InitInterface>();
809 Connection* connection_ptr = connection.get();
812 delete connection_ptr;
815 return std::make_unique<ProxyClient<InitInterface>>(
816 kj::mv(init_client), connection.release(),
true);
823template <
typename InitInterface,
typename InitImpl>
830 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&
init, [](
InitImpl*){}), connection);
833 MP_LOG(loop, Log::Info) <<
"IPC server: socket connected.";
834 it->onDisconnect([&loop, it] {
835 MP_LOG(loop, Log::Info) <<
"IPC server: socket disconnected.";
843template <
typename InitInterface,
typename InitImpl>
846 auto* ptr = listener.get();
848 [&loop, &
init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream)
mutable {
849 _Serve<InitInterface>(loop, kj::mv(stream), init);
850 _Listen<InitInterface>(loop, kj::mv(listener), init);
856template <
typename InitInterface,
typename InitImpl>
859 _Serve<InitInterface>(
860 loop, loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
865template <
typename InitInterface,
typename InitImpl>
869 _Listen<InitInterface>(loop,
870 loop.
m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
Helper class that detects when a promise is canceled.
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
::capnp::TwoPartyVatNetwork m_network
kj::Own< kj::AsyncIoStream > m_stream
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
void onDisconnect(F &&f)
Add disconnect handler.
kj::Canceler m_canceler
Canceler for canceling promises that we want to discard when the connection is destroyed.
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
std::list< Connection > m_incoming_connections
List of connections.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
void * m_context
External context pointer.
std::function< void()> testing_hook_makethread
Hook called when ProxyServer<ThreadMap>::makeThread() is called.
std::function< void()> testing_hook_makethread_created
Hook called on the worker thread inside makeThread(), after the thread context is set up and thread_c...
EventLoop(const char *exe_name, std::function< void(bool, std::string)> old_callback, void *context=nullptr)
Backwards-compatible constructor for previous (deprecated) logging callback signature.
std::function< void()> testing_hook_async_request_done
Hook called on the worker thread just before returning results.
std::function< void()> testing_hook_async_request_start
Hook called on the worker thread when it starts to execute an async request.
Event loop smart pointer automatically managing m_num_clients.
std::unique_lock< std::mutex > m_lock
friend Logger & operator<<(Logger &logger, T &&value)
Logger(const Logger &)=delete
~Logger() noexcept(false)
Logger & operator=(Logger &&)=delete
Logger(const LogOptions &options, Log log_level)
const LogOptions & m_options
friend Logger & operator<<(Logger &&logger, T &&value)
std::ostringstream m_buffer
Logger & operator=(const Logger &)=delete
Handler for kj::TaskSet failed task events.
LoggingErrorHandler(EventLoop &loop)
void taskFailed(kj::Exception &&exception) override
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Interface::Client m_client
~ProxyClientBase() noexcept
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
kj::StringPtr KJ_STRINGIFY(Log flags)
std::list< std::function< void()> > CleanupList
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
std::function< void(LogMessage)> LogFn
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
thread_local ThreadContext g_thread_context
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
Log
Log flags. Update stringify function if changed!
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
std::string LongThreadName(const char *exe_name)
ConnThreads::iterator ConnThread
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
typename CleanupList::iterator CleanupIt
void CleanupRun(CleanupList &fns)
std::map< Connection *, std::optional< ProxyClient< Thread > > > ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection.
ThreadContext & thread_context
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Log level
The severity level of this message.
std::string message
Message to be logged.
LogFn log_fn
External logging callback.
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
Log log_level
Messages with a severity level less than log_level will not be reported.
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ThreadContext & m_thread_context
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
CallContext & call_context
ProxyServer & proxy_server
bool request_canceled
For IPC methods that execute asynchronously, not on the event-loop thread, this is set to true if the...
Lock * cancel_lock
For IPC methods that execute asynchronously, not on the event-loop thread: lock preventing the event-...
Vat id for server side of connection.
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the callbackThread argument it passes in the req...
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the thread argument it passes in the request,...
std::string thread_name
Identifying string for debug.
Single element task queue used to handle recursive capnp calls.
void wait(Lock &lock, Predicate pred)
std::condition_variable m_cv
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)