Bitcoin Core 30.99.0
P2P Digital Currency
proxy-io.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_IO_H
6#define MP_PROXY_IO_H
7
8#include <mp/proxy.h>
9#include <mp/util.h>
10
11#include <mp/proxy.capnp.h>
12
13#include <capnp/rpc-twoparty.h>
14
15#include <assert.h>
16#include <condition_variable>
17#include <functional>
18#include <kj/function.h>
19#include <map>
20#include <memory>
21#include <optional>
22#include <sstream>
23#include <string>
24#include <thread>
25
26namespace mp {
27struct ThreadContext;
28
30{
32};
33
35{
39 {
40 }
41};
42
43template <typename ProxyServer, typename CallContext_>
45{
46 using CallContext = CallContext_;
47
50 int req;
56 Lock* cancel_lock{nullptr};
63 bool request_canceled{false};
64
67 {
68 }
69};
70
71template <typename Interface, typename Params, typename Results>
72using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
73
74template <>
75struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
76{
78 // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
79 ProxyClient(const ProxyClient&) = delete;
81
91 std::optional<CleanupIt> m_disconnect_cb;
92};
93
94template <>
95struct ProxyServer<Thread> final : public Thread::Server
96{
97public:
98 ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
100 kj::Promise<void> getName(GetNameContext context) override;
101
106 template<typename T, typename Fn>
107 kj::Promise<T> post(Fn&& fn);
108
111 std::thread m_thread;
114 kj::Promise<void> m_thread_ready{kj::READY_NOW};
115};
116
118class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
119{
120public:
122 void taskFailed(kj::Exception&& exception) override;
124};
125
127enum class Log {
128 Trace = 0,
129 Debug,
130 Info,
131 Warning,
132 Error,
133 Raise,
134};
135
136kj::StringPtr KJ_STRINGIFY(Log flags);
137
139
141 std::string message;
142
145};
146
147using LogFn = std::function<void(LogMessage)>;
148
150
156 size_t max_chars{200};
157
161};
162
164{
165public:
166 Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}
167
168 Logger(Logger&&) = delete;
169 Logger& operator=(Logger&&) = delete;
170 Logger(const Logger&) = delete;
171 Logger& operator=(const Logger&) = delete;
172
173 ~Logger() noexcept(false)
174 {
175 if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
176 }
177
178 template <typename T>
179 friend Logger& operator<<(Logger& logger, T&& value)
180 {
181 if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
182 return logger;
183 }
184
185 template <typename T>
186 friend Logger& operator<<(Logger&& logger, T&& value)
187 {
188 return logger << std::forward<T>(value);
189 }
190
191 explicit operator bool() const
192 {
193 return enabled();
194 }
195
196private:
197 bool enabled() const
198 {
200 }
201
204 std::ostringstream m_buffer;
205};
206
207#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
208
209#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
210
211std::string LongThreadName(const char* exe_name);
212
239{
240public:
242 EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
243 : EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}
244
246 EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);
247
249 EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
250 : EventLoop(exe_name,
251 LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
252 context){}
253
254 ~EventLoop();
255
259 void loop();
260
263 void post(kj::Function<void()> fn);
264
268 template <typename Callable>
269 void sync(Callable&& callable)
270 {
271 post(std::forward<Callable>(callable));
272 }
273
276 void addAsyncCleanup(std::function<void()> fn);
277
289 void startAsyncThread() MP_REQUIRES(m_mutex);
290
292 bool done() const MP_REQUIRES(m_mutex);
293
296 const char* m_exe_name;
297
299 std::thread::id m_thread_id = std::this_thread::get_id();
300
303 std::thread m_async_thread;
304
306 kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
307
309 std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
310
312 int m_wait_fd = -1;
313
315 int m_post_fd = -1;
316
319 int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
320
323 Mutex m_mutex;
324 std::condition_variable m_cv;
325
327 kj::AsyncIoContext m_io_context;
328
330 LoggingErrorHandler m_error_handler{*this};
331
333 std::unique_ptr<kj::TaskSet> m_task_set;
334
336 std::list<Connection> m_incoming_connections;
337
340
343};
344
356struct Waiter
357{
358 Waiter() = default;
359
360 template <typename Fn>
361 bool post(Fn&& fn)
362 {
363 const Lock lock(m_mutex);
364 if (m_fn) return false;
365 m_fn = std::forward<Fn>(fn);
366 m_cv.notify_all();
367 return true;
368 }
369
370 template <class Predicate>
371 void wait(Lock& lock, Predicate pred)
372 {
373 m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
374 // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
375 // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
376 // after the fn() call and before the lock.lock() call in this loop
377 // in the case where a capnp response is sent and a brand new
378 // request is immediately received.
379 while (m_fn) {
380 auto fn = std::move(*m_fn);
381 m_fn.reset();
382 Unlock(lock, fn);
383 }
384 const bool done = pred();
385 return done;
386 });
387 }
388
397 std::condition_variable m_cv;
398 std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
399};
400
407{
408public:
409 Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
410 : m_loop(loop), m_stream(kj::mv(stream_)),
411 m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
412 m_rpc_system(::capnp::makeRpcClient(m_network)) {}
414 kj::Own<kj::AsyncIoStream>&& stream_,
415 const std::function<::capnp::Capability::Client(Connection&)>& make_client)
416 : m_loop(loop), m_stream(kj::mv(stream_)),
417 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
418 m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
419
425 ~Connection();
426
430 CleanupIt addSyncCleanup(std::function<void()> fn);
431 void removeSyncCleanup(CleanupIt it);
432
434 template <typename F>
435 void onDisconnect(F&& f)
436 {
437 // Add disconnect handler to local TaskSet to ensure it is canceled and
438 // will never run after connection object is destroyed. But when disconnect
439 // handler fires, do not call the function f right away, instead add it
440 // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
441 // error in the typical case where f deletes this Connection object.
442 m_on_disconnect.add(m_network.onDisconnect().then(
443 [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
444 }
445
447 kj::Own<kj::AsyncIoStream> m_stream;
448 LoggingErrorHandler m_error_handler{*m_loop};
452 kj::TaskSet m_on_disconnect{m_error_handler};
453 ::capnp::TwoPartyVatNetwork m_network;
454 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
455
456 // ThreadMap interface client, used to create a remote server thread when an
457 // client IPC call is being made for the first time from a new thread.
458 ThreadMap::Client m_thread_map{nullptr};
459
462 ::capnp::CapabilityServerSet<Thread> m_threads;
463
467 kj::Canceler m_canceler;
468
473};
474
484{
485 ::capnp::word scratch[4]{};
486 ::capnp::MallocMessageBuilder message{scratch};
487 ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
488 ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
489};
490
491template <typename Interface, typename Impl>
493 Connection* connection,
494 bool destroy_connection)
495 : m_client(std::move(client)), m_context(connection)
496
497{
498 // Handler for the connection getting destroyed before this client object.
499 auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
500 // Release client capability by move-assigning to temporary.
501 {
502 typename Interface::Client(std::move(m_client));
503 }
504 Lock lock{m_context.loop->m_mutex};
505 m_context.connection = nullptr;
506 });
507
508 // Two shutdown sequences are supported:
509 //
510 // - A normal sequence where client proxy objects are deleted by external
511 // code that no longer needs them
512 //
513 // - A garbage collection sequence where the connection or event loop shuts
514 // down while external code is still holding client references.
515 //
516 // The first case is handled here when m_context.connection is not null. The
517 // second case is handled by the disconnect_cb function, which sets
518 // m_context.connection to null so nothing happens here.
519 m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
520 {
521 // If the capnp interface defines a destroy method, call it to destroy
522 // the remote object, waiting for it to be deleted server side. If the
523 // capnp interface does not define a destroy method, this will just call
524 // an empty stub defined in the ProxyClientBase class and do nothing.
525 Sub::destroy(*this);
526
527 // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
528 m_context.loop->sync([&]() {
529 // Remove disconnect callback on cleanup so it doesn't run and try
530 // to access this object after it's destroyed. This call needs to
531 // run inside loop->sync() on the event loop thread because
532 // otherwise, if there were an ill-timed disconnect, the
533 // onDisconnect handler could fire and delete the Connection object
534 // before the removeSyncCleanup call.
536
537 // Release client capability by move-assigning to temporary.
538 {
539 typename Interface::Client(std::move(m_client));
540 }
541 if (destroy_connection) {
542 delete m_context.connection;
543 m_context.connection = nullptr;
544 }
545 });
546 }
547 });
548 Sub::construct(*this);
549}
550
551template <typename Interface, typename Impl>
553{
554 CleanupRun(m_context.cleanup_fns);
555}
556
557template <typename Interface, typename Impl>
558ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
559 : m_impl(std::move(impl)), m_context(&connection)
560{
561 assert(m_impl);
562}
563
576template <typename Interface, typename Impl>
578{
579 if (m_impl) {
580 // If impl is non-null at this point, it means no client is waiting for
581 // the m_impl server object to be destroyed synchronously. This can
582 // happen either if the interface did not define a "destroy" method (see
583 // invokeDestroy method below), or if a destroy method was defined, but
584 // the connection was broken before it could be called.
585 //
586 // In either case, be conservative and run the cleanup on an
587 // asynchronous thread, to avoid destructors or cleanup functions
588 // blocking or deadlocking the current EventLoop thread, since they
589 // could be making IPC calls.
590 //
591 // Technically this is a little too conservative since if the interface
592 // defines a "destroy" method, but the destroy method does not accept a
593 // Context parameter specifying a worker thread, the cleanup method
594 // would run on the EventLoop thread normally (when connection is
595 // unbroken), but will not run on the EventLoop thread now (when
596 // connection is broken). Probably some refactoring of the destructor
597 // and invokeDestroy function is possible to make this cleaner and more
598 // consistent.
599 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
600 impl.reset();
601 CleanupRun(fns);
602 });
603 }
604 assert(m_context.cleanup_fns.empty());
605}
606
624template <typename Interface, typename Impl>
626{
627 m_impl.reset();
628 CleanupRun(m_context.cleanup_fns);
629}
630
637using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
638using ConnThread = ConnThreads::iterator;
639
640// Retrieve ProxyClient<Thread> object associated with this connection from a
641// map, or create a new one and insert it into the map. Return map iterator and
642// inserted bool.
643std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
644
659{
661 std::string thread_name;
662
678 std::unique_ptr<Waiter> waiter = nullptr;
679
697 ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
698
708 ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
709
713 bool loop_thread = false;
714};
715
716template<typename T, typename Fn>
717kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
718{
719 auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is ready to post again.
720 auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
721 CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
722 // Keep a reference to the ProxyServer<Thread> instance by assigning it to
723 // the self variable. ProxyServer instances are reference-counted and if the
724 // client drops its reference, this variable keeps the instance alive until
725 // the thread finishes executing. The self variable needs to be destroyed on
726 // the event loop thread so it is freed in a sync() call below.
727 auto self = thisCap();
728 auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
729 auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
730 bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
731 // Fulfill ready.promise now, as soon as the Waiter starts executing
732 // this lambda, so the next ProxyServer<Thread>::post() call can
733 // immediately call waiter->post(). It is important to do this
734 // before calling fn() because fn() can make an IPC call back to the
735 // client, which can make another IPC call to this server thread.
736 // (This typically happens when IPC methods take std::function
737 // parameters.) When this happens the second call to the server
738 // thread should not be blocked waiting for the first call.
739 m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
740 ready_fulfiller->fulfill();
741 ready_fulfiller = nullptr;
742 });
743 std::optional<T> result_value;
744 kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
745 m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
746 // Destroy CancelMonitor here before fulfilling or rejecting the
747 // promise so it doesn't get triggered when the promise is
748 // destroyed.
749 cancel_monitor_ptr = nullptr;
750 // Send results to the fulfiller. Technically it would be ok to
751 // skip this if promise was canceled, but it's simpler to just
752 // do it unconditionally.
753 KJ_IF_MAYBE(e, exception) {
754 assert(!result_value);
755 result_fulfiller->reject(kj::mv(*e));
756 } else {
757 assert(result_value);
758 result_fulfiller->fulfill(kj::mv(*result_value));
759 result_value.reset();
760 }
761 result_fulfiller = nullptr;
762 // Use evalLater to destroy the ProxyServer<Thread> self
763 // reference, if it is the last reference, because the
764 // ProxyServer<Thread> destructor needs to join the thread,
765 // which can't happen until this sync() block has exited.
766 m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
767 });
768 });
769 // Assert that calling Waiter::post did not fail. It could only return
770 // false if a new function was posted before the previous one finished
771 // executing, but new functions are only posted when m_thread_ready is
772 // signaled, so this should never happen.
773 assert(posted);
774 return kj::mv(result.promise);
775 }).attach(kj::heap<CancelProbe>(cancel_monitor));
776 m_thread_ready = kj::mv(ready.promise);
777 return ret;
778}
779
783template <typename InitInterface>
784std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
785{
786 typename InitInterface::Client init_client(nullptr);
787 std::unique_ptr<Connection> connection;
788 loop.sync([&] {
789 auto stream =
790 loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
791 connection = std::make_unique<Connection>(loop, kj::mv(stream));
792 init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
793 Connection* connection_ptr = connection.get();
794 connection->onDisconnect([&loop, connection_ptr] {
795 MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
796 delete connection_ptr;
797 });
798 });
799 return std::make_unique<ProxyClient<InitInterface>>(
800 kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
801}
802
807template <typename InitInterface, typename InitImpl>
808void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
809{
810 loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
811 // Disable deleter so proxy server object doesn't attempt to delete the
812 // init implementation when the proxy client is destroyed or
813 // disconnected.
814 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
815 });
816 auto it = loop.m_incoming_connections.begin();
817 it->onDisconnect([&loop, it] {
818 MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
819 loop.m_incoming_connections.erase(it);
820 });
821}
822
826template <typename InitInterface, typename InitImpl>
827void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
828{
829 auto* ptr = listener.get();
830 loop.m_task_set->add(ptr->accept().then(
831 [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
832 _Serve<InitInterface>(loop, kj::mv(stream), init);
833 _Listen<InitInterface>(loop, kj::mv(listener), init);
834 }));
835}
836
839template <typename InitInterface, typename InitImpl>
840void ServeStream(EventLoop& loop, int fd, InitImpl& init)
841{
842 _Serve<InitInterface>(
843 loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
844}
845
848template <typename InitInterface, typename InitImpl>
850{
851 loop.sync([&]() {
852 _Listen<InitInterface>(loop,
853 loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
854 init);
855 });
856}
857
858extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
859// Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
860// cannot be thread_local" which should not be a problem on modern platforms, and
861// could lead to a small memory leak at worst on older ones.
862
863} // namespace mp
864
865#endif // MP_PROXY_IO_H
int ret
if(!SetupNetworking())
int flags
Definition: bitcoin-tx.cpp:529
Helper class that detects when a promise is canceled.
Definition: util.h:293
Object holding network & rpc state associated with either an incoming server connection,...
Definition: proxy-io.h:407
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:153
EventLoopRef m_loop
Definition: proxy-io.h:446
::capnp::TwoPartyVatNetwork m_network
Definition: proxy-io.h:453
kj::Own< kj::AsyncIoStream > m_stream
Definition: proxy-io.h:447
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Definition: proxy-io.h:413
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
Definition: proxy-io.h:409
void onDisconnect(F &&f)
Add disconnect handler.
Definition: proxy-io.h:435
kj::Canceler m_canceler
Canceler for canceling promises that we want to discard when the connection is destroyed.
Definition: proxy-io.h:467
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
Definition: proxy-io.h:462
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:472
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:454
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:167
Event loop implementation.
Definition: proxy-io.h:239
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:327
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
Definition: proxy-io.h:269
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
Definition: proxy-io.h:242
std::list< Connection > m_incoming_connections
List of connections.
Definition: proxy-io.h:336
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:323
LogOptions m_log_opts
Logging options.
Definition: proxy-io.h:339
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:333
void * m_context
External context pointer.
Definition: proxy-io.h:342
EventLoop(const char *exe_name, std::function< void(bool, std::string)> old_callback, void *context=nullptr)
Backwards-compatible constructor for previous (deprecated) logging callback signature.
Definition: proxy-io.h:249
Event loop smart pointer automatically managing m_num_clients.
Definition: proxy.h:51
Definition: util.h:171
std::unique_lock< std::mutex > m_lock
Definition: util.h:183
friend Logger & operator<<(Logger &logger, T &&value)
Definition: proxy-io.h:179
Logger(const Logger &)=delete
bool enabled() const
Definition: proxy-io.h:197
~Logger() noexcept(false)
Definition: proxy-io.h:173
Logger & operator=(Logger &&)=delete
Log m_log_level
Definition: proxy-io.h:203
Logger(Logger &&)=delete
Logger(const LogOptions &options, Log log_level)
Definition: proxy-io.h:166
const LogOptions & m_options
Definition: proxy-io.h:202
friend Logger & operator<<(Logger &&logger, T &&value)
Definition: proxy-io.h:186
std::ostringstream m_buffer
Definition: proxy-io.h:204
Logger & operator=(const Logger &)=delete
Handler for kj::TaskSet failed task events.
Definition: proxy-io.h:119
EventLoop & m_loop
Definition: proxy-io.h:123
LoggingErrorHandler(EventLoop &loop)
Definition: proxy-io.h:121
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:43
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Definition: proxy.h:81
Interface::Client m_client
Definition: proxy.h:132
ProxyContext m_context
Definition: proxy.h:133
~ProxyClientBase() noexcept
Definition: proxy-io.h:552
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
Definition: proxy-io.h:492
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Definition: protocol.cpp:144
Context m_context
Definition: protocol.cpp:141
#define MP_GUARDED_BY(x)
Definition: util.h:160
#define MP_REQUIRES(x)
Definition: util.h:156
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
void Unlock(Lock &lock, Callback &&callback)
Definition: util.h:207
kj::StringPtr KJ_STRINGIFY(Log flags)
Definition: proxy.cpp:438
std::list< std::function< void()> > CleanupList
Definition: proxy.h:36
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
Definition: proxy-io.h:840
std::function< void(LogMessage)> LogFn
Definition: proxy-io.h:147
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:321
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:41
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
Definition: proxy-io.h:827
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
Definition: proxy-io.h:784
Log
Log flags. Update stringify function if changed!
Definition: proxy-io.h:127
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
Definition: proxy-io.h:849
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:433
ConnThreads::iterator ConnThread
Definition: proxy-io.h:638
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
Definition: proxy-io.h:808
typename CleanupList::iterator CleanupIt
Definition: proxy.h:37
void CleanupRun(CleanupList &fns)
Definition: proxy.h:39
std::map< Connection *, std::optional< ProxyClient< Thread > > > ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection.
Definition: proxy-io.h:637
Definition: common.h:29
#define MP_LOG(loop,...)
Definition: proxy-io.h:209
ThreadContext & thread_context
Definition: proxy-io.h:36
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Definition: proxy-io.h:37
Connection & connection
Definition: proxy-io.h:31
Log level
The severity level of this message.
Definition: proxy-io.h:144
std::string message
Message to be logged.
Definition: proxy-io.h:141
LogFn log_fn
External logging callback.
Definition: proxy-io.h:152
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
Definition: proxy-io.h:156
Log log_level
Messages with a severity level less than log_level will not be reported.
Definition: proxy-io.h:160
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Definition: proxy-io.h:91
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:25
EventLoopRef loop
Definition: proxy.h:71
Connection * connection
Definition: proxy.h:70
CleanupList cleanup_fns
Definition: proxy.h:72
ThreadContext & m_thread_context
Definition: proxy-io.h:110
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
Definition: proxy.h:148
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
Definition: proxy-io.h:577
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Definition: proxy.h:171
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
CallContext_ CallContext
Definition: proxy-io.h:46
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
Definition: proxy-io.h:65
CallContext & call_context
Definition: proxy-io.h:49
ProxyServer & proxy_server
Definition: proxy-io.h:48
bool request_canceled
For IPC methods that execute asynchronously, not on the event-loop thread, this is set to true if the...
Definition: proxy-io.h:63
Lock * cancel_lock
For IPC methods that execute asynchronously, not on the event-loop thread: lock preventing the event-...
Definition: proxy-io.h:56
Vat id for server side of connection.
Definition: proxy-io.h:484
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
Definition: proxy-io.h:659
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the callbackThread argument it passes in the req...
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the thread argument it passes in the request,...
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:661
Single element task queue used to handle recursive capnp calls.
Definition: proxy-io.h:357
void wait(Lock &lock, Predicate pred)
Definition: proxy-io.h:371
std::condition_variable m_cv
Definition: proxy-io.h:397
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
Definition: proxy-io.h:396
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)
bool post(Fn &&fn)
Definition: proxy-io.h:361
Waiter()=default
assert(!tx.IsCoinBase())