Bitcoin Core 29.99.0
P2P Digital Currency
proxy-io.h
Go to the documentation of this file.
1// Copyright (c) 2019 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_IO_H
6#define MP_PROXY_IO_H
7
8#include <mp/proxy.h>
9#include <mp/util.h>
10
11#include <mp/proxy.capnp.h>
12
13#include <capnp/rpc-twoparty.h>
14
15#include <assert.h>
16#include <functional>
17#include <optional>
18#include <map>
19#include <memory>
20#include <sstream>
21#include <string>
22
23namespace mp {
24struct ThreadContext;
25
27{
29};
30
32{
36 {
37 }
38};
39
40template <typename ProxyServer, typename CallContext_>
42{
43 using CallContext = CallContext_;
44
47 int req;
48
51 {
52 }
53};
54
55template <typename Interface, typename Params, typename Results>
56using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
57
58template <>
59struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
60{
62 // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
63 ProxyClient(const ProxyClient&) = delete;
65
66 void setCleanup(const std::function<void()>& fn);
67
75 std::optional<CleanupIt> m_cleanup_it;
76};
77
78template <>
79struct ProxyServer<Thread> final : public Thread::Server
80{
81public:
82 ProxyServer(ThreadContext& thread_context, std::thread&& thread);
84 kj::Promise<void> getName(GetNameContext context) override;
86 std::thread m_thread;
87};
88
90class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
91{
92public:
94 void taskFailed(kj::Exception&& exception) override;
96};
97
98using LogFn = std::function<void(bool raise, std::string message)>;
99
101{
102public:
103 Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {}
104 Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {}
105 ~Logger() noexcept(false)
106 {
107 if (m_fn) m_fn(m_raise, m_buffer.str());
108 }
109
110 template <typename T>
111 friend Logger& operator<<(Logger& logger, T&& value)
112 {
113 if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
114 return logger;
115 }
116
117 template <typename T>
118 friend Logger& operator<<(Logger&& logger, T&& value)
119 {
120 return logger << std::forward<T>(value);
121 }
122
125 std::ostringstream m_buffer;
128std::string LongThreadName(const char* exe_name);
129
134{
135public:
137 EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr);
138 ~EventLoop();
139
143 void loop();
144
147 void post(const std::function<void()>& fn);
148
152 template <typename Callable>
153 void sync(Callable&& callable)
154 {
155 post(std::ref(callable));
156 }
157
169 void startAsyncThread(std::unique_lock<std::mutex>& lock);
170
172 void addClient(std::unique_lock<std::mutex>& lock);
173 bool removeClient(std::unique_lock<std::mutex>& lock);
175 bool done(std::unique_lock<std::mutex>& lock);
176
178 {
179 Logger logger(false, m_log_fn);
180 logger << "{" << LongThreadName(m_exe_name) << "} ";
181 return logger;
182 }
183 Logger logPlain() { return {false, m_log_fn}; }
184 Logger raise() { return {true, m_log_fn}; }
185
188 const char* m_exe_name;
189
191 std::thread::id m_thread_id = std::this_thread::get_id();
192
195 std::thread m_async_thread;
196
198 const std::function<void()>* m_post_fn = nullptr;
199
202
204 int m_wait_fd = -1;
205
207 int m_post_fd = -1;
208
212
215 std::mutex m_mutex;
216 std::condition_variable m_cv;
217
219 kj::AsyncIoContext m_io_context;
220
223
225 std::unique_ptr<kj::TaskSet> m_task_set;
226
228 std::list<Connection> m_incoming_connections;
229
232
235};
236
243struct Waiter
244{
245 Waiter() = default;
246
247 template <typename Fn>
248 void post(Fn&& fn)
249 {
250 const std::unique_lock<std::mutex> lock(m_mutex);
251 assert(!m_fn);
252 m_fn = std::move(fn);
253 m_cv.notify_all();
254 }
255
256 template <class Predicate>
257 void wait(std::unique_lock<std::mutex>& lock, Predicate pred)
258 {
259 m_cv.wait(lock, [&] {
260 // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
261 // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
262 // after the fn() call and before the lock.lock() call in this loop
263 // in the case where a capnp response is sent and a brand new
264 // request is immediately received.
265 while (m_fn) {
266 auto fn = std::move(m_fn);
267 m_fn = nullptr;
268 lock.unlock();
269 fn();
270 lock.lock();
271 }
272 const bool done = pred();
273 return done;
274 });
275 }
276
277 std::mutex m_mutex;
278 std::condition_variable m_cv;
279 std::function<void()> m_fn;
280};
281
288{
289public:
290 Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
291 : m_loop(loop), m_stream(kj::mv(stream_)),
292 m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
293 m_rpc_system(::capnp::makeRpcClient(m_network))
294 {
295 std::unique_lock<std::mutex> lock(m_loop.m_mutex);
296 m_loop.addClient(lock);
297 }
299 kj::Own<kj::AsyncIoStream>&& stream_,
300 const std::function<::capnp::Capability::Client(Connection&)>& make_client)
301 : m_loop(loop), m_stream(kj::mv(stream_)),
302 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
303 m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this)))
304 {
305 std::unique_lock<std::mutex> lock(m_loop.m_mutex);
306 m_loop.addClient(lock);
307 }
308
314 ~Connection();
315
319 CleanupIt addSyncCleanup(std::function<void()> fn);
321
324 void addAsyncCleanup(std::function<void()> fn);
325
327 template <typename F>
328 void onDisconnect(F&& f)
329 {
330 // Add disconnect handler to local TaskSet to ensure it is cancelled and
331 // will never run after connection object is destroyed. But when disconnect
332 // handler fires, do not call the function f right away, instead add it
333 // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
334 // error in cases where f deletes this Connection object.
335 m_on_disconnect.add(m_network.onDisconnect().then(
336 [f = std::move(f), this]() mutable { m_loop.m_task_set->add(kj::evalLater(kj::mv(f))); }));
337 }
338
340 kj::Own<kj::AsyncIoStream> m_stream;
343 ::capnp::TwoPartyVatNetwork m_network;
344 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
345
346 // ThreadMap interface client, used to create a remote server thread when an
347 // client IPC call is being made for the first time from a new thread.
348 ThreadMap::Client m_thread_map{nullptr};
349
352 ::capnp::CapabilityServerSet<Thread> m_threads;
353
359};
360
370{
371 ::capnp::word scratch[4]{};
372 ::capnp::MallocMessageBuilder message{scratch};
373 ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
374 ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
375};
376
377template <typename Interface, typename Impl>
379 Connection* connection,
380 bool destroy_connection)
381 : m_client(std::move(client)), m_context(connection)
382
383{
384 {
385 std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
387 }
388
389 // Handler for the connection getting destroyed before this client object.
390 auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
391 // Release client capability by move-assigning to temporary.
392 {
393 typename Interface::Client(std::move(m_client));
394 }
395 {
396 std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
398 }
399 m_context.connection = nullptr;
400 });
401
402 // Two shutdown sequences are supported:
403 //
404 // - A normal sequence where client proxy objects are deleted by external
405 // code that no longer needs them
406 //
407 // - A garbage collection sequence where the connection or event loop shuts
408 // down while external code is still holding client references.
409 //
410 // The first case is handled here when m_context.connection is not null. The
411 // second case is handled by the cleanup function, which sets m_context.connection to
412 // null so nothing happens here.
413 m_context.cleanup_fns.emplace_front([this, destroy_connection, cleanup_it]{
414 if (m_context.connection) {
415 // Remove cleanup callback so it doesn't run and try to access
416 // this object after it's already destroyed.
418
419 // If the capnp interface defines a destroy method, call it to destroy
420 // the remote object, waiting for it to be deleted server side. If the
421 // capnp interface does not define a destroy method, this will just call
422 // an empty stub defined in the ProxyClientBase class and do nothing.
423 Sub::destroy(*this);
424
425 // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
427 // Release client capability by move-assigning to temporary.
428 {
429 typename Interface::Client(std::move(m_client));
430 }
431 {
432 std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
434 }
435
436 if (destroy_connection) {
437 delete m_context.connection;
438 m_context.connection = nullptr;
439 }
440 });
441 }
442 });
443 Sub::construct(*this);
444}
445
446template <typename Interface, typename Impl>
448{
449 CleanupRun(m_context.cleanup_fns);
450}
451
452template <typename Interface, typename Impl>
453ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
454 : m_impl(std::move(impl)), m_context(&connection)
455{
456 assert(m_impl);
457 std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
459}
460
463template <typename Interface, typename Impl>
465{
466 if (m_impl) {
467 // If impl is non-null at this point, it means no client is waiting for
468 // the m_impl server object to be destroyed synchronously. This can
469 // happen either if the interface did not define a "destroy" method (see
470 // invokeDestroy method below), or if a destroy method was defined, but
471 // the connection was broken before it could be called.
472 //
473 // In either case, be conservative and run the cleanup on an
474 // asynchronous thread, to avoid destructors or cleanup functions
475 // blocking or deadlocking the current EventLoop thread, since they
476 // could be making IPC calls.
477 //
478 // Technically this is a little too conservative since if the interface
479 // defines a "destroy" method, but the destroy method does not accept a
480 // Context parameter specifying a worker thread, the cleanup method
481 // would run on the EventLoop thread normally (when connection is
482 // unbroken), but will not run on the EventLoop thread now (when
483 // connection is broken). Probably some refactoring of the destructor
484 // and invokeDestroy function is possible to make this cleaner and more
485 // consistent.
486 m_context.connection->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
487 impl.reset();
488 CleanupRun(fns);
489 });
490 }
491 assert(m_context.cleanup_fns.empty());
492 std::unique_lock<std::mutex> lock(m_context.connection->m_loop.m_mutex);
493 m_context.connection->m_loop.removeClient(lock);
494}
495
513template <typename Interface, typename Impl>
515{
516 m_impl.reset();
517 CleanupRun(m_context.cleanup_fns);
518}
519
520using ConnThreads = std::map<Connection*, ProxyClient<Thread>>;
521using ConnThread = ConnThreads::iterator;
522
523// Retrieve ProxyClient<Thread> object associated with this connection from a
524// map, or create a new one and insert it into the map. Return map iterator and
525// inserted bool.
526std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread);
527
529{
531 std::string thread_name;
532
536 std::unique_ptr<Waiter> waiter = nullptr;
537
543
552
556 bool loop_thread = false;
557};
558
562template <typename InitInterface>
563std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
564{
565 typename InitInterface::Client init_client(nullptr);
566 std::unique_ptr<Connection> connection;
567 loop.sync([&] {
568 auto stream =
569 loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
570 connection = std::make_unique<Connection>(loop, kj::mv(stream));
571 init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
572 Connection* connection_ptr = connection.get();
573 connection->onDisconnect([&loop, connection_ptr] {
574 loop.log() << "IPC client: unexpected network disconnect.";
575 delete connection_ptr;
576 });
577 });
578 return std::make_unique<ProxyClient<InitInterface>>(
579 kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
580}
581
586template <typename InitInterface, typename InitImpl>
587void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
588{
589 loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
590 // Disable deleter so proxy server object doesn't attempt to delete the
591 // init implementation when the proxy client is destroyed or
592 // disconnected.
593 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
594 });
595 auto it = loop.m_incoming_connections.begin();
596 it->onDisconnect([&loop, it] {
597 loop.log() << "IPC server: socket disconnected.";
598 loop.m_incoming_connections.erase(it);
599 });
600}
601
605template <typename InitInterface, typename InitImpl>
606void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
607{
608 auto* ptr = listener.get();
609 loop.m_task_set->add(ptr->accept().then(
610 [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
611 _Serve<InitInterface>(loop, kj::mv(stream), init);
612 _Listen<InitInterface>(loop, kj::mv(listener), init);
613 }));
614}
615
618template <typename InitInterface, typename InitImpl>
619void ServeStream(EventLoop& loop, int fd, InitImpl& init)
620{
621 _Serve<InitInterface>(
622 loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
623}
624
627template <typename InitInterface, typename InitImpl>
629{
630 loop.sync([&]() {
631 _Listen<InitInterface>(loop,
632 loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
633 init);
634 });
635}
636
637extern thread_local ThreadContext g_thread_context;
638
639} // namespace mp
640
641#endif // MP_PROXY_IO_H
Object holding network & rpc state associated with either an incoming server connection,...
Definition: proxy-io.h:288
void addAsyncCleanup(std::function< void()> fn)
Register asynchronous cleanup function to run on worker thread when disconnect() is called.
Definition: proxy.cpp:135
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:115
CleanupList m_async_cleanup_fns
Definition: proxy-io.h:358
kj::TaskSet m_on_disconnect
Definition: proxy-io.h:342
LoggingErrorHandler m_error_handler
Definition: proxy-io.h:341
~Connection()
Run cleanup functions.
Definition: proxy.cpp:51
::capnp::TwoPartyVatNetwork m_network
Definition: proxy-io.h:343
kj::Own< kj::AsyncIoStream > m_stream
Definition: proxy-io.h:340
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Definition: proxy-io.h:298
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
Definition: proxy-io.h:290
void onDisconnect(F &&f)
Add disconnect handler.
Definition: proxy-io.h:328
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
Definition: proxy-io.h:352
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:357
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:344
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:129
ThreadMap::Client m_thread_map
Definition: proxy-io.h:348
EventLoop & m_loop
Definition: proxy-io.h:339
Event loop implementation.
Definition: proxy-io.h:134
const char * m_exe_name
Process name included in thread names so combined debug output from multiple processes is easier to u...
Definition: proxy-io.h:188
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:219
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
Definition: proxy-io.h:153
bool done(std::unique_lock< std::mutex > &lock)
Check if loop should exit.
Definition: proxy.cpp:280
std::condition_variable m_cv
Definition: proxy-io.h:216
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
Definition: proxy.cpp:157
bool removeClient(std::unique_lock< std::mutex > &lock)
Definition: proxy.cpp:242
Logger log()
Definition: proxy-io.h:177
std::list< Connection > m_incoming_connections
List of connections.
Definition: proxy-io.h:228
LogFn m_log_fn
External logging callback.
Definition: proxy-io.h:231
void loop()
Run event loop.
Definition: proxy.cpp:185
Logger raise()
Definition: proxy-io.h:184
CleanupList m_async_fns
Callback functions to run on async thread.
Definition: proxy-io.h:201
void post(const std::function< void()> &fn)
Run function on event loop thread.
Definition: proxy.cpp:221
int m_post_fd
Pipe write handle used to wake up the event loop thread.
Definition: proxy-io.h:207
int m_num_clients
Number of clients holding references to ProxyServerBase objects that reference this event loop.
Definition: proxy-io.h:211
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:225
std::mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:215
void * m_context
External context pointer.
Definition: proxy-io.h:234
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
Definition: proxy-io.h:204
Logger logPlain()
Definition: proxy-io.h:183
const std::function< void()> * m_post_fn
Callback function to run on event loop thread during post() or sync() call.
Definition: proxy-io.h:198
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
Definition: proxy-io.h:222
std::thread m_async_thread
Handle of an async worker thread.
Definition: proxy-io.h:195
std::thread::id m_thread_id
ID of the event loop thread.
Definition: proxy-io.h:191
void startAsyncThread(std::unique_lock< std::mutex > &lock)
Start asynchronous worker thread if necessary.
Definition: proxy.cpp:256
void addClient(std::unique_lock< std::mutex > &lock)
Add/remove remote client reference counts.
Definition: proxy.cpp:240
friend Logger & operator<<(Logger &logger, T &&value)
Definition: proxy-io.h:111
Logger(bool raise, LogFn &fn)
Definition: proxy-io.h:103
Logger(Logger &&logger)
Definition: proxy-io.h:104
~Logger() noexcept(false)
Definition: proxy-io.h:105
LogFn & m_fn
Definition: proxy-io.h:124
bool m_raise
Definition: proxy-io.h:123
friend Logger & operator<<(Logger &&logger, T &&value)
Definition: proxy-io.h:118
std::ostringstream m_buffer
Definition: proxy-io.h:125
Handler for kj::TaskSet failed task events.
Definition: proxy-io.h:91
EventLoop & m_loop
Definition: proxy-io.h:95
LoggingErrorHandler(EventLoop &loop)
Definition: proxy-io.h:93
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:45
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Definition: proxy.h:63
Interface::Client m_client
Definition: proxy.h:105
ProxyContext m_context
Definition: proxy.h:106
~ProxyClientBase() noexcept
Definition: proxy-io.h:447
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Definition: proxy-io.h:378
Context m_context
Definition: protocol.cpp:96
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
std::list< std::function< void()> > CleanupList
Definition: proxy.h:39
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
Definition: proxy-io.h:619
std::map< Connection *, ProxyClient< Thread > > ConnThreads
Definition: proxy-io.h:520
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:43
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:288
std::function< void(bool raise, std::string message)> LogFn
Definition: proxy-io.h:98
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
Definition: proxy-io.h:606
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
Definition: proxy-io.h:563
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
Definition: proxy-io.h:628
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:394
ConnThreads::iterator ConnThread
Definition: proxy-io.h:521
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
Definition: proxy-io.h:587
typename CleanupList::iterator CleanupIt
Definition: proxy.h:40
void CleanupRun(CleanupList &fns)
Definition: proxy.h:42
ThreadContext & thread_context
Definition: proxy-io.h:33
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Definition: proxy-io.h:34
Connection & connection
Definition: proxy-io.h:28
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_cleanup_it
Cleanup function to run when the connection is closed.
Definition: proxy-io.h:75
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:24
Connection * connection
Definition: proxy.h:53
CleanupList cleanup_fns
Definition: proxy.h:54
std::thread m_thread
Definition: proxy-io.h:86
ThreadContext & m_thread_context
Definition: proxy-io.h:85
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
Definition: proxy.h:121
ProxyContext m_context
Definition: proxy.h:144
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
Definition: proxy-io.h:464
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Definition: proxy.h:143
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
CallContext_ CallContext
Definition: proxy-io.h:43
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
Definition: proxy-io.h:49
CallContext & call_context
Definition: proxy-io.h:46
ProxyServer & proxy_server
Definition: proxy-io.h:45
Vat id for server side of connection.
Definition: proxy-io.h:370
::capnp::word scratch[4]
Definition: proxy-io.h:371
::capnp::MallocMessageBuilder message
Definition: proxy-io.h:372
::capnp::rpc::twoparty::VatId::Builder vat_id
Definition: proxy-io.h:373
ConnThreads request_threads
When client is making a request to a server, this is the thread argument it passes in the request,...
Definition: proxy-io.h:551
ConnThreads callback_threads
When client is making a request to a server, this is the callbackThread argument it passes in the req...
Definition: proxy-io.h:542
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:531
Single element task queue used to handle recursive capnp calls.
Definition: proxy-io.h:244
std::function< void()> m_fn
Definition: proxy-io.h:279
std::mutex m_mutex
Definition: proxy-io.h:277
std::condition_variable m_cv
Definition: proxy-io.h:278
void wait(std::unique_lock< std::mutex > &lock, Predicate pred)
Definition: proxy-io.h:257
void post(Fn &&fn)
Definition: proxy-io.h:248
Waiter()=default
assert(!tx.IsCoinBase())