23void ConnmanTestMsg::Handshake(
CNode&
node,
24 bool successfully_connected,
30 auto& peerman{
static_cast<PeerManager&
>(*m_msgproc)};
34 peerman.SendMessages(&
node);
52 (void)connman.ReceiveMsgFrom(
node, std::move(msg_version));
53 node.fPauseSend =
false;
54 connman.ProcessMessagesOnce(
node);
55 peerman.SendMessages(&
node);
57 if (
node.fDisconnect)
return;
61 assert(peerman.GetNodeStateStats(
node.GetId(), statestats));
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(
node, std::move(msg_verack));
67 node.fPauseSend =
false;
68 connman.ProcessMessagesOnce(
node);
69 peerman.SendMessages(&
node);
76 assert(
node.ReceiveMsgBytes(msg_bytes, complete));
78 node.MarkReceivedMsgsForProcessing();
85 node.vSendMsg.clear();
86 node.m_send_memusage = 0;
88 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
89 if (to_send.empty())
break;
90 node.m_transport->MarkBytesSent(to_send.size());
96 bool queued =
node.m_transport->SetMessageToSend(ser_msg);
100 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
101 if (to_send.empty())
break;
103 node.m_transport->MarkBytesSent(to_send.size());
111 if (!
node)
return nullptr;
114 node->fSuccessfullyConnected =
true;
121 std::vector<NodeEvictionCandidate> candidates;
122 candidates.reserve(n_candidates);
123 for (
int id = 0;
id < n_candidates; ++id) {
124 candidates.push_back({
126 .m_connected=std::chrono::seconds{random_context.
randrange(100)},
127 .m_min_ping_time=std::chrono::microseconds{random_context.
randrange(100)},
128 .m_last_block_time=std::chrono::seconds{random_context.
randrange(100)},
129 .m_last_tx_time=std::chrono::seconds{random_context.
randrange(100)},
130 .fRelevantServices=random_context.
randbool(),
131 .m_relay_txs=random_context.
randbool(),
132 .fBloomFilter=random_context.
randbool(),
133 .nKeyedNetGroup=random_context.
randrange(100u),
134 .prefer_evict=random_context.
randbool(),
135 .m_is_local=random_context.
randbool(),
158 memset(buf, 0x0, len);
170 if (addr !=
nullptr) {
172 memset(addr, 0x00, *addr_len);
173 const socklen_t write_len =
static_cast<socklen_t
>(
sizeof(sockaddr_in));
174 if (*addr_len >= write_len) {
175 *addr_len = write_len;
176 sockaddr_in* addr_in =
reinterpret_cast<sockaddr_in*
>(addr);
177 addr_in->sin_family = AF_INET;
178 memset(&addr_in->sin_addr, 0x05,
sizeof(addr_in->sin_addr));
179 addr_in->sin_port = htons(6789);
182 return std::make_unique<ZeroSock>();
187 std::memset(opt_val, 0x0, *opt_len);
195 std::memset(
name, 0x0, *name_len);
205 if (occurred !=
nullptr) {
206 *occurred = requested;
213 for (
auto& [sock, events] : events_per_sock) {
215 events.occurred = events.requested;
222 assert(
false &&
"Move of Sock into ZeroSock not allowed.");
227 : m_contents{contents}
235 if ((
flags & MSG_PEEK) == 0) {
238 return consume_bytes;
243 assert(
false &&
"Move of Sock into StaticContentsSock not allowed.");
251 if (m_data.empty()) {
259 const size_t read_bytes{std::min(len, m_data.size())};
261 std::memcpy(buf, m_data.data(), read_bytes);
262 if ((
flags & MSG_PEEK) == 0) {
263 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
276 WaitForDataOrEof(lock);
277 if (m_eof && m_data.empty()) {
283 if (!transport.ReceivedBytes(
s)) {
286 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() -
s.size());
287 if (transport.ReceivedMessageComplete()) {
290 if (m_data.empty()) {
291 WaitForDataOrEof(lock);
292 if (m_eof && m_data.empty()) {
304 return std::make_optional<CNetMessage>(std::move(
msg));
310 const uint8_t* b =
static_cast<const uint8_t*
>(buf);
311 m_data.insert(m_data.end(), b, b + len);
324 Assert(lock.mutex() == &m_mutex);
328 return !m_data.empty() || m_eof;
349 m_pipes->send.PushBytes(buf, len);
361 Event* occurred)
const
364 ev.emplace(
this,
Events{requested});
366 if (occurred !=
nullptr) {
367 *occurred = ev.begin()->second.occurred;
374 const auto deadline = std::chrono::steady_clock::now() + timeout;
375 bool at_least_one_event_occurred{
false};
379 for (
auto& [sock, events] : events_per_sock) {
383 at_least_one_event_occurred =
true;
387 auto dyn_sock =
reinterpret_cast<const DynSock*
>(sock.get());
389 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
391 at_least_one_event_occurred =
true;
396 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
400 std::this_thread::sleep_for(10ms);
408 assert(
false &&
"Move of Sock into DynSock not allowed.");
#define Assert(val)
Identity function.
A CService with information about it as peer.
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
static constexpr SerParams V1
Transport protocol agnostic message container.
Information about a peer.
A combination of a network address (CNetAddr) and a (TCP) port.
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
std::shared_ptr< Pipes > m_pipes
std::shared_ptr< Queue > m_accept_sockets
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
bool randbool() noexcept
Generate a random boolean.
RAII helper class that manages a socket and closes it automatically when it goes out of scope.
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
SOCKET m_socket
Contained socket.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
A Span is an object that can refer to a contiguous sequence of objects.
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
const std::string m_contents
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
StaticContentsSock(const std::string &contents)
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Wrapper around std::unique_lock style lock for MutexType.
A mocked Sock alternative that succeeds on all operations.
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
int Listen(int) const override
listen(2) wrapper.
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
ConnectionType
Different types of connections to a peer.
@ INBOUND
Inbound connections are those initiated by a peer.
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
ServiceFlags
nServices flags
static const int PROTOCOL_VERSION
network protocol versioning
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class.
ServiceFlags their_services
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
void NodeReceiveMsgBytes(CNode &node, Span< const uint8_t > msg_bytes, bool &complete) const
void AddTestNode(CNode &node)
void FlushSendBuffer(CNode &node) const
Auxiliary requested/occurred events to wait for in WaitMany().
#define WAIT_LOCK(cs, name)
static std::atomic< SOCKET > g_mocked_sock_fd
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
constexpr auto ALL_NETWORKS
#define EXCLUSIVE_LOCKS_REQUIRED(...)