11#include <mp/proxy.capnp.h>
13#include <capnp/rpc-twoparty.h>
16#include <condition_variable>
18#include <kj/function.h>
43template <
typename ProxyServer,
typename CallContext_>
58template <
typename Interface,
typename Params,
typename Results>
69 void setDisconnectCallback(
const std::function<
void()>& fn);
89 kj::Promise<void> getName(GetNameContext context)
override;
99 void taskFailed(kj::Exception&& exception)
override;
103using LogFn = std::function<void(
bool raise, std::string message)>;
115 template <
typename T>
118 if (logger.
m_fn) logger.
m_buffer << std::forward<T>(value);
122 template <
typename T>
125 return logger << std::forward<T>(value);
174 EventLoop(
const char* exe_name,
LogFn log_fn,
void* context =
nullptr);
184 void post(kj::Function<
void()> fn);
189 template <
typename Callable>
192 post(std::forward<Callable>(callable));
285 template <
typename Fn>
288 const std::unique_lock<std::mutex> lock(
m_mutex);
290 m_fn = std::forward<Fn>(fn);
294 template <
class Predicate>
295 void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
297 m_cv.wait(lock, [&] {
304 auto fn = std::move(*
m_fn);
308 const bool done = pred();
322 std::optional<kj::Function<void()>>
m_fn;
335 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
338 kj::Own<kj::AsyncIoStream>&& stream_,
339 const std::function<::capnp::Capability::Client(
Connection&)>& make_client)
341 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
358 template <
typename F>
367 [f = std::forward<F>(f),
this]()
mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
375 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>>
m_rpc_system;
403 ::capnp::rpc::twoparty::VatId::Builder
vat_id{
message.getRoot<::capnp::rpc::twoparty::VatId>()};
407template <
typename Interface,
typename Impl>
410 bool destroy_connection)
418 typename Interface::Client(std::move(
m_client));
455 typename Interface::Client(std::move(m_client));
457 if (destroy_connection) {
464 Sub::construct(*
this);
467template <
typename Interface,
typename Impl>
473template <
typename Interface,
typename Impl>
492template <
typename Interface,
typename Impl>
515 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(
m_context.cleanup_fns)]()
mutable {
540template <
typename Interface,
typename Impl>
553std::tuple<ConnThread, bool>
SetThread(
ConnThreads& threads, std::mutex& mutex,
Connection* connection,
const std::function<Thread::Client()>& make_thread);
563 std::unique_ptr<Waiter> waiter =
nullptr;
583 bool loop_thread =
false;
589template <
typename InitInterface>
592 typename InitInterface::Client init_client(
nullptr);
593 std::unique_ptr<Connection> connection;
596 loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
597 connection = std::make_unique<Connection>(loop, kj::mv(stream));
598 init_client = connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<InitInterface>();
599 Connection* connection_ptr = connection.get();
601 loop.
log() <<
"IPC client: unexpected network disconnect.";
602 delete connection_ptr;
605 return std::make_unique<ProxyClient<InitInterface>>(
606 kj::mv(init_client), connection.release(),
true);
613template <
typename InitInterface,
typename InitImpl>
620 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&
init, [](
InitImpl*){}), connection);
623 it->onDisconnect([&loop, it] {
624 loop.
log() <<
"IPC server: socket disconnected.";
632template <
typename InitInterface,
typename InitImpl>
635 auto* ptr = listener.get();
637 [&loop, &
init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream)
mutable {
638 _Serve<InitInterface>(loop, kj::mv(stream), init);
639 _Listen<InitInterface>(loop, kj::mv(listener), init);
645template <
typename InitInterface,
typename InitImpl>
648 _Serve<InitInterface>(
649 loop, loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
654template <
typename InitInterface,
typename InitImpl>
658 _Listen<InitInterface>(loop,
659 loop.
m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
kj::TaskSet m_on_disconnect
LoggingErrorHandler m_error_handler
~Connection()
Run cleanup functions.
::capnp::TwoPartyVatNetwork m_network
kj::Own< kj::AsyncIoStream > m_stream
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
void onDisconnect(F &&f)
Add disconnect handler.
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
ThreadMap::Client m_thread_map
Event loop implementation.
const char * m_exe_name
Process name included in thread names so combined debug output from multiple processes is easier to u...
kj::AsyncIoContext m_io_context
Capnp IO context.
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
std::list< Connection > m_incoming_connections
List of connections.
void loop()
Run event loop.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
int m_num_clients MP_GUARDED_BY(m_mutex)=0
Number of clients holding references to ProxyServerBase objects that reference this event loop.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
void * m_context
External context pointer.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
std::optional< CleanupList > m_async_fns MP_GUARDED_BY(m_mutex)
Callback functions to run on async thread.
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
std::thread m_async_thread
Handle of an async worker thread.
kj::Function< void()> *m_post_fn MP_GUARDED_BY(m_mutex)
Callback function to run on event loop thread during post() or sync() call.
std::thread::id m_thread_id
ID of the event loop thread.
void post(kj::Function< void()> fn)
Run function on event loop thread.
Event loop smart pointer automatically managing m_num_clients.
friend Logger & operator<<(Logger &logger, T &&value)
Logger(bool raise, LogFn &fn)
~Logger() noexcept(false)
friend Logger & operator<<(Logger &&logger, T &&value)
std::ostringstream m_buffer
Handler for kj::TaskSet failed task events.
LoggingErrorHandler(EventLoop &loop)
void taskFailed(kj::Exception &&exception) override
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Interface::Client m_client
~ProxyClientBase() noexcept
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
std::list< std::function< void()> > CleanupList
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
std::map< Connection *, ProxyClient< Thread > > ConnThreads
thread_local ThreadContext g_thread_context
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
std::function< void(bool raise, std::string message)> LogFn
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
std::string LongThreadName(const char *exe_name)
ConnThreads::iterator ConnThread
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
typename CleanupList::iterator CleanupIt
void CleanupRun(CleanupList &fns)
ThreadContext & thread_context
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
LogFn log_fn
External logging callback.
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ThreadContext & m_thread_context
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
CallContext & call_context
ProxyServer & proxy_server
Vat id for server side of connection.
::capnp::MallocMessageBuilder message
::capnp::rpc::twoparty::VatId::Builder vat_id
ConnThreads request_threads
When client is making a request to a server, this is the thread argument it passes in the request,...
ConnThreads callback_threads
When client is making a request to a server, this is the callbackThread argument it passes in the req...
std::string thread_name
Identifying string for debug.
Single element task queue used to handle recursive capnp calls.
std::optional< kj::Function< void()> > m_fn
std::mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
std::condition_variable m_cv
void wait(std::unique_lock< std::mutex > &lock, Predicate pred)