11#include <mp/proxy.capnp.h>
13#include <capnp/rpc-twoparty.h>
16#include <condition_variable>
18#include <kj/function.h>
43template <
typename ProxyServer,
typename CallContext_>
58template <
typename Interface,
typename Params,
typename Results>
87 kj::Promise<void> getName(GetNameContext context)
override;
153 template <
typename T>
160 template <
typename T>
163 return logger << std::forward<T>(value);
166 explicit operator bool()
const
182#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
184#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
224 EventLoop(
const char* exe_name, std::function<
void(
bool, std::string)> old_callback,
void* context =
nullptr)
226 LogFn{[old_callback =
std::move(old_callback)](
LogMessage log_data) {old_callback(log_data.level ==
Log::Raise, std::move(log_data.message));}},
238 void post(kj::Function<
void()> fn);
243 template <
typename Callable>
246 post(std::forward<Callable>(callable));
251 void addAsyncCleanup(std::function<
void()> fn);
271 const
char* m_exe_name;
274 std::thread::
id m_thread_id =
std::this_thread::get_id();
278 std::thread m_async_thread;
299 std::condition_variable m_cv;
302 kj::AsyncIoContext m_io_context;
330 template <
typename Fn>
333 const Lock lock(m_mutex);
334 if (m_fn)
return false;
335 m_fn = std::forward<Fn>(fn);
340 template <
class Predicate>
350 auto fn = std::move(*m_fn);
354 const bool done = pred();
380 :
m_loop(loop), m_stream(kj::mv(stream_)),
381 m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
382 m_rpc_system(::capnp::makeRpcClient(m_network)) {}
384 kj::Own<kj::AsyncIoStream>&& stream_,
385 const std::function<::capnp::Capability::Client(
Connection&)>& make_client)
386 :
m_loop(loop), m_stream(kj::mv(stream_)),
387 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
388 m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
400 CleanupIt addSyncCleanup(std::function<
void()> fn);
404 template <
typename F>
412 m_on_disconnect.add(m_network.onDisconnect().then(
413 [f = std::forward<F>(f),
this]()
mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
419 kj::TaskSet m_on_disconnect{m_error_handler};
421 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>>
m_rpc_system;
425 ThreadMap::Client m_thread_map{
nullptr};
447 ::capnp::word scratch[4]{};
448 ::capnp::MallocMessageBuilder message{scratch};
449 ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
450 ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
453template <
typename Interface,
typename Impl>
456 bool destroy_connection)
464 typename Interface::Client(std::move(
m_client));
501 typename Interface::Client(std::move(m_client));
503 if (destroy_connection) {
510 Sub::construct(*
this);
513template <
typename Interface,
typename Impl>
519template <
typename Interface,
typename Impl>
538template <
typename Interface,
typename Impl>
561 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(
m_context.cleanup_fns)]()
mutable {
586template <
typename Interface,
typename Impl>
599using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
640 std::unique_ptr<Waiter> waiter =
nullptr;
675 bool loop_thread =
false;
681template <
typename InitInterface>
684 typename InitInterface::Client init_client(
nullptr);
685 std::unique_ptr<Connection> connection;
688 loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
689 connection = std::make_unique<Connection>(loop, kj::mv(stream));
690 init_client = connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<InitInterface>();
691 Connection* connection_ptr = connection.get();
694 delete connection_ptr;
697 return std::make_unique<ProxyClient<InitInterface>>(
698 kj::mv(init_client), connection.release(),
true);
705template <
typename InitInterface,
typename InitImpl>
712 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&
init, [](
InitImpl*){}), connection);
715 it->onDisconnect([&loop, it] {
716 MP_LOG(loop, Log::Info) <<
"IPC server: socket disconnected.";
724template <
typename InitInterface,
typename InitImpl>
727 auto* ptr = listener.get();
729 [&loop, &
init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream)
mutable {
730 _Serve<InitInterface>(loop, kj::mv(stream), init);
731 _Listen<InitInterface>(loop, kj::mv(listener), init);
737template <
typename InitInterface,
typename InitImpl>
740 _Serve<InitInterface>(
741 loop, loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
746template <
typename InitInterface,
typename InitImpl>
750 _Listen<InitInterface>(loop,
751 loop.
m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
::capnp::TwoPartyVatNetwork m_network
kj::Own< kj::AsyncIoStream > m_stream
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
void onDisconnect(F &&f)
Add disconnect handler.
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
std::list< Connection > m_incoming_connections
List of connections.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
void * m_context
External context pointer.
EventLoop(const char *exe_name, std::function< void(bool, std::string)> old_callback, void *context=nullptr)
Backwards-compatible constructor for previous (deprecated) logging callback signature.
Event loop smart pointer automatically managing m_num_clients.
std::unique_lock< std::mutex > m_lock
friend Logger & operator<<(Logger &logger, T &&value)
Logger(const Logger &)=delete
~Logger() noexcept(false)
Logger & operator=(Logger &&)=delete
Logger(const LogOptions &options, Log log_level)
const LogOptions & m_options
friend Logger & operator<<(Logger &&logger, T &&value)
std::ostringstream m_buffer
Logger & operator=(const Logger &)=delete
Handler for kj::TaskSet failed task events.
LoggingErrorHandler(EventLoop &loop)
void taskFailed(kj::Exception &&exception) override
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Interface::Client m_client
~ProxyClientBase() noexcept
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
kj::StringPtr KJ_STRINGIFY(Log flags)
std::list< std::function< void()> > CleanupList
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
std::function< void(LogMessage)> LogFn
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
thread_local ThreadContext g_thread_context
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
Log
Log flags. Update stringify function if changed!
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
std::string LongThreadName(const char *exe_name)
ConnThreads::iterator ConnThread
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
typename CleanupList::iterator CleanupIt
void CleanupRun(CleanupList &fns)
std::map< Connection *, std::optional< ProxyClient< Thread > > > ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection.
ThreadContext & thread_context
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Log level
The severity level of this message.
std::string message
Message to be logged.
LogFn log_fn
External logging callback.
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
Log log_level
Messages with a severity level less than log_level will not be reported.
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ThreadContext & m_thread_context
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
CallContext & call_context
ProxyServer & proxy_server
Vat id for server side of connection.
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the callbackThread argument it passes in the req...
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the thread argument it passes in the request,...
std::string thread_name
Identifying string for debug.
Single element task queue used to handle recursive capnp calls.
void wait(Lock &lock, Predicate pred)
std::condition_variable m_cv
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)