11#include <mp/proxy.capnp.h>
13#include <capnp/rpc-twoparty.h>
16#include <condition_variable>
18#include <kj/function.h>
43template <
typename ProxyServer,
typename CallContext_>
71template <
typename Interface,
typename Params,
typename Results>
100 kj::Promise<void> getName(GetNameContext context)
override;
106 template<
typename T,
typename Fn>
107 kj::Promise<T> post(Fn&& fn);
114 kj::Promise<void> m_thread_ready{kj::READY_NOW};
122 void taskFailed(kj::Exception&& exception)
override;
178 template <
typename T>
185 template <
typename T>
188 return logger << std::forward<T>(value);
191 explicit operator bool()
const
207#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
209#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
249 EventLoop(
const char* exe_name, std::function<
void(
bool, std::string)> old_callback,
void* context =
nullptr)
251 LogFn{[old_callback =
std::move(old_callback)](
LogMessage log_data) {old_callback(log_data.level ==
Log::Raise, std::move(log_data.message));}},
263 void post(kj::Function<
void()> fn);
268 template <
typename Callable>
271 post(std::forward<Callable>(callable));
276 void addAsyncCleanup(std::function<
void()> fn);
296 const
char* m_exe_name;
299 std::thread::
id m_thread_id =
std::this_thread::get_id();
303 std::thread m_async_thread;
324 std::condition_variable m_cv;
327 kj::AsyncIoContext m_io_context;
360 template <
typename Fn>
363 const Lock lock(m_mutex);
364 if (m_fn)
return false;
365 m_fn = std::forward<Fn>(fn);
370 template <
class Predicate>
380 auto fn = std::move(*m_fn);
384 const bool done = pred();
410 :
m_loop(loop), m_stream(kj::mv(stream_)),
411 m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
412 m_rpc_system(::capnp::makeRpcClient(m_network)) {}
414 kj::Own<kj::AsyncIoStream>&& stream_,
415 const std::function<::capnp::Capability::Client(
Connection&)>& make_client)
416 :
m_loop(loop), m_stream(kj::mv(stream_)),
417 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
418 m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
430 CleanupIt addSyncCleanup(std::function<
void()> fn);
434 template <
typename F>
442 m_on_disconnect.add(m_network.onDisconnect().then(
443 [f = std::forward<F>(f),
this]()
mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
452 kj::TaskSet m_on_disconnect{m_error_handler};
454 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>>
m_rpc_system;
458 ThreadMap::Client m_thread_map{
nullptr};
485 ::capnp::word scratch[4]{};
486 ::capnp::MallocMessageBuilder message{scratch};
487 ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
488 ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
491template <
typename Interface,
typename Impl>
494 bool destroy_connection)
502 typename Interface::Client(std::move(
m_client));
539 typename Interface::Client(std::move(m_client));
541 if (destroy_connection) {
548 Sub::construct(*
this);
551template <
typename Interface,
typename Impl>
557template <
typename Interface,
typename Impl>
576template <
typename Interface,
typename Impl>
599 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(
m_context.cleanup_fns)]()
mutable {
624template <
typename Interface,
typename Impl>
637using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
678 std::unique_ptr<Waiter> waiter =
nullptr;
713 bool loop_thread =
false;
716template<
typename T,
typename Fn>
719 auto ready = kj::newPromiseAndFulfiller<void>();
720 auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
727 auto self = thisCap();
728 auto ret = m_thread_ready.then([
this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]()
mutable {
729 auto result = kj::newPromiseAndFulfiller<T>();
730 bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
739 m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
740 ready_fulfiller->fulfill();
741 ready_fulfiller = nullptr;
743 std::optional<T> result_value;
744 kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
745 m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
749 cancel_monitor_ptr = nullptr;
753 KJ_IF_MAYBE(e, exception) {
754 assert(!result_value);
755 result_fulfiller->reject(kj::mv(*e));
757 assert(result_value);
758 result_fulfiller->fulfill(kj::mv(*result_value));
759 result_value.reset();
761 result_fulfiller = nullptr;
766 m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
774 return kj::mv(result.promise);
775 }).attach(kj::heap<CancelProbe>(cancel_monitor));
776 m_thread_ready = kj::mv(ready.promise);
783template <
typename InitInterface>
786 typename InitInterface::Client init_client(
nullptr);
787 std::unique_ptr<Connection> connection;
790 loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
791 connection = std::make_unique<Connection>(loop, kj::mv(stream));
792 init_client = connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<InitInterface>();
793 Connection* connection_ptr = connection.get();
796 delete connection_ptr;
799 return std::make_unique<ProxyClient<InitInterface>>(
800 kj::mv(init_client), connection.release(),
true);
807template <
typename InitInterface,
typename InitImpl>
814 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&
init, [](
InitImpl*){}), connection);
817 it->onDisconnect([&loop, it] {
818 MP_LOG(loop, Log::Info) <<
"IPC server: socket disconnected.";
826template <
typename InitInterface,
typename InitImpl>
829 auto* ptr = listener.get();
831 [&loop, &
init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream)
mutable {
832 _Serve<InitInterface>(loop, kj::mv(stream), init);
833 _Listen<InitInterface>(loop, kj::mv(listener), init);
839template <
typename InitInterface,
typename InitImpl>
842 _Serve<InitInterface>(
843 loop, loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
848template <
typename InitInterface,
typename InitImpl>
852 _Listen<InitInterface>(loop,
853 loop.
m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
Helper class that detects when a promise is canceled.
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
::capnp::TwoPartyVatNetwork m_network
kj::Own< kj::AsyncIoStream > m_stream
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
void onDisconnect(F &&f)
Add disconnect handler.
kj::Canceler m_canceler
Canceler for canceling promises that we want to discard when the connection is destroyed.
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
std::list< Connection > m_incoming_connections
List of connections.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
void * m_context
External context pointer.
EventLoop(const char *exe_name, std::function< void(bool, std::string)> old_callback, void *context=nullptr)
Backwards-compatible constructor for previous (deprecated) logging callback signature.
Event loop smart pointer automatically managing m_num_clients.
std::unique_lock< std::mutex > m_lock
friend Logger & operator<<(Logger &logger, T &&value)
Logger(const Logger &)=delete
~Logger() noexcept(false)
Logger & operator=(Logger &&)=delete
Logger(const LogOptions &options, Log log_level)
const LogOptions & m_options
friend Logger & operator<<(Logger &&logger, T &&value)
std::ostringstream m_buffer
Logger & operator=(const Logger &)=delete
Handler for kj::TaskSet failed task events.
LoggingErrorHandler(EventLoop &loop)
void taskFailed(kj::Exception &&exception) override
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Interface::Client m_client
~ProxyClientBase() noexcept
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
kj::StringPtr KJ_STRINGIFY(Log flags)
std::list< std::function< void()> > CleanupList
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
std::function< void(LogMessage)> LogFn
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
thread_local ThreadContext g_thread_context
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
Log
Log flags. Update stringify function if changed!
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
std::string LongThreadName(const char *exe_name)
ConnThreads::iterator ConnThread
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
typename CleanupList::iterator CleanupIt
void CleanupRun(CleanupList &fns)
std::map< Connection *, std::optional< ProxyClient< Thread > > > ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection.
ThreadContext & thread_context
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Log level
The severity level of this message.
std::string message
Message to be logged.
LogFn log_fn
External logging callback.
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
Log log_level
Messages with a severity level less than log_level will not be reported.
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ThreadContext & m_thread_context
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
CallContext & call_context
ProxyServer & proxy_server
bool request_canceled
For IPC methods that execute asynchronously, not on the event-loop thread, this is set to true if the...
Lock * cancel_lock
For IPC methods that execute asynchronously, not on the event-loop thread: lock preventing the event-...
Vat id for server side of connection.
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the callbackThread argument it passes in the req...
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the thread argument it passes in the request,...
std::string thread_name
Identifying string for debug.
Single element task queue used to handle recursive capnp calls.
void wait(Lock &lock, Predicate pred)
std::condition_variable m_cv
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)