9#include <mp/proxy.capnp.h>
14#include <capnp/capability.h>
15#include <capnp/common.h>
17#include <condition_variable>
21#include <kj/async-io.h>
22#include <kj/async-prelude.h>
25#include <kj/function.h>
32#include <sys/socket.h>
44 KJ_LOG(ERROR,
"Uncaught exception in daemonized task.", exception);
45 m_loop.
log() <<
"Uncaught exception in daemonized task.";
52 m_loop->m_num_clients += 1;
62 auto loop_lock{
PtrOrValue{m_lock, loop->m_mutex}};
63 loop_lock->assert_locked(loop->m_mutex);
64 assert(loop->m_num_clients > 0);
65 loop->m_num_clients -= 1;
67 loop->m_cv.notify_all();
68 int post_fd{loop->m_post_fd};
71 KJ_SYSCALL(write(post_fd, &buffer, 1));
75 if (relock) loop_lock->lock();
190 m_async_fns->emplace_back(std::move(fn));
195 : m_exe_name(exe_name),
196 m_io_context(kj::setupAsyncIo()),
197 m_task_set(new kj::TaskSet(m_error_handler)),
202 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
211 KJ_ASSERT(m_post_fn ==
nullptr);
212 KJ_ASSERT(!m_async_fns);
215 KJ_ASSERT(m_num_clients == 0);
231 m_async_fns.emplace();
234 kj::Own<kj::AsyncIoStream> wait_stream{
235 m_io_context.lowLevelProvider->wrapSocketFd(
m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
239 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(
m_io_context.waitScope);
240 if (read_bytes != 1)
throw std::logic_error(
"EventLoop wait_stream closed unexpectedly");
254 log() <<
"EventLoop::loop done, cancelling event listeners.";
256 log() <<
"EventLoop::loop bye.";
257 wait_stream =
nullptr;
258 KJ_SYSCALL(::close(post_fd));
279 KJ_SYSCALL(write(post_fd, &buffer, 1));
290 }
else if (!m_async_fns->empty()) {
293 while (m_async_fns) {
294 if (!m_async_fns->empty()) {
296 const std::function<void()> fn = std::move(m_async_fns->front());
297 m_async_fns->pop_front();
312 assert(m_num_clients >= 0);
313 return m_num_clients == 0 && m_async_fns->empty();
323 std::tie(thread, inserted) = threads.ref.try_emplace(connection);
326 thread->second.emplace(make_thread(), connection,
false);
327 thread->second->m_disconnect_cb = connection->
addSyncCleanup([threads, thread] {
336 thread->second->m_disconnect_cb.reset();
340 threads.ref.erase(thread);
343 return {thread, inserted};
351 if (m_disconnect_cb) {
357 if (m_disconnect_cb) {
358 m_context.connection->removeSyncCleanup(*m_disconnect_cb);
365 : m_thread_context(thread_context), m_thread(
std::move(thread))
367 assert(m_thread_context.waiter.get() !=
nullptr);
372 if (!m_thread.joinable())
return;
378 assert(m_thread_context.waiter.get());
379 std::unique_ptr<Waiter> waiter;
381 const Lock lock(m_thread_context.waiter->m_mutex);
384 waiter = std::move(m_thread_context.waiter);
391 m_thread_context.request_threads.clear();
392 m_thread_context.callback_threads.clear();
394 waiter->m_cv.notify_all();
401 context.getResults().setResult(m_thread_context.thread_name);
402 return kj::READY_NOW;
409 const std::string from = context.getParams().getName();
410 std::promise<ThreadContext*> thread_context;
411 std::thread thread([&thread_context, from,
this]() {
420 auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
421 auto thread_client =
m_connection.m_threads.add(kj::mv(thread_server));
422 context.getResults().setResult(kj::mv(thread_client));
423 return kj::READY_NOW;
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
~Connection()
Run cleanup functions.
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
void loop()
Run event loop.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
std::thread m_async_thread
Handle of an async worker thread.
std::thread::id m_thread_id
ID of the event loop thread.
void post(kj::Function< void()> fn)
Run function on event loop thread.
Event loop smart pointer automatically managing m_num_clients.
EventLoopRef(EventLoop &loop, Lock *lock=nullptr)
void reset(bool relock=false)
std::unique_lock< std::mutex > m_lock
void taskFailed(kj::Exception &&exception) override
boost::signals2::scoped_connection m_connection
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
std::list< std::function< void()> > CleanupList
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::atomic< int > server_reqs
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
thread_local ThreadContext g_thread_context
std::function< void(bool raise, std::string message)> LogFn
std::string LongThreadName(const char *exe_name)
ConnThreads::iterator ConnThread
typename CleanupList::iterator CleanupIt
LogFn log_fn
External logging callback.
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ProxyContext(Connection *connection)
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Convenient wrapper around std::variant<T*, T>
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
std::unique_ptr< Waiter > waiter
Waiter object used to allow remote clients to execute code on this thread.
bool loop_thread
Whether this thread is a capnp event loop thread.
std::string thread_name
Identifying string for debug.