9#include <mp/proxy.capnp.h>
15#include <capnp/blob.h>
16#include <capnp/capability.h>
17#include <condition_variable>
20#include <kj/async-io.h>
24#include <kj/exception.h>
32#include <sys/socket.h>
40template <
typename Interface>
47 KJ_LOG(ERROR,
"Uncaught exception in daemonized task.", exception);
48 m_loop.
log() <<
"Uncaught exception in daemonized task.";
158 : m_exe_name(exe_name),
159 m_io_context(kj::setupAsyncIo()),
160 m_task_set(new kj::TaskSet(m_error_handler)),
161 m_log_fn(
std::move(log_fn)),
165 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
173 const std::lock_guard<std::mutex> lock(
m_mutex);
191 kj::Own<kj::AsyncIoStream> wait_stream{
192 m_io_context.lowLevelProvider->wrapSocketFd(
m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
196 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(
m_io_context.waitScope);
197 if (read_bytes != 1)
throw std::logic_error(
"EventLoop wait_stream closed unexpectedly");
198 std::unique_lock<std::mutex> lock(
m_mutex);
203 }
else if (
done(lock)) {
211 log() <<
"EventLoop::loop done, cancelling event listeners.";
213 log() <<
"EventLoop::loop bye.";
214 wait_stream =
nullptr;
215 KJ_SYSCALL(::close(post_fd));
216 const std::unique_lock<std::mutex> lock(
m_mutex);
227 std::unique_lock<std::mutex> lock(
m_mutex);
234 KJ_SYSCALL(write(post_fd, &buffer, 1));
250 KJ_SYSCALL(write(post_fd, &buffer, 1));
262 std::unique_lock<std::mutex> lock(
m_mutex);
266 const std::function<void()> fn = std::move(
m_async_fns.front());
290 const std::unique_lock<std::mutex> lock(mutex);
291 auto thread = threads.find(connection);
292 if (thread != threads.end())
return {thread,
false};
293 thread = threads.emplace(
294 std::piecewise_construct, std::forward_as_tuple(connection),
295 std::forward_as_tuple(make_thread(), connection,
false)).first;
296 thread->second.setCleanup([&threads, &mutex, thread] {
305 thread->second.m_cleanup_it.reset();
307 const std::unique_lock<std::mutex> lock(mutex);
308 threads.erase(thread);
310 return {thread,
true};
319 m_context.connection->removeSyncCleanup(*m_cleanup_it);
327 m_cleanup_it =
m_context.connection->addSyncCleanup(fn);
331 : m_thread_context(thread_context), m_thread(
std::move(thread))
333 assert(m_thread_context.waiter.get() !=
nullptr);
338 if (!m_thread.joinable())
return;
344 assert(m_thread_context.waiter.get());
345 std::unique_ptr<Waiter> waiter;
347 const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
350 waiter = std::move(m_thread_context.waiter);
357 m_thread_context.request_threads.clear();
358 m_thread_context.callback_threads.clear();
360 waiter->m_cv.notify_all();
367 context.getResults().setResult(m_thread_context.thread_name);
368 return kj::READY_NOW;
375 const std::string from = context.getParams().getName();
376 std::promise<ThreadContext*> thread_context;
377 std::thread thread([&thread_context, from,
this]() {
386 auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
387 auto thread_client =
m_connection.m_threads.add(kj::mv(thread_server));
388 context.getResults().setResult(kj::mv(thread_client));
389 return kj::READY_NOW;
Object holding network & rpc state associated with either an incoming server connection,...
void addAsyncCleanup(std::function< void()> fn)
Register asynchronous cleanup function to run on worker thread when disconnect() is called.
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
CleanupList m_async_cleanup_fns
~Connection()
Run cleanup functions.
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
void removeSyncCleanup(CleanupIt it)
kj::AsyncIoContext m_io_context
Capnp IO context.
bool done(std::unique_lock< std::mutex > &lock)
Check if loop should exit.
std::condition_variable m_cv
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
bool removeClient(std::unique_lock< std::mutex > &lock)
void loop()
Run event loop.
CleanupList m_async_fns
Callback functions to run on async thread.
void post(const std::function< void()> &fn)
Run function on event loop thread.
int m_post_fd
Pipe write handle used to wake up the event loop thread.
int m_num_clients
Number of clients holding references to ProxyServerBase objects that reference this event loop.
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
std::mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
const std::function< void()> * m_post_fn
Callback function to run on event loop thread during post() or sync() call.
std::thread m_async_thread
Handle of an async worker thread.
std::thread::id m_thread_id
ID of the event loop thread.
void startAsyncThread(std::unique_lock< std::mutex > &lock)
Start asynchronous worker thread if necessary.
void addClient(std::unique_lock< std::mutex > &lock)
Add/remove remote client reference counts.
void taskFailed(kj::Exception &&exception) override
boost::signals2::scoped_connection m_connection
Functions to serialize / deserialize common bitcoin types.
void Unlock(Lock &lock, Callback &&callback)
std::map< Connection *, ProxyClient< Thread > > ConnThreads
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
std::atomic< int > server_reqs
thread_local ThreadContext g_thread_context
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
std::function< void(bool raise, std::string message)> LogFn
std::string LongThreadName(const char *exe_name)
typename CleanupList::iterator CleanupIt
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
std::unique_ptr< Waiter > waiter
Waiter object used to allow client threads blocked waiting for a server response to execute callbacks...
bool loop_thread
Whether this thread is a capnp event loop thread.
std::string thread_name
Identifying string for debug.