Bitcoin Core 31.99.0
P2P Digital Currency
proxy-io.h
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#ifndef MP_PROXY_IO_H
6#define MP_PROXY_IO_H
7
8#include <mp/proxy.h>
9#include <mp/util.h>
10
11#include <mp/proxy.capnp.h>
12
13#include <capnp/rpc-twoparty.h>
14
15#include <assert.h>
16#include <condition_variable>
17#include <functional>
18#include <kj/function.h>
19#include <map>
20#include <memory>
21#include <optional>
22#include <sstream>
23#include <string>
24#include <thread>
25
26namespace mp {
27struct ThreadContext;
28
30{
32};
33
35{
39 {
40 }
41};
42
43template <typename ProxyServer, typename CallContext_>
45{
46 using CallContext = CallContext_;
47
50 int req;
56 Lock* cancel_lock{nullptr};
63 bool request_canceled{false};
64
67 {
68 }
69};
70
71template <typename Interface, typename Params, typename Results>
72using ServerContext = ServerInvokeContext<ProxyServer<Interface>, ::capnp::CallContext<Params, Results>>;
73
74template <>
75struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
76{
78 // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair
79 ProxyClient(const ProxyClient&) = delete;
81
91 std::optional<CleanupIt> m_disconnect_cb;
92};
93
94template <>
95struct ProxyServer<Thread> final : public Thread::Server
96{
97public:
98 ProxyServer(Connection& connection, ThreadContext& thread_context, std::thread&& thread);
100 kj::Promise<void> getName(GetNameContext context) override;
101
106 template<typename T, typename Fn>
107 kj::Promise<T> post(Fn&& fn);
108
111 std::thread m_thread;
114 kj::Promise<void> m_thread_ready{kj::READY_NOW};
115};
116
118class LoggingErrorHandler : public kj::TaskSet::ErrorHandler
119{
120public:
122 void taskFailed(kj::Exception&& exception) override;
124};
125
127enum class Log {
128 Trace = 0,
129 Debug,
130 Info,
131 Warning,
132 Error,
133 Raise,
134};
135
136kj::StringPtr KJ_STRINGIFY(Log flags);
137
139
141 std::string message;
142
145};
146
147using LogFn = std::function<void(LogMessage)>;
148
150
156 size_t max_chars{200};
157
161};
162
164{
165public:
166 Logger(const LogOptions& options, Log log_level) : m_options(options), m_log_level(log_level) {}
167
168 Logger(Logger&&) = delete;
169 Logger& operator=(Logger&&) = delete;
170 Logger(const Logger&) = delete;
171 Logger& operator=(const Logger&) = delete;
172
173 ~Logger() noexcept(false)
174 {
175 if (enabled()) m_options.log_fn({std::move(m_buffer).str(), m_log_level});
176 }
177
178 template <typename T>
179 friend Logger& operator<<(Logger& logger, T&& value)
180 {
181 if (logger.enabled()) logger.m_buffer << std::forward<T>(value);
182 return logger;
183 }
184
185 template <typename T>
186 friend Logger& operator<<(Logger&& logger, T&& value)
187 {
188 return logger << std::forward<T>(value);
189 }
190
191 explicit operator bool() const
192 {
193 return enabled();
194 }
195
196private:
197 bool enabled() const
198 {
200 }
201
204 std::ostringstream m_buffer;
205};
206
207#define MP_LOGPLAIN(loop, ...) if (mp::Logger logger{(loop).m_log_opts, __VA_ARGS__}; logger) logger
208
209#define MP_LOG(loop, ...) MP_LOGPLAIN(loop, __VA_ARGS__) << "{" << LongThreadName((loop).m_exe_name) << "} "
210
211std::string LongThreadName(const char* exe_name);
212
239{
240public:
242 EventLoop(const char* exe_name, LogFn log_fn, void* context = nullptr)
243 : EventLoop(exe_name, LogOptions{std::move(log_fn)}, context){}
244
246 EventLoop(const char* exe_name, LogOptions log_opts, void* context = nullptr);
247
249 EventLoop(const char* exe_name, std::function<void(bool, std::string)> old_callback, void* context = nullptr)
250 : EventLoop(exe_name,
251 LogFn{[old_callback = std::move(old_callback)](LogMessage log_data) {old_callback(log_data.level == Log::Raise, std::move(log_data.message));}},
252 context){}
253
254 ~EventLoop();
255
259 void loop();
260
263 void post(kj::Function<void()> fn);
264
268 template <typename Callable>
269 void sync(Callable&& callable)
270 {
271 post(std::forward<Callable>(callable));
272 }
273
276 void addAsyncCleanup(std::function<void()> fn);
277
289 void startAsyncThread() MP_REQUIRES(m_mutex);
290
292 bool done() const MP_REQUIRES(m_mutex);
293
296 const char* m_exe_name;
297
299 std::thread::id m_thread_id = std::this_thread::get_id();
300
303 std::thread m_async_thread;
304
306 kj::Function<void()>* m_post_fn MP_GUARDED_BY(m_mutex) = nullptr;
307
309 std::optional<CleanupList> m_async_fns MP_GUARDED_BY(m_mutex);
310
312 int m_wait_fd = -1;
313
315 int m_post_fd = -1;
316
319 int m_num_clients MP_GUARDED_BY(m_mutex) = 0;
320
323 Mutex m_mutex;
324 std::condition_variable m_cv;
325
327 kj::AsyncIoContext m_io_context;
328
330 LoggingErrorHandler m_error_handler{*this};
331
333 std::unique_ptr<kj::TaskSet> m_task_set;
334
336 std::list<Connection> m_incoming_connections;
337
340
343
345 std::function<void()> testing_hook_makethread;
346
350 std::function<void()> testing_hook_makethread_created;
351
355 std::function<void()> testing_hook_async_request_start;
356
358 std::function<void()> testing_hook_async_request_done;
359};
360
372struct Waiter
373{
374 Waiter() = default;
375
376 template <typename Fn>
377 bool post(Fn&& fn)
378 {
379 const Lock lock(m_mutex);
380 if (m_fn) return false;
381 m_fn = std::forward<Fn>(fn);
382 m_cv.notify_all();
383 return true;
384 }
385
386 template <class Predicate>
387 void wait(Lock& lock, Predicate pred)
388 {
389 m_cv.wait(lock.m_lock, [&]() MP_REQUIRES(m_mutex) {
390 // Important for this to be "while (m_fn)", not "if (m_fn)" to avoid
391 // a lost-wakeup bug. A new m_fn and m_cv notification might be sent
392 // after the fn() call and before the lock.lock() call in this loop
393 // in the case where a capnp response is sent and a brand new
394 // request is immediately received.
395 while (m_fn) {
396 auto fn = std::move(*m_fn);
397 m_fn.reset();
398 Unlock(lock, fn);
399 }
400 const bool done = pred();
401 return done;
402 });
403 }
404
413 std::condition_variable m_cv;
414 std::optional<kj::Function<void()>> m_fn MP_GUARDED_BY(m_mutex);
415};
416
423{
424public:
425 Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
426 : m_loop(loop), m_stream(kj::mv(stream_)),
427 m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
428 m_rpc_system(::capnp::makeRpcClient(m_network)) {}
430 kj::Own<kj::AsyncIoStream>&& stream_,
431 const std::function<::capnp::Capability::Client(Connection&)>& make_client)
432 : m_loop(loop), m_stream(kj::mv(stream_)),
433 m_network(*m_stream, ::capnp::rpc::twoparty::Side::SERVER, ::capnp::ReaderOptions()),
434 m_rpc_system(::capnp::makeRpcServer(m_network, make_client(*this))) {}
435
441 ~Connection();
442
446 CleanupIt addSyncCleanup(std::function<void()> fn);
447 void removeSyncCleanup(CleanupIt it);
448
450 template <typename F>
451 void onDisconnect(F&& f)
452 {
453 // Add disconnect handler to local TaskSet to ensure it is canceled and
454 // will never run after connection object is destroyed. But when disconnect
455 // handler fires, do not call the function f right away, instead add it
456 // to the EventLoop TaskSet to avoid "Promise callback destroyed itself"
457 // error in the typical case where f deletes this Connection object.
458 m_on_disconnect.add(m_network.onDisconnect().then(
459 [f = std::forward<F>(f), this]() mutable { m_loop->m_task_set->add(kj::evalLater(kj::mv(f))); }));
460 }
461
463 kj::Own<kj::AsyncIoStream> m_stream;
464 LoggingErrorHandler m_error_handler{*m_loop};
468 kj::TaskSet m_on_disconnect{m_error_handler};
469 ::capnp::TwoPartyVatNetwork m_network;
470 std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId>> m_rpc_system;
471
472 // ThreadMap interface client, used to create a remote server thread when an
473 // client IPC call is being made for the first time from a new thread.
474 ThreadMap::Client m_thread_map{nullptr};
475
478 ::capnp::CapabilityServerSet<Thread> m_threads;
479
483 kj::Canceler m_canceler;
484
489};
490
500{
501 ::capnp::word scratch[4]{};
502 ::capnp::MallocMessageBuilder message{scratch};
503 ::capnp::rpc::twoparty::VatId::Builder vat_id{message.getRoot<::capnp::rpc::twoparty::VatId>()};
504 ServerVatId() { vat_id.setSide(::capnp::rpc::twoparty::Side::SERVER); }
505};
506
507template <typename Interface, typename Impl>
509 Connection* connection,
510 bool destroy_connection)
511 : m_client(std::move(client)), m_context(connection)
512
513{
514 // Handler for the connection getting destroyed before this client object.
515 auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
516 // Release client capability by move-assigning to temporary.
517 {
518 typename Interface::Client(std::move(m_client));
519 }
520 Lock lock{m_context.loop->m_mutex};
521 m_context.connection = nullptr;
522 });
523
524 // Two shutdown sequences are supported:
525 //
526 // - A normal sequence where client proxy objects are deleted by external
527 // code that no longer needs them
528 //
529 // - A garbage collection sequence where the connection or event loop shuts
530 // down while external code is still holding client references.
531 //
532 // The first case is handled here when m_context.connection is not null. The
533 // second case is handled by the disconnect_cb function, which sets
534 // m_context.connection to null so nothing happens here.
535 m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
536 {
537 // If the capnp interface defines a destroy method, call it to destroy
538 // the remote object, waiting for it to be deleted server side. If the
539 // capnp interface does not define a destroy method, this will just call
540 // an empty stub defined in the ProxyClientBase class and do nothing.
541 Sub::destroy(*this);
542
543 // FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
544 m_context.loop->sync([&]() {
545 // Remove disconnect callback on cleanup so it doesn't run and try
546 // to access this object after it's destroyed. This call needs to
547 // run inside loop->sync() on the event loop thread because
548 // otherwise, if there were an ill-timed disconnect, the
549 // onDisconnect handler could fire and delete the Connection object
550 // before the removeSyncCleanup call.
552
553 // Release client capability by move-assigning to temporary.
554 {
555 typename Interface::Client(std::move(m_client));
556 }
557 if (destroy_connection) {
558 delete m_context.connection;
559 m_context.connection = nullptr;
560 }
561 });
562 }
563 });
564 Sub::construct(*this);
565}
566
567template <typename Interface, typename Impl>
569{
570 CleanupRun(m_context.cleanup_fns);
571}
572
573template <typename Interface, typename Impl>
574ProxyServerBase<Interface, Impl>::ProxyServerBase(std::shared_ptr<Impl> impl, Connection& connection)
575 : m_impl(std::move(impl)), m_context(&connection)
576{
577 assert(m_impl);
578}
579
592template <typename Interface, typename Impl>
594{
595 if (m_impl) {
596 // If impl is non-null at this point, it means no client is waiting for
597 // the m_impl server object to be destroyed synchronously. This can
598 // happen either if the interface did not define a "destroy" method (see
599 // invokeDestroy method below), or if a destroy method was defined, but
600 // the connection was broken before it could be called.
601 //
602 // In either case, be conservative and run the cleanup on an
603 // asynchronous thread, to avoid destructors or cleanup functions
604 // blocking or deadlocking the current EventLoop thread, since they
605 // could be making IPC calls.
606 //
607 // Technically this is a little too conservative since if the interface
608 // defines a "destroy" method, but the destroy method does not accept a
609 // Context parameter specifying a worker thread, the cleanup method
610 // would run on the EventLoop thread normally (when connection is
611 // unbroken), but will not run on the EventLoop thread now (when
612 // connection is broken). Probably some refactoring of the destructor
613 // and invokeDestroy function is possible to make this cleaner and more
614 // consistent.
615 m_context.loop->addAsyncCleanup([impl=std::move(m_impl), fns=std::move(m_context.cleanup_fns)]() mutable {
616 impl.reset();
617 CleanupRun(fns);
618 });
619 }
620 assert(m_context.cleanup_fns.empty());
621}
622
640template <typename Interface, typename Impl>
642{
643 m_impl.reset();
644 CleanupRun(m_context.cleanup_fns);
645}
646
653using ConnThreads = std::map<Connection*, std::optional<ProxyClient<Thread>>>;
654using ConnThread = ConnThreads::iterator;
655
656// Retrieve ProxyClient<Thread> object associated with this connection from a
657// map, or create a new one and insert it into the map. Return map iterator and
658// inserted bool.
659std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread);
660
675{
677 std::string thread_name;
678
694 std::unique_ptr<Waiter> waiter = nullptr;
695
713 ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex);
714
724 ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex);
725
729 bool loop_thread = false;
730};
731
732template<typename T, typename Fn>
733kj::Promise<T> ProxyServer<Thread>::post(Fn&& fn)
734{
735 auto ready = kj::newPromiseAndFulfiller<void>(); // Signaled when waiter is ready to post again.
736 auto cancel_monitor_ptr = kj::heap<CancelMonitor>();
737 CancelMonitor& cancel_monitor = *cancel_monitor_ptr;
738 // Keep a reference to the ProxyServer<Thread> instance by assigning it to
739 // the self variable. ProxyServer instances are reference-counted and if the
740 // client drops its reference, this variable keeps the instance alive until
741 // the thread finishes executing. The self variable needs to be destroyed on
742 // the event loop thread so it is freed in a sync() call below.
743 auto self = thisCap();
744 auto ret = m_thread_ready.then([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
745 auto result = kj::newPromiseAndFulfiller<T>(); // Signaled when fn() is called, with its return value.
746 bool posted = m_thread_context.waiter->post([this, self = std::move(self), fn = std::forward<Fn>(fn), ready_fulfiller = kj::mv(ready_fulfiller), result_fulfiller = kj::mv(result.fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
747 // Fulfill ready.promise now, as soon as the Waiter starts executing
748 // this lambda, so the next ProxyServer<Thread>::post() call can
749 // immediately call waiter->post(). It is important to do this
750 // before calling fn() because fn() can make an IPC call back to the
751 // client, which can make another IPC call to this server thread.
752 // (This typically happens when IPC methods take std::function
753 // parameters.) When this happens the second call to the server
754 // thread should not be blocked waiting for the first call.
755 m_loop->sync([ready_fulfiller = kj::mv(ready_fulfiller)]() mutable {
756 ready_fulfiller->fulfill();
757 ready_fulfiller = nullptr;
758 });
759 std::optional<T> result_value;
760 kj::Maybe<kj::Exception> exception{kj::runCatchingExceptions([&]{ result_value.emplace(fn(*cancel_monitor_ptr)); })};
761 m_loop->sync([this, &result_value, &exception, self = kj::mv(self), result_fulfiller = kj::mv(result_fulfiller), cancel_monitor_ptr = kj::mv(cancel_monitor_ptr)]() mutable {
762 // Destroy CancelMonitor here before fulfilling or rejecting the
763 // promise so it doesn't get triggered when the promise is
764 // destroyed.
765 cancel_monitor_ptr = nullptr;
766 // Send results to the fulfiller. Technically it would be ok to
767 // skip this if promise was canceled, but it's simpler to just
768 // do it unconditionally.
769 KJ_IF_MAYBE(e, exception) {
770 assert(!result_value);
771 result_fulfiller->reject(kj::mv(*e));
772 } else {
773 assert(result_value);
774 result_fulfiller->fulfill(kj::mv(*result_value));
775 result_value.reset();
776 }
777 result_fulfiller = nullptr;
778 // Use evalLater to destroy the ProxyServer<Thread> self
779 // reference, if it is the last reference, because the
780 // ProxyServer<Thread> destructor needs to join the thread,
781 // which can't happen until this sync() block has exited.
782 m_loop->m_task_set->add(kj::evalLater([self = kj::mv(self)] {}));
783 });
784 });
785 // Assert that calling Waiter::post did not fail. It could only return
786 // false if a new function was posted before the previous one finished
787 // executing, but new functions are only posted when m_thread_ready is
788 // signaled, so this should never happen.
789 assert(posted);
790 return kj::mv(result.promise);
791 }).attach(kj::heap<CancelProbe>(cancel_monitor));
792 m_thread_ready = kj::mv(ready.promise);
793 return ret;
794}
795
799template <typename InitInterface>
800std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
801{
802 typename InitInterface::Client init_client(nullptr);
803 std::unique_ptr<Connection> connection;
804 loop.sync([&] {
805 auto stream =
806 loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
807 connection = std::make_unique<Connection>(loop, kj::mv(stream));
808 init_client = connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
809 Connection* connection_ptr = connection.get();
810 connection->onDisconnect([&loop, connection_ptr] {
811 MP_LOG(loop, Log::Warning) << "IPC client: unexpected network disconnect.";
812 delete connection_ptr;
813 });
814 });
815 return std::make_unique<ProxyClient<InitInterface>>(
816 kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
817}
818
823template <typename InitInterface, typename InitImpl>
824void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
825{
826 loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
827 // Disable deleter so proxy server object doesn't attempt to delete the
828 // init implementation when the proxy client is destroyed or
829 // disconnected.
830 return kj::heap<ProxyServer<InitInterface>>(std::shared_ptr<InitImpl>(&init, [](InitImpl*){}), connection);
831 });
832 auto it = loop.m_incoming_connections.begin();
833 MP_LOG(loop, Log::Info) << "IPC server: socket connected.";
834 it->onDisconnect([&loop, it] {
835 MP_LOG(loop, Log::Info) << "IPC server: socket disconnected.";
836 loop.m_incoming_connections.erase(it);
837 });
838}
839
843template <typename InitInterface, typename InitImpl>
844void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
845{
846 auto* ptr = listener.get();
847 loop.m_task_set->add(ptr->accept().then(
848 [&loop, &init, listener = kj::mv(listener)](kj::Own<kj::AsyncIoStream>&& stream) mutable {
849 _Serve<InitInterface>(loop, kj::mv(stream), init);
850 _Listen<InitInterface>(loop, kj::mv(listener), init);
851 }));
852}
853
856template <typename InitInterface, typename InitImpl>
857void ServeStream(EventLoop& loop, int fd, InitImpl& init)
858{
859 _Serve<InitInterface>(
860 loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
861}
862
865template <typename InitInterface, typename InitImpl>
867{
868 loop.sync([&]() {
869 _Listen<InitInterface>(loop,
870 loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
871 init);
872 });
873}
874
875extern thread_local ThreadContext g_thread_context; // NOLINT(bitcoin-nontrivial-threadlocal)
876// Silence nonstandard bitcoin tidy error "Variable with non-trivial destructor
877// cannot be thread_local" which should not be a problem on modern platforms, and
878// could lead to a small memory leak at worst on older ones.
879
880} // namespace mp
881
882#endif // MP_PROXY_IO_H
int ret
if(!SetupNetworking())
int flags
Definition: bitcoin-tx.cpp:530
Helper class that detects when a promise is canceled.
Definition: util.h:293
Object holding network & rpc state associated with either an incoming server connection,...
Definition: proxy-io.h:423
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:153
EventLoopRef m_loop
Definition: proxy-io.h:462
::capnp::TwoPartyVatNetwork m_network
Definition: proxy-io.h:469
kj::Own< kj::AsyncIoStream > m_stream
Definition: proxy-io.h:463
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_, const std::function<::capnp::Capability::Client(Connection &)> &make_client)
Definition: proxy-io.h:429
Connection(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream_)
Definition: proxy-io.h:425
void onDisconnect(F &&f)
Add disconnect handler.
Definition: proxy-io.h:451
kj::Canceler m_canceler
Canceler for canceling promises that we want to discard when the connection is destroyed.
Definition: proxy-io.h:483
::capnp::CapabilityServerSet< Thread > m_threads
Collection of server-side IPC worker threads (ProxyServer<Thread> objects previously returned by Thre...
Definition: proxy-io.h:478
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:488
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:470
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:167
Event loop implementation.
Definition: proxy-io.h:239
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:327
void sync(Callable &&callable)
Wrapper around EventLoop::post that takes advantage of the fact that callable will not go out of scop...
Definition: proxy-io.h:269
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object with default logging options.
Definition: proxy-io.h:242
std::list< Connection > m_incoming_connections
List of connections.
Definition: proxy-io.h:336
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:323
LogOptions m_log_opts
Logging options.
Definition: proxy-io.h:339
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:333
void * m_context
External context pointer.
Definition: proxy-io.h:342
std::function< void()> testing_hook_makethread
Hook called when ProxyServer<ThreadMap>::makeThread() is called.
Definition: proxy-io.h:345
std::function< void()> testing_hook_makethread_created
Hook called on the worker thread inside makeThread(), after the thread context is set up and thread_c...
Definition: proxy-io.h:350
EventLoop(const char *exe_name, std::function< void(bool, std::string)> old_callback, void *context=nullptr)
Backwards-compatible constructor for previous (deprecated) logging callback signature.
Definition: proxy-io.h:249
std::function< void()> testing_hook_async_request_done
Hook called on the worker thread just before returning results.
Definition: proxy-io.h:358
std::function< void()> testing_hook_async_request_start
Hook called on the worker thread when it starts to execute an async request.
Definition: proxy-io.h:355
Event loop smart pointer automatically managing m_num_clients.
Definition: proxy.h:51
Definition: util.h:171
std::unique_lock< std::mutex > m_lock
Definition: util.h:183
friend Logger & operator<<(Logger &logger, T &&value)
Definition: proxy-io.h:179
Logger(const Logger &)=delete
bool enabled() const
Definition: proxy-io.h:197
~Logger() noexcept(false)
Definition: proxy-io.h:173
Logger & operator=(Logger &&)=delete
Log m_log_level
Definition: proxy-io.h:203
Logger(Logger &&)=delete
Logger(const LogOptions &options, Log log_level)
Definition: proxy-io.h:166
const LogOptions & m_options
Definition: proxy-io.h:202
friend Logger & operator<<(Logger &&logger, T &&value)
Definition: proxy-io.h:186
std::ostringstream m_buffer
Definition: proxy-io.h:204
Logger & operator=(const Logger &)=delete
Handler for kj::TaskSet failed task events.
Definition: proxy-io.h:119
EventLoop & m_loop
Definition: proxy-io.h:123
LoggingErrorHandler(EventLoop &loop)
Definition: proxy-io.h:121
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:43
Base class for generated ProxyClient classes that implement a C++ interface and forward calls to a ca...
Definition: proxy.h:81
Interface::Client m_client
Definition: proxy.h:132
ProxyContext m_context
Definition: proxy.h:133
~ProxyClientBase() noexcept
Definition: proxy-io.h:568
ProxyClientBase(typename Interface::Client client, Connection *connection, bool destroy_connection)
Construct libmultiprocess client object wrapping Cap'n Proto client object with a reference to the as...
Definition: proxy-io.h:508
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Definition: protocol.cpp:144
Context m_context
Definition: protocol.cpp:141
#define MP_GUARDED_BY(x)
Definition: util.h:160
#define MP_REQUIRES(x)
Definition: util.h:156
Definition: basic.cpp:8
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
void Unlock(Lock &lock, Callback &&callback)
Definition: util.h:207
kj::StringPtr KJ_STRINGIFY(Log flags)
Definition: proxy.cpp:441
std::list< std::function< void()> > CleanupList
Definition: proxy.h:36
void ServeStream(EventLoop &loop, int fd, InitImpl &init)
Given stream file descriptor and an init object, handle requests on the stream by calling methods on ...
Definition: proxy-io.h:857
std::function< void(LogMessage)> LogFn
Definition: proxy-io.h:147
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:321
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:41
void _Listen(EventLoop &loop, kj::Own< kj::ConnectionReceiver > &&listener, InitImpl &init)
Given connection receiver and an init object, handle incoming connections by calling _Serve,...
Definition: proxy-io.h:844
std::unique_ptr< ProxyClient< InitInterface > > ConnectStream(EventLoop &loop, int fd)
Given stream file descriptor, make a new ProxyClient object to send requests over the stream.
Definition: proxy-io.h:800
Log
Log flags. Update stringify function if changed!
Definition: proxy-io.h:127
void ListenConnections(EventLoop &loop, int fd, InitImpl &init)
Given listening socket file descriptor and an init object, handle incoming connections and requests b...
Definition: proxy-io.h:866
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:436
ConnThreads::iterator ConnThread
Definition: proxy-io.h:654
void _Serve(EventLoop &loop, kj::Own< kj::AsyncIoStream > &&stream, InitImpl &init)
Given stream and init objects, construct a new ProxyServer object that handles requests from the stre...
Definition: proxy-io.h:824
typename CleanupList::iterator CleanupIt
Definition: proxy.h:37
void CleanupRun(CleanupList &fns)
Definition: proxy.h:39
std::map< Connection *, std::optional< ProxyClient< Thread > > > ConnThreads
Map from Connection to local or remote thread handle which will be used over that connection.
Definition: proxy-io.h:653
Definition: common.h:30
#define MP_LOG(loop,...)
Definition: proxy-io.h:209
ThreadContext & thread_context
Definition: proxy-io.h:36
ClientInvokeContext(Connection &conn, ThreadContext &thread_context)
Definition: proxy-io.h:37
Connection & connection
Definition: proxy-io.h:31
Log level
The severity level of this message.
Definition: proxy-io.h:144
std::string message
Message to be logged.
Definition: proxy-io.h:141
LogFn log_fn
External logging callback.
Definition: proxy-io.h:152
size_t max_chars
Maximum number of characters to use when representing request and response structs as strings.
Definition: proxy-io.h:156
Log log_level
Messages with a severity level less than log_level will not be reported.
Definition: proxy-io.h:160
ProxyClient(const ProxyClient &)=delete
std::optional< CleanupIt > m_disconnect_cb
Reference to callback function that is run if there is a sudden disconnect and the Connection object ...
Definition: proxy-io.h:91
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:25
EventLoopRef loop
Definition: proxy.h:71
Connection * connection
Definition: proxy.h:70
CleanupList cleanup_fns
Definition: proxy.h:72
ThreadContext & m_thread_context
Definition: proxy-io.h:110
Base class for generated ProxyServer classes that implement capnp server methods and forward calls to...
Definition: proxy.h:148
virtual ~ProxyServerBase()
ProxyServer destructor, called from the EventLoop thread by Cap'n Proto garbage collection code after...
Definition: proxy-io.h:593
std::shared_ptr< Impl > m_impl
Implementation pointer that may or may not be owned and deleted when this capnp server goes out of sc...
Definition: proxy.h:171
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
CallContext_ CallContext
Definition: proxy-io.h:46
ServerInvokeContext(ProxyServer &proxy_server, CallContext &call_context, int req)
Definition: proxy-io.h:65
CallContext & call_context
Definition: proxy-io.h:49
ProxyServer & proxy_server
Definition: proxy-io.h:48
bool request_canceled
For IPC methods that execute asynchronously, not on the event-loop thread, this is set to true if the...
Definition: proxy-io.h:63
Lock * cancel_lock
For IPC methods that execute asynchronously, not on the event-loop thread: lock preventing the event-...
Definition: proxy-io.h:56
Vat id for server side of connection.
Definition: proxy-io.h:500
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
Definition: proxy-io.h:675
ConnThreads callback_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the callbackThread argument it passes in the req...
ConnThreads request_threads MP_GUARDED_BY(waiter->m_mutex)
When client is making a request to a server, this is the thread argument it passes in the request,...
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:677
Single element task queue used to handle recursive capnp calls.
Definition: proxy-io.h:373
void wait(Lock &lock, Predicate pred)
Definition: proxy-io.h:387
std::condition_variable m_cv
Definition: proxy-io.h:413
Mutex m_mutex
Mutex mainly used internally by waiter class, but also used externally to guard access to related sta...
Definition: proxy-io.h:412
std::optional< kj::Function< void()> > m_fn MP_GUARDED_BY(m_mutex)
bool post(Fn &&fn)
Definition: proxy-io.h:377
Waiter()=default
assert(!tx.IsCoinBase())