Bitcoin Core 29.99.0
P2P Digital Currency
net.cpp
Go to the documentation of this file.
1// Copyright (c) 2020-present The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <test/util/net.h>
6
7#include <net.h>
8#include <net_processing.h>
9#include <netaddress.h>
10#include <netmessagemaker.h>
12#include <node/eviction.h>
13#include <protocol.h>
14#include <random.h>
15#include <serialize.h>
16#include <span.h>
17#include <sync.h>
18
19#include <chrono>
20#include <optional>
21#include <vector>
22
23void ConnmanTestMsg::Handshake(CNode& node,
24 bool successfully_connected,
25 ServiceFlags remote_services,
26 ServiceFlags local_services,
27 int32_t version,
28 bool relay_txs)
29{
30 auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
31 auto& connman{*this};
32
33 peerman.InitializeNode(node, local_services);
34 peerman.SendMessages(&node);
35 FlushSendBuffer(node); // Drop the version message added by SendMessages.
36
37 CSerializedNetMsg msg_version{
39 version, //
40 Using<CustomUintFormatter<8>>(remote_services), //
41 int64_t{}, // dummy time
42 int64_t{}, // ignored service bits
43 CNetAddr::V1(CService{}), // dummy
44 int64_t{}, // ignored service bits
45 CNetAddr::V1(CService{}), // ignored
46 uint64_t{1}, // dummy nonce
47 std::string{}, // dummy subver
48 int32_t{}, // dummy starting_height
49 relay_txs),
50 };
51
52 (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
53 node.fPauseSend = false;
54 connman.ProcessMessagesOnce(node);
55 peerman.SendMessages(&node);
56 FlushSendBuffer(node); // Drop the verack message added by SendMessages.
57 if (node.fDisconnect) return;
58 assert(node.nVersion == version);
59 assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
60 CNodeStateStats statestats;
61 assert(peerman.GetNodeStateStats(node.GetId(), statestats));
62 assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
63 assert(statestats.their_services == remote_services);
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
67 node.fPauseSend = false;
68 connman.ProcessMessagesOnce(node);
69 peerman.SendMessages(&node);
70 assert(node.fSuccessfullyConnected == true);
71 }
72}
73
75
77{
79 nMaxOutboundCycleStartTime = 0s;
80 nMaxOutboundTotalBytesSentInCycle = 0;
81}
82
83void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, std::span<const uint8_t> msg_bytes, bool& complete) const
84{
85 assert(node.ReceiveMsgBytes(msg_bytes, complete));
86 if (complete) {
87 node.MarkReceivedMsgsForProcessing();
88 }
89}
90
92{
93 LOCK(node.cs_vSend);
94 node.vSendMsg.clear();
95 node.m_send_memusage = 0;
96 while (true) {
97 const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
98 if (to_send.empty()) break;
99 node.m_transport->MarkBytesSent(to_send.size());
100 }
101}
102
104{
105 bool queued = node.m_transport->SetMessageToSend(ser_msg);
106 assert(queued);
107 bool complete{false};
108 while (true) {
109 const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
110 if (to_send.empty()) break;
111 NodeReceiveMsgBytes(node, to_send, complete);
112 node.m_transport->MarkBytesSent(to_send.size());
113 }
114 return complete;
115}
116
117CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
118{
119 CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true);
120 if (!node) return nullptr;
121 node->SetCommonVersion(PROTOCOL_VERSION);
123 node->fSuccessfullyConnected = true;
125 return node;
126}
127
128std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
129{
130 std::vector<NodeEvictionCandidate> candidates;
131 candidates.reserve(n_candidates);
132 for (int id = 0; id < n_candidates; ++id) {
133 candidates.push_back({
134 .id=id,
135 .m_connected=std::chrono::seconds{random_context.randrange(100)},
136 .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
137 .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
138 .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
139 .fRelevantServices=random_context.randbool(),
140 .m_relay_txs=random_context.randbool(),
141 .fBloomFilter=random_context.randbool(),
142 .nKeyedNetGroup=random_context.randrange(100u),
143 .prefer_evict=random_context.randbool(),
144 .m_is_local=random_context.randbool(),
145 .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
146 .m_noban=false,
147 .m_conn_type=ConnectionType::INBOUND,
148 });
149 }
150 return candidates;
151}
152
153// Have different ZeroSock (or others that inherit from it) objects have different
154// m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
155// different objects comparing as equal.
156static std::atomic<SOCKET> g_mocked_sock_fd{0};
157
159
160// Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
162
163ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
164
165ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
166{
167 memset(buf, 0x0, len);
168 return len;
169}
170
171int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
172
173int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
174
175int ZeroSock::Listen(int) const { return 0; }
176
177std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
178{
179 if (addr != nullptr) {
180 // Pretend all connections come from 5.5.5.5:6789
181 memset(addr, 0x00, *addr_len);
182 const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
183 if (*addr_len >= write_len) {
184 *addr_len = write_len;
185 sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
186 addr_in->sin_family = AF_INET;
187 memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
188 addr_in->sin_port = htons(6789);
189 }
190 }
191 return std::make_unique<ZeroSock>();
192}
193
194int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
195{
196 std::memset(opt_val, 0x0, *opt_len);
197 return 0;
198}
199
200int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
201
202int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
203{
204 std::memset(name, 0x0, *name_len);
205 return 0;
206}
207
208bool ZeroSock::SetNonBlocking() const { return true; }
209
210bool ZeroSock::IsSelectable() const { return true; }
211
212bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
213{
214 if (occurred != nullptr) {
215 *occurred = requested;
216 }
217 return true;
218}
219
220bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
221{
222 for (auto& [sock, events] : events_per_sock) {
223 (void)sock;
224 events.occurred = events.requested;
225 }
226 return true;
227}
228
230{
231 assert(false && "Move of Sock into ZeroSock not allowed.");
232 return *this;
233}
234
235StaticContentsSock::StaticContentsSock(const std::string& contents)
236 : m_contents{contents}
237{
238}
239
240ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
241{
242 const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
243 std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
244 if ((flags & MSG_PEEK) == 0) {
245 m_consumed += consume_bytes;
246 }
247 return consume_bytes;
248}
249
251{
252 assert(false && "Move of Sock into StaticContentsSock not allowed.");
253 return *this;
254}
255
256ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
257{
258 WAIT_LOCK(m_mutex, lock);
259
260 if (m_data.empty()) {
261 if (m_eof) {
262 return 0;
263 }
264 errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
265 return -1;
266 }
267
268 const size_t read_bytes{std::min(len, m_data.size())};
269
270 std::memcpy(buf, m_data.data(), read_bytes);
271 if ((flags & MSG_PEEK) == 0) {
272 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
273 }
274
275 return read_bytes;
276}
277
278std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
279{
280 V1Transport transport{NodeId{0}};
281
282 {
283 WAIT_LOCK(m_mutex, lock);
284
285 WaitForDataOrEof(lock);
286 if (m_eof && m_data.empty()) {
287 return std::nullopt;
288 }
289
290 for (;;) {
291 std::span<const uint8_t> s{m_data};
292 if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
293 return std::nullopt;
294 }
295 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
296 if (transport.ReceivedMessageComplete()) {
297 break;
298 }
299 if (m_data.empty()) {
300 WaitForDataOrEof(lock);
301 if (m_eof && m_data.empty()) {
302 return std::nullopt;
303 }
304 }
305 }
306 }
307
308 bool reject{false};
309 CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
310 if (reject) {
311 return std::nullopt;
312 }
313 return std::make_optional<CNetMessage>(std::move(msg));
314}
315
316void DynSock::Pipe::PushBytes(const void* buf, size_t len)
317{
318 LOCK(m_mutex);
319 const uint8_t* b = static_cast<const uint8_t*>(buf);
320 m_data.insert(m_data.end(), b, b + len);
321 m_cond.notify_all();
322}
323
325{
326 LOCK(m_mutex);
327 m_eof = true;
328 m_cond.notify_all();
329}
330
332{
333 Assert(lock.mutex() == &m_mutex);
334
335 m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
336 AssertLockHeld(m_mutex);
337 return !m_data.empty() || m_eof;
338 });
339}
340
341DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
342 : m_pipes{pipes}, m_accept_sockets{accept_sockets}
343{
344}
345
347{
348 m_pipes->send.Eof();
349}
350
351ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
352{
353 return m_pipes->recv.GetBytes(buf, len, flags);
354}
355
356ssize_t DynSock::Send(const void* buf, size_t len, int) const
357{
358 m_pipes->send.PushBytes(buf, len);
359 return len;
360}
361
362std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
363{
364 ZeroSock::Accept(addr, addr_len);
365 return m_accept_sockets->Pop().value_or(nullptr);
366}
367
368bool DynSock::Wait(std::chrono::milliseconds timeout,
369 Event requested,
370 Event* occurred) const
371{
372 EventsPerSock ev;
373 ev.emplace(this, Events{requested});
374 const bool ret{WaitMany(timeout, ev)};
375 if (occurred != nullptr) {
376 *occurred = ev.begin()->second.occurred;
377 }
378 return ret;
379}
380
381bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
382{
383 const auto deadline = std::chrono::steady_clock::now() + timeout;
384 bool at_least_one_event_occurred{false};
385
386 for (;;) {
387 // Check all sockets for readiness without waiting.
388 for (auto& [sock, events] : events_per_sock) {
389 if ((events.requested & Sock::SEND) != 0) {
390 // Always ready for Send().
391 events.occurred |= Sock::SEND;
392 at_least_one_event_occurred = true;
393 }
394
395 if ((events.requested & Sock::RECV) != 0) {
396 auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
397 uint8_t b;
398 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
399 events.occurred |= Sock::RECV;
400 at_least_one_event_occurred = true;
401 }
402 }
403 }
404
405 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
406 break;
407 }
408
409 std::this_thread::sleep_for(10ms);
410 }
411
412 return true;
413}
414
416{
417 assert(false && "Move of Sock into DynSock not allowed.");
418 return *this;
419}
int ret
int flags
Definition: bitcoin-tx.cpp:529
#define Assert(val)
Identity function.
Definition: check.h:106
A CService with information about it as peer.
Definition: protocol.h:367
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Definition: net.cpp:396
Mutex m_total_bytes_sent_mutex
Definition: net.h:1425
std::map< uint64_t, CachedAddrResponse > m_addr_response_caches
Addr responses stored in different caches per (network, local socket) prevent cross-network node iden...
Definition: net.h:1492
static constexpr SerParams V1
Definition: netaddress.h:231
Transport protocol agnostic message container.
Definition: net.h:233
Information about a peer.
Definition: net.h:675
A combination of a network address (CNetAddr) and a (TCP) port.
Definition: netaddress.h:531
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
Definition: net.cpp:256
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
Definition: net.cpp:331
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
Definition: net.cpp:324
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
Definition: net.cpp:278
Mutex m_mutex
Definition: net.h:264
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
Definition: net.cpp:316
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
Definition: net.h:216
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:415
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:351
std::shared_ptr< Pipes > m_pipes
Definition: net.h:335
std::shared_ptr< Queue > m_accept_sockets
Definition: net.h:336
~DynSock()
Definition: net.cpp:346
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:368
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:362
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:356
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
Definition: net.cpp:341
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:381
Fast randomness source.
Definition: random.h:386
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Definition: random.h:254
bool randbool() noexcept
Generate a random boolean.
Definition: random.h:325
RAII helper class that manages a socket and closes it automatically when it goes out of scope.
Definition: sock.h:27
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
Definition: sock.h:148
SOCKET m_socket
Contained socket.
Definition: sock.h:275
uint8_t Event
Definition: sock.h:138
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
Definition: sock.h:143
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
Definition: sock.h:208
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
Definition: net.h:189
size_t m_consumed
Definition: net.h:208
const std::string m_contents
Definition: net.h:207
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:250
StaticContentsSock(const std::string &contents)
Definition: net.cpp:235
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Definition: net.cpp:240
Wrapper around std::unique_lock style lock for MutexType.
Definition: sync.h:147
A mocked Sock alternative that succeeds on all operations.
Definition: net.h:145
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
Definition: net.cpp:194
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:212
ZeroSock()
Definition: net.cpp:158
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:220
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
Definition: net.cpp:202
int Listen(int) const override
listen(2) wrapper.
Definition: net.cpp:175
~ZeroSock() override
Definition: net.cpp:161
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
Definition: net.cpp:173
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
Definition: net.cpp:171
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:163
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:177
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
Definition: net.cpp:208
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:165
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:229
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
Definition: net.cpp:200
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
Definition: net.cpp:210
#define INVALID_SOCKET
Definition: compat.h:55
ConnectionType
Different types of connections to a peer.
@ INBOUND
Inbound connections are those initiated by a peer.
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
Definition: protocol.h:70
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
Definition: protocol.h:65
Definition: messages.h:20
int64_t NodeId
Definition: net.h:98
ServiceFlags
nServices flags
Definition: protocol.h:309
@ NODE_WITNESS
Definition: protocol.h:320
@ NODE_NETWORK
Definition: protocol.h:315
static const int PROTOCOL_VERSION
network protocol versioning
const char * name
Definition: rest.cpp:50
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class.
Definition: serialize.h:488
ServiceFlags their_services
void NodeReceiveMsgBytes(CNode &node, std::span< const uint8_t > msg_bytes, bool &complete) const
Definition: net.cpp:83
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
Definition: net.cpp:103
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Definition: net.cpp:117
void ResetAddrCache()
Definition: net.cpp:74
void AddTestNode(CNode &node)
Definition: net.h:57
void FlushSendBuffer(CNode &node) const
Definition: net.cpp:91
void ResetMaxOutboundCycle()
Definition: net.cpp:76
Serialization wrapper class for custom integers and enums.
Definition: serialize.h:521
Auxiliary requested/occurred events to wait for in WaitMany().
Definition: sock.h:173
#define WAIT_LOCK(cs, name)
Definition: sync.h:265
#define LOCK(cs)
Definition: sync.h:259
static std::atomic< SOCKET > g_mocked_sock_fd
Definition: net.cpp:156
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
Definition: net.cpp:128
constexpr auto ALL_NETWORKS
Definition: net.h:130
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:51
AssertLockHeld(pool.cs)
assert(!tx.IsCoinBase())