11#include <mp/proxy.capnp.h>
13#include <capnp/rpc-twoparty.h>
40template <
typename ProxyServer,
typename CallContext_>
55template <
typename Interface,
typename Params,
typename Results>
66 void setCleanup(
const std::function<
void()>& fn);
84 kj::Promise<void> getName(GetNameContext context)
override;
94 void taskFailed(kj::Exception&& exception)
override;
98using LogFn = std::function<void(
bool raise, std::string message)>;
110 template <
typename T>
113 if (logger.
m_fn) logger.
m_buffer << std::forward<T>(value);
117 template <
typename T>
120 return logger << std::forward<T>(value);
137 EventLoop(
const char* exe_name,
LogFn log_fn,
void* context =
nullptr);
147 void post(
const std::function<
void()>& fn);
152 template <
typename Callable>
155 post(std::ref(callable));
172 void addClient(std::unique_lock<std::mutex>& lock);
175 bool done(std::unique_lock<std::mutex>& lock);
247 template <
typename Fn>
250 const std::unique_lock<std::mutex> lock(
m_mutex);
252 m_fn = std::move(fn);
256 template <
class Predicate>
257 void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
259 m_cv.wait(lock, [&] {
266 auto fn = std::move(
m_fn);
272 const bool done = pred();
292 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
299 kj::Own<kj::AsyncIoStream>&& stream_,
300 const std::function<::capnp::Capability::Client(
Connection&)>& make_client)
302 m_network(*
m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
327 template <
typename F>
336 [f = std::move(f),
this]()
mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
344 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>>
m_rpc_system;
373 ::capnp::rpc::twoparty::VatId::Builder
vat_id{
message.getRoot<::capnp::rpc::twoparty::VatId>()};
377template <
typename Interface,
typename Impl>
380 bool destroy_connection)
393 typename Interface::Client(std::move(
m_client));
429 typename Interface::Client(std::move(
m_client));
436 if (destroy_connection) {
443 Sub::construct(*
this);
446template <
typename Interface,
typename Impl>
452template <
typename Interface,
typename Impl>
463template <
typename Interface,
typename Impl>
486 m_context.connection->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(
m_context.cleanup_fns)]()
mutable {
492 std::unique_lock<std::mutex> lock(
m_context.connection->m_loop.m_mutex);
493 m_context.connection->m_loop.removeClient(lock);
513template <
typename Interface,
typename Impl>
526std::tuple<ConnThread, bool>
SetThread(
ConnThreads& threads, std::mutex& mutex,
Connection* connection,
const std::function<Thread::Client()>& make_thread);
536 std::unique_ptr<Waiter> waiter =
nullptr;
556 bool loop_thread =
false;
562template <
typename InitInterface>
565 typename InitInterface::Client init_client(
nullptr);
566 std::unique_ptr<Connection> connection;
569 loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
570 connection = std::make_unique<Connection>(loop, kj::mv(stream));
571 init_client = connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<InitInterface>();
572 Connection* connection_ptr = connection.get();
574 loop.
log() <<
"IPC client: unexpected network disconnect.";
575 delete connection_ptr;
578 return std::make_unique<ProxyClient<InitInterface>>(
579 kj::mv(init_client), connection.release(),
true);
586template <
typename InitInterface,
typename InitImpl>
593 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&
init, [](
InitImpl*){}), connection);
596 it->onDisconnect([&loop, it] {
597 loop.
log() <<
"IPC server: socket disconnected.";
605template <
typename InitInterface,
typename InitImpl>
608 auto* ptr = listener.get();
610 [&loop, &
init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream)
mutable {
611 _Serve<InitInterface>(loop, kj::mv(stream), init);
612 _Listen<InitInterface>(loop, kj::mv(listener), init);
618template <
typename InitInterface,
typename InitImpl>
621 _Serve<InitInterface>(
622 loop, loop.
m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
627template <
typename InitInterface,
typename InitImpl>
631 _Listen<InitInterface>(loop,
632 loop.
m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
Object holding network & rpc state associated with either an incoming server connection,...
void addAsyncCleanup(std::function< void()> fn)
Register asynchronous cleanup function to run on worker thread when disconnect() is called.
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
CleanupList m_async_cleanup_fns
kj::TaskSet m_on_disconnect
LoggingErrorHandler m_error_handler
~Connection()
Run cleanup functions.
::capnp::TwoPartyVatNetwork m_network
kj::Own< kj::AsyncIoStream > m_stream
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
void onDisconnect(F &&f)
Add disconnect handler.
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
ThreadMap::Client m_thread_map
Event loop implementation.
const char * m_exe_name
Process name included in thread names so combined debug output from multiple processes is easier to u...
kj::AsyncIoContext m_io_context
Capnp IO context.
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
bool done(std::unique_lock< std::mutex > &lock)
Check if loop should exit.
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
bool removeClient(std::unique_lock< std::mutex > &lock)
std::list< Connection > m_incoming_connections
List of connections.
LogFn m_log_fn
External logging callback.
void loop()
Run event loop.
CleanupList m_async_fns
Callback functions to run on async thread.
void post(const std::function< void()> &fn)
Run function on event loop thread.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
int m_num_clients
Number of clients holding references to ProxyServerBase objects that reference this event loop.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
std::mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
void * m_context
External context pointer.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
const std::function< void()> * m_post_fn
Callback function to run on event loop thread during post() or sync() call.
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
std::thread m_async_thread
Handle of an async worker thread.
std::thread::id m_thread_id
ID of the event loop thread.
void startAsyncThread(std::unique_lock< std::mutex > &lock)
Start asynchronous worker thread if necessary.
void addClient(std::unique_lock< std::mutex > &lock)
Add/remove remote client reference counts.
friend Logger & operator<<(Logger &logger, T &&value)
Logger(bool raise, LogFn &fn)
~Logger() noexcept(false)
friend Logger & operator<<(Logger &&logger, T &&value)
std::ostringstream m_buffer
Handler for kj::TaskSet failed task events.
LoggingErrorHandler(EventLoop &loop)
void taskFailed(kj::Exception &&exception) override
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Interface::Client m_client
~ProxyClientBase() noexcept
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Functions to serialize / deserialize common bitcoin types.
std::list< std::function< void()> > CleanupList
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
std::map< Connection *, ProxyClient< Thread > > ConnThreads
thread_local ThreadContext g_thread_context
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
std::function< void(bool raise, std::string message)> LogFn
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
std::string LongThreadName(const char *exe_name)
ConnThreads::iterator ConnThread
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
typename CleanupList::iterator CleanupIt
void CleanupRun(CleanupList &fns)
ThreadContext & thread_context
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_cleanup_it
Cleanup function to run when the connection is closed.
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ThreadContext & m_thread_context
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
CallContext & call_context
ProxyServer & proxy_server
Vat id for server side of connection.
::capnp::MallocMessageBuilder message
::capnp::rpc::twoparty::VatId::Builder vat_id
ConnThreads request_threads
When client is making a request to a server, this is the thread argument it passes in the request,...
ConnThreads callback_threads
When client is making a request to a server, this is the callbackThread argument it passes in the req...
std::string thread_name
Identifying string for debug.
Single element task queue used to handle recursive capnp calls.
std::function< void()> m_fn
std::condition_variable m_cv
void wait(std::unique_lock< std::mutex > &lock, Predicate pred)