Bitcoin Core 30.99.0
P2P Digital Currency
test.cpp
Go to the documentation of this file.
1// Copyright (c) The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <mp/test/foo.capnp.h>
6#include <mp/test/foo.capnp.proxy.h>
7
8#include <atomic>
9#include <capnp/capability.h>
10#include <capnp/rpc.h>
11#include <condition_variable>
12#include <cstring>
13#include <functional>
14#include <future>
15#include <kj/async.h>
16#include <kj/async-io.h>
17#include <kj/common.h>
18#include <kj/debug.h>
19#include <kj/exception.h>
20#include <kj/memory.h>
21#include <kj/string.h>
22#include <kj/test.h>
23#include <memory>
24#include <mp/proxy.h>
25#include <mp/proxy.capnp.h>
26#include <mp/proxy-io.h>
27#include <mp/util.h>
28#include <optional>
29#include <set>
30#include <stdexcept>
31#include <string>
32#include <string_view>
33#include <system_error>
34#include <thread>
35#include <utility>
36#include <vector>
37
38namespace mp {
39namespace test {
40
56{
57public:
58 std::function<void()> server_disconnect;
59 std::function<void()> client_disconnect;
60 std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
61 std::unique_ptr<ProxyClient<messages::FooInterface>> client;
65 std::thread thread;
66
67 TestSetup(bool client_owns_connection = true)
68 : thread{[&] {
69 EventLoop loop("mptest", [](mp::LogMessage log) {
70 // Info logs are not printed by default, but will be shown with `mptest --verbose`
71 KJ_LOG(INFO, log.level, log.message);
72 if (log.level == mp::Log::Raise) throw std::runtime_error(log.message);
73 });
74 auto pipe = loop.m_io_context.provider->newTwoWayPipe();
75
76 auto server_connection =
77 std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), [&](Connection& connection) {
78 auto server_proxy = kj::heap<ProxyServer<messages::FooInterface>>(
79 std::make_shared<FooImplementation>(), connection);
80 server = server_proxy;
81 return capnp::Capability::Client(kj::mv(server_proxy));
82 });
83 server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); };
84 // Set handler to destroy the server when the client disconnects. This
85 // is ignored if server_disconnect() is called instead.
86 server_connection->onDisconnect([&] { server_connection.reset(); });
87
88 auto client_connection = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]));
89 auto client_proxy = std::make_unique<ProxyClient<messages::FooInterface>>(
90 client_connection->m_rpc_system->bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
91 client_connection.get(), /* destroy_connection= */ client_owns_connection);
92 if (client_owns_connection) {
93 client_connection.release();
94 } else {
95 client_disconnect = [&] { loop.sync([&] { client_connection.reset(); }); };
96 }
97
98 client_promise.set_value(std::move(client_proxy));
99 loop.loop();
100 }}
101 {
102 client = client_promise.get_future().get();
103 }
104
106 {
107 // Test that client cleanup_fns are executed.
108 bool destroyed = false;
109 client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed = true; });
110 client.reset();
111 KJ_EXPECT(destroyed);
112
113 thread.join();
114 }
115};
116
117KJ_TEST("Call FooInterface methods")
118{
120 ProxyClient<messages::FooInterface>* foo = setup.client.get();
121
122 KJ_EXPECT(foo->add(1, 2) == 3);
123 int ret;
124 foo->addOut(3, 4, ret);
125 KJ_EXPECT(ret == 7);
126 foo->addInOut(3, ret);
127 KJ_EXPECT(ret == 10);
128
129 FooStruct in;
130 in.name = "name";
131 in.setint.insert(2);
132 in.setint.insert(1);
133 in.vbool.push_back(false);
134 in.vbool.push_back(true);
135 in.vbool.push_back(false);
136 FooStruct out = foo->pass(in);
137 KJ_EXPECT(in.name == out.name);
138 KJ_EXPECT(in.setint.size() == out.setint.size());
139 for (auto init{in.setint.begin()}, outit{out.setint.begin()}; init != in.setint.end() && outit != out.setint.end(); ++init, ++outit) {
140 KJ_EXPECT(*init == *outit);
141 }
142 KJ_EXPECT(in.vbool.size() == out.vbool.size());
143 for (size_t i = 0; i < in.vbool.size(); ++i) {
144 KJ_EXPECT(in.vbool[i] == out.vbool[i]);
145 }
146
147 FooStruct err;
148 try {
149 foo->raise(in);
150 } catch (const FooStruct& e) {
151 err = e;
152 }
153 KJ_EXPECT(in.name == err.name);
154
155 class Callback : public ExtendedCallback
156 {
157 public:
158 Callback(int expect, int ret) : m_expect(expect), m_ret(ret) {}
159 int call(int arg) override
160 {
161 KJ_EXPECT(arg == m_expect);
162 return m_ret;
163 }
164 int callExtended(int arg) override
165 {
166 KJ_EXPECT(arg == m_expect + 10);
167 return m_ret + 10;
168 }
169 int m_expect, m_ret;
170 };
171
172 foo->initThreadMap();
173 Callback callback(1, 2);
174 KJ_EXPECT(foo->callback(callback, 1) == 2);
175 KJ_EXPECT(foo->callbackUnique(std::make_unique<Callback>(3, 4), 3) == 4);
176 KJ_EXPECT(foo->callbackShared(std::make_shared<Callback>(5, 6), 5) == 6);
177 auto saved = std::make_shared<Callback>(7, 8);
178 KJ_EXPECT(saved.use_count() == 1);
179 foo->saveCallback(saved);
180 KJ_EXPECT(saved.use_count() == 2);
181 foo->callbackSaved(7);
182 KJ_EXPECT(foo->callbackSaved(7) == 8);
183 foo->saveCallback(nullptr);
184 KJ_EXPECT(saved.use_count() == 1);
185 KJ_EXPECT(foo->callbackExtended(callback, 11) == 12);
186
187 FooCustom custom_in;
188 custom_in.v1 = "v1";
189 custom_in.v2 = 5;
190 FooCustom custom_out = foo->passCustom(custom_in);
191 KJ_EXPECT(custom_in.v1 == custom_out.v1);
192 KJ_EXPECT(custom_in.v2 == custom_out.v2);
193
194 foo->passEmpty(FooEmpty{});
195
196 FooMessage message1;
197 message1.message = "init";
198 FooMessage message2{foo->passMessage(message1)};
199 KJ_EXPECT(message2.message == "init build read call build read");
200
201 FooMutable mut;
202 mut.message = "init";
203 foo->passMutable(mut);
204 KJ_EXPECT(mut.message == "init build pass call return read");
205
206 KJ_EXPECT(foo->passFn([]{ return 10; }) == 10);
207}
208
209KJ_TEST("Call IPC method after client connection is closed")
210{
211 TestSetup setup{/*client_owns_connection=*/false};
212 ProxyClient<messages::FooInterface>* foo = setup.client.get();
213 KJ_EXPECT(foo->add(1, 2) == 3);
214 setup.client_disconnect();
215
216 bool disconnected{false};
217 try {
218 foo->add(1, 2);
219 } catch (const std::runtime_error& e) {
220 KJ_EXPECT(std::string_view{e.what()} == "IPC client method called after disconnect.");
221 disconnected = true;
222 }
223 KJ_EXPECT(disconnected);
224}
225
226KJ_TEST("Calling IPC method after server connection is closed")
227{
229 ProxyClient<messages::FooInterface>* foo = setup.client.get();
230 KJ_EXPECT(foo->add(1, 2) == 3);
231 setup.server_disconnect();
232
233 bool disconnected{false};
234 try {
235 foo->add(1, 2);
236 } catch (const std::runtime_error& e) {
237 KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
238 disconnected = true;
239 }
240 KJ_EXPECT(disconnected);
241}
242
243KJ_TEST("Calling IPC method and disconnecting during the call")
244{
245 TestSetup setup{/*client_owns_connection=*/false};
246 ProxyClient<messages::FooInterface>* foo = setup.client.get();
247 KJ_EXPECT(foo->add(1, 2) == 3);
248
249 // Set m_fn to initiate client disconnect when server is in the middle of
250 // handling the callFn call to make sure this case is handled cleanly.
251 setup.server->m_impl->m_fn = setup.client_disconnect;
252
253 bool disconnected{false};
254 try {
255 foo->callFn();
256 } catch (const std::runtime_error& e) {
257 KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
258 disconnected = true;
259 }
260 KJ_EXPECT(disconnected);
261}
262
263KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
264{
265 // This test is similar to last test, except that instead of letting the IPC
266 // call return immediately after triggering a disconnect, make it disconnect
267 // & wait so server is forced to deal with having a disconnection and call
268 // in flight at the same time.
269 //
270 // Test uses callFnAsync() instead of callFn() to implement this. Both of
271 // these methods have the same implementation, but the callFnAsync() capnp
272 // method declaration takes an mp.Context argument so the method executes on
273 // an asynchronous thread instead of executing in the event loop thread, so
274 // it is able to block without deadlocking the event lock thread.
275 //
276 // This test adds important coverage because it causes the server Connection
277 // object to be destroyed before ProxyServer object, which is not a
278 // condition that usually happens because the m_rpc_system.reset() call in
279 // the ~Connection destructor usually would immediately free all remaining
280 // ProxyServer objects associated with the connection. Having an in-progress
281 // RPC call requires keeping the ProxyServer longer.
282
283 std::promise<void> signal;
284 TestSetup setup{/*client_owns_connection=*/false};
285 ProxyClient<messages::FooInterface>* foo = setup.client.get();
286 KJ_EXPECT(foo->add(1, 2) == 3);
287
288 foo->initThreadMap();
289 setup.server->m_impl->m_fn = [&] {
290 EventLoopRef loop{*setup.server->m_context.loop};
291 setup.client_disconnect();
292 signal.get_future().get();
293 };
294
295 bool disconnected{false};
296 try {
297 foo->callFnAsync();
298 } catch (const std::runtime_error& e) {
299 KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
300 disconnected = true;
301 }
302 KJ_EXPECT(disconnected);
303
304 // Now that the disconnect has been detected, set signal allowing the
305 // callFnAsync() IPC call to return. Since signalling may not wake up the
306 // thread right away, it is important for the signal variable to be declared
307 // *before* the TestSetup variable so is not destroyed while
308 // signal.get_future().get() is called.
309 signal.set_value();
310}
311
312KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
313{
315 ProxyClient<messages::FooInterface>* foo = setup.client.get();
316 std::promise<void> signal;
317
318 foo->initThreadMap();
319 // Use callFnAsync() to get the client to set up the request_thread
320 // that will be used for the test.
321 setup.server->m_impl->m_fn = [&] {};
322 foo->callFnAsync();
324 Thread::Client *callback_thread, *request_thread;
325 foo->m_context.loop->sync([&] {
326 Lock lock(tc.waiter->m_mutex);
327 callback_thread = &tc.callback_threads.at(foo->m_context.connection)->m_client;
328 request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
329 });
330
331 setup.server->m_impl->m_fn = [&] {
332 try
333 {
334 signal.get_future().get();
335 }
336 catch (const std::future_error& e)
337 {
338 KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
339 }
340 };
341
342 auto client{foo->m_client};
343 bool caught_thread_busy = false;
344 // NOTE: '3' was chosen because it was the lowest number
345 // of simultaneous calls required to reliably catch a "thread busy" error
346 std::atomic<size_t> running{3};
347 foo->m_context.loop->sync([&]
348 {
349 for (size_t i = 0; i < running; i++)
350 {
351 auto request{client.callFnAsyncRequest()};
352 auto context{request.initContext()};
353 context.setCallbackThread(*callback_thread);
354 context.setThread(*request_thread);
355 foo->m_context.loop->m_task_set->add(request.send().then(
356 [&](auto&& results) {
357 running -= 1;
358 tc.waiter->m_cv.notify_all();
359 },
360 [&](kj::Exception&& e) {
361 KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
362 "remote exception: std::exception: thread busy");
363 caught_thread_busy = true;
364 running -= 1;
365 signal.set_value();
366 tc.waiter->m_cv.notify_all();
367 }
368 ));
369 }
370 });
371 {
372 Lock lock(tc.waiter->m_mutex);
373 tc.waiter->wait(lock, [&running] { return running == 0; });
374 }
375 KJ_EXPECT(caught_thread_busy);
376}
377
378} // namespace test
379} // namespace mp
int ret
Object holding network & rpc state associated with either an incoming server connection,...
Definition: proxy-io.h:377
Event loop implementation.
Definition: proxy-io.h:214
kj::AsyncIoContext m_io_context
Capnp IO context.
Definition: proxy-io.h:302
Event loop smart pointer automatically managing m_num_clients.
Definition: proxy.h:51
Definition: util.h:170
Test setup class creating a two way connection between a ProxyServer<FooInterface> object and a Proxy...
Definition: test.cpp:56
std::promise< std::unique_ptr< ProxyClient< messages::FooInterface > > > client_promise
Definition: test.cpp:60
TestSetup(bool client_owns_connection=true)
Definition: test.cpp:67
std::function< void()> server_disconnect
Definition: test.cpp:58
std::function< void()> client_disconnect
Definition: test.cpp:59
std::thread thread
Thread variable should be after other struct members so the thread does not start until the other mem...
Definition: test.cpp:65
ProxyServer< messages::FooInterface > * server
Definition: test.cpp:62
std::unique_ptr< ProxyClient< messages::FooInterface > > client
Definition: test.cpp:61
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
Definition: test.cpp:312
Functions to serialize / deserialize common bitcoin types.
Definition: common-types.h:57
thread_local ThreadContext g_thread_context
Definition: proxy.cpp:41
Log level
The severity level of this message.
Definition: proxy-io.h:119
std::string message
Message to be logged.
Definition: proxy-io.h:116
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Definition: proxy.h:25
Vat id for server side of connection.
Definition: proxy-io.h:446
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
Definition: proxy-io.h:621
std::string v1
Definition: foo.h:30
std::string message
Definition: foo.h:40
std::string message
Definition: foo.h:45
std::string name
Definition: foo.h:21
std::set< int > setint
Definition: foo.h:22
std::vector< bool > vbool
Definition: foo.h:23
static int setup(void)
Definition: tests.c:7796
#define expect(bit)