Bitcoin Core 30.99.0
P2P Digital Currency
proxy-io.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_IO_H
6#define MP_PROXY_IO_H
7
8#include <mp/proxy.h>
9#include <mp/util.h>
10
11#include <mp/proxy.capnp.h>
12
13#include <capnp/rpc-twoparty.h>
14
15#include <assert.h>
16#include <condition_variable>
17#include <functional>
18#include <kj/function.h>
19#include <map>
20#include <memory>
21#include <optional>
22#include <sstream>
23#include <string>
24#include <thread>
25
26namespace mp {
27struct ThreadContext;
28
30{
32};
33
35{
39 {
40 }
41};
42
43template <typename ProxyServer, typename CallContext_>
45{
46 using CallContext = CallContext_;
47
50 int req;
51
54 {
55 }
56};
57
58template <typename Interface, typename Params, typename Results>
59using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
60
61template <>
62struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
63{
65 // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
66 ProxyClient(const ProxyClient&) = delete;
68
78 std::optional<CleanupIt> m_disconnect_cb;
79};
80
81template <>
82struct ProxyServer<Thread> final : public Thread::Server
83{
84public:
85 ProxyServer(ThreadContext& thread_context, std::thread&& thread);
87 kj::Promise<void> getName(GetNameContext context) override;
89 std::thread m_thread;
90};
91
93class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
94{
95public:
97 void taskFailed(kj::Exception&& exception) override;
99};
100
101using LogFn = std::function<void(bool raise, std::string message)>;
102
104{
105public:
106 Logger(bool raise, LogFn& fn) : m_raise(raise), m_fn(fn) {}
107 Logger(Logger&& logger) : m_raise(logger.m_raise), m_fn(logger.m_fn), m_buffer(std::move(logger.m_buffer)) {}
108 ~Logger() noexcept(false)
109 {
110 if (m_fn) m_fn(m_raise, m_buffer.str());
111 }
112
113 template <typename T>
114 friend Logger& operator<<(Logger& logger, T&& value)
115 {
116 if (logger.m_fn) logger.m_buffer << std::forward<T>(value);
117 return logger;
118 }
119
120 template <typename T>
121 friend Logger& operator<<(Logger&& logger, T&& value)
122 {
123 return logger << std::forward<T>(value);
124 }
125
128 std::ostringstream m_buffer;
129};
130
132
135
138 size_t max_chars{200};
139};
140
141std::string LongThreadName(const char* exe_name);
142
169{
170public:
172 EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr);
173 ~EventLoop();
174
178 void loop();
179
182 void post(kj::Function<void()> fn);
183
187 template <typename Callable>
188 void sync(Callable&& callable)
189 {
190 post(std::forward<Callable>(callable));
191 }
192
195 void addAsyncCleanup(std::function<void()> fn);
196
209
211 bool done() const MP_REQUIRES(m_mutex);
212
214 {
215 Logger logger(false, m_log_opts.log_fn);
216 logger << "{" << LongThreadName(m_exe_name) << "} ";
217 return logger;
218 }
219 Logger logPlain() { return {false, m_log_opts.log_fn}; }
220 Logger raise() { return {true, m_log_opts.log_fn}; }
221
224 const char* m_exe_name;
225
227 std::thread::id m_thread_id = std::this_thread::get_id();
228
231 std::thread m_async_thread;
232
234 kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
235
237 std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
238
240 int m_wait_fd = -1;
241
243 int m_post_fd = -1;
244
247 int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
248
252 std::condition_variable m_cv;
253
255 kj::AsyncIoContext m_io_context;
256
259
261 std::unique_ptr<kj::TaskSet> m_task_set;
262
264 std::list<Connection> m_incoming_connections;
265
268
271};
272
279struct Waiter
280{
281 Waiter() = default;
282
283 template <typename Fn>
284 void post(Fn&& fn)
285 {
286 const Lock lock(m_mutex);
287 assert(!m_fn);
288 m_fn = std::forward<Fn>(fn);
289 m_cv.notify_all();
290 }
291
292 template <class Predicate>
293 void wait(Lock& lock, Predicate pred)
294 {
295 m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
296 // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
297 // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
298 // after the fn() call and before the lock.lock() call in this loop
299 // in the case where a capnp response is sent and a brand new
300 // request is immediately received.
301 while (m_fn) {
302 auto fn = std::move(*m_fn);
303 m_fn.reset();
304 Unlock(lock, fn);
305 }
306 const bool done = pred();
307 return done;
308 });
309 }
310
319 std::condition_variable m_cv;
320 std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
321};
322
329{
330public:
331 Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
332 : m_loop(loop), m_stream(kj::mv(stream_)),
333 m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
334 m_rpc_system(::capnp::makeRpcClient(m_network)) {}
336 kj::Own<kj::AsyncIoStream>&& stream_,
337 const std::function<::capnp::Capability::Client(Connection&)>& make_client)
338 : m_loop(loop), m_stream(kj::mv(stream_)),
339 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
340 m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
341
347 ~Connection();
348
352 CleanupIt addSyncCleanup(std::function<void()> fn);
354
356 template <typename F>
357 void onDisconnect(F&& f)
358 {
359 // Add disconnect handler to local TaskSet to ensure it is cancelled and
360 // will never run after connection object is destroyed. But when disconnect
361 // handler fires, do not call the function f right away, instead add it
362 // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
363 // error in cases where f deletes this Connection object.
364 m_on_disconnect.add(m_network.onDisconnect().then(
365 [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
366 }
367
369 kj::Own<kj::AsyncIoStream> m_stream;
372 ::capnp::TwoPartyVatNetwork m_network;
373 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
374
375 // ThreadMap interface client, used to create a remote server thread when an
376 // client IPC call is being made for the first time from a new thread.
377 ThreadMap::Client m_thread_map{nullptr};
378
381 ::capnp::CapabilityServerSet<Thread> m_threads;
382
387};
388
398{
399 ::capnp::word scratch[4]{};
400 ::capnp::MallocMessageBuilder message{scratch};
401 ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
402 ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
403};
404
405template <typename Interface, typename Impl>
407 Connection* connection,
408 bool destroy_connection)
409 : m_client(std::move(client)), m_context(connection)
410
411{
412 // Handler for the connection getting destroyed before this client object.
413 auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
414 // Release client capability by move-assigning to temporary.
415 {
416 typename Interface::Client(std::move(m_client));
417 }
418 Lock lock{m_context.loop->m_mutex};
419 m_context.connection = nullptr;
420 });
421
422 // Two shutdown sequences are supported:
423 //
424 // - A normal sequence where client proxy objects are deleted by external
425 // code that no longer needs them
426 //
427 // - A garbage collection sequence where the connection or event loop shuts
428 // down while external code is still holding client references.
429 //
430 // The first case is handled here when m_context.connection is not null. The
431 // second case is handled by the disconnect_cb function, which sets
432 // m_context.connection to null so nothing happens here.
433 m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
434 {
435 // If the capnp interface defines a destroy method, call it to destroy
436 // the remote object, waiting for it to be deleted server side. If the
437 // capnp interface does not define a destroy method, this will just call
438 // an empty stub defined in the ProxyClientBase class and do nothing.
439 Sub::destroy(*this);
440
441 // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
442 m_context.loop->sync([&]() {
443 // Remove disconnect callback on cleanup so it doesn't run and try
444 // to access this object after it's destroyed. This call needs to
445 // run inside loop->sync() on the event loop thread because
446 // otherwise, if there were an ill-timed disconnect, the
447 // onDisconnect handler could fire and delete the Connection object
448 // before the removeSyncCleanup call.
450
451 // Release client capability by move-assigning to temporary.
452 {
453 typename Interface::Client(std::move(m_client));
454 }
455 if (destroy_connection) {
456 delete m_context.connection;
457 m_context.connection = nullptr;
458 }
459 });
460 }
461 });
462 Sub::construct(*this);
463}
464
465template <typename Interface, typename Impl>
467{
468 CleanupRun(m_context.cleanup_fns);
469}
470
471template <typename Interface, typename Impl>
472ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
473 : m_impl(std::move(impl)), m_context(&connection)
474{
475 assert(m_impl);
476}
477
490template <typename Interface, typename Impl>
492{
493 if (m_impl) {
494 // If impl is non-null at this point, it means no client is waiting for
495 // the m_impl server object to be destroyed synchronously. This can
496 // happen either if the interface did not define a "destroy" method (see
497 // invokeDestroy method below), or if a destroy method was defined, but
498 // the connection was broken before it could be called.
499 //
500 // In either case, be conservative and run the cleanup on an
501 // asynchronous thread, to avoid destructors or cleanup functions
502 // blocking or deadlocking the current EventLoop thread, since they
503 // could be making IPC calls.
504 //
505 // Technically this is a little too conservative since if the interface
506 // defines a "destroy" method, but the destroy method does not accept a
507 // Context parameter specifying a worker thread, the cleanup method
508 // would run on the EventLoop thread normally (when connection is
509 // unbroken), but will not run on the EventLoop thread now (when
510 // connection is broken). Probably some refactoring of the destructor
511 // and invokeDestroy function is possible to make this cleaner and more
512 // consistent.
513 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
514 impl.reset();
515 CleanupRun(fns);
516 });
517 }
518 assert(m_context.cleanup_fns.empty());
519}
520
538template <typename Interface, typename Impl>
540{
541 m_impl.reset();
542 CleanupRun(m_context.cleanup_fns);
543}
544
551using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
552using ConnThread = ConnThreads::iterator;
553
554// Retrieve ProxyClient<Thread> object associated with this connection from a
555// map, or create a new one and insert it into the map. Return map iterator and
556// inserted bool.
557std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
558
573{
575 std::string thread_name;
576
592 std::unique_ptr<Waiter> waiter = nullptr;
593
611 ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
612
622 ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
623
627 bool loop_thread = false;
628};
629
633template <typename InitInterface>
634std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
635{
636 typename InitInterface::Client init_client(nullptr);
637 std::unique_ptr<Connection> connection;
638 loop.sync([&] {
639 auto stream =
640 loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
641 connection = std::make_unique<Connection>(loop, kj::mv(stream));
642 init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
643 Connection* connection_ptr = connection.get();
644 connection->onDisconnect([&loop, connection_ptr] {
645 loop.log() << "IPC client: unexpected network disconnect.";
646 delete connection_ptr;
647 });
648 });
649 return std::make_unique<ProxyClient<InitInterface>>(
650 kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
651}
652
657template <typename InitInterface, typename InitImpl>
658void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
659{
660 loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
661 // Disable deleter so proxy server object doesn't attempt to delete the
662 // init implementation when the proxy client is destroyed or
663 // disconnected.
664 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
665 });
666 auto it = loop.m_incoming_connections.begin();
667 it->onDisconnect([&loop, it] {
668 loop.log() << "IPC server: socket disconnected.";
669 loop.m_incoming_connections.erase(it);
670 });
671}
672
676template <typename InitInterface, typename InitImpl>
677void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
678{
679 auto* ptr = listener.get();
680 loop.m_task_set->add(ptr->accept().then(
681 [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
682 _Serve<InitInterface>(loop, kj::mv(stream), init);
683 _Listen<InitInterface>(loop, kj::mv(listener), init);
684 }));
685}
686
689template <typename InitInterface, typename InitImpl>
690void ServeStream(EventLoop& loop, int fd, InitImpl& init)
691{
692 _Serve<InitInterface>(
693 loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
694}
695
698template <typename InitInterface, typename InitImpl>
700{
701 loop.sync([&]() {
702 _Listen<InitInterface>(loop,
703 loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
704 init);
705 });
706}
707
708extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
709// Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
710// cannot be thread_local" which should not be a problem on modern platforms, and
711// could lead to a small memory leak at worst on older ones.
712
713} // namespace mp
714
715#endif // MP_PROXY_IO_H
if(!SetupNetworking())
Object holding network & rpc state associated with either an incoming server connection,...
Definition: proxy-io.h:329
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:148
kj::TaskSet m_on_disconnect
Definition: proxy-io.h:371
EventLoopRef m_loop
Definition: proxy-io.h:368
LoggingErrorHandler m_error_handler
Definition: proxy-io.h:370
~Connection()
Run cleanup functions.
Definition: proxy.cpp:82
::capnp::TwoPartyVatNetwork m_network
Definition: proxy-io.h:372
kj::Own< kj::AsyncIoStream > m_stream
Definition: proxy-io.h:369
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Definition: proxy-io.h:335
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
Definition: proxy-io.h:331
void onDisconnect(F &&f)
Add disconnect handler.
Definition: proxy-io.h:357
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
Definition: proxy-io.h:381
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:386
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:373
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:162
ThreadMap::Client m_thread_map
Definition: proxy-io.h:377
Event loop implementation.
Definition: proxy-io.h:169
const char * m_exe_name
Process name included in thread names so combined debug output from multiple processes is easier to u...
Definition: proxy-io.h:224
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:255
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
Definition: proxy.cpp:284
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
Definition: proxy-io.h:188
std::condition_variable m_cv
Definition: proxy-io.h:252
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
Definition: proxy.cpp:194
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
Definition: proxy.cpp:171
Logger log()
Definition: proxy-io.h:213
std::list< Connection > m_incoming_connections
List of connections.
Definition: proxy-io.h:264
void loop()
Run event loop.
Definition: proxy.cpp:222
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:251
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
Definition: proxy.cpp:310
Logger raise()
Definition: proxy-io.h:220
int m_post_fd
Pipe write handle used to wake up the event loop thread.
Definition: proxy-io.h:243
int m_num_clients MP_GUARDED_BY(m_mutex)=0
Number of clients holding references to ProxyServerBase objects that reference this event loop.
LogOptions m_log_opts
Logging options.
Definition: proxy-io.h:267
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:261
void * m_context
External context pointer.
Definition: proxy-io.h:270
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
Definition: proxy-io.h:240
Logger logPlain()
Definition: proxy-io.h:219
std::optional< CleanupList > m_async_fns MP_GUARDED_BY(m_mutex)
Callback functions to run on async thread.
LoggingErrorHandler m_error_handler
Capnp error handler. Needs to outlive m_task_set.
Definition: proxy-io.h:258
std::thread m_async_thread
Handle of an async worker thread.
Definition: proxy-io.h:231
kj::Function< void()> *m_post_fn MP_GUARDED_BY(m_mutex)
Callback function to run on event loop thread during post() or sync() call.
std::thread::id m_thread_id
ID of the event loop thread.
Definition: proxy-io.h:227
void post(kj::Function< void()> fn)
Run function on event loop thread.
Definition: proxy.cpp:266
Event loop smart pointer automatically managing m_num_clients.
Definition: proxy.h:51
Definition: util.h:170
std::unique_lock< std::mutex > m_lock
Definition: util.h:182
friend Logger & operator<<(Logger &logger, T &&value)
Definition: proxy-io.h:114
Logger(bool raise, LogFn &fn)
Definition: proxy-io.h:106
Logger(Logger &&logger)
Definition: proxy-io.h:107
~Logger() noexcept(false)
Definition: proxy-io.h:108
LogFn & m_fn
Definition: proxy-io.h:127
bool m_raise
Definition: proxy-io.h:126
friend Logger & operator<<(Logger &&logger, T &&value)
Definition: proxy-io.h:121
std::ostringstream m_buffer
Definition: proxy-io.h:128
Handler for kj::TaskSet failed task events.
Definition: proxy-io.h:94
EventLoop & m_loop
Definition: proxy-io.h:98
LoggingErrorHandler(EventLoop &loop)
Definition: proxy-io.h:96
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:42
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Definition: proxy.h:81
Interface::Client m_client
Definition: proxy.h:132
ProxyContext m_context
Definition: proxy.h:133
~ProxyClientBase() noexcept
Definition: proxy-io.h:466
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
Definition: proxy-io.h:406
Context m_context
Definition: protocol.cpp:101
#define MP_REQUIRES(x)
Definition: util.h:155
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
void Unlock(Lock &lock, Callback &&callback)
Definition: util.h:206
std::list< std::function< void()> > CleanupList
Definition: proxy.h:36
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
Definition: proxy-io.h:690
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:316
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:40
std::function< void(bool raise, std::string message)> LogFn
Definition: proxy-io.h:101
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
Definition: proxy-io.h:677
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
Definition: proxy-io.h:634
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
Definition: proxy-io.h:699
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:428
ConnThreads::iterator ConnThread
Definition: proxy-io.h:552
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
Definition: proxy-io.h:658
typename CleanupList::iterator CleanupIt
Definition: proxy.h:37
void CleanupRun(CleanupList &fns)
Definition: proxy.h:39
std::map< Connection *, std::optional< ProxyClient< Thread > > > ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection.
Definition: proxy-io.h:551
ThreadContext & thread_context
Definition: proxy-io.h:36
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Definition: proxy-io.h:37
Connection & connection
Definition: proxy-io.h:31
LogFn log_fn
External logging callback.
Definition: proxy-io.h:134
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
Definition: proxy-io.h:138
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Definition: proxy-io.h:78
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:25
EventLoopRef loop
Definition: proxy.h:71
Connection * connection
Definition: proxy.h:70
CleanupList cleanup_fns
Definition: proxy.h:72
std::thread m_thread
Definition: proxy-io.h:89
ThreadContext & m_thread_context
Definition: proxy-io.h:88
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
Definition: proxy.h:148
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
Definition: proxy-io.h:491
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Definition: proxy.h:170
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
CallContext_ CallContext
Definition: proxy-io.h:46
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
Definition: proxy-io.h:52
CallContext & call_context
Definition: proxy-io.h:49
ProxyServer & proxy_server
Definition: proxy-io.h:48
Vat id for server side of connection.
Definition: proxy-io.h:398
::capnp::word scratch[4]
Definition: proxy-io.h:399
::capnp::MallocMessageBuilder message
Definition: proxy-io.h:400
::capnp::rpc::twoparty::VatId::Builder vat_id
Definition: proxy-io.h:401
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
Definition: proxy-io.h:573
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the callbackThread argument it passes in the req...
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the thread argument it passes in the request,...
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:575
Single element task queue used to handle recursive capnp calls.
Definition: proxy-io.h:280
void wait(Lock &lock, Predicate pred)
Definition: proxy-io.h:293
std::condition_variable m_cv
Definition: proxy-io.h:319
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
Definition: proxy-io.h:318
void post(Fn &&fn)
Definition: proxy-io.h:284
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)
Waiter()=default
assert(!tx.IsCoinBase())