23void ConnmanTestMsg::Handshake(
CNode&
node,
24 bool successfully_connected,
30 auto& peerman{
static_cast<PeerManager&
>(*m_msgproc)};
34 peerman.SendMessages(&
node);
52 (void)connman.ReceiveMsgFrom(
node, std::move(msg_version));
53 node.fPauseSend =
false;
54 connman.ProcessMessagesOnce(
node);
55 peerman.SendMessages(&
node);
57 if (
node.fDisconnect)
return;
61 assert(peerman.GetNodeStateStats(
node.GetId(), statestats));
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(
node, std::move(msg_verack));
67 node.fPauseSend =
false;
68 connman.ProcessMessagesOnce(
node);
69 peerman.SendMessages(&
node);
79 nMaxOutboundCycleStartTime = 0
s;
80 nMaxOutboundTotalBytesSentInCycle = 0;
85 assert(
node.ReceiveMsgBytes(msg_bytes, complete));
87 node.MarkReceivedMsgsForProcessing();
94 node.vSendMsg.clear();
95 node.m_send_memusage = 0;
97 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
98 if (to_send.empty())
break;
99 node.m_transport->MarkBytesSent(to_send.size());
105 bool queued =
node.m_transport->SetMessageToSend(ser_msg);
107 bool complete{
false};
109 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
110 if (to_send.empty())
break;
112 node.m_transport->MarkBytesSent(to_send.size());
120 if (!
node)
return nullptr;
123 node->fSuccessfullyConnected =
true;
130 std::vector<NodeEvictionCandidate> candidates;
131 candidates.reserve(n_candidates);
132 for (
int id = 0;
id < n_candidates; ++id) {
133 candidates.push_back({
135 .m_connected=std::chrono::seconds{random_context.
randrange(100)},
136 .m_min_ping_time=std::chrono::microseconds{random_context.
randrange(100)},
137 .m_last_block_time=std::chrono::seconds{random_context.
randrange(100)},
138 .m_last_tx_time=std::chrono::seconds{random_context.
randrange(100)},
139 .fRelevantServices=random_context.
randbool(),
140 .m_relay_txs=random_context.
randbool(),
141 .fBloomFilter=random_context.
randbool(),
142 .nKeyedNetGroup=random_context.
randrange(100u),
143 .prefer_evict=random_context.
randbool(),
144 .m_is_local=random_context.
randbool(),
167 memset(buf, 0x0, len);
179 if (addr !=
nullptr) {
181 memset(addr, 0x00, *addr_len);
182 const socklen_t write_len =
static_cast<socklen_t
>(
sizeof(sockaddr_in));
183 if (*addr_len >= write_len) {
184 *addr_len = write_len;
185 sockaddr_in* addr_in =
reinterpret_cast<sockaddr_in*
>(addr);
186 addr_in->sin_family = AF_INET;
187 memset(&addr_in->sin_addr, 0x05,
sizeof(addr_in->sin_addr));
188 addr_in->sin_port = htons(6789);
191 return std::make_unique<ZeroSock>();
196 std::memset(opt_val, 0x0, *opt_len);
204 std::memset(
name, 0x0, *name_len);
214 if (occurred !=
nullptr) {
215 *occurred = requested;
222 for (
auto& [sock, events] : events_per_sock) {
224 events.occurred = events.requested;
231 assert(
false &&
"Move of Sock into ZeroSock not allowed.");
236 : m_contents{contents}
244 if ((
flags & MSG_PEEK) == 0) {
247 return consume_bytes;
252 assert(
false &&
"Move of Sock into StaticContentsSock not allowed.");
260 if (m_data.empty()) {
268 const size_t read_bytes{std::min(len, m_data.size())};
270 std::memcpy(buf, m_data.data(), read_bytes);
271 if ((
flags & MSG_PEEK) == 0) {
272 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
285 WaitForDataOrEof(lock);
286 if (m_eof && m_data.empty()) {
291 std::span<const uint8_t>
s{m_data};
292 if (!transport.ReceivedBytes(
s)) {
295 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() -
s.size());
296 if (transport.ReceivedMessageComplete()) {
299 if (m_data.empty()) {
300 WaitForDataOrEof(lock);
301 if (m_eof && m_data.empty()) {
313 return std::make_optional<CNetMessage>(std::move(
msg));
319 const uint8_t* b =
static_cast<const uint8_t*
>(buf);
320 m_data.insert(m_data.end(), b, b + len);
333 Assert(lock.mutex() == &m_mutex);
337 return !m_data.empty() || m_eof;
358 m_pipes->send.PushBytes(buf, len);
370 Event* occurred)
const
373 ev.emplace(
this,
Events{requested});
375 if (occurred !=
nullptr) {
376 *occurred = ev.begin()->second.occurred;
383 const auto deadline = std::chrono::steady_clock::now() + timeout;
384 bool at_least_one_event_occurred{
false};
388 for (
auto& [sock, events] : events_per_sock) {
392 at_least_one_event_occurred =
true;
396 auto dyn_sock =
reinterpret_cast<const DynSock*
>(sock.get());
398 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
400 at_least_one_event_occurred =
true;
405 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
409 std::this_thread::sleep_for(10ms);
417 assert(
false &&
"Move of Sock into DynSock not allowed.");
#define Assert(val)
Identity function.
A CService with information about it as peer.
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Mutex m_total_bytes_sent_mutex
std::map< uint64_t, CachedAddrResponse > m_addr_response_caches
Addr responses stored in different caches per (network, local socket) prevent cross-network node iden...
static constexpr SerParams V1
Transport protocol agnostic message container.
Information about a peer.
A combination of a network address (CNetAddr) and a (TCP) port.
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
std::shared_ptr< Pipes > m_pipes
std::shared_ptr< Queue > m_accept_sockets
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
bool randbool() noexcept
Generate a random boolean.
RAII helper class that manages a socket and closes it automatically when it goes out of scope.
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
SOCKET m_socket
Contained socket.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
const std::string m_contents
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
StaticContentsSock(const std::string &contents)
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Wrapper around std::unique_lock style lock for MutexType.
A mocked Sock alternative that succeeds on all operations.
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
int Listen(int) const override
listen(2) wrapper.
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
ConnectionType
Different types of connections to a peer.
@ INBOUND
Inbound connections are those initiated by a peer.
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
ServiceFlags
nServices flags
static const int PROTOCOL_VERSION
network protocol versioning
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class.
ServiceFlags their_services
void NodeReceiveMsgBytes(CNode &node, std::span< const uint8_t > msg_bytes, bool &complete) const
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
void AddTestNode(CNode &node)
void FlushSendBuffer(CNode &node) const
void ResetMaxOutboundCycle()
Auxiliary requested/occurred events to wait for in WaitMany().
#define WAIT_LOCK(cs, name)
static std::atomic< SOCKET > g_mocked_sock_fd
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
constexpr auto ALL_NETWORKS
#define EXCLUSIVE_LOCKS_REQUIRED(...)