Bitcoin Core 28.99.0
P2P Digital Currency
net.cpp
Go to the documentation of this file.
1// Copyright (c) 2020-2022 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
5#include <test/util/net.h>
6
7#include <net.h>
8#include <net_processing.h>
9#include <netaddress.h>
10#include <netmessagemaker.h>
12#include <node/eviction.h>
13#include <protocol.h>
14#include <random.h>
15#include <serialize.h>
16#include <span.h>
17#include <sync.h>
18
19#include <chrono>
20#include <optional>
21#include <vector>
22
23void ConnmanTestMsg::Handshake(CNode& node,
24 bool successfully_connected,
25 ServiceFlags remote_services,
26 ServiceFlags local_services,
27 int32_t version,
28 bool relay_txs)
29{
30 auto& peerman{static_cast<PeerManager&>(*m_msgproc)};
31 auto& connman{*this};
32
33 peerman.InitializeNode(node, local_services);
34 peerman.SendMessages(&node);
35 FlushSendBuffer(node); // Drop the version message added by SendMessages.
36
37 CSerializedNetMsg msg_version{
39 version, //
40 Using<CustomUintFormatter<8>>(remote_services), //
41 int64_t{}, // dummy time
42 int64_t{}, // ignored service bits
43 CNetAddr::V1(CService{}), // dummy
44 int64_t{}, // ignored service bits
45 CNetAddr::V1(CService{}), // ignored
46 uint64_t{1}, // dummy nonce
47 std::string{}, // dummy subver
48 int32_t{}, // dummy starting_height
49 relay_txs),
50 };
51
52 (void)connman.ReceiveMsgFrom(node, std::move(msg_version));
53 node.fPauseSend = false;
54 connman.ProcessMessagesOnce(node);
55 peerman.SendMessages(&node);
56 FlushSendBuffer(node); // Drop the verack message added by SendMessages.
57 if (node.fDisconnect) return;
58 assert(node.nVersion == version);
59 assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
60 CNodeStateStats statestats;
61 assert(peerman.GetNodeStateStats(node.GetId(), statestats));
62 assert(statestats.m_relay_txs == (relay_txs && !node.IsBlockOnlyConn()));
63 assert(statestats.their_services == remote_services);
64 if (successfully_connected) {
66 (void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
67 node.fPauseSend = false;
68 connman.ProcessMessagesOnce(node);
69 peerman.SendMessages(&node);
70 assert(node.fSuccessfullyConnected == true);
71 }
72}
73
75{
76 assert(node.ReceiveMsgBytes(msg_bytes, complete));
77 if (complete) {
78 node.MarkReceivedMsgsForProcessing();
79 }
80}
81
83{
84 LOCK(node.cs_vSend);
85 node.vSendMsg.clear();
86 node.m_send_memusage = 0;
87 while (true) {
88 const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
89 if (to_send.empty()) break;
90 node.m_transport->MarkBytesSent(to_send.size());
91 }
92}
93
95{
96 bool queued = node.m_transport->SetMessageToSend(ser_msg);
97 assert(queued);
98 bool complete{false};
99 while (true) {
100 const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend(false);
101 if (to_send.empty()) break;
102 NodeReceiveMsgBytes(node, to_send, complete);
103 node.m_transport->MarkBytesSent(to_send.size());
104 }
105 return complete;
106}
107
108CNode* ConnmanTestMsg::ConnectNodePublic(PeerManager& peerman, const char* pszDest, ConnectionType conn_type)
109{
110 CNode* node = ConnectNode(CAddress{}, pszDest, /*fCountFailure=*/false, conn_type, /*use_v2transport=*/true);
111 if (!node) return nullptr;
112 node->SetCommonVersion(PROTOCOL_VERSION);
114 node->fSuccessfullyConnected = true;
116 return node;
117}
118
119std::vector<NodeEvictionCandidate> GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext& random_context)
120{
121 std::vector<NodeEvictionCandidate> candidates;
122 candidates.reserve(n_candidates);
123 for (int id = 0; id < n_candidates; ++id) {
124 candidates.push_back({
125 .id=id,
126 .m_connected=std::chrono::seconds{random_context.randrange(100)},
127 .m_min_ping_time=std::chrono::microseconds{random_context.randrange(100)},
128 .m_last_block_time=std::chrono::seconds{random_context.randrange(100)},
129 .m_last_tx_time=std::chrono::seconds{random_context.randrange(100)},
130 .fRelevantServices=random_context.randbool(),
131 .m_relay_txs=random_context.randbool(),
132 .fBloomFilter=random_context.randbool(),
133 .nKeyedNetGroup=random_context.randrange(100u),
134 .prefer_evict=random_context.randbool(),
135 .m_is_local=random_context.randbool(),
136 .m_network=ALL_NETWORKS[random_context.randrange(ALL_NETWORKS.size())],
137 .m_noban=false,
138 .m_conn_type=ConnectionType::INBOUND,
139 });
140 }
141 return candidates;
142}
143
144// Have different ZeroSock (or others that inherit from it) objects have different
145// m_socket because EqualSharedPtrSock compares m_socket and we want to avoid two
146// different objects comparing as equal.
147static std::atomic<SOCKET> g_mocked_sock_fd{0};
148
150
151// Sock::~Sock() would try to close(2) m_socket if it is not INVALID_SOCKET, avoid that.
153
154ssize_t ZeroSock::Send(const void*, size_t len, int) const { return len; }
155
156ssize_t ZeroSock::Recv(void* buf, size_t len, int flags) const
157{
158 memset(buf, 0x0, len);
159 return len;
160}
161
162int ZeroSock::Connect(const sockaddr*, socklen_t) const { return 0; }
163
164int ZeroSock::Bind(const sockaddr*, socklen_t) const { return 0; }
165
166int ZeroSock::Listen(int) const { return 0; }
167
168std::unique_ptr<Sock> ZeroSock::Accept(sockaddr* addr, socklen_t* addr_len) const
169{
170 if (addr != nullptr) {
171 // Pretend all connections come from 5.5.5.5:6789
172 memset(addr, 0x00, *addr_len);
173 const socklen_t write_len = static_cast<socklen_t>(sizeof(sockaddr_in));
174 if (*addr_len >= write_len) {
175 *addr_len = write_len;
176 sockaddr_in* addr_in = reinterpret_cast<sockaddr_in*>(addr);
177 addr_in->sin_family = AF_INET;
178 memset(&addr_in->sin_addr, 0x05, sizeof(addr_in->sin_addr));
179 addr_in->sin_port = htons(6789);
180 }
181 }
182 return std::make_unique<ZeroSock>();
183}
184
185int ZeroSock::GetSockOpt(int level, int opt_name, void* opt_val, socklen_t* opt_len) const
186{
187 std::memset(opt_val, 0x0, *opt_len);
188 return 0;
189}
190
191int ZeroSock::SetSockOpt(int, int, const void*, socklen_t) const { return 0; }
192
193int ZeroSock::GetSockName(sockaddr* name, socklen_t* name_len) const
194{
195 std::memset(name, 0x0, *name_len);
196 return 0;
197}
198
199bool ZeroSock::SetNonBlocking() const { return true; }
200
201bool ZeroSock::IsSelectable() const { return true; }
202
203bool ZeroSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
204{
205 if (occurred != nullptr) {
206 *occurred = requested;
207 }
208 return true;
209}
210
211bool ZeroSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
212{
213 for (auto& [sock, events] : events_per_sock) {
214 (void)sock;
215 events.occurred = events.requested;
216 }
217 return true;
218}
219
221{
222 assert(false && "Move of Sock into ZeroSock not allowed.");
223 return *this;
224}
225
226StaticContentsSock::StaticContentsSock(const std::string& contents)
227 : m_contents{contents}
228{
229}
230
231ssize_t StaticContentsSock::Recv(void* buf, size_t len, int flags) const
232{
233 const size_t consume_bytes{std::min(len, m_contents.size() - m_consumed)};
234 std::memcpy(buf, m_contents.data() + m_consumed, consume_bytes);
235 if ((flags & MSG_PEEK) == 0) {
236 m_consumed += consume_bytes;
237 }
238 return consume_bytes;
239}
240
242{
243 assert(false && "Move of Sock into StaticContentsSock not allowed.");
244 return *this;
245}
246
247ssize_t DynSock::Pipe::GetBytes(void* buf, size_t len, int flags)
248{
249 WAIT_LOCK(m_mutex, lock);
250
251 if (m_data.empty()) {
252 if (m_eof) {
253 return 0;
254 }
255 errno = EAGAIN; // Same as recv(2) on a non-blocking socket.
256 return -1;
257 }
258
259 const size_t read_bytes{std::min(len, m_data.size())};
260
261 std::memcpy(buf, m_data.data(), read_bytes);
262 if ((flags & MSG_PEEK) == 0) {
263 m_data.erase(m_data.begin(), m_data.begin() + read_bytes);
264 }
265
266 return read_bytes;
267}
268
269std::optional<CNetMessage> DynSock::Pipe::GetNetMsg()
270{
271 V1Transport transport{NodeId{0}};
272
273 {
274 WAIT_LOCK(m_mutex, lock);
275
276 WaitForDataOrEof(lock);
277 if (m_eof && m_data.empty()) {
278 return std::nullopt;
279 }
280
281 for (;;) {
282 Span<const uint8_t> s{m_data};
283 if (!transport.ReceivedBytes(s)) { // Consumed bytes are removed from the front of s.
284 return std::nullopt;
285 }
286 m_data.erase(m_data.begin(), m_data.begin() + m_data.size() - s.size());
287 if (transport.ReceivedMessageComplete()) {
288 break;
289 }
290 if (m_data.empty()) {
291 WaitForDataOrEof(lock);
292 if (m_eof && m_data.empty()) {
293 return std::nullopt;
294 }
295 }
296 }
297 }
298
299 bool reject{false};
300 CNetMessage msg{transport.GetReceivedMessage(/*time=*/{}, reject)};
301 if (reject) {
302 return std::nullopt;
303 }
304 return std::make_optional<CNetMessage>(std::move(msg));
305}
306
307void DynSock::Pipe::PushBytes(const void* buf, size_t len)
308{
309 LOCK(m_mutex);
310 const uint8_t* b = static_cast<const uint8_t*>(buf);
311 m_data.insert(m_data.end(), b, b + len);
312 m_cond.notify_all();
313}
314
316{
317 LOCK(m_mutex);
318 m_eof = true;
319 m_cond.notify_all();
320}
321
323{
324 Assert(lock.mutex() == &m_mutex);
325
326 m_cond.wait(lock, [&]() EXCLUSIVE_LOCKS_REQUIRED(m_mutex) {
327 AssertLockHeld(m_mutex);
328 return !m_data.empty() || m_eof;
329 });
330}
331
332DynSock::DynSock(std::shared_ptr<Pipes> pipes, std::shared_ptr<Queue> accept_sockets)
333 : m_pipes{pipes}, m_accept_sockets{accept_sockets}
334{
335}
336
338{
339 m_pipes->send.Eof();
340}
341
342ssize_t DynSock::Recv(void* buf, size_t len, int flags) const
343{
344 return m_pipes->recv.GetBytes(buf, len, flags);
345}
346
347ssize_t DynSock::Send(const void* buf, size_t len, int) const
348{
349 m_pipes->send.PushBytes(buf, len);
350 return len;
351}
352
353std::unique_ptr<Sock> DynSock::Accept(sockaddr* addr, socklen_t* addr_len) const
354{
355 ZeroSock::Accept(addr, addr_len);
356 return m_accept_sockets->Pop().value_or(nullptr);
357}
358
359bool DynSock::Wait(std::chrono::milliseconds timeout,
360 Event requested,
361 Event* occurred) const
362{
363 EventsPerSock ev;
364 ev.emplace(this, Events{requested});
365 const bool ret{WaitMany(timeout, ev)};
366 if (occurred != nullptr) {
367 *occurred = ev.begin()->second.occurred;
368 }
369 return ret;
370}
371
372bool DynSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) const
373{
374 const auto deadline = std::chrono::steady_clock::now() + timeout;
375 bool at_least_one_event_occurred{false};
376
377 for (;;) {
378 // Check all sockets for readiness without waiting.
379 for (auto& [sock, events] : events_per_sock) {
380 if ((events.requested & Sock::SEND) != 0) {
381 // Always ready for Send().
382 events.occurred |= Sock::SEND;
383 at_least_one_event_occurred = true;
384 }
385
386 if ((events.requested & Sock::RECV) != 0) {
387 auto dyn_sock = reinterpret_cast<const DynSock*>(sock.get());
388 uint8_t b;
389 if (dyn_sock->m_pipes->recv.GetBytes(&b, 1, MSG_PEEK) == 1 || !dyn_sock->m_accept_sockets->Empty()) {
390 events.occurred |= Sock::RECV;
391 at_least_one_event_occurred = true;
392 }
393 }
394 }
395
396 if (at_least_one_event_occurred || std::chrono::steady_clock::now() > deadline) {
397 break;
398 }
399
400 std::this_thread::sleep_for(10ms);
401 }
402
403 return true;
404}
405
407{
408 assert(false && "Move of Sock into DynSock not allowed.");
409 return *this;
410}
int ret
int flags
Definition: bitcoin-tx.cpp:536
#define Assert(val)
Identity function.
Definition: check.h:85
A CService with information about it as peer.
Definition: protocol.h:367
CNode * ConnectNode(CAddress addrConnect, const char *pszDest, bool fCountFailure, ConnectionType conn_type, bool use_v2transport) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Definition: net.cpp:397
static constexpr SerParams V1
Definition: netaddress.h:231
Transport protocol agnostic message container.
Definition: net.h:231
Information about a peer.
Definition: net.h:673
A combination of a network address (CNetAddr) and a (TCP) port.
Definition: netaddress.h:531
ssize_t GetBytes(void *buf, size_t len, int flags=0) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Get bytes and remove them from the pipe.
Definition: net.cpp:247
void WaitForDataOrEof(UniqueLock< Mutex > &lock) EXCLUSIVE_LOCKS_REQUIRED(m_mutex)
Return when there is some data to read or EOF has been signaled.
Definition: net.cpp:322
void Eof() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Signal end-of-file on the receiving end (GetBytes() or GetNetMsg()).
Definition: net.cpp:315
std::optional< CNetMessage > GetNetMsg() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Deserialize a CNetMessage and remove it from the pipe.
Definition: net.cpp:269
Mutex m_mutex
Definition: net.h:261
void PushBytes(const void *buf, size_t len) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex)
Push bytes to the pipe.
Definition: net.cpp:307
A mocked Sock alternative that allows providing the data to be returned by Recv() and inspecting the ...
Definition: net.h:213
DynSock & operator=(Sock &&) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:406
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:342
std::shared_ptr< Pipes > m_pipes
Definition: net.h:332
std::shared_ptr< Queue > m_accept_sockets
Definition: net.h:333
~DynSock()
Definition: net.cpp:337
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:359
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:353
ssize_t Send(const void *buf, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:347
DynSock(std::shared_ptr< Pipes > pipes, std::shared_ptr< Queue > accept_sockets)
Create a new mocked sock.
Definition: net.cpp:332
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:372
Fast randomness source.
Definition: random.h:377
virtual void InitializeNode(const CNode &node, ServiceFlags our_services)=0
Initialize a peer (setup state)
I randrange(I range) noexcept
Generate a random integer in the range [0..range), with range > 0.
Definition: random.h:254
bool randbool() noexcept
Generate a random boolean.
Definition: random.h:316
RAII helper class that manages a socket and closes it automatically when it goes out of scope.
Definition: sock.h:27
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
Definition: sock.h:148
SOCKET m_socket
Contained socket.
Definition: sock.h:275
uint8_t Event
Definition: sock.h:138
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
Definition: sock.h:143
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
Definition: sock.h:208
A Span is an object that can refer to a contiguous sequence of objects.
Definition: span.h:98
A mocked Sock alternative that returns a statically contained data upon read and succeeds and ignores...
Definition: net.h:186
size_t m_consumed
Definition: net.h:205
const std::string m_contents
Definition: net.h:204
StaticContentsSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:241
StaticContentsSock(const std::string &contents)
Definition: net.cpp:226
ssize_t Recv(void *buf, size_t len, int flags) const override
Return parts of the contents that was provided at construction until it is exhausted and then return ...
Definition: net.cpp:231
Wrapper around std::unique_lock style lock for MutexType.
Definition: sync.h:152
A mocked Sock alternative that succeeds on all operations.
Definition: net.h:142
int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const override
getsockopt(2) wrapper.
Definition: net.cpp:185
bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const override
Wait for readiness for input (recv) or output (send).
Definition: net.cpp:203
ZeroSock()
Definition: net.cpp:149
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const override
Same as Wait(), but wait on many sockets within the same timeout.
Definition: net.cpp:211
int GetSockName(sockaddr *name, socklen_t *name_len) const override
getsockname(2) wrapper.
Definition: net.cpp:193
int Listen(int) const override
listen(2) wrapper.
Definition: net.cpp:166
~ZeroSock() override
Definition: net.cpp:152
int Bind(const sockaddr *, socklen_t) const override
bind(2) wrapper.
Definition: net.cpp:164
int Connect(const sockaddr *, socklen_t) const override
connect(2) wrapper.
Definition: net.cpp:162
ssize_t Send(const void *, size_t len, int) const override
send(2) wrapper.
Definition: net.cpp:154
std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const override
accept(2) wrapper.
Definition: net.cpp:168
bool SetNonBlocking() const override
Set the non-blocking option on the socket.
Definition: net.cpp:199
ssize_t Recv(void *buf, size_t len, int flags) const override
recv(2) wrapper.
Definition: net.cpp:156
ZeroSock & operator=(Sock &&other) override
Move assignment operator, grab the socket from another object and close ours (if set).
Definition: net.cpp:220
int SetSockOpt(int, int, const void *, socklen_t) const override
setsockopt(2) wrapper.
Definition: net.cpp:191
bool IsSelectable() const override
Check if the underlying socket can be used for select(2) (or the Wait() method).
Definition: net.cpp:201
#define INVALID_SOCKET
Definition: compat.h:56
ConnectionType
Different types of connections to a peer.
@ INBOUND
Inbound connections are those initiated by a peer.
CSerializedNetMsg Make(std::string msg_type, Args &&... args)
constexpr const char * VERACK
The verack message acknowledges a previously-received version message, informing the connecting node ...
Definition: protocol.h:70
constexpr const char * VERSION
The version message provides information about the transmitting node to the receiving node at the beg...
Definition: protocol.h:65
Definition: messages.h:20
int64_t NodeId
Definition: net.h:97
ServiceFlags
nServices flags
Definition: protocol.h:309
@ NODE_WITNESS
Definition: protocol.h:320
@ NODE_NETWORK
Definition: protocol.h:315
static const int PROTOCOL_VERSION
network protocol versioning
const char * name
Definition: rest.cpp:49
static Wrapper< Formatter, T & > Using(T &&t)
Cause serialization/deserialization of an object to be done using a specified formatter class.
Definition: serialize.h:497
ServiceFlags their_services
bool ReceiveMsgFrom(CNode &node, CSerializedNetMsg &&ser_msg) const
Definition: net.cpp:94
CNode * ConnectNodePublic(PeerManager &peerman, const char *pszDest, ConnectionType conn_type) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex)
Definition: net.cpp:108
void NodeReceiveMsgBytes(CNode &node, Span< const uint8_t > msg_bytes, bool &complete) const
Definition: net.cpp:74
void AddTestNode(CNode &node)
Definition: net.h:54
void FlushSendBuffer(CNode &node) const
Definition: net.cpp:82
Serialization wrapper class for custom integers and enums.
Definition: serialize.h:530
Auxiliary requested/occurred events to wait for in WaitMany().
Definition: sock.h:173
#define WAIT_LOCK(cs, name)
Definition: sync.h:262
#define LOCK(cs)
Definition: sync.h:257
static std::atomic< SOCKET > g_mocked_sock_fd
Definition: net.cpp:147
std::vector< NodeEvictionCandidate > GetRandomNodeEvictionCandidates(int n_candidates, FastRandomContext &random_context)
Definition: net.cpp:119
constexpr auto ALL_NETWORKS
Definition: net.h:127
#define EXCLUSIVE_LOCKS_REQUIRED(...)
Definition: threadsafety.h:49
AssertLockHeld(pool.cs)
assert(!tx.IsCoinBase())