5#include <mp/test/foo.capnp.h>
6#include <mp/test/foo.capnp.proxy.h>
9#include <capnp/capability.h>
11#include <condition_variable>
16#include <kj/async-io.h>
19#include <kj/exception.h>
25#include <mp/proxy.capnp.h>
33#include <system_error>
60 std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>>
client_promise;
61 std::unique_ptr<ProxyClient<messages::FooInterface>>
client;
76 auto server_connection =
77 std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), [&](
Connection& connection) {
78 auto server_proxy = kj::heap<ProxyServer<messages::FooInterface>>(
79 std::make_shared<FooImplementation>(), connection);
80 server = server_proxy;
81 return capnp::Capability::Client(kj::mv(server_proxy));
86 server_connection->onDisconnect([&] { server_connection.reset(); });
88 auto client_connection = std::make_unique<Connection>(loop, kj::mv(pipe.ends[1]));
89 auto client_proxy = std::make_unique<ProxyClient<messages::FooInterface>>(
90 client_connection->m_rpc_system->bootstrap(
ServerVatId().vat_id).castAs<messages::FooInterface>(),
91 client_connection.get(), client_owns_connection);
92 if (client_owns_connection) {
93 client_connection.release();
102 client = client_promise.get_future().get();
108 bool destroyed =
false;
109 client->m_context.cleanup_fns.emplace_front([&destroyed] { destroyed =
true; });
111 KJ_EXPECT(destroyed);
122 KJ_EXPECT(foo->add(1, 2) == 3);
124 foo->addOut(3, 4,
ret);
126 foo->addInOut(3,
ret);
127 KJ_EXPECT(
ret == 10);
133 in.
vbool.push_back(
false);
134 in.
vbool.push_back(
true);
135 in.
vbool.push_back(
false);
137 KJ_EXPECT(in.
name ==
out.name);
138 KJ_EXPECT(in.
setint.size() ==
out.setint.size());
140 KJ_EXPECT(*
init == *outit);
142 KJ_EXPECT(in.
vbool.size() ==
out.vbool.size());
143 for (
size_t i = 0; i < in.
vbool.size(); ++i) {
144 KJ_EXPECT(in.
vbool[i] ==
out.vbool[i]);
159 int call(
int arg)
override
161 KJ_EXPECT(arg == m_expect);
164 int callExtended(
int arg)
override
166 KJ_EXPECT(arg == m_expect + 10);
172 foo->initThreadMap();
173 Callback callback(1, 2);
174 KJ_EXPECT(foo->callback(callback, 1) == 2);
175 KJ_EXPECT(foo->callbackUnique(std::make_unique<Callback>(3, 4), 3) == 4);
176 KJ_EXPECT(foo->callbackShared(std::make_shared<Callback>(5, 6), 5) == 6);
177 auto saved = std::make_shared<Callback>(7, 8);
178 KJ_EXPECT(saved.use_count() == 1);
179 foo->saveCallback(saved);
180 KJ_EXPECT(saved.use_count() == 2);
181 foo->callbackSaved(7);
182 KJ_EXPECT(foo->callbackSaved(7) == 8);
183 foo->saveCallback(
nullptr);
184 KJ_EXPECT(saved.use_count() == 1);
185 KJ_EXPECT(foo->callbackExtended(callback, 11) == 12);
190 FooCustom custom_out = foo->passCustom(custom_in);
191 KJ_EXPECT(custom_in.
v1 == custom_out.
v1);
192 KJ_EXPECT(custom_in.
v2 == custom_out.
v2);
198 FooMessage message2{foo->passMessage(message1)};
199 KJ_EXPECT(message2.message ==
"init build read call build read");
203 foo->passMutable(mut);
204 KJ_EXPECT(mut.
message ==
"init build pass call return read");
206 KJ_EXPECT(foo->passFn([]{ return 10; }) == 10);
209KJ_TEST(
"Call IPC method after client connection is closed")
213 KJ_EXPECT(foo->add(1, 2) == 3);
214 setup.client_disconnect();
216 bool disconnected{
false};
219 }
catch (
const std::runtime_error& e) {
220 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method called after disconnect.");
223 KJ_EXPECT(disconnected);
226KJ_TEST(
"Calling IPC method after server connection is closed")
230 KJ_EXPECT(foo->add(1, 2) == 3);
231 setup.server_disconnect();
233 bool disconnected{
false};
236 }
catch (
const std::runtime_error& e) {
237 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
240 KJ_EXPECT(disconnected);
243KJ_TEST(
"Calling IPC method and disconnecting during the call")
247 KJ_EXPECT(foo->add(1, 2) == 3);
251 setup.server->m_impl->m_fn =
setup.client_disconnect;
253 bool disconnected{
false};
256 }
catch (
const std::runtime_error& e) {
257 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
260 KJ_EXPECT(disconnected);
263KJ_TEST(
"Calling IPC method, disconnecting and blocking during the call")
283 std::promise<void> signal;
286 KJ_EXPECT(foo->add(1, 2) == 3);
288 foo->initThreadMap();
289 setup.server->m_impl->m_fn = [&] {
291 setup.client_disconnect();
292 signal.get_future().get();
295 bool disconnected{
false};
298 }
catch (
const std::runtime_error& e) {
299 KJ_EXPECT(std::string_view{e.what()} ==
"IPC client method call interrupted by disconnect.");
302 KJ_EXPECT(disconnected);
312KJ_TEST(
"Make simultaneous IPC calls to trigger 'thread busy' error")
316 std::promise<void> signal;
318 foo->initThreadMap();
321 setup.server->m_impl->m_fn = [&] {};
324 Thread::Client *callback_thread, *request_thread;
325 foo->m_context.loop->sync([&] {
326 Lock lock(tc.waiter->m_mutex);
327 callback_thread = &tc.callback_threads.at(foo->m_context.connection)->m_client;
328 request_thread = &tc.request_threads.at(foo->m_context.connection)->m_client;
331 setup.server->m_impl->m_fn = [&] {
334 signal.get_future().get();
336 catch (
const std::future_error& e)
338 KJ_EXPECT(e.code() == std::make_error_code(std::future_errc::future_already_retrieved));
342 auto client{foo->m_client};
343 bool caught_thread_busy =
false;
346 std::atomic<size_t> running{3};
347 foo->m_context.loop->sync([&]
349 for (
size_t i = 0; i < running; i++)
351 auto request{client.callFnAsyncRequest()};
352 auto context{request.initContext()};
353 context.setCallbackThread(*callback_thread);
354 context.setThread(*request_thread);
355 foo->m_context.loop->m_task_set->add(request.send().then(
356 [&](
auto&& results) {
358 tc.waiter->m_cv.notify_all();
360 [&](kj::Exception&& e) {
361 KJ_EXPECT(std::string_view{e.getDescription().cStr()} ==
362 "remote exception: std::exception: thread busy");
363 caught_thread_busy =
true;
366 tc.waiter->m_cv.notify_all();
372 Lock lock(tc.waiter->m_mutex);
373 tc.waiter->wait(lock, [&running] {
return running == 0; });
375 KJ_EXPECT(caught_thread_busy);
Object holding network & rpc state associated with either an incoming server connection,...
Event loop implementation.
kj::AsyncIoContext m_io_context
Capnp IO context.
Event loop smart pointer automatically managing m_num_clients.
Test setup class creating a two way connection between a ProxyServer<FooInterface> object and a Proxy...
std::promise< std::unique_ptr< ProxyClient< messages::FooInterface > > > client_promise
TestSetup(bool client_owns_connection=true)
std::function< void()> server_disconnect
std::function< void()> client_disconnect
std::thread thread
Thread variable should be after other struct members so the thread does not start until the other mem...
ProxyServer< messages::FooInterface > * server
std::unique_ptr< ProxyClient< messages::FooInterface > > client
KJ_TEST("Make simultaneous IPC calls to trigger 'thread busy' error")
Functions to serialize / deserialize common bitcoin types.
thread_local ThreadContext g_thread_context
Log level
The severity level of this message.
std::string message
Message to be logged.
Mapping from capnp interface type to proxy client implementation (specializations are generated by pr...
Vat id for server side of connection.
The thread_local ThreadContext g_thread_context struct provides information about individual threads ...
std::vector< bool > vbool