9#include <mp/proxy.capnp.h>
14#include <capnp/capability.h>
15#include <capnp/common.h>
17#include <condition_variable>
21#include <kj/async-io.h>
22#include <kj/async-prelude.h>
25#include <kj/function.h>
33#include <sys/socket.h>
45 KJ_LOG(ERROR,
"Uncaught exception in daemonized task.", exception);
53 m_loop->m_num_clients += 1;
63 auto loop_lock{
PtrOrValue{m_lock, loop->m_mutex}};
64 loop_lock->assert_locked(loop->m_mutex);
65 assert(loop->m_num_clients > 0);
66 loop->m_num_clients -= 1;
68 loop->m_cv.notify_all();
69 int post_fd{loop->m_post_fd};
72 KJ_SYSCALL(write(post_fd, &buffer, 1));
76 if (relock) loop_lock->lock();
191 m_async_fns->emplace_back(std::move(fn));
196 : m_exe_name(exe_name),
197 m_io_context(kj::setupAsyncIo()),
198 m_task_set(new kj::TaskSet(m_error_handler)),
199 m_log_opts(
std::move(log_opts)),
203 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
212 KJ_ASSERT(m_post_fn ==
nullptr);
213 KJ_ASSERT(!m_async_fns);
216 KJ_ASSERT(m_num_clients == 0);
232 m_async_fns.emplace();
235 kj::Own<kj::AsyncIoStream> wait_stream{
236 m_io_context.lowLevelProvider->wrapSocketFd(
m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
240 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(
m_io_context.waitScope);
241 if (read_bytes != 1)
throw std::logic_error(
"EventLoop wait_stream closed unexpectedly");
255 MP_LOG(*
this,
Log::Info) <<
"EventLoop::loop done, cancelling event listeners.";
258 wait_stream =
nullptr;
259 KJ_SYSCALL(::close(post_fd));
280 KJ_SYSCALL(write(post_fd, &buffer, 1));
291 }
else if (!m_async_fns->empty()) {
294 while (m_async_fns) {
295 if (!m_async_fns->empty()) {
297 const std::function<void()> fn = std::move(m_async_fns->front());
298 m_async_fns->pop_front();
313 assert(m_num_clients >= 0);
314 return m_num_clients == 0 && m_async_fns->empty();
324 std::tie(thread, inserted) = threads.ref.try_emplace(connection);
327 thread->second.emplace(make_thread(), connection,
false);
328 thread->second->m_disconnect_cb = connection->
addSyncCleanup([threads, thread] {
337 thread->second->m_disconnect_cb.reset();
341 threads.ref.erase(thread);
344 return {thread, inserted};
352 if (m_disconnect_cb) {
358 if (m_disconnect_cb) {
359 m_context.connection->removeSyncCleanup(*m_disconnect_cb);
366 : m_thread_context(thread_context), m_thread(
std::move(thread))
368 assert(m_thread_context.waiter.get() !=
nullptr);
373 if (!m_thread.joinable())
return;
379 assert(m_thread_context.waiter.get());
380 std::unique_ptr<Waiter> waiter;
382 const Lock lock(m_thread_context.waiter->m_mutex);
385 waiter = std::move(m_thread_context.waiter);
392 m_thread_context.request_threads.clear();
393 m_thread_context.callback_threads.clear();
395 waiter->m_cv.notify_all();
402 context.getResults().setResult(m_thread_context.thread_name);
403 return kj::READY_NOW;
410 const std::string from = context.getParams().getName();
411 std::promise<ThreadContext*> thread_context;
412 std::thread thread([&thread_context, from,
this]() {
421 auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
422 auto thread_client =
m_connection.m_threads.add(kj::mv(thread_server));
423 context.getResults().setResult(kj::mv(thread_client));
424 return kj::READY_NOW;
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
~Connection()
Run cleanup functions.
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
void loop()
Run event loop.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
std::thread m_async_thread
Handle of an async worker thread.
std::thread::id m_thread_id
ID of the event loop thread.
void post(kj::Function< void()> fn)
Run function on event loop thread.
Event loop smart pointer automatically managing m_num_clients.
EventLoopRef(EventLoop &loop, Lock *lock=nullptr)
void reset(bool relock=false)
std::unique_lock< std::mutex > m_lock
void taskFailed(kj::Exception &&exception) override
boost::signals2::scoped_connection m_connection
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
kj::StringPtr KJ_STRINGIFY(Log flags)
std::list< std::function< void()> > CleanupList
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::atomic< int > server_reqs
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
thread_local ThreadContext g_thread_context
Log
Log flags. Update stringify function if changed!
std::string LongThreadName(const char *exe_name)
ConnThreads::iterator ConnThread
typename CleanupList::iterator CleanupIt
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ProxyContext(Connection *connection)
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Convenient wrapper around std::variant<T*, T>
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
std::unique_ptr< Waiter > waiter
Waiter object used to allow remote clients to execute code on this thread.
bool loop_thread
Whether this thread is a capnp event loop thread.
std::string thread_name
Identifying string for debug.