Bitcoin Core 30.99.0
P2P Digital Currency
net.cpp
Go to the documentation of this file.
1// Copyright (c) 2020-present The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <test/util/net.h>
6
7#include <net.h>
8#include <net_processing.h>
9#include <netaddress.h>
10#include <netmessagemaker.h>
12#include <node/eviction.h>
13#include <protocol.h>
14#include <random.h>
15#include <serialize.h>
16#include <span.h>
17#include <sync.h>
18
19#include <chrono>
20#include <optional>
21#include <vector>
22
23void ConnmanTestMsg::Handshake(CNode& node,
24 bool successfully_connected,
25 ServiceFlags remote_services,
26 ServiceFlags local_services,
27 int32_t version,
28 bool relay_txs)
29{
30 auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
31 auto& connman{*this};
32
33 peerman.InitializeNode(node, local_services);
34 peerman.SendMessages(node);
35 FlushSendBuffer(node); // Drop the version message added by SendMessages.
36
37 CSerializedNetMsg msg_version{
39 version, //
40 Using<CustomUintFormatter<8>>(remote_services), //
41 int64_t{}, // dummy time
42 int64_t{}, // ignored service bits
43 CNetAddr::V1(CService{}), // dummy
44 int64_t{}, // ignored service bits
45 CNetAddr::V1(CService{}), // ignored
46 uint64_t{1}, // dummy nonce
47 std::string{}, // dummy subver
48 int32_t{}, // dummy starting_height
49 relay_txs),
50 };
51
52 (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
53 node.fPauseSend = false;
54 connman.ProcessMessagesOnce(node);
55 peerman.SendMessages(node);
56 FlushSendBuffer(node); // Drop the verack message added by SendMessages.
57 if (node.fDisconnect) return;
58 assert(node.nVersion == version);
59 assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
60 CNodeStateStats statestats;
61 assert(peerman.GetNodeStateStats(node.GetId(), statestats));
62 assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
63 assert(statestats.their_services == remote_services);
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
67 node.fPauseSend = false;
68 connman.ProcessMessagesOnce(node);
69 peerman.SendMessages(node);
70 assert(node.fSuccessfullyConnected == true);
71 }
72}
73
75
77{
79 nMaxOutboundCycleStartTime = 0s;
80 nMaxOutboundTotalBytesSentInCycle = 0;
81}
82
84{
89}
90
91void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, std::span<const uint8_t> msg_bytes, bool& complete) const
92{
93 assert(node.ReceiveMsgBytes(msg_bytes, complete));
94 if (complete) {
95 node.MarkReceivedMsgsForProcessing();
96 }
97}
98
100{
101 LOCK(node.cs_vSend);
102 node.vSendMsg.clear();
103 node.m_send_memusage = 0;
104 while (true) {
105 const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
106 if (to_send.empty()) break;
107 node.m_transport->MarkBytesSent(to_send.size());
108 }
109}
110
112{
113 bool queued = node.m_transport->SetMessageToSend(ser_msg);
114 assert(queued);
115 bool complete{false};
116 while (true) {
117 const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
118 if (to_send.empty()) break;
119 NodeReceiveMsgBytes(node, to_send, complete);
120 node.m_transport->MarkBytesSent(to_send.size());
121 }
122 return complete;
123}
124
125CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
126{
127 CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true, /*proxy_override=*/std::nullopt);
128 if (!node) return nullptr;
129 node->SetCommonVersion(PROTOCOL_VERSION);
131 node->fSuccessfullyConnected = true;
133 return node;
134}
135
136std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
137{
138 std::vector<NodeEvictionCandidate> candidates;
139 candidates.reserve(n_candidates);
140 for (int id = 0; id < n_candidates; ++id) {
141 candidates.push_back({
142 .id=id,
143 .m_connected=std::chrono::seconds{random_context.randrange(100)},
144 .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
145 .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
146 .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
147 .fRelevantServices=random_context.randbool(),
148 .m_relay_txs=random_context.randbool(),
149 .fBloomFilter=random_context.randbool(),
150 .nKeyedNetGroup=random_context.randrange(100u),
151 .prefer_evict=random_context.randbool(),
152 .m_is_local=random_context.randbool(),
153 .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
154 .m_noban=false,
155 .m_conn_type=ConnectionType::INBOUND,
156 });
157 }
158 return candidates;
159}
160
161// Have different ZeroSock (or others that inherit from it) objects have different
162// m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
163// different objects comparing as equal.
164static std::atomic<SOCKET> g_mocked_sock_fd{0};
165
167
168// Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
170
171ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
172
173ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
174{
175 memset(buf, 0x0, len);
176 return len;
177}
178
179int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
180
181int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
182
183int ZeroSock::Listen(int) const { return 0; }
184
185std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
186{
187 if (addr != nullptr) {
188 // Pretend all connections come from 5.5.5.5:6789
189 memset(addr, 0x00, *addr_len);
190 const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
191 if (*addr_len >= write_len) {
192 *addr_len = write_len;
193 sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
194 addr_in->sin_family = AF_INET;
195 memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
196 addr_in->sin_port = htons(6789);
197 }
198 }
199 return std::make_unique<ZeroSock>();
200}
201
202int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
203{
204 std::memset(opt_val, 0x0, *opt_len);
205 return 0;
206}
207
208int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
209
210int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
211{
212 std::memset(name, 0x0, *name_len);
213 return 0;
214}
215
216bool ZeroSock::SetNonBlocking() const { return true; }
217
218bool ZeroSock::IsSelectable() const { return true; }
219
220bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
221{
222 if (occurred != nullptr) {
223 *occurred = requested;
224 }
225 return true;
226}
227
228bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
229{
230 for (auto& [sock, events] : events_per_sock) {
231 (void)sock;
232 events.occurred = events.requested;
233 }
234 return true;
235}
236
238{
239 assert(false && "Move of Sock into ZeroSock not allowed.");
240 return *this;
241}
242
243StaticContentsSock::StaticContentsSock(const std::string& contents)
244 : m_contents{contents}
245{
246}
247
248ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
249{
250 const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
251 std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
252 if ((flags & MSG_PEEK) == 0) {
253 m_consumed += consume_bytes;
254 }
255 return consume_bytes;
256}
257
259{
260 assert(false && "Move of Sock into StaticContentsSock not allowed.");
261 return *this;
262}
263
264ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
265{
266 WAIT_LOCK(m_mutex, lock);
267
268 if (m_data.empty()) {
269 if (m_eof) {
270 return 0;
271 }
272 errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
273 return -1;
274 }
275
276 const size_t read_bytes{std::min(len, m_data.size())};
277
278 std::memcpy(buf, m_data.data(), read_bytes);
279 if ((flags & MSG_PEEK) == 0) {
280 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
281 }
282
283 return read_bytes;
284}
285
286std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
287{
288 V1Transport transport{NodeId{0}};
289
290 {
291 WAIT_LOCK(m_mutex, lock);
292
293 WaitForDataOrEof(lock);
294 if (m_eof && m_data.empty()) {
295 return std::nullopt;
296 }
297
298 for (;;) {
299 std::span<const uint8_t> s{m_data};
300 if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
301 return std::nullopt;
302 }
303 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
304 if (transport.ReceivedMessageComplete()) {
305 break;
306 }
307 if (m_data.empty()) {
308 WaitForDataOrEof(lock);
309 if (m_eof && m_data.empty()) {
310 return std::nullopt;
311 }
312 }
313 }
314 }
315
316 bool reject{false};
317 CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
318 if (reject) {
319 return std::nullopt;
320 }
321 return std::make_optional<CNetMessage>(std::move(msg));
322}
323
324void DynSock::Pipe::PushBytes(const void* buf, size_t len)
325{
326 LOCK(m_mutex);
327 const uint8_t* b = static_cast<const uint8_t*>(buf);
328 m_data.insert(m_data.end(), b, b + len);
329 m_cond.notify_all();
330}
331
333{
334 LOCK(m_mutex);
335 m_eof = true;
336 m_cond.notify_all();
337}
338
340{
341 Assert(lock.mutex() == &m_mutex);
342
343 m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
344 AssertLockHeld(m_mutex);
345 return !m_data.empty() || m_eof;
346 });
347}
348
349DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
350 : m_pipes{pipes}, m_accept_sockets{accept_sockets}
351{
352}
353
355{
356 m_pipes->send.Eof();
357}
358
359ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
360{
361 return m_pipes->recv.GetBytes(buf, len, flags);
362}
363
364ssize_t DynSock::Send(const void* buf, size_t len, int) const
365{
366 m_pipes->send.PushBytes(buf, len);
367 return len;
368}
369
370std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
371{
372 ZeroSock::Accept(addr, addr_len);
373 return m_accept_sockets->Pop().value_or(nullptr);
374}
375
376bool DynSock::Wait(std::chrono::milliseconds timeout,
377 Event requested,
378 Event* occurred) const
379{
380 EventsPerSock ev;
381 ev.emplace(this, Events{requested});
382 const bool ret{WaitMany(timeout, ev)};
383 if (occurred != nullptr) {
384 *occurred = ev.begin()->second.occurred;
385 }
386 return ret;
387}
388
389bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
390{
391 const auto deadline = std::chrono::steady_clock::now() + timeout;
392 bool at_least_one_event_occurred{false};
393
394 for (;;) {
395 // Check all sockets for readiness without waiting.
396 for (auto& [sock, events] : events_per_sock) {
397 if ((events.requested & Sock::SEND) != 0) {
398 // Always ready for Send().
399 events.occurred |= Sock::SEND;
400 at_least_one_event_occurred = true;
401 }
402
403 if ((events.requested & Sock::RECV) != 0) {
404 auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
405 uint8_t b;
406 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
407 events.occurred |= Sock::RECV;
408 at_least_one_event_occurred = true;
409 }
410 }
411 }
412
413 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
414 break;
415 }
416
417 std::this_thread::sleep_for(10ms);
418 }
419
420 return true;
421}
422
424{
425 assert(false && "Move of Sock into DynSock not allowed.");
426 return *this;
427}
int ret
int flags
Definition: bitcoin-tx.cpp:529
#define Assert(val)
Identity function.
Definition: check.h:113
A CService with information about it as peer.
Definition: protocol.h:367
std::atomic_bool m_outbound_tor_ok_at_least_once
Remember if we ever established at least one outbound connection to a Tor peer, including sending and...
Definition: net.h:1204
std::atomic_size_t m_num_to_open
Number of ConnectionType::PRIVATE_BROADCAST connections to open.
Definition: net.h:1253
class CConnman::PrivateBroadcast m_private_broadcast
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport, const std::optional< Proxy > &proxy_override) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Open a new P2P connection.
Definition: net.cpp:372
Mutex m_total_bytes_sent_mutex
Definition: net.h:1573
std::map< uint64_t, CachedAddrResponse > m_addr_response_caches
Addr responses stored in different caches per (network, local socket) prevent cross-network node iden...
Definition: net.h:1640
static constexpr SerParams V1
Definition: netaddress.h:231
Transport protocol agnostic message container.
Definition: net.h:238
Information about a peer.
Definition: net.h:680
A combination of a network address (CNetAddr) and a (TCP) port.
Definition: netaddress.h:530
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
Definition: net.cpp:264
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
Definition: net.cpp:339
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
Definition: net.cpp:332
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
Definition: net.cpp:286
Mutex m_mutex
Definition: net.h:287
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
Definition: net.cpp:324
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
Definition: net.h:239
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:423
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:359
std::shared_ptr< Pipes > m_pipes
Definition: net.h:358
std::shared_ptr< Queue > m_accept_sockets
Definition: net.h:359
~DynSock()
Definition: net.cpp:354
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:376
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:370
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:364
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
Definition: net.cpp:349
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:389
Fast randomness source.
Definition: random.h:386
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Definition: random.h:254
bool randbool() noexcept
Generate a random boolean.
Definition: random.h:325
RAII helper class that manages a socket and closes it automatically when it goes out of scope.
Definition: sock.h:28
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
Definition: sock.h:149
SOCKET m_socket
Contained socket.
Definition: sock.h:276
uint8_t Event
Definition: sock.h:139
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
Definition: sock.h:144
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
Definition: sock.h:209
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
Definition: net.h:212
size_t m_consumed
Definition: net.h:231
const std::string m_contents
Definition: net.h:230
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:258
StaticContentsSock(const std::string &contents)
Definition: net.cpp:243
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Definition: net.cpp:248
Wrapper around std::unique_lock style lock for MutexType.
Definition: sync.h:146
A mocked Sock alternative that succeeds on all operations.
Definition: net.h:168
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
Definition: net.cpp:202
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:220
ZeroSock()
Definition: net.cpp:166
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:228
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
Definition: net.cpp:210
int Listen(int) const override
listen(2) wrapper.
Definition: net.cpp:183
~ZeroSock() override
Definition: net.cpp:169
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
Definition: net.cpp:181
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
Definition: net.cpp:179
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:171
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:185
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
Definition: net.cpp:216
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:173
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:237
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
Definition: net.cpp:208
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
Definition: net.cpp:218
#define INVALID_SOCKET
Definition: compat.h:67
ConnectionType
Different types of connections to a peer.
@ INBOUND
Inbound connections are those initiated by a peer.
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
Definition: protocol.h:70
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
Definition: protocol.h:65
Definition: messages.h:21
int64_t NodeId
Definition: net.h:103
ServiceFlags
nServices flags
Definition: protocol.h:309
@ NODE_WITNESS
Definition: protocol.h:320
@ NODE_NETWORK
Definition: protocol.h:315
static const int PROTOCOL_VERSION
network protocol versioning
const char * name
Definition: rest.cpp:48
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class.
Definition: serialize.h:488
ServiceFlags their_services
void NodeReceiveMsgBytes(CNode &node, std::span< const uint8_t > msg_bytes, bool &complete) const
Definition: net.cpp:91
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
Definition: net.cpp:111
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Definition: net.cpp:125
void ResetAddrCache()
Definition: net.cpp:74
void AddTestNode(CNode &node)
Definition: net.h:61
void Reset()
Reset the internal state.
Definition: net.cpp:83
void FlushSendBuffer(CNode &node) const
Definition: net.cpp:99
void ResetMaxOutboundCycle()
Definition: net.cpp:76
Serialization wrapper class for custom integers and enums.
Definition: serialize.h:521
Auxiliary requested/occurred events to wait for in WaitMany().
Definition: sock.h:174
#define WAIT_LOCK(cs, name)
Definition: sync.h:264
#define LOCK(cs)
Definition: sync.h:258
static std::atomic< SOCKET > g_mocked_sock_fd
Definition: net.cpp:164
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
Definition: net.cpp:136
constexpr auto ALL_NETWORKS
Definition: net.h:153
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:51
AssertLockHeld(pool.cs)
assert(!tx.IsCoinBase())