Bitcoin Core  21.99.0
P2P Digital Currency
sock.cpp
Go to the documentation of this file.
1 // Copyright (c) 2020-2021 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <compat.h>
6 #include <logging.h>
7 #include <threadinterrupt.h>
8 #include <tinyformat.h>
9 #include <util/sock.h>
10 #include <util/system.h>
11 #include <util/time.h>
12 
13 #include <codecvt>
14 #include <cwchar>
15 #include <locale>
16 #include <stdexcept>
17 #include <string>
18 
19 #ifdef USE_POLL
20 #include <poll.h>
21 #endif
22 
23 static inline bool IOErrorIsPermanent(int err)
24 {
25  return err != WSAEAGAIN && err != WSAEINTR && err != WSAEWOULDBLOCK && err != WSAEINPROGRESS;
26 }
27 
28 Sock::Sock() : m_socket(INVALID_SOCKET) {}
29 
30 Sock::Sock(SOCKET s) : m_socket(s) {}
31 
32 Sock::Sock(Sock&& other)
33 {
34  m_socket = other.m_socket;
35  other.m_socket = INVALID_SOCKET;
36 }
37 
39 
41 {
42  Reset();
43  m_socket = other.m_socket;
44  other.m_socket = INVALID_SOCKET;
45  return *this;
46 }
47 
48 SOCKET Sock::Get() const { return m_socket; }
49 
51 {
52  const SOCKET s = m_socket;
54  return s;
55 }
56 
58 
59 ssize_t Sock::Send(const void* data, size_t len, int flags) const
60 {
61  return send(m_socket, static_cast<const char*>(data), len, flags);
62 }
63 
64 ssize_t Sock::Recv(void* buf, size_t len, int flags) const
65 {
66  return recv(m_socket, static_cast<char*>(buf), len, flags);
67 }
68 
69 bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const
70 {
71 #ifdef USE_POLL
72  pollfd fd;
73  fd.fd = m_socket;
74  fd.events = 0;
75  if (requested & RECV) {
76  fd.events |= POLLIN;
77  }
78  if (requested & SEND) {
79  fd.events |= POLLOUT;
80  }
81 
82  if (poll(&fd, 1, count_milliseconds(timeout)) == SOCKET_ERROR) {
83  return false;
84  }
85 
86  if (occurred != nullptr) {
87  *occurred = 0;
88  if (fd.revents & POLLIN) {
89  *occurred |= RECV;
90  }
91  if (fd.revents & POLLOUT) {
92  *occurred |= SEND;
93  }
94  }
95 
96  return true;
97 #else
99  return false;
100  }
101 
102  fd_set fdset_recv;
103  fd_set fdset_send;
104  FD_ZERO(&fdset_recv);
105  FD_ZERO(&fdset_send);
106 
107  if (requested & RECV) {
108  FD_SET(m_socket, &fdset_recv);
109  }
110 
111  if (requested & SEND) {
112  FD_SET(m_socket, &fdset_send);
113  }
114 
115  timeval timeout_struct = MillisToTimeval(timeout);
116 
117  if (select(m_socket + 1, &fdset_recv, &fdset_send, nullptr, &timeout_struct) == SOCKET_ERROR) {
118  return false;
119  }
120 
121  if (occurred != nullptr) {
122  *occurred = 0;
123  if (FD_ISSET(m_socket, &fdset_recv)) {
124  *occurred |= RECV;
125  }
126  if (FD_ISSET(m_socket, &fdset_send)) {
127  *occurred |= SEND;
128  }
129  }
130 
131  return true;
132 #endif /* USE_POLL */
133 }
134 
135 void Sock::SendComplete(const std::string& data,
136  std::chrono::milliseconds timeout,
137  CThreadInterrupt& interrupt) const
138 {
139  const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
140  size_t sent{0};
141 
142  for (;;) {
143  const ssize_t ret{Send(data.data() + sent, data.size() - sent, MSG_NOSIGNAL)};
144 
145  if (ret > 0) {
146  sent += static_cast<size_t>(ret);
147  if (sent == data.size()) {
148  break;
149  }
150  } else {
151  const int err{WSAGetLastError()};
152  if (IOErrorIsPermanent(err)) {
153  throw std::runtime_error(strprintf("send(): %s", NetworkErrorString(err)));
154  }
155  }
156 
157  const auto now = GetTime<std::chrono::milliseconds>();
158 
159  if (now >= deadline) {
160  throw std::runtime_error(strprintf(
161  "Send timeout (sent only %u of %u bytes before that)", sent, data.size()));
162  }
163 
164  if (interrupt) {
165  throw std::runtime_error(strprintf(
166  "Send interrupted (sent only %u of %u bytes before that)", sent, data.size()));
167  }
168 
169  // Wait for a short while (or the socket to become ready for sending) before retrying
170  // if nothing was sent.
171  const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{MAX_WAIT_FOR_IO});
172  Wait(wait_time, SEND);
173  }
174 }
175 
176 std::string Sock::RecvUntilTerminator(uint8_t terminator,
177  std::chrono::milliseconds timeout,
178  CThreadInterrupt& interrupt) const
179 {
180  const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
181  std::string data;
182  bool terminator_found{false};
183 
184  // We must not consume any bytes past the terminator from the socket.
185  // One option is to read one byte at a time and check if we have read a terminator.
186  // However that is very slow. Instead, we peek at what is in the socket and only read
187  // as many bytes as possible without crossing the terminator.
188  // Reading 64 MiB of random data with 262526 terminator chars takes 37 seconds to read
189  // one byte at a time VS 0.71 seconds with the "peek" solution below. Reading one byte
190  // at a time is about 50 times slower.
191 
192  for (;;) {
193  char buf[512];
194 
195  const ssize_t peek_ret{Recv(buf, sizeof(buf), MSG_PEEK)};
196 
197  switch (peek_ret) {
198  case -1: {
199  const int err{WSAGetLastError()};
200  if (IOErrorIsPermanent(err)) {
201  throw std::runtime_error(strprintf("recv(): %s", NetworkErrorString(err)));
202  }
203  break;
204  }
205  case 0:
206  throw std::runtime_error("Connection unexpectedly closed by peer");
207  default:
208  auto end = buf + peek_ret;
209  auto terminator_pos = std::find(buf, end, terminator);
210  terminator_found = terminator_pos != end;
211 
212  const size_t try_len{terminator_found ? terminator_pos - buf + 1 :
213  static_cast<size_t>(peek_ret)};
214 
215  const ssize_t read_ret{Recv(buf, try_len, 0)};
216 
217  if (read_ret < 0 || static_cast<size_t>(read_ret) != try_len) {
218  throw std::runtime_error(
219  strprintf("recv() returned %u bytes on attempt to read %u bytes but previous "
220  "peek claimed %u bytes are available",
221  read_ret, try_len, peek_ret));
222  }
223 
224  // Don't include the terminator in the output.
225  const size_t append_len{terminator_found ? try_len - 1 : try_len};
226 
227  data.append(buf, buf + append_len);
228 
229  if (terminator_found) {
230  return data;
231  }
232  }
233 
234  const auto now = GetTime<std::chrono::milliseconds>();
235 
236  if (now >= deadline) {
237  throw std::runtime_error(strprintf(
238  "Receive timeout (received %u bytes without terminator before that)", data.size()));
239  }
240 
241  if (interrupt) {
242  throw std::runtime_error(strprintf(
243  "Receive interrupted (received %u bytes without terminator before that)",
244  data.size()));
245  }
246 
247  // Wait for a short while (or the socket to become ready for reading) before retrying.
248  const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{MAX_WAIT_FOR_IO});
249  Wait(wait_time, RECV);
250  }
251 }
252 
253 bool Sock::IsConnected(std::string& errmsg) const
254 {
255  if (m_socket == INVALID_SOCKET) {
256  errmsg = "not connected";
257  return false;
258  }
259 
260  char c;
261  switch (Recv(&c, sizeof(c), MSG_PEEK)) {
262  case -1: {
263  const int err = WSAGetLastError();
264  if (IOErrorIsPermanent(err)) {
265  errmsg = NetworkErrorString(err);
266  return false;
267  }
268  return true;
269  }
270  case 0:
271  errmsg = "closed";
272  return false;
273  default:
274  return true;
275  }
276 }
277 
278 #ifdef WIN32
279 std::string NetworkErrorString(int err)
280 {
281  wchar_t buf[256];
282  buf[0] = 0;
283  if(FormatMessageW(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS | FORMAT_MESSAGE_MAX_WIDTH_MASK,
284  nullptr, err, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
285  buf, ARRAYSIZE(buf), nullptr))
286  {
287  return strprintf("%s (%d)", std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>,wchar_t>().to_bytes(buf), err);
288  }
289  else
290  {
291  return strprintf("Unknown error (%d)", err);
292  }
293 }
294 #else
295 std::string NetworkErrorString(int err)
296 {
297  char buf[256];
298  buf[0] = 0;
299  /* Too bad there are two incompatible implementations of the
300  * thread-safe strerror. */
301  const char *s;
302 #ifdef STRERROR_R_CHAR_P /* GNU variant can return a pointer outside the passed buffer */
303  s = strerror_r(err, buf, sizeof(buf));
304 #else /* POSIX variant always returns message in buffer */
305  s = buf;
306  if (strerror_r(err, buf, sizeof(buf)))
307  buf[0] = 0;
308 #endif
309  return strprintf("%s (%d)", s, err);
310 }
311 #endif
312 
313 bool CloseSocket(SOCKET& hSocket)
314 {
315  if (hSocket == INVALID_SOCKET)
316  return false;
317 #ifdef WIN32
318  int ret = closesocket(hSocket);
319 #else
320  int ret = close(hSocket);
321 #endif
322  if (ret) {
323  LogPrintf("Socket close failed: %d. Error: %s\n", hSocket, NetworkErrorString(WSAGetLastError()));
324  }
325  hSocket = INVALID_SOCKET;
326  return ret != SOCKET_ERROR;
327 }
threadinterrupt.h
Sock::m_socket
SOCKET m_socket
Contained socket.
Definition: sock.h:157
Sock::operator=
Sock & operator=(const Sock &)=delete
Copy assignment operator, disabled because closing the same socket twice is undesirable.
flags
int flags
Definition: bitcoin-tx.cpp:512
WSAEINPROGRESS
#define WSAEINPROGRESS
Definition: compat.h:50
Sock::SendComplete
virtual void SendComplete(const std::string &data, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt) const
Send the given data, retrying on transient errors.
Definition: sock.cpp:135
WSAGetLastError
#define WSAGetLastError()
Definition: compat.h:43
Sock::Reset
virtual void Reset()
Close if non-empty.
Definition: sock.cpp:57
tinyformat.h
CloseSocket
bool CloseSocket(SOCKET &hSocket)
Close socket and set hSocket to INVALID_SOCKET.
Definition: sock.cpp:313
Sock
RAII helper class that manages a socket.
Definition: sock.h:25
INVALID_SOCKET
#define INVALID_SOCKET
Definition: compat.h:53
IsSelectableSocket
static bool IsSelectableSocket(const SOCKET &s)
Definition: compat.h:100
compat.h
Sock::Recv
virtual ssize_t Recv(void *buf, size_t len, int flags) const
recv(2) wrapper.
Definition: sock.cpp:64
send
static RPCHelpMan send()
Definition: rpcwallet.cpp:4038
Sock::Wait
virtual bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const
Wait for readiness for input (recv) or output (send).
Definition: sock.cpp:69
Sock::SEND
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
Definition: sock.h:103
time.h
LogPrintf
#define LogPrintf(...)
Definition: logging.h:183
NetworkErrorString
std::string NetworkErrorString(int err)
Return readable error string for a network error code.
Definition: sock.cpp:295
Sock::Send
virtual ssize_t Send(const void *data, size_t len, int flags) const
send(2) wrapper.
Definition: sock.cpp:59
SOCKET_ERROR
#define SOCKET_ERROR
Definition: compat.h:54
WSAEWOULDBLOCK
#define WSAEWOULDBLOCK
Definition: compat.h:46
Sock::RecvUntilTerminator
virtual std::string RecvUntilTerminator(uint8_t terminator, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt) const
Read from socket until a terminator character is encountered.
Definition: sock.cpp:176
system.h
Sock::Sock
Sock()
Default constructor, creates an empty object that does nothing when destroyed.
Definition: sock.cpp:28
strprintf
#define strprintf
Format arguments and return the string or write to given std::ostream (see tinyformat::format doc for...
Definition: tinyformat.h:1164
Sock::Get
virtual SOCKET Get() const
Get the value of the contained socket.
Definition: sock.cpp:48
Sock::IsConnected
virtual bool IsConnected(std::string &errmsg) const
Check if still connected.
Definition: sock.cpp:253
Sock::Event
uint8_t Event
Definition: sock.h:93
MSG_NOSIGNAL
#define MSG_NOSIGNAL
Definition: compat.h:110
MillisToTimeval
struct timeval MillisToTimeval(int64_t nTimeout)
Convert milliseconds to a struct timeval for e.g.
Definition: time.cpp:172
count_milliseconds
constexpr int64_t count_milliseconds(std::chrono::milliseconds t)
Definition: time.h:30
logging.h
WSAEINTR
#define WSAEINTR
Definition: compat.h:49
MAX_WAIT_FOR_IO
static constexpr auto MAX_WAIT_FOR_IO
Maximum time to wait for I/O readiness.
Definition: sock.h:19
Sock::Release
virtual SOCKET Release()
Get the value of the contained socket and drop ownership.
Definition: sock.cpp:50
Sock::RECV
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
Definition: sock.h:98
IOErrorIsPermanent
static bool IOErrorIsPermanent(int err)
Definition: sock.cpp:23
Sock::~Sock
virtual ~Sock()
Destructor, close the socket or do nothing if empty.
Definition: sock.cpp:38
CThreadInterrupt
Definition: threadinterrupt.h:19
sock.h
WSAEAGAIN
#define WSAEAGAIN
Definition: compat.h:47
SOCKET
unsigned int SOCKET
Definition: compat.h:41