Bitcoin Core 30.99.0
P2P Digital Currency
proxy-io.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_IO_H
6#define MP_PROXY_IO_H
7
8#include <mp/proxy.h>
9#include <mp/util.h>
10
11#include <mp/proxy.capnp.h>
12
13#include <capnp/rpc-twoparty.h>
14
15#include <assert.h>
16#include <condition_variable>
17#include <functional>
18#include <kj/function.h>
19#include <map>
20#include <memory>
21#include <optional>
22#include <sstream>
23#include <string>
24#include <thread>
25
26namespace mp {
27struct ThreadContext;
28
30{
32};
33
35{
39 {
40 }
41};
42
43template <typename ProxyServer, typename CallContext_>
45{
46 using CallContext = CallContext_;
47
50 int req;
51
54 {
55 }
56};
57
58template <typename Interface, typename Params, typename Results>
59using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
60
61template <>
62struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
63{
65 // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
66 ProxyClient(const ProxyClient&) = delete;
68
69 void setDisconnectCallback(const std::function<void()>& fn);
70
80 std::optional<CleanupIt> m_disconnect_cb;
81};
82
83template <>
84struct ProxyServer<Thread> final : public Thread::Server
85{
86public:
87 ProxyServer(ThreadContext& thread_context, std::thread&& thread);
89 kj::Promise<void> getName(GetNameContext context) override;
91 std::thread m_thread;
92};
93
95class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
96{
97public:
99 void taskFailed(kj::Exception&& exception) override;
101};
102
103using LogFn = std::function<void(bool raise, std::string message)>;
104
106{
107public:
108 Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {}
109 Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {}
110 ~Logger() noexcept(false)
111 {
112 if (m_fn) m_fn(m_raise, m_buffer.str());
113 }
114
115 template <typename T>
116 friend Logger& operator<<(Logger& logger, T&& value)
117 {
118 if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
119 return logger;
120 }
121
122 template <typename T>
123 friend Logger& operator<<(Logger&& logger, T&& value)
124 {
125 return logger << std::forward<T>(value);
126 }
127
130 std::ostringstream m_buffer;
131};
132
134
137
140 size_t max_chars{200};
141};
142
143std::string LongThreadName(const char* exe_name);
144
171{
172public:
174 EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr);
175 ~EventLoop();
176
180 void loop();
181
184 void post(kj::Function<void()> fn);
185
189 template <typename Callable>
190 void sync(Callable&& callable)
191 {
192 post(std::forward<Callable>(callable));
193 }
194
197 void addAsyncCleanup(std::function<void()> fn);
198
211
213 bool done() const MP_REQUIRES(m_mutex);
214
216 {
217 Logger logger(false, m_log_opts.log_fn);
218 logger << "{" << LongThreadName(m_exe_name) << "} ";
219 return logger;
220 }
221 Logger logPlain() { return {false, m_log_opts.log_fn}; }
222 Logger raise() { return {true, m_log_opts.log_fn}; }
223
226 const char* m_exe_name;
227
229 std::thread::id m_thread_id = std::this_thread::get_id();
230
233 std::thread m_async_thread;
234
236 kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
237
239 std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
240
242 int m_wait_fd = -1;
243
245 int m_post_fd = -1;
246
249 int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
250
254 std::condition_variable m_cv;
255
257 kj::AsyncIoContext m_io_context;
258
261
263 std::unique_ptr<kj::TaskSet> m_task_set;
264
266 std::list<Connection> m_incoming_connections;
267
270
273};
274
281struct Waiter
282{
283 Waiter() = default;
284
285 template <typename Fn>
286 void post(Fn&& fn)
287 {
288 const std::unique_lock<std::mutex> lock(m_mutex);
289 assert(!m_fn);
290 m_fn = std::forward<Fn>(fn);
291 m_cv.notify_all();
292 }
293
294 template <class Predicate>
295 void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
296 {
297 m_cv.wait(lock, [&] {
298 // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
299 // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
300 // after the fn() call and before the lock.lock() call in this loop
301 // in the case where a capnp response is sent and a brand new
302 // request is immediately received.
303 while (m_fn) {
304 auto fn = std::move(*m_fn);
305 m_fn.reset();
306 Unlock(lock, fn);
307 }
308 const bool done = pred();
309 return done;
310 });
311 }
312
320 std::mutex m_mutex;
321 std::condition_variable m_cv;
322 std::optional<kj::Function<void()>> m_fn;
323};
324
331{
332public:
333 Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
334 : m_loop(loop), m_stream(kj::mv(stream_)),
335 m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
336 m_rpc_system(::capnp::makeRpcClient(m_network)) {}
338 kj::Own<kj::AsyncIoStream>&& stream_,
339 const std::function<::capnp::Capability::Client(Connection&)>& make_client)
340 : m_loop(loop), m_stream(kj::mv(stream_)),
341 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
342 m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
343
349 ~Connection();
350
354 CleanupIt addSyncCleanup(std::function<void()> fn);
356
358 template <typename F>
359 void onDisconnect(F&& f)
360 {
361 // Add disconnect handler to local TaskSet to ensure it is cancelled and
362 // will never run after connection object is destroyed. But when disconnect
363 // handler fires, do not call the function f right away, instead add it
364 // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
365 // error in cases where f deletes this Connection object.
366 m_on_disconnect.add(m_network.onDisconnect().then(
367 [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
368 }
369
371 kj::Own<kj::AsyncIoStream> m_stream;
374 ::capnp::TwoPartyVatNetwork m_network;
375 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
376
377 // ThreadMap interface client, used to create a remote server thread when an
378 // client IPC call is being made for the first time from a new thread.
379 ThreadMap::Client m_thread_map{nullptr};
380
383 ::capnp::CapabilityServerSet<Thread> m_threads;
384
389};
390
400{
401 ::capnp::word scratch[4]{};
402 ::capnp::MallocMessageBuilder message{scratch};
403 ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
404 ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
405};
406
407template <typename Interface, typename Impl>
409 Connection* connection,
410 bool destroy_connection)
411 : m_client(std::move(client)), m_context(connection)
412
413{
414 // Handler for the connection getting destroyed before this client object.
415 auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
416 // Release client capability by move-assigning to temporary.
417 {
418 typename Interface::Client(std::move(m_client));
419 }
420 Lock lock{m_context.loop->m_mutex};
421 m_context.connection = nullptr;
422 });
423
424 // Two shutdown sequences are supported:
425 //
426 // - A normal sequence where client proxy objects are deleted by external
427 // code that no longer needs them
428 //
429 // - A garbage collection sequence where the connection or event loop shuts
430 // down while external code is still holding client references.
431 //
432 // The first case is handled here when m_context.connection is not null. The
433 // second case is handled by the disconnect_cb function, which sets
434 // m_context.connection to null so nothing happens here.
435 m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
436 {
437 // If the capnp interface defines a destroy method, call it to destroy
438 // the remote object, waiting for it to be deleted server side. If the
439 // capnp interface does not define a destroy method, this will just call
440 // an empty stub defined in the ProxyClientBase class and do nothing.
441 Sub::destroy(*this);
442
443 // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
444 m_context.loop->sync([&]() {
445 // Remove disconnect callback on cleanup so it doesn't run and try
446 // to access this object after it's destroyed. This call needs to
447 // run inside loop->sync() on the event loop thread because
448 // otherwise, if there were an ill-timed disconnect, the
449 // onDisconnect handler could fire and delete the Connection object
450 // before the removeSyncCleanup call.
452
453 // Release client capability by move-assigning to temporary.
454 {
455 typename Interface::Client(std::move(m_client));
456 }
457 if (destroy_connection) {
458 delete m_context.connection;
459 m_context.connection = nullptr;
460 }
461 });
462 }
463 });
464 Sub::construct(*this);
465}
466
467template <typename Interface, typename Impl>
469{
470 CleanupRun(m_context.cleanup_fns);
471}
472
473template <typename Interface, typename Impl>
474ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
475 : m_impl(std::move(impl)), m_context(&connection)
476{
477 assert(m_impl);
478}
479
492template <typename Interface, typename Impl>
494{
495 if (m_impl) {
496 // If impl is non-null at this point, it means no client is waiting for
497 // the m_impl server object to be destroyed synchronously. This can
498 // happen either if the interface did not define a "destroy" method (see
499 // invokeDestroy method below), or if a destroy method was defined, but
500 // the connection was broken before it could be called.
501 //
502 // In either case, be conservative and run the cleanup on an
503 // asynchronous thread, to avoid destructors or cleanup functions
504 // blocking or deadlocking the current EventLoop thread, since they
505 // could be making IPC calls.
506 //
507 // Technically this is a little too conservative since if the interface
508 // defines a "destroy" method, but the destroy method does not accept a
509 // Context parameter specifying a worker thread, the cleanup method
510 // would run on the EventLoop thread normally (when connection is
511 // unbroken), but will not run on the EventLoop thread now (when
512 // connection is broken). Probably some refactoring of the destructor
513 // and invokeDestroy function is possible to make this cleaner and more
514 // consistent.
515 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
516 impl.reset();
517 CleanupRun(fns);
518 });
519 }
520 assert(m_context.cleanup_fns.empty());
521}
522
540template <typename Interface, typename Impl>
542{
543 m_impl.reset();
544 CleanupRun(m_context.cleanup_fns);
545}
546
547using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
548using ConnThread = ConnThreads::iterator;
549
550// Retrieve ProxyClient<Thread> object associated with this connection from a
551// map, or create a new one and insert it into the map. Return map iterator and
552// inserted bool.
553std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
554
556{
558 std::string thread_name;
559
563 std::unique_ptr<Waiter> waiter = nullptr;
564
570
579
583 bool loop_thread = false;
584};
585
589template <typename InitInterface>
590std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
591{
592 typename InitInterface::Client init_client(nullptr);
593 std::unique_ptr<Connection> connection;
594 loop.sync([&] {
595 auto stream =
596 loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
597 connection = std::make_unique<Connection>(loop, kj::mv(stream));
598 init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
599 Connection* connection_ptr = connection.get();
600 connection->onDisconnect([&loop, connection_ptr] {
601 loop.log() << "IPC client: unexpected network disconnect.";
602 delete connection_ptr;
603 });
604 });
605 return std::make_unique<ProxyClient<InitInterface>>(
606 kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
607}
608
613template <typename InitInterface, typename InitImpl>
614void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
615{
616 loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
617 // Disable deleter so proxy server object doesn't attempt to delete the
618 // init implementation when the proxy client is destroyed or
619 // disconnected.
620 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
621 });
622 auto it = loop.m_incoming_connections.begin();
623 it->onDisconnect([&loop, it] {
624 loop.log() << "IPC server: socket disconnected.";
625 loop.m_incoming_connections.erase(it);
626 });
627}
628
632template <typename InitInterface, typename InitImpl>
633void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
634{
635 auto* ptr = listener.get();
636 loop.m_task_set->add(ptr->accept().then(
637 [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
638 _Serve<InitInterface>(loop, kj::mv(stream), init);
639 _Listen<InitInterface>(loop, kj::mv(listener), init);
640 }));
641}
642
645template <typename InitInterface, typename InitImpl>
646void ServeStream(EventLoop& loop, int fd, InitImpl& init)
647{
648 _Serve<InitInterface>(
649 loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
650}
651
654template <typename InitInterface, typename InitImpl>
656{
657 loop.sync([&]() {
658 _Listen<InitInterface>(loop,
659 loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
660 init);
661 });
662}
663
664extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
665// Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
666// cannot be thread_local" which should not be a problem on modern platforms, and
667// could lead to a small memory leak at worst on older ones.
668
669} // namespace mp
670
671#endif // MP_PROXY_IO_H
if(!SetupNetworking())
Object holding network & rpc state associated with either an incoming server connection,...
Definition: proxy-io.h:331
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:143
kj::TaskSet m_on_disconnect
Definition: proxy-io.h:373
EventLoopRef m_loop
Definition: proxy-io.h:370
LoggingErrorHandler m_error_handler
Definition: proxy-io.h:372
~Connection()
Run cleanup functions.
Definition: proxy.cpp:82
::capnp::TwoPartyVatNetwork m_network
Definition: proxy-io.h:374
kj::Own< kj::AsyncIoStream > m_stream
Definition: proxy-io.h:371
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Definition: proxy-io.h:337
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
Definition: proxy-io.h:333
void onDisconnect(F &&f)
Add disconnect handler.
Definition: proxy-io.h:359
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
Definition: proxy-io.h:383
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:388
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:375
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:157
ThreadMap::Client m_thread_map
Definition: proxy-io.h:379
Event loop implementation.
Definition: proxy-io.h:171
const char * m_exe_name
Process name included in thread names so combined debug output from multiple processes is easier to u...
Definition: proxy-io.h:226
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:257
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
Definition: proxy.cpp:276
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
Definition: proxy-io.h:190
std::condition_variable m_cv
Definition: proxy-io.h:254
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
Definition: proxy.cpp:186
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
Definition: proxy.cpp:163
Logger log()
Definition: proxy-io.h:215
std::list< Connection > m_incoming_connections
List of connections.
Definition: proxy-io.h:266
void loop()
Run event loop.
Definition: proxy.cpp:214
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:253
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
Definition: proxy.cpp:302
Logger raise()
Definition: proxy-io.h:222
int m_post_fd
Pipe write handle used to wake up the event loop thread.
Definition: proxy-io.h:245
int m_num_clients MP_GUARDED_BY(m_mutex)=0
Number of clients holding references to ProxyServerBase objects that reference this event loop.
LogOptions m_log_opts
Logging options.
Definition: proxy-io.h:269
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:263
void * m_context
External context pointer.
Definition: proxy-io.h:272
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
Definition: proxy-io.h:242
Logger logPlain()
Definition: proxy-io.h:221
std::optional< CleanupList > m_async_fns MP_GUARDED_BY(m_mutex)
Callback functions to run on async thread.
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
Definition: proxy-io.h:260
std::thread m_async_thread
Handle of an async worker thread.
Definition: proxy-io.h:233
kj::Function< void()> *m_post_fn MP_GUARDED_BY(m_mutex)
Callback function to run on event loop thread during post() or sync() call.
std::thread::id m_thread_id
ID of the event loop thread.
Definition: proxy-io.h:229
void post(kj::Function< void()> fn)
Run function on event loop thread.
Definition: proxy.cpp:258
Event loop smart pointer automatically managing m_num_clients.
Definition: proxy.h:51
Definition: util.h:170
friend Logger & operator<<(Logger &logger, T &&value)
Definition: proxy-io.h:116
Logger(bool raise, LogFn &fn)
Definition: proxy-io.h:108
Logger(Logger &&logger)
Definition: proxy-io.h:109
~Logger() noexcept(false)
Definition: proxy-io.h:110
LogFn & m_fn
Definition: proxy-io.h:129
bool m_raise
Definition: proxy-io.h:128
friend Logger & operator<<(Logger &&logger, T &&value)
Definition: proxy-io.h:123
std::ostringstream m_buffer
Definition: proxy-io.h:130
Handler for kj::TaskSet failed task events.
Definition: proxy-io.h:96
EventLoop & m_loop
Definition: proxy-io.h:100
LoggingErrorHandler(EventLoop &loop)
Definition: proxy-io.h:98
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:42
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Definition: proxy.h:81
Interface::Client m_client
Definition: proxy.h:132
ProxyContext m_context
Definition: proxy.h:133
~ProxyClientBase() noexcept
Definition: proxy-io.h:468
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
Definition: proxy-io.h:408
Context m_context
Definition: protocol.cpp:101
#define MP_REQUIRES(x)
Definition: util.h:155
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
void Unlock(Lock &lock, Callback &&callback)
Definition: util.h:195
std::list< std::function< void()> > CleanupList
Definition: proxy.h:36
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
Definition: proxy-io.h:646
std::map< Connection *, ProxyClient< Thread > > ConnThreads
Definition: proxy-io.h:547
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:40
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:308
std::function< void(bool raise, std::string message)> LogFn
Definition: proxy-io.h:103
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
Definition: proxy-io.h:633
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
Definition: proxy-io.h:590
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
Definition: proxy-io.h:655
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:414
ConnThreads::iterator ConnThread
Definition: proxy-io.h:548
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
Definition: proxy-io.h:614
typename CleanupList::iterator CleanupIt
Definition: proxy.h:37
void CleanupRun(CleanupList &fns)
Definition: proxy.h:39
ThreadContext & thread_context
Definition: proxy-io.h:36
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Definition: proxy-io.h:37
Connection & connection
Definition: proxy-io.h:31
LogFn log_fn
External logging callback.
Definition: proxy-io.h:136
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
Definition: proxy-io.h:140
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Definition: proxy-io.h:80
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:25
EventLoopRef loop
Definition: proxy.h:71
Connection * connection
Definition: proxy.h:70
CleanupList cleanup_fns
Definition: proxy.h:72
std::thread m_thread
Definition: proxy-io.h:91
ThreadContext & m_thread_context
Definition: proxy-io.h:90
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
Definition: proxy.h:148
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
Definition: proxy-io.h:493
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Definition: proxy.h:170
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
CallContext_ CallContext
Definition: proxy-io.h:46
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
Definition: proxy-io.h:52
CallContext & call_context
Definition: proxy-io.h:49
ProxyServer & proxy_server
Definition: proxy-io.h:48
Vat id for server side of connection.
Definition: proxy-io.h:400
::capnp::word scratch[4]
Definition: proxy-io.h:401
::capnp::MallocMessageBuilder message
Definition: proxy-io.h:402
::capnp::rpc::twoparty::VatId::Builder vat_id
Definition: proxy-io.h:403
ConnThreads request_threads
When client is making a request to a server, this is the thread argument it passes in the request,...
Definition: proxy-io.h:578
ConnThreads callback_threads
When client is making a request to a server, this is the callbackThread argument it passes in the req...
Definition: proxy-io.h:569
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:558
Single element task queue used to handle recursive capnp calls.
Definition: proxy-io.h:282
std::optional< kj::Function< void()> > m_fn
Definition: proxy-io.h:322
std::mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
Definition: proxy-io.h:320
std::condition_variable m_cv
Definition: proxy-io.h:321
void wait(std::unique_lock< std::mutex > &lock, Predicate pred)
Definition: proxy-io.h:295
void post(Fn &&fn)
Definition: proxy-io.h:286
Waiter()=default
assert(!tx.IsCoinBase())