Bitcoin Core 29.99.0
P2P Digital Currency
proxy.cpp
Go to the documentation of this file.
1// Copyright (c) 2019 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <mp/proxy.h>
6
7#include <mp/proxy-io.h>
8#include <mp/proxy-types.h>
9#include <mp/proxy.capnp.h>
10#include <mp/type-threadmap.h>
11#include <mp/util.h>
12
13#include <assert.h>
14#include <atomic>
15#include <capnp/blob.h>
16#include <capnp/capability.h>
17#include <condition_variable>
18#include <functional>
19#include <future>
20#include <kj/async-io.h>
21#include <kj/async.h>
22#include <kj/common.h>
23#include <kj/debug.h>
24#include <kj/exception.h>
25#include <kj/memory.h>
26#include <map>
27#include <memory>
28#include <mutex>
29#include <stddef.h>
30#include <stdexcept>
31#include <string>
32#include <sys/socket.h>
33#include <thread>
34#include <tuple>
35#include <unistd.h>
36#include <utility>
37
38namespace mp {
39
40template <typename Interface>
41struct ProxyServer;
42
44
45void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
46{
47 KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
48 m_loop.log() << "Uncaught exception in daemonized task.";
49}
50
52{
53 // Shut down RPC system first, since this will garbage collect Server
54 // objects that were not freed before the connection was closed, some of
55 // which may call addAsyncCleanup and add more cleanup callbacks which can
56 // run below.
57 m_rpc_system.reset();
58
59 // ProxyClient cleanup handlers are in sync list, and ProxyServer cleanup
60 // handlers are in the async list.
61 //
62 // The ProxyClient cleanup handlers are synchronous because they are fast
63 // and don't do anything besides release capnp resources and reset state so
64 // future calls to client methods immediately throw exceptions instead of
65 // trying to communicating across the socket. The synchronous callbacks set
66 // ProxyClient capability pointers to null, so new method calls on client
67 // objects fail without triggering i/o or relying on event loop which may go
68 // out of scope or trigger obscure capnp i/o errors.
69 //
70 // The ProxySever cleanup handlers call user defined destructors on server
71 // object, which can run arbitrary blocking bitcoin code so they have to run
72 // asynchronously in a different thread. The asynchronous cleanup functions
73 // intentionally aren't started until after the synchronous cleanup
74 // functions run, so client objects are fully disconnected before bitcoin
75 // code in the destructors are run. This way if the bitcoin code tries to
76 // make client requests the requests will just fail immediately instead of
77 // sending i/o or accessing the event loop.
78 //
79 // The context where Connection objects are destroyed and this destructor is invoked
80 // is different depending on whether this is an outgoing connection being used
81 // to make an Init.makeX call() (e.g. Init.makeNode or Init.makeWalletClient) or an incoming
82 // connection implementing the Init interface and handling the Init.makeX() calls.
83 //
84 // Either way when a connection is closed, capnp behavior is to call all
85 // ProxyServer object destructors first, and then trigger an onDisconnect
86 // callback.
87 //
88 // On incoming side of the connection, the onDisconnect callback is written
89 // to delete the Connection object from the m_incoming_connections and call
90 // this destructor which calls Connection::disconnect.
91 //
92 // On the outgoing side, the Connection object is owned by top level client
93 // object client, which onDisconnect handler doesn't have ready access to,
94 // so onDisconnect handler just calls Connection::disconnect directly
95 // instead.
96 //
97 // Either way disconnect code runs in the event loop thread and called both
98 // on clean and unclean shutdowns. In unclean shutdown case when the
99 // connection is broken, sync and async cleanup lists will filled with
100 // callbacks. In the clean shutdown case both lists will be empty.
101 while (!m_sync_cleanup_fns.empty()) {
102 m_sync_cleanup_fns.front()();
103 m_sync_cleanup_fns.pop_front();
104 }
105 while (!m_async_cleanup_fns.empty()) {
106 const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
107 m_loop.m_async_fns.emplace_back(std::move(m_async_cleanup_fns.front()));
108 m_async_cleanup_fns.pop_front();
109 }
110 std::unique_lock<std::mutex> lock(m_loop.m_mutex);
112 m_loop.removeClient(lock);
113}
114
115CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
116{
117 const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
118 // Add cleanup callbacks to the front of list, so sync cleanup functions run
119 // in LIFO order. This is a good approach because sync cleanup functions are
120 // added as client objects are created, and it is natural to clean up
121 // objects in the reverse order they were created. In practice, however,
122 // order should not be significant because the cleanup callbacks run
123 // synchronously in a single batch when the connection is broken, and they
124 // only reset the connection pointers in the client objects without actually
125 // deleting the client objects.
126 return m_sync_cleanup_fns.emplace(m_sync_cleanup_fns.begin(), std::move(fn));
127}
128
130{
131 const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
132 m_sync_cleanup_fns.erase(it);
133}
134
135void Connection::addAsyncCleanup(std::function<void()> fn)
136{
137 const std::unique_lock<std::mutex> lock(m_loop.m_mutex);
138 // Add async cleanup callbacks to the back of the list. Unlike the sync
139 // cleanup list, this list order is more significant because it determines
140 // the order server objects are destroyed when there is a sudden disconnect,
141 // and it is possible objects may need to be destroyed in a certain order.
142 // This function is called in ProxyServerBase destructors, and since capnp
143 // destroys ProxyServer objects in LIFO order, we should preserve this
144 // order, and add cleanup callbacks to the end of the list so they can be
145 // run starting from the beginning of the list.
146 //
147 // In bitcoin core, running these callbacks in the right order is
148 // particularly important for the wallet process, because it uses blocking
149 // shared_ptrs and requires Chain::Notification pointers owned by the node
150 // process to be destroyed before the WalletLoader objects owned by the node
151 // process, otherwise shared pointer counts of the CWallet objects (which
152 // inherit from Chain::Notification) will not be 1 when WalletLoader
153 // destructor runs and it will wait forever for them to be released.
154 m_async_cleanup_fns.emplace(m_async_cleanup_fns.end(), std::move(fn));
155}
156
157EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
158 : m_exe_name(exe_name),
159 m_io_context(kj::setupAsyncIo()),
160 m_task_set(new kj::TaskSet(m_error_handler)),
161 m_log_fn(std::move(log_fn)),
162 m_context(context)
163{
164 int fds[2];
165 KJ_SYSCALL(socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
166 m_wait_fd = fds[0];
167 m_post_fd = fds[1];
168}
169
171{
172 if (m_async_thread.joinable()) m_async_thread.join();
173 const std::lock_guard<std::mutex> lock(m_mutex);
174 KJ_ASSERT(m_post_fn == nullptr);
175 KJ_ASSERT(m_async_fns.empty());
176 KJ_ASSERT(m_wait_fd == -1);
177 KJ_ASSERT(m_post_fd == -1);
178 KJ_ASSERT(m_num_clients == 0);
179
180 // Spin event loop. wait for any promises triggered by RPC shutdown.
181 // auto cleanup = kj::evalLater([]{});
182 // cleanup.wait(m_io_context.waitScope);
183}
184
186{
189 KJ_DEFER(g_thread_context.loop_thread = false);
190
191 kj::Own<kj::AsyncIoStream> wait_stream{
192 m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)};
193 int post_fd{m_post_fd};
194 char buffer = 0;
195 for (;;) {
196 const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope);
197 if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly");
198 std::unique_lock<std::mutex> lock(m_mutex);
199 if (m_post_fn) {
200 Unlock(lock, *m_post_fn);
201 m_post_fn = nullptr;
202 m_cv.notify_all();
203 } else if (done(lock)) {
204 // Intentionally do not break if m_post_fn was set, even if done()
205 // would return true, to ensure that the removeClient write(post_fd)
206 // call always succeeds and the loop does not exit between the time
207 // that the done condition is set and the write call is made.
208 break;
209 }
210 }
211 log() << "EventLoop::loop done, cancelling event listeners.";
212 m_task_set.reset();
213 log() << "EventLoop::loop bye.";
214 wait_stream = nullptr;
215 KJ_SYSCALL(::close(post_fd));
216 const std::unique_lock<std::mutex> lock(m_mutex);
217 m_wait_fd = -1;
218 m_post_fd = -1;
219}
220
221void EventLoop::post(const std::function<void()>& fn)
222{
223 if (std::this_thread::get_id() == m_thread_id) {
224 fn();
225 return;
226 }
227 std::unique_lock<std::mutex> lock(m_mutex);
228 addClient(lock);
229 m_cv.wait(lock, [this] { return m_post_fn == nullptr; });
230 m_post_fn = &fn;
231 int post_fd{m_post_fd};
232 Unlock(lock, [&] {
233 char buffer = 0;
234 KJ_SYSCALL(write(post_fd, &buffer, 1));
235 });
236 m_cv.wait(lock, [this, &fn] { return m_post_fn != &fn; });
237 removeClient(lock);
238}
239
240void EventLoop::addClient(std::unique_lock<std::mutex>& lock) { m_num_clients += 1; }
241
242bool EventLoop::removeClient(std::unique_lock<std::mutex>& lock)
243{
244 m_num_clients -= 1;
245 if (done(lock)) {
246 m_cv.notify_all();
247 int post_fd{m_post_fd};
248 lock.unlock();
249 char buffer = 0;
250 KJ_SYSCALL(write(post_fd, &buffer, 1)); // NOLINT(bugprone-suspicious-semicolon)
251 return true;
252 }
253 return false;
254}
255
256void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
257{
258 if (m_async_thread.joinable()) {
259 m_cv.notify_all();
260 } else if (!m_async_fns.empty()) {
261 m_async_thread = std::thread([this] {
262 std::unique_lock<std::mutex> lock(m_mutex);
263 while (true) {
264 if (!m_async_fns.empty()) {
265 addClient(lock);
266 const std::function<void()> fn = std::move(m_async_fns.front());
267 m_async_fns.pop_front();
268 Unlock(lock, fn);
269 if (removeClient(lock)) break;
270 continue;
271 } else if (m_num_clients == 0) {
272 break;
273 }
274 m_cv.wait(lock);
275 }
276 });
277 }
278}
279
280bool EventLoop::done(std::unique_lock<std::mutex>& lock)
281{
282 assert(m_num_clients >= 0);
283 assert(lock.owns_lock());
284 assert(lock.mutex() == &m_mutex);
285 return m_num_clients == 0 && m_async_fns.empty();
286}
287
288std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, const std::function<Thread::Client()>& make_thread)
289{
290 const std::unique_lock<std::mutex> lock(mutex);
291 auto thread = threads.find(connection);
292 if (thread != threads.end()) return {thread, false};
293 thread = threads.emplace(
294 std::piecewise_construct, std::forward_as_tuple(connection),
295 std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
296 thread->second.setCleanup([&threads, &mutex, thread] {
297 // Note: it is safe to use the `thread` iterator in this cleanup
298 // function, because the iterator would only be invalid if the map entry
299 // was removed, and if the map entry is removed the ProxyClient<Thread>
300 // destructor unregisters the cleanup.
301
302 // Connection is being destroyed before thread client is, so reset
303 // thread client m_cleanup_it member so thread client destructor does not
304 // try unregister this callback after connection is destroyed.
305 thread->second.m_cleanup_it.reset();
306 // Remove connection pointer about to be destroyed from the map
307 const std::unique_lock<std::mutex> lock(mutex);
308 threads.erase(thread);
309 });
310 return {thread, true};
311}
312
314{
315 // If thread is being destroyed before connection is destroyed, remove the
316 // cleanup callback that was registered to handle the connection being
317 // destroyed before the thread being destroyed.
318 if (m_cleanup_it) {
319 m_context.connection->removeSyncCleanup(*m_cleanup_it);
320 }
321}
322
323void ProxyClient<Thread>::setCleanup(const std::function<void()>& fn)
324{
325 assert(fn);
326 assert(!m_cleanup_it);
327 m_cleanup_it = m_context.connection->addSyncCleanup(fn);
328}
329
330ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
331 : m_thread_context(thread_context), m_thread(std::move(thread))
332{
333 assert(m_thread_context.waiter.get() != nullptr);
334}
335
337{
338 if (!m_thread.joinable()) return;
339 // Stop async thread and wait for it to exit. Need to wait because the
340 // m_thread handle needs to outlive the thread to avoid "terminate called
341 // without an active exception" error. An alternative to waiting would be
342 // detach the thread, but this would introduce nondeterminism which could
343 // make code harder to debug or extend.
344 assert(m_thread_context.waiter.get());
345 std::unique_ptr<Waiter> waiter;
346 {
347 const std::unique_lock<std::mutex> lock(m_thread_context.waiter->m_mutex);
350 waiter = std::move(m_thread_context.waiter);
352 assert(!waiter->m_fn);
353 // Clear client maps now to avoid deadlock in m_thread.join() call
354 // below. The maps contain Thread::Client objects that need to be
355 // destroyed from the event loop thread (this thread), which can't
356 // happen if this thread is busy calling join.
357 m_thread_context.request_threads.clear();
358 m_thread_context.callback_threads.clear();
360 waiter->m_cv.notify_all();
361 }
362 m_thread.join();
363}
364
365kj::Promise<void> ProxyServer<Thread>::getName(GetNameContext context)
366{
367 context.getResults().setResult(m_thread_context.thread_name);
368 return kj::READY_NOW;
369}
370
372
373kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
374{
375 const std::string from = context.getParams().getName();
376 std::promise<ThreadContext*> thread_context;
377 std::thread thread([&thread_context, from, this]() {
378 g_thread_context.thread_name = ThreadName(m_connection.m_loop.m_exe_name) + " (from " + from + ")";
379 g_thread_context.waiter = std::make_unique<Waiter>();
380 thread_context.set_value(&g_thread_context);
381 std::unique_lock<std::mutex> lock(g_thread_context.waiter->m_mutex);
382 // Wait for shutdown signal from ProxyServer<Thread> destructor (signal
383 // is just waiter getting set to null.)
384 g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
385 });
386 auto thread_server = kj::heap<ProxyServer<Thread>>(*thread_context.get_future().get(), std::move(thread));
387 auto thread_client = m_connection.m_threads.add(kj::mv(thread_server));
388 context.getResults().setResult(kj::mv(thread_client));
389 return kj::READY_NOW;
390}
391
392std::atomic<int> server_reqs{0};
393
394std::string LongThreadName(const char* exe_name)
395{
397}
398
399} // namespace mp
Object holding network & rpc state associated with either an incoming server connection,...
Definition: proxy-io.h:288
void addAsyncCleanup(std::function< void()> fn)
Register asynchronous cleanup function to run on worker thread when disconnect() is called.
Definition: proxy.cpp:135
CleanupIt addSyncCleanup(std::function< void()> fn)
Register synchronous cleanup function to run on event loop thread (with access to capnp thread local ...
Definition: proxy.cpp:115
CleanupList m_async_cleanup_fns
Definition: proxy-io.h:358
~Connection()
Run cleanup functions.
Definition: proxy.cpp:51
CleanupList m_sync_cleanup_fns
Cleanup functions to run if connection is broken unexpectedly.
Definition: proxy-io.h:357
std::optional<::capnp::RpcSystem<::capnp::rpc::twoparty::VatId > > m_rpc_system
Definition: proxy-io.h:344
void removeSyncCleanup(CleanupIt it)
Definition: proxy.cpp:129
EventLoop & m_loop
Definition: proxy-io.h:339
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:219
bool done(std::unique_lock< std::mutex > &lock)
Check if loop should exit.
Definition: proxy.cpp:280
std::condition_variable m_cv
Definition: proxy-io.h:216
EventLoop(const char *exe_name, LogFn log_fn, void *context=nullptr)
Construct event loop object.
Definition: proxy.cpp:157
bool removeClient(std::unique_lock< std::mutex > &lock)
Definition: proxy.cpp:242
Logger log()
Definition: proxy-io.h:177
void loop()
Run event loop.
Definition: proxy.cpp:185
CleanupList m_async_fns
Callback functions to run on async thread.
Definition: proxy-io.h:201
void post(const std::function< void()> &fn)
Run function on event loop thread.
Definition: proxy.cpp:221
int m_post_fd
Pipe write handle used to wake up the event loop thread.
Definition: proxy-io.h:207
int m_num_clients
Number of clients holding references to ProxyServerBase objects that reference this event loop.
Definition: proxy-io.h:211
std::unique_ptr< kj::TaskSet > m_task_set
Capnp list of pending promises.
Definition: proxy-io.h:225
std::mutex m_mutex
Mutex and condition variable used to post tasks to event loop and async thread.
Definition: proxy-io.h:215
int m_wait_fd
Pipe read handle used to wake up the event loop thread.
Definition: proxy-io.h:204
const std::function< void()> * m_post_fn
Callback function to run on event loop thread during post() or sync() call.
Definition: proxy-io.h:198
std::thread m_async_thread
Handle of an async worker thread.
Definition: proxy-io.h:195
std::thread::id m_thread_id
ID of the event loop thread.
Definition: proxy-io.h:191
void startAsyncThread(std::unique_lock< std::mutex > &lock)
Start asynchronous worker thread if necessary.
Definition: proxy.cpp:256
void addClient(std::unique_lock< std::mutex > &lock)
Add/remove remote client reference counts.
Definition: proxy.cpp:240
EventLoop & m_loop
Definition: proxy-io.h:95
void taskFailed(kj::Exception &&exception) override
Definition: proxy.cpp:45
boost::signals2::scoped_connection m_connection
Definition: interfaces.cpp:30
Context m_context
Definition: protocol.cpp:96
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
void Unlock(Lock &lock, Callback &&callback)
Definition: util.h:143
std::map< Connection *, ProxyClient< Thread > > ConnThreads
Definition: proxy-io.h:520
std::string ThreadName(const char *exe_name)
Format current thread name as "{exe_name}-{$pid}/{thread_name}-{$tid}".
Definition: util.cpp:47
std::atomic< int > server_reqs
Definition: proxy.cpp:392
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:43
std::tuple< ConnThread, bool > SetThread(ConnThreads &threads, std::mutex &mutex, Connection *connection, const std::function< Thread::Client()> &make_thread)
Definition: proxy.cpp:288
std::function< void(bool raise, std::string message)> LogFn
Definition: proxy-io.h:98
std::string LongThreadName(const char *exe_name)
Definition: proxy.cpp:394
typename CleanupList::iterator CleanupIt
Definition: proxy.h:40
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:24
Mapping from capnp interface type to proxy server implementation (specializations are generated by pr...
Definition: proxy.h:28
std::unique_ptr< Waiter > waiter
Waiter object used to allow client threads blocked waiting for a server response to execute callbacks...
Definition: proxy-io.h:536
bool loop_thread
Whether this thread is a capnp event loop thread.
Definition: proxy-io.h:556
std::string thread_name
Identifying string for debug.
Definition: proxy-io.h:531
assert(!tx.IsCoinBase())