11#include <mp/proxy.capnp.h>
13#include <capnp/rpc-twoparty.h>
16#include <condition_variable>
18#include <kj/function.h>
43template <
typename ProxyServer,
typename CallContext_>
58template <
typename Interface,
typename Params,
typename Results>
87 kj::Promise<void> getName(GetNameContext context)
override;
101using LogFn = std::function<void(
bool raise, std::string message)>;
113 template <
typename T>
116 if (logger.
m_fn) logger.
m_buffer << std::forward<T>(value);
120 template <
typename T>
123 return logger << std::forward<T>(value);
172 EventLoop(
const char* exe_name,
LogFn log_fn,
void* context =
nullptr);
182 void post(kj::Function<
void()> fn);
187 template <
typename Callable>
190 post(std::forward<Callable>(callable));
283 template <
typename Fn>
288 m_fn = std::forward<Fn>(fn);
292 template <
class Predicate>
302 auto fn = std::move(*m_fn);
306 const bool done = pred();
333 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
336 kj::Own<kj::AsyncIoStream>&& stream_,
337 const std::function<::capnp::Capability::Client(
Connection&)>& make_client)
339 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
356 template <
typename F>
365 [f = std::forward<F>(f),
this]()
mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
373 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>>
m_rpc_system;
401 ::capnp::rpc::twoparty::VatId::Builder
vat_id{
message.getRoot<::capnp::rpc::twoparty::VatId>()};
405template <
typename Interface,
typename Impl>
408 bool destroy_connection)
416 typename Interface::Client(std::move(
m_client));
453 typename Interface::Client(std::move(m_client));
455 if (destroy_connection) {
462 Sub::construct(*
this);
465template <
typename Interface,
typename Impl>
471template <
typename Interface,
typename Impl>
490template <
typename Interface,
typename Impl>
513 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(
m_context.cleanup_fns)]()
mutable {
538template <
typename Interface,
typename Impl>
551using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
592 std::unique_ptr<Waiter> waiter =
nullptr;
627 bool loop_thread =
false;
633template <
typename InitInterface>
636 typename InitInterface::Client init_client(
nullptr);
637 std::unique_ptr<Connection> connection;
640 loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
641 connection = std::make_unique<Connection>(loop, kj::mv(stream));
642 init_client = connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<InitInterface>();
643 Connection* connection_ptr = connection.get();
645 loop.
log() <<
"IPC client: unexpected network disconnect.";
646 delete connection_ptr;
649 return std::make_unique<ProxyClient<InitInterface>>(
650 kj::mv(init_client), connection.release(),
true);
657template <
typename InitInterface,
typename InitImpl>
664 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&
init, [](
InitImpl*){}), connection);
667 it->onDisconnect([&loop, it] {
668 loop.
log() <<
"IPC server: socket disconnected.";
676template <
typename InitInterface,
typename InitImpl>
679 auto* ptr = listener.get();
681 [&loop, &
init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream)
mutable {
682 _Serve<InitInterface>(loop, kj::mv(stream), init);
683 _Listen<InitInterface>(loop, kj::mv(listener), init);
689template <
typename InitInterface,
typename InitImpl>
692 _Serve<InitInterface>(
693 loop, loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
698template <
typename InitInterface,
typename InitImpl>
702 _Listen<InitInterface>(loop,
703 loop.
m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
kj::TaskSet m_on_disconnect
LoggingErrorHandler m_error_handler
~Connection()
Run cleanup functions.
::capnp::TwoPartyVatNetwork m_network
kj::Own< kj::AsyncIoStream > m_stream
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
void onDisconnect(F &&f)
Add disconnect handler.
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
ThreadMap::Client m_thread_map
Event loop implementation.
const char * m_exe_name
Process name included in thread names so combined debug output from multiple processes is easier to u...
kj::AsyncIoContext m_io_context
Capnp IO context.
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
std::list< Connection > m_incoming_connections
List of connections.
void loop()
Run event loop.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
int m_num_clients MP_GUARDED_BY(m_mutex)=0
Number of clients holding references to ProxyServerBase objects that reference this event loop.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
void * m_context
External context pointer.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
std::optional< CleanupList > m_async_fns MP_GUARDED_BY(m_mutex)
Callback functions to run on async thread.
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
std::thread m_async_thread
Handle of an async worker thread.
kj::Function< void()> *m_post_fn MP_GUARDED_BY(m_mutex)
Callback function to run on event loop thread during post() or sync() call.
std::thread::id m_thread_id
ID of the event loop thread.
void post(kj::Function< void()> fn)
Run function on event loop thread.
Event loop smart pointer automatically managing m_num_clients.
std::unique_lock< std::mutex > m_lock
friend Logger & operator<<(Logger &logger, T &&value)
Logger(bool raise, LogFn &fn)
~Logger() noexcept(false)
friend Logger & operator<<(Logger &&logger, T &&value)
std::ostringstream m_buffer
Handler for kj::TaskSet failed task events.
LoggingErrorHandler(EventLoop &loop)
void taskFailed(kj::Exception &&exception) override
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Interface::Client m_client
~ProxyClientBase() noexcept
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
std::list< std::function< void()> > CleanupList
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
thread_local ThreadContext g_thread_context
std::function< void(bool raise, std::string message)> LogFn
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
std::string LongThreadName(const char *exe_name)
ConnThreads::iterator ConnThread
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
typename CleanupList::iterator CleanupIt
void CleanupRun(CleanupList &fns)
std::map< Connection *, std::optional< ProxyClient< Thread > > > ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection.
ThreadContext & thread_context
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
LogFn log_fn
External logging callback.
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ThreadContext & m_thread_context
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
CallContext & call_context
ProxyServer & proxy_server
Vat id for server side of connection.
::capnp::MallocMessageBuilder message
::capnp::rpc::twoparty::VatId::Builder vat_id
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the callbackThread argument it passes in the req...
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the thread argument it passes in the request,...
std::string thread_name
Identifying string for debug.
Single element task queue used to handle recursive capnp calls.
void wait(Lock &lock, Predicate pred)
std::condition_variable m_cv
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)