Bitcoin Core 30.99.0
P2P Digital Currency
proxy.cpp
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <mp/proxy.h>
6
7#include <mp/proxy-io.h>
8#include <mp/proxy-types.h>
9#include <mp/proxy.capnp.h>
10#include <mp/type-threadmap.h>
11#include <mp/util.h>
12
13#include <atomic>
14#include <capnp/capability.h>
15#include <capnp/common.h> // IWYU pragma: keep
16#include <capnp/rpc.h>
17#include <condition_variable>
18#include <functional>
19#include <future>
20#include <kj/async.h>
21#include <kj/async-io.h>
22#include <kj/async-prelude.h>
23#include <kj/common.h>
24#include <kj/debug.h>
25#include <kj/function.h>
26#include <kj/memory.h>
27#include <map>
28#include <memory>
29#include <optional>
30#include <stdexcept>
31#include <string>
32#include <sys/socket.h>
33#include <thread>
34#include <tuple>
35#include <unistd.h>
36#include <utility>
37
38namespace mp {
39
41
42void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
43{
44 KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
45 m_loop.log() << "Uncaught exception in daemonized task.";
46}
47
48EventLoopRef::EventLoopRef(EventLoop& loop, Lock* lock) : m_loop(&loop), m_lock(lock)
49{
50 auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex}};
51 loop_lock->assert_locked(m_loop->m_mutex);
52 m_loop->m_num_clients += 1;
53}
54
55// Due to the conditionals in this function, MP_NO_TSA is required to avoid
56// error "error: mutex 'loop_lock' is not held on every path through here
57// [-Wthread-safety-analysis]"
59{
60 if (auto* loop{m_loop}) {
61 m_loop = nullptr;
62 auto loop_lock{PtrOrValue{m_lock, loop->m_mutex}};
63 loop_lock->assert_locked(loop->m_mutex);
64 assert(loop->m_num_clients > 0);
65 loop->m_num_clients -= 1;
66 if (loop->done()) {
67 loop->m_cv.notify_all();
68 int post_fd{loop->m_post_fd};
69 loop_lock->unlock();
70 char buffer = 0;
71 KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
72 // By default, do not try to relock `loop_lock` after writing,
73 // because the event loop could wake up and destroy itself and the
74 // mutex might no longer exist.
75 if (relock) loop_lock->lock();
76 }
77 }
78}
79
80ProxyContext::ProxyContext(Connection* connection) : connection(connection), loop{*connection->m_loop} {}
81
83{
84 // Connection destructor is always called on the event loop thread. If this
85 // is a local disconnect, it will trigger I/O, so this needs to run on the
86 // event loop thread, and if there was a remote disconnect, this is called
87 // by an onDisconnect callback directly from the event loop thread.
88 assert(std::this_thread::get_id() == m_loop->m_thread_id);
89 // Shut down RPC system first, since this will garbage collect any
90 // ProxyServer objects that were not freed before the connection was closed.
91 // Typically all ProxyServer objects associated with this connection will be
92 // freed before this call returns. However that will not be the case if
93 // there are asynchronous IPC calls over this connection still currently
94 // executing. In that case, Cap'n Proto will destroy the ProxyServer objects
95 // after the calls finish.
96 m_rpc_system.reset();
97
98 // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
99 // handlers are in the async list.
100 //
101 // The ProxyClient cleanup handlers are synchronous because they are fast
102 // and don't do anything besides release capnp resources and reset state so
103 // future calls to client methods immediately throw exceptions instead of
104 // trying to communicating across the socket. The synchronous callbacks set
105 // ProxyClient capability pointers to null, so new method calls on client
106 // objects fail without triggering i/o or relying on event loop which may go
107 // out of scope or trigger obscure capnp i/o errors.
108 //
109 // The ProxySever cleanup handlers call user defined destructors on server
110 // object, which can run arbitrary blocking bitcoin code so they have to run
111 // asynchronously in a different thread. The asynchronous cleanup functions
112 // intentionally aren't started until after the synchronous cleanup
113 // functions run, so client objects are fully disconnected before bitcoin
114 // code in the destructors are run. This way if the bitcoin code tries to
115 // make client requests the requests will just fail immediately instead of
116 // sending i/o or accessing the event loop.
117 //
118 // The context where Connection objects are destroyed and this destructor is invoked
119 // is different depending on whether this is an outgoing connection being used
120 // to make an Init.makeX call() (e.g. Init.makeNode or Init.makeWalletClient) or an incoming
121 // connection implementing the Init interface and handling the Init.makeX() calls.
122 //
123 // Either way when a connection is closed, capnp behavior is to call all
124 // ProxyServer object destructors first, and then trigger an onDisconnect
125 // callback.
126 //
127 // On incoming side of the connection, the onDisconnect callback is written
128 // to delete the Connection object from the m_incoming_connections and call
129 // this destructor which calls Connection::disconnect.
130 //
131 // On the outgoing side, the Connection object is owned by top level client
132 // object client, which onDisconnect handler doesn't have ready access to,
133 // so onDisconnect handler just calls Connection::disconnect directly
134 // instead.
135 //
136 // Either way disconnect code runs in the event loop thread and called both
137 // on clean and unclean shutdowns. In unclean shutdown case when the
138 // connection is broken, sync and async cleanup lists will filled with
139 // callbacks. In the clean shutdown case both lists will be empty.
140 Lock lock{m_loop->m_mutex};
141 while (!m_sync_cleanup_fns.empty()) {
142 CleanupList fn;
143 fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
144 Unlock(lock, fn.front());
145 }
146}
147
148CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
149{
150 const Lock lock(m_loop->m_mutex);
151 // Add cleanup callbacks to the front of list, so sync cleanup functions run
152 // in LIFO order. This is a good approach because sync cleanup functions are
153 // added as client objects are created, and it is natural to clean up
154 // objects in the reverse order they were created. In practice, however,
155 // order should not be significant because the cleanup callbacks run
156 // synchronously in a single batch when the connection is broken, and they
157 // only reset the connection pointers in the client objects without actually
158 // deleting the client objects.
159 return m_sync_cleanup_fns.emplace(m_sync_cleanup_fns.begin(), std::move(fn));
160}
161
163{
164 // Require cleanup functions to be removed on the event loop thread to avoid
165 // needing to deal with them being removed in the middle of a disconnect.
166 assert(std::this_thread::get_id() == m_loop->m_thread_id);
167 const Lock lock(m_loop->m_mutex);
168 m_sync_cleanup_fns.erase(it);
169}
170
171void EventLoop::addAsyncCleanup(std::function<void()> fn)
172{
173 const Lock lock(m_mutex);
174 // Add async cleanup callbacks to the back of the list. Unlike the sync
175 // cleanup list, this list order is more significant because it determines
176 // the order server objects are destroyed when there is a sudden disconnect,
177 // and it is possible objects may need to be destroyed in a certain order.
178 // This function is called in ProxyServerBase destructors, and since capnp
179 // destroys ProxyServer objects in LIFO order, we should preserve this
180 // order, and add cleanup callbacks to the end of the list so they can be
181 // run starting from the beginning of the list.
182 //
183 // In bitcoin core, running these callbacks in the right order is
184 // particularly important for the wallet process, because it uses blocking
185 // shared_ptrs and requires Chain::Notification pointers owned by the node
186 // process to be destroyed before the WalletLoader objects owned by the node
187 // process, otherwise shared pointer counts of the CWallet objects (which
188 // inherit from Chain::Notification) will not be 1 when WalletLoader
189 // destructor runs and it will wait forever for them to be released.
190 m_async_fns->emplace_back(std::move(fn));
192}
193
194EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
195 : m_exe_name(exe_name),
196 m_io_context(kj::setupAsyncIo()),
197 m_task_set(new kj::TaskSet(m_error_handler)),
198 m_context(context)
199{
200 m_log_opts.log_fn = log_fn;
201 int fds[2];
202 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
203 m_wait_fd = fds[0];
204 m_post_fd = fds[1];
205}
206
208{
209 if (m_async_thread.joinable()) m_async_thread.join();
210 const Lock lock(m_mutex);
211 KJ_ASSERT(m_post_fn == nullptr);
212 KJ_ASSERT(!m_async_fns);
213 KJ_ASSERT(m_wait_fd == -1);
214 KJ_ASSERT(m_post_fd == -1);
215 KJ_ASSERT(m_num_clients == 0);
216
217 // Spin event loop. wait for any promises triggered by RPC shutdown.
218 // auto cleanup = kj::evalLater([]{});
219 // cleanup.wait(m_io_context.waitScope);
220}
221
223{
226 KJ_DEFER(g_thread_context.loop_thread = false);
227
228 {
229 const Lock lock(m_mutex);
230 assert(!m_async_fns);
231 m_async_fns.emplace();
232 }
233
234 kj::Own<kj::AsyncIoStream> wait_stream{
235 m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
236 int post_fd{m_post_fd};
237 char buffer = 0;
238 for (;;) {
239 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
240 if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
241 Lock lock(m_mutex);
242 if (m_post_fn) {
243 Unlock(lock, *m_post_fn);
244 m_post_fn = nullptr;
245 m_cv.notify_all();
246 } else if (done()) {
247 // Intentionally do not break if m_post_fn was set, even if done()
248 // would return true, to ensure that the EventLoopRef write(post_fd)
249 // call always succeeds and the loop does not exit between the time
250 // that the done condition is set and the write call is made.
251 break;
252 }
253 }
254 log() << "EventLoop::loop done, cancelling event listeners.";
255 m_task_set.reset();
256 log() << "EventLoop::loop bye.";
257 wait_stream = nullptr;
258 KJ_SYSCALL(::close(post_fd));
259 const Lock lock(m_mutex);
260 m_wait_fd = -1;
261 m_post_fd = -1;
262 m_async_fns.reset();
263 m_cv.notify_all();
264}
265
266void EventLoop::post(kj::Function<void()> fn)
267{
268 if (std::this_thread::get_id() == m_thread_id) {
269 fn();
270 return;
271 }
272 Lock lock(m_mutex);
273 EventLoopRef ref(*this, &lock);
274 m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_post_fn == nullptr; });
275 m_post_fn = &fn;
276 int post_fd{m_post_fd};
277 Unlock(lock, [&] {
278 char buffer = 0;
279 KJ_SYSCALL(write(post_fd, &buffer, 1));
280 });
281 m_cv.wait(lock.m_lock, [this, &fn]() MP_REQUIRES(m_mutex) { return m_post_fn != &fn; });
282}
283
285{
286 assert (std::this_thread::get_id() == m_thread_id);
287 if (m_async_thread.joinable()) {
288 // Notify to wake up the async thread if it is already running.
289 m_cv.notify_all();
290 } else if (!m_async_fns->empty()) {
291 m_async_thread = std::thread([this] {
292 Lock lock(m_mutex);
293 while (m_async_fns) {
294 if (!m_async_fns->empty()) {
295 EventLoopRef ref{*this, &lock};
296 const std::function<void()> fn = std::move(m_async_fns->front());
297 m_async_fns->pop_front();
298 Unlock(lock, fn);
299 // Important to relock because of the wait() call below.
300 ref.reset(/*relock=*/true);
301 // Continue without waiting in case there are more async_fns
302 continue;
303 }
304 m_cv.wait(lock.m_lock);
305 }
306 });
307 }
308}
309
310bool EventLoop::done() const
311{
312 assert(m_num_clients >= 0);
313 return m_num_clients == 0 && m_async_fns->empty();
314}
315
316std::tuple<ConnThread, bool> SetThread(GuardedRef<ConnThreads> threads, Connection* connection, const std::function<Thread::Client()>& make_thread)
317{
318 assert(std::this_thread::get_id() == connection->m_loop->m_thread_id);
319 ConnThread thread;
320 bool inserted;
321 {
322 const Lock lock(threads.mutex);
323 std::tie(thread, inserted) = threads.ref.try_emplace(connection);
324 }
325 if (inserted) {
326 thread->second.emplace(make_thread(), connection, /* destroy_connection= */ false);
327 thread->second->m_disconnect_cb = connection->addSyncCleanup([threads, thread] {
328 // Note: it is safe to use the `thread` iterator in this cleanup
329 // function, because the iterator would only be invalid if the map entry
330 // was removed, and if the map entry is removed the ProxyClient<Thread>
331 // destructor unregisters the cleanup.
332
333 // Connection is being destroyed before thread client is, so reset
334 // thread client m_disconnect_cb member so thread client destructor does not
335 // try to unregister this callback after connection is destroyed.
336 thread->second->m_disconnect_cb.reset();
337
338 // Remove connection pointer about to be destroyed from the map
339 const Lock lock(threads.mutex);
340 threads.ref.erase(thread);
341 });
342 }
343 return {thread, inserted};
344}
345
347{
348 // If thread is being destroyed before connection is destroyed, remove the
349 // cleanup callback that was registered to handle the connection being
350 // destroyed before the thread being destroyed.
351 if (m_disconnect_cb) {
352 // Remove disconnect callback on the event loop thread with
353 // loop->sync(), so if the connection is broken there is not a race
354 // between this thread trying to remove the callback and the disconnect
355 // handler attempting to call it.
356 m_context.loop->sync([&]() {
357 if (m_disconnect_cb) {
358 m_context.connection->removeSyncCleanup(*m_disconnect_cb);
359 }
360 });
361 }
362}
363
364ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
365 : m_thread_context(thread_context), m_thread(std::move(thread))
366{
367 assert(m_thread_context.waiter.get() != nullptr);
368}
369
371{
372 if (!m_thread.joinable()) return;
373 // Stop async thread and wait for it to exit. Need to wait because the
374 // m_thread handle needs to outlive the thread to avoid "terminate called
375 // without an active exception" error. An alternative to waiting would be
376 // detach the thread, but this would introduce nondeterminism which could
377 // make code harder to debug or extend.
378 assert(m_thread_context.waiter.get());
379 std::unique_ptr<Waiter> waiter;
380 {
381 const Lock lock(m_thread_context.waiter->m_mutex);
384 waiter = std::move(m_thread_context.waiter);
386 assert(!waiter->m_fn);
387 // Clear client maps now to avoid deadlock in m_thread.join() call
388 // below. The maps contain Thread::Client objects that need to be
389 // destroyed from the event loop thread (this thread), which can't
390 // happen if this thread is busy calling join.
391 m_thread_context.request_threads.clear();
392 m_thread_context.callback_threads.clear();
394 waiter->m_cv.notify_all();
395 }
396 m_thread.join();
397}
398
399kj::Promise<void> ProxyServer<Thread>::getName(GetNameContext context)
400{
401 context.getResults().setResult(m_thread_context.thread_name);
402 return kj::READY_NOW;
403}
404
406
407kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
408{
409 const std::string from = context.getParams().getName();
410 std::promise<ThreadContext*> thread_context;
411 std::thread thread([&thread_context, from, this]() {
412 g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
413 g_thread_context.waiter = std::make_unique<Waiter>();
414 thread_context.set_value(&g_thread_context);
415 Lock lock(g_thread_context.waiter->m_mutex);
416 // Wait for shutdown signal from ProxyServer<Thread> destructor (signal
417 // is just waiter getting set to null.)
418 g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
419 });
420 auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
421 auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
422 context.getResults().setResult(kj::mv(thread_client));
423 return kj::READY_NOW;
424}
425
426std::atomic<int> server_reqs{0};
427
428std::string LongThreadName(const char* exe_name)
429{
431}
432
433} // namespace mp
Object holding network & rpc state associated with either an incoming server connection,...
Definition: proxy-io.h:329
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:148
EventLoopRef m_loop
Definition: proxy-io.h:368
~Connection()
Run cleanup functions.
Definition: proxy.cpp:82
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:386
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:373
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:162
Event loop implementation.
Definition: proxy-io.h:169
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:255
void startAsyncThread() MP_REQUIRES(m_mutex)
Start asynchronous worker thread if necessary.
Definition: proxy.cpp:284
std::condition_variable m_cv
Definition: proxy-io.h:252
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
Definition: proxy.cpp:194
void addAsyncCleanup(std::function< void()> fn)
Register cleanup function to run on asynchronous worker thread without blocking the event loop thread...
Definition: proxy.cpp:171
Logger log()
Definition: proxy-io.h:213
void loop()
Run event loop.
Definition: proxy.cpp:222
Mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:251
bool done() const MP_REQUIRES(m_mutex)
Check if loop should exit.
Definition: proxy.cpp:310
int m_post_fd
Pipe write handle used to wake up the event loop thread.
Definition: proxy-io.h:243
LogOptions m_log_opts
Logging options.
Definition: proxy-io.h:267
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:261
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
Definition: proxy-io.h:240
std::thread m_async_thread
Handle of an async worker thread.
Definition: proxy-io.h:231
std::thread::id m_thread_id
ID of the event loop thread.
Definition: proxy-io.h:227
void post(kj::Function< void()> fn)
Run function on event loop thread.
Definition: proxy.cpp:266
Event loop smart pointer automatically managing m_num_clients.
Definition: proxy.h:51
EventLoopRef(EventLoop &loop, Lock *lock=nullptr)
Definition: proxy.cpp:48
void reset(bool relock=false)
Definition: proxy.cpp:58
EventLoop * m_loop
Definition: proxy.h:63
Lock * m_lock
Definition: proxy.h:64
Definition: util.h:170
std::unique_lock< std::mutex > m_lock
Definition: util.h:182
EventLoop & m_loop
Definition: proxy-io.h:98
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:42
boost::signals2::scoped_connection m_connection
Definition: interfaces.cpp:30
std::optional< mp::EventLoop > m_loop
EventLoop object which manages I/O events for all connections.
Definition: protocol.cpp:104
Context m_context
Definition: protocol.cpp:101
#define MP_NO_TSA
Definition: util.h:160
#define MP_REQUIRES(x)
Definition: util.h:155
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
void Unlock(Lock &lock, Callback &&callback)
Definition: util.h:206
std::list< std::function< void()> > CleanupList
Definition: proxy.h:36
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
Definition: util.cpp:48
std::atomic< int > server_reqs
Definition: proxy.cpp:426
std::tuple< ConnThread, bool > SetThread(GuardedRef< ConnThreads > threads, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:316
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:40
std::function< void(bool raise, std::string message)> LogFn
Definition: proxy-io.h:101
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:428
ConnThreads::iterator ConnThread
Definition: proxy-io.h:552
typename CleanupList::iterator CleanupIt
Definition: proxy.h:37
Mutex & mutex
Definition: util.h:188
LogFn log_fn
External logging callback.
Definition: proxy-io.h:134
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:25
ProxyContext(Connection *connection)
Definition: proxy.cpp:80
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
Convenient wrapper around std::variant<T*, T>
Definition: util.h:134
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
Definition: proxy-io.h:573
std::unique_ptr< Waiter > waiter
Waiter object used to allow remote clients to execute code on this thread.
Definition: proxy-io.h:592
bool loop_thread
Whether this thread is a capnp event loop thread.
Definition: proxy-io.h:627
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:575
assert(!tx.IsCoinBase())