23void ConnmanTestMsg::Handshake(
CNode&
node,
24 bool successfully_connected,
30 auto& peerman{
static_cast<PeerManager&
>(*m_msgproc)};
34 peerman.SendMessages(
node);
52 (void)connman.ReceiveMsgFrom(
node, std::move(msg_version));
53 node.fPauseSend =
false;
54 connman.ProcessMessagesOnce(
node);
55 peerman.SendMessages(
node);
57 if (
node.fDisconnect)
return;
61 assert(peerman.GetNodeStateStats(
node.GetId(), statestats));
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(
node, std::move(msg_verack));
67 node.fPauseSend =
false;
68 connman.ProcessMessagesOnce(
node);
69 peerman.SendMessages(
node);
79 nMaxOutboundCycleStartTime = 0
s;
80 nMaxOutboundTotalBytesSentInCycle = 0;
93 assert(
node.ReceiveMsgBytes(msg_bytes, complete));
95 node.MarkReceivedMsgsForProcessing();
102 node.vSendMsg.clear();
103 node.m_send_memusage = 0;
105 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
106 if (to_send.empty())
break;
107 node.m_transport->MarkBytesSent(to_send.size());
113 bool queued =
node.m_transport->SetMessageToSend(ser_msg);
115 bool complete{
false};
117 const auto& [to_send, _more, _msg_type] =
node.m_transport->GetBytesToSend(
false);
118 if (to_send.empty())
break;
120 node.m_transport->MarkBytesSent(to_send.size());
128 if (!
node)
return nullptr;
131 node->fSuccessfullyConnected =
true;
138 std::vector<NodeEvictionCandidate> candidates;
139 candidates.reserve(n_candidates);
140 for (
int id = 0;
id < n_candidates; ++id) {
141 candidates.push_back({
143 .m_connected=std::chrono::seconds{random_context.
randrange(100)},
144 .m_min_ping_time=std::chrono::microseconds{random_context.
randrange(100)},
145 .m_last_block_time=std::chrono::seconds{random_context.
randrange(100)},
146 .m_last_tx_time=std::chrono::seconds{random_context.
randrange(100)},
147 .fRelevantServices=random_context.
randbool(),
148 .m_relay_txs=random_context.
randbool(),
149 .fBloomFilter=random_context.
randbool(),
150 .nKeyedNetGroup=random_context.
randrange(100u),
151 .prefer_evict=random_context.
randbool(),
152 .m_is_local=random_context.
randbool(),
175 memset(buf, 0x0, len);
187 if (addr !=
nullptr) {
189 memset(addr, 0x00, *addr_len);
190 const socklen_t write_len =
static_cast<socklen_t
>(
sizeof(sockaddr_in));
191 if (*addr_len >= write_len) {
192 *addr_len = write_len;
193 sockaddr_in* addr_in =
reinterpret_cast<sockaddr_in*
>(addr);
194 addr_in->sin_family = AF_INET;
195 memset(&addr_in->sin_addr, 0x05,
sizeof(addr_in->sin_addr));
196 addr_in->sin_port = htons(6789);
199 return std::make_unique<ZeroSock>();
204 std::memset(opt_val, 0x0, *opt_len);
212 std::memset(
name, 0x0, *name_len);
222 if (occurred !=
nullptr) {
223 *occurred = requested;
230 for (
auto& [sock, events] : events_per_sock) {
232 events.occurred = events.requested;
239 assert(
false &&
"Move of Sock into ZeroSock not allowed.");
244 : m_contents{contents}
252 if ((
flags & MSG_PEEK) == 0) {
255 return consume_bytes;
260 assert(
false &&
"Move of Sock into StaticContentsSock not allowed.");
268 if (m_data.empty()) {
276 const size_t read_bytes{std::min(len, m_data.size())};
278 std::memcpy(buf, m_data.data(), read_bytes);
279 if ((
flags & MSG_PEEK) == 0) {
280 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
293 WaitForDataOrEof(lock);
294 if (m_eof && m_data.empty()) {
299 std::span<const uint8_t>
s{m_data};
300 if (!transport.ReceivedBytes(
s)) {
303 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() -
s.size());
304 if (transport.ReceivedMessageComplete()) {
307 if (m_data.empty()) {
308 WaitForDataOrEof(lock);
309 if (m_eof && m_data.empty()) {
321 return std::make_optional<CNetMessage>(std::move(
msg));
327 const uint8_t* b =
static_cast<const uint8_t*
>(buf);
328 m_data.insert(m_data.end(), b, b + len);
341 Assert(lock.mutex() == &m_mutex);
345 return !m_data.empty() || m_eof;
366 m_pipes->send.PushBytes(buf, len);
378 Event* occurred)
const
381 ev.emplace(
this,
Events{requested});
383 if (occurred !=
nullptr) {
384 *occurred = ev.begin()->second.occurred;
391 const auto deadline = std::chrono::steady_clock::now() + timeout;
392 bool at_least_one_event_occurred{
false};
396 for (
auto& [sock, events] : events_per_sock) {
400 at_least_one_event_occurred =
true;
404 auto dyn_sock =
reinterpret_cast<const DynSock*
>(sock.get());
406 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
408 at_least_one_event_occurred =
true;
413 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
417 std::this_thread::sleep_for(10ms);
425 assert(
false &&
"Move of Sock into DynSock not allowed.");
#define Assert(val)
Identity function.
A CService with information about it as peer.
std::atomic_bool m_outbound_tor_ok_at_least_once
Remember if we ever established at least one outbound connection to a Tor peer, including sending and...
std::atomic_size_t m_num_to_open
Number of ConnectionType::PRIVATE_BROADCAST connections to open.
class CConnman::PrivateBroadcast m_private_broadcast
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport, const std::optional< Proxy > &proxy_override) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Open a new P2P connection.
Mutex m_total_bytes_sent_mutex
std::map< uint64_t, CachedAddrResponse > m_addr_response_caches
Addr responses stored in different caches per (network, local socket) prevent cross-network node iden...
static constexpr SerParams V1
Transport protocol agnostic message container.
Information about a peer.
A combination of a network address (CNetAddr) and a (TCP) port.
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
std::shared_ptr< Pipes > m_pipes
std::shared_ptr< Queue > m_accept_sockets
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
bool randbool() noexcept
Generate a random boolean.
RAII helper class that manages a socket and closes it automatically when it goes out of scope.
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
SOCKET m_socket
Contained socket.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
const std::string m_contents
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
StaticContentsSock(const std::string &contents)
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Wrapper around std::unique_lock style lock for MutexType.
A mocked Sock alternative that succeeds on all operations.
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
int Listen(int) const override
listen(2) wrapper.
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
ConnectionType
Different types of connections to a peer.
@ INBOUND
Inbound connections are those initiated by a peer.
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
ServiceFlags
nServices flags
static const int PROTOCOL_VERSION
network protocol versioning
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class.
ServiceFlags their_services
void NodeReceiveMsgBytes(CNode &node, std::span< const uint8_t > msg_bytes, bool &complete) const
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
void AddTestNode(CNode &node)
void Reset()
Reset the internal state.
void FlushSendBuffer(CNode &node) const
void ResetMaxOutboundCycle()
Auxiliary requested/occurred events to wait for in WaitMany().
#define WAIT_LOCK(cs, name)
static std::atomic< SOCKET > g_mocked_sock_fd
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
constexpr auto ALL_NETWORKS
#define EXCLUSIVE_LOCKS_REQUIRED(...)