5#include <mp/test/foo.capnp.h>
6#include <mp/test/foo.capnp.proxy.h>
9#include <capnp/capability.h>
13#include <condition_variable>
19#include <kj/async-io.h>
26#include <mp/proxy.capnp.h>
46static_assert(std::is_integral_v<
decltype(
kMP_MAJOR_VERSION)>,
"MP_MAJOR_VERSION must be an integral constant");
47static_assert(std::is_integral_v<
decltype(
kMP_MINOR_VERSION)>,
"MP_MINOR_VERSION must be an integral constant");
69 std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>>
client_promise;
70 std::unique_ptr<ProxyClient<messages::FooInterface>>
client;
85 auto server_connection =
86 std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), [&](
Connection& connection) {
87 auto server_proxy = kj::heap<ProxyServer<messages::FooInterface>>(
88 std::make_shared<FooImplementation>(), connection);
89 server = server_proxy;
90 return capnp::Capability::Client(kj::mv(server_proxy));
94 assert(std::this_thread::get_id() == loop.m_thread_id);
95 loop.m_task_set->add(kj::evalLater([&] { server_connection.reset(); }));
99 server_connection->onDisconnect([&] { server_connection.reset(); });
101 auto client_connection = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]));
102 auto client_proxy = std::make_unique<ProxyClient<messages::FooInterface>>(
103 client_connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<messages::FooInterface>(),
104 client_connection.get(), client_owns_connection);
105 if (client_owns_connection) {
106 (void)client_connection.release();
115 client = client_promise.get_future().get();
121 bool destroyed =
false;
122 client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed =
true; });
124 KJ_EXPECT(destroyed);
135 KJ_EXPECT(foo->add(1, 2) == 3);
137 foo->addOut(3, 4,
ret);
139 foo->addInOut(3,
ret);
140 KJ_EXPECT(
ret == 10);
146 in.
vbool.push_back(
false);
147 in.
vbool.push_back(
true);
148 in.
vbool.push_back(
false);
150 KJ_EXPECT(in.
name ==
out.name);
151 KJ_EXPECT(in.
setint.size() ==
out.setint.size());
153 KJ_EXPECT(*
init == *outit);
155 KJ_EXPECT(in.
vbool.size() ==
out.vbool.size());
156 for (
size_t i = 0; i < in.
vbool.size(); ++i) {
157 KJ_EXPECT(in.
vbool[i] ==
out.vbool[i]);
172 int call(
int arg)
override
174 KJ_EXPECT(arg == m_expect);
177 int callExtended(
int arg)
override
179 KJ_EXPECT(arg == m_expect + 10);
185 foo->initThreadMap();
186 Callback callback(1, 2);
187 KJ_EXPECT(foo->callback(callback, 1) == 2);
188 KJ_EXPECT(foo->callbackUnique(std::make_unique<Callback>(3, 4), 3) == 4);
189 KJ_EXPECT(foo->callbackShared(std::make_shared<Callback>(5, 6), 5) == 6);
190 auto saved = std::make_shared<Callback>(7, 8);
191 KJ_EXPECT(saved.use_count() == 1);
192 foo->saveCallback(saved);
193 KJ_EXPECT(saved.use_count() == 2);
194 foo->callbackSaved(7);
195 KJ_EXPECT(foo->callbackSaved(7) == 8);
196 foo->saveCallback(
nullptr);
197 KJ_EXPECT(saved.use_count() == 1);
198 KJ_EXPECT(foo->callbackExtended(callback, 11) == 12);
203 FooCustom custom_out = foo->passCustom(custom_in);
204 KJ_EXPECT(custom_in.
v1 == custom_out.
v1);
205 KJ_EXPECT(custom_in.
v2 == custom_out.
v2);
211 FooMessage message2{foo->passMessage(message1)};
212 KJ_EXPECT(message2.message ==
"init build read call build read");
216 foo->passMutable(mut);
217 KJ_EXPECT(mut.
message ==
"init build pass call return read");
219 KJ_EXPECT(foo->passFn([]{ return 10; }) == 10);
221 std::vector<FooDataRef> data_in;
222 data_in.push_back(std::make_shared<FooData>(
FooData{
'H',
'i'}));
223 data_in.push_back(
nullptr);
224 std::vector<FooDataRef> data_out{foo->passDataPointers(data_in)};
225 KJ_EXPECT(data_out.size() == 2);
226 KJ_REQUIRE(data_out[0] !=
nullptr);
227 KJ_EXPECT(*data_out[0] == *data_in[0]);
228 KJ_EXPECT(!data_out[1]);
231KJ_TEST(
"Call IPC method after client connection is closed")
235 KJ_EXPECT(foo->add(1, 2) == 3);
236 setup.client_disconnect();
238 bool disconnected{
false};
241 }
catch (
const std::runtime_error& e) {
242 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method called after disconnect.");
245 KJ_EXPECT(disconnected);
248KJ_TEST(
"Calling IPC method after server connection is closed")
252 KJ_EXPECT(foo->add(1, 2) == 3);
253 setup.server_disconnect();
255 bool disconnected{
false};
258 }
catch (
const std::runtime_error& e) {
259 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
262 KJ_EXPECT(disconnected);
265KJ_TEST(
"Calling IPC method and disconnecting during the call")
269 KJ_EXPECT(foo->add(1, 2) == 3);
273 setup.server->m_impl->m_fn =
setup.client_disconnect;
275 bool disconnected{
false};
278 }
catch (
const std::runtime_error& e) {
279 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
282 KJ_EXPECT(disconnected);
285KJ_TEST(
"Calling IPC method, disconnecting and blocking during the call")
305 std::promise<void> signal;
308 KJ_EXPECT(foo->add(1, 2) == 3);
310 foo->initThreadMap();
311 setup.server->m_impl->m_fn = [&] {
313 setup.client_disconnect();
314 signal.get_future().get();
317 bool disconnected{
false};
320 }
catch (
const std::runtime_error& e) {
321 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
324 KJ_EXPECT(disconnected);
334KJ_TEST(
"Worker thread destroyed before it is initialized")
345 foo->initThreadMap();
346 setup.server->m_impl->m_fn = [] {};
353 setup.server_disconnect_later();
358 std::this_thread::sleep_for(std::chrono::milliseconds(10));
361 bool disconnected{
false};
364 }
catch (
const std::runtime_error& e) {
365 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
368 KJ_EXPECT(disconnected);
371KJ_TEST(
"Calling async IPC method, with server disconnect racing the call")
382 foo->initThreadMap();
383 setup.server->m_impl->m_fn = [] {};
387 setup.server_disconnect();
390 std::this_thread::sleep_for(std::chrono::milliseconds(10));
396 }
catch (
const std::runtime_error& e) {
397 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
401KJ_TEST(
"Calling async IPC method, with server disconnect after cleanup")
414 foo->initThreadMap();
415 setup.server->m_impl->m_fn = [] {};
419 setup.server_disconnect();
425 }
catch (
const std::runtime_error& e) {
426 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
430KJ_TEST(
"Make simultaneous IPC calls on single remote thread")
434 std::promise<void> signal;
436 foo->initThreadMap();
439 setup.server->m_impl->m_fn = [&] {};
442 Thread::Client *callback_thread, *request_thread;
443 foo->m_context.loop->sync([&] {
444 Lock lock(tc.waiter->m_mutex);
445 callback_thread = &tc.callback_threads.at(foo->m_context.connection)->m_client;
446 request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
450 std::atomic<int> expected = 100;
452 setup.server->m_impl->m_int_fn = [&](
int n) {
458 auto client{foo->m_client};
459 std::atomic<size_t> running{3};
460 foo->m_context.loop->sync([&]
462 for (
size_t i = 0; i < running; i++)
464 auto request{client.callIntFnAsyncRequest()};
465 auto context{request.initContext()};
466 context.setCallbackThread(*callback_thread);
467 context.setThread(*request_thread);
468 request.setArg(100 * (i+1));
469 foo->m_context.loop->m_task_set->add(request.send().then(
470 [&running, &tc, i](
auto&& results) {
471 assert(results.getResult() == static_cast<int32_t>(100 * (i+1)));
473 tc.waiter->m_cv.notify_all();
478 Lock lock(tc.waiter->m_mutex);
479 tc.waiter->wait(lock, [&running] {
return running == 0; });
481 KJ_EXPECT(expected == 400);
Object holding network & rpc state associated with either an incoming server connection,...
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
std::function< void()> testing_hook_makethread
Hook called when ProxyServer<ThreadMap>::makeThread() is called.
std::function< void()> testing_hook_makethread_created
Hook called on the worker thread inside makeThread(), after the thread context is set up and thread_c...
std::function< void()> testing_hook_async_request_done
Hook called on the worker thread just before returning results.
std::function< void()> testing_hook_async_request_start
Hook called on the worker thread when it starts to execute an async request.
Event loop smart pointer automatically managing m_num_clients.
Test setup class creating a two way connection between a ProxyServer<FooInterface> object and a Proxy...
std::promise< std::unique_ptr< ProxyClient< messages::FooInterface > > > client_promise
TestSetup(bool client_owns_connection=true)
std::function< void()> server_disconnect
std::function< void()> client_disconnect
std::thread thread
Thread variable should be after other struct members so the thread does not start until the other mem...
std::function< void()> server_disconnect_later
ProxyServer< messages::FooInterface > * server
std::unique_ptr< ProxyClient< messages::FooInterface > > client
std::vector< char > FooData
constexpr auto kMP_MAJOR_VERSION
Check version.h header values.
constexpr auto kMP_MINOR_VERSION
Functions to serialize / deserialize common bitcoin types.
thread_local ThreadContext g_thread_context
KJ_TEST("SpawnProcess does not run callback in child")
Log level
The severity level of this message.
std::string message
Message to be logged.
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Vat id for server side of connection.
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
std::vector< bool > vbool
Major and minor version numbers.
#define MP_MAJOR_VERSION
Major version number.
#define MP_MINOR_VERSION
Minor version number.