9#include <mp/proxy.capnp.h>
14#include <capnp/capability.h>
16#include <condition_variable>
20#include <kj/async-io.h>
21#include <kj/async-prelude.h>
24#include <kj/function.h>
32#include <sys/socket.h>
44 KJ_LOG(ERROR,
"Uncaught exception in daemonized task.", exception);
45 m_loop.
log() <<
"Uncaught exception in daemonized task.";
52 m_loop->m_num_clients += 1;
62 auto loop_lock{
PtrOrValue{m_lock, loop->m_mutex}};
63 loop_lock->assert_locked(loop->m_mutex);
64 assert(loop->m_num_clients > 0);
65 loop->m_num_clients -= 1;
67 loop->m_cv.notify_all();
68 int post_fd{loop->m_post_fd};
71 KJ_SYSCALL(write(post_fd, &buffer, 1));
75 if (relock) loop_lock->lock();
182 m_async_fns->emplace_back(std::move(fn));
187 : m_exe_name(exe_name),
188 m_io_context(kj::setupAsyncIo()),
189 m_task_set(new kj::TaskSet(m_error_handler)),
194 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
203 KJ_ASSERT(m_post_fn ==
nullptr);
204 KJ_ASSERT(!m_async_fns);
207 KJ_ASSERT(m_num_clients == 0);
223 m_async_fns.emplace();
226 kj::Own<kj::AsyncIoStream> wait_stream{
227 m_io_context.lowLevelProvider->wrapSocketFd(
m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
231 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(
m_io_context.waitScope);
232 if (read_bytes != 1)
throw std::logic_error(
"EventLoop wait_stream closed unexpectedly");
246 log() <<
"EventLoop::loop done, cancelling event listeners.";
248 log() <<
"EventLoop::loop bye.";
249 wait_stream =
nullptr;
250 KJ_SYSCALL(::close(post_fd));
271 KJ_SYSCALL(write(post_fd, &buffer, 1));
282 }
else if (!m_async_fns->empty()) {
285 while (m_async_fns) {
286 if (!m_async_fns->empty()) {
288 const std::function<void()> fn = std::move(m_async_fns->front());
289 m_async_fns->pop_front();
304 assert(m_num_clients >= 0);
305 return m_num_clients == 0 && m_async_fns->empty();
310 const std::unique_lock<std::mutex> lock(mutex);
311 auto thread = threads.find(connection);
312 if (thread != threads.end())
return {thread,
false};
313 thread = threads.emplace(
314 std::piecewise_construct, std::forward_as_tuple(connection),
315 std::forward_as_tuple(make_thread(), connection,
false)).first;
316 thread->second.setDisconnectCallback([&threads, &mutex, thread] {
326 const std::unique_lock<std::mutex> lock(mutex);
327 thread->second.m_disconnect_cb.reset();
328 threads.erase(thread);
330 return {thread,
true};
338 if (m_disconnect_cb) {
339 m_context.connection->removeSyncCleanup(*m_disconnect_cb);
347 m_disconnect_cb =
m_context.connection->addSyncCleanup(fn);
351 : m_thread_context(thread_context), m_thread(
std::move(thread))
353 assert(m_thread_context.waiter.get() !=
nullptr);
358 if (!m_thread.joinable())
return;
364 assert(m_thread_context.waiter.get());
365 std::unique_ptr<Waiter> waiter;
367 const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
370 waiter = std::move(m_thread_context.waiter);
377 m_thread_context.request_threads.clear();
378 m_thread_context.callback_threads.clear();
380 waiter->m_cv.notify_all();
387 context.getResults().setResult(m_thread_context.thread_name);
388 return kj::READY_NOW;
395 const std::string from = context.getParams().getName();
396 std::promise<ThreadContext*> thread_context;
397 std::thread thread([&thread_context, from,
this]() {
406 auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
407 auto thread_client =
m_connection.m_threads.add(kj::mv(thread_server));
408 context.getResults().setResult(kj::mv(thread_client));
409 return kj::READY_NOW;
Object holding network & rpc state associated with either an incoming server connection,...
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
~Connection()
Run cleanup functions.
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
void loop()
Run event loop.
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
LogOptions m_log_opts
Logging options.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
std::thread m_async_thread
Handle of an async worker thread.
std::thread::id m_thread_id
ID of the event loop thread.
void post(kj::Function< void()> fn)
Run function on event loop thread.
Event loop smart pointer automatically managing m_num_clients.
EventLoopRef(EventLoop &loop, Lock *lock=nullptr)
void reset(bool relock=false)
std::unique_lock< std::mutex > m_lock
void taskFailed(kj::Exception &&exception) override
boost::signals2::scoped_connection m_connection
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
std::list< std::function< void()> > CleanupList
std::map< Connection *, ProxyClient< Thread > > ConnThreads
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::atomic< int > server_reqs
thread_local ThreadContext g_thread_context
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
std::function< void(bool raise, std::string message)> LogFn
std::string LongThreadName(const char *exe_name)
typename CleanupList::iterator CleanupIt
LogFn log_fn
External logging callback.
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
ProxyContext(Connection *connection)
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Convenient wrapper around std::variant<T*, T>
std::unique_ptr< Waiter > waiter
Waiter object used to allow client threads blocked waiting for a server response to execute callbacks...
bool loop_thread
Whether this thread is a capnp event loop thread.
std::string thread_name
Identifying string for debug.