13#include <util/threadinterrupt.h>
64 return connect(
m_socket, addr, addr_len);
67int Sock::Bind(
const sockaddr* addr, socklen_t addr_len)
const
69 return bind(
m_socket, addr, addr_len);
77std::unique_ptr<Sock>
Sock::Accept(sockaddr* addr, socklen_t* addr_len)
const
85 std::unique_ptr<Sock> sock;
87 const auto socket = accept(
m_socket, addr, addr_len);
90 sock = std::make_unique<Sock>(socket);
91 }
catch (
const std::exception&) {
105 return getsockopt(
m_socket, level, opt_name,
static_cast<char*
>(opt_val), opt_len);
110 return setsockopt(
m_socket, level, opt_name,
static_cast<const char*
>(opt_val), opt_len);
139#if defined(USE_POLL) || defined(WIN32)
153 std::shared_ptr<const Sock> shared{std::shared_ptr<const Sock>{},
this};
157 if (!
WaitMany(timeout, events_per_sock)) {
161 if (occurred !=
nullptr) {
162 *occurred = events_per_sock.begin()->second.occurred;
171 std::vector<pollfd> pfds;
172 for (
const auto& [sock, events] : events_per_sock) {
174 auto& pfd = pfds.back();
175 pfd.fd = sock->m_socket;
176 if (events.requested &
RECV) {
177 pfd.events |= POLLIN;
179 if (events.requested &
SEND) {
180 pfd.events |= POLLOUT;
188 assert(pfds.size() == events_per_sock.size());
190 for (
auto& [sock, events] : events_per_sock) {
191 assert(sock->m_socket ==
static_cast<SOCKET>(pfds[i].fd));
193 if (pfds[i].revents & POLLIN) {
194 events.occurred |=
RECV;
196 if (pfds[i].revents & POLLOUT) {
197 events.occurred |=
SEND;
199 if (pfds[i].revents & (POLLERR | POLLHUP)) {
200 events.occurred |=
ERR;
215 for (
const auto& [sock, events] : events_per_sock) {
216 if (!sock->IsSelectable()) {
219 const auto&
s = sock->m_socket;
220 if (events.requested &
RECV) {
223 if (events.requested &
SEND) {
227 socket_max = std::max(socket_max,
s);
236 for (
auto& [sock, events] : events_per_sock) {
237 const auto&
s = sock->m_socket;
239 if (FD_ISSET(
s, &recv)) {
240 events.occurred |=
RECV;
242 if (FD_ISSET(
s, &
send)) {
243 events.occurred |=
SEND;
245 if (FD_ISSET(
s, &err)) {
246 events.occurred |=
ERR;
255 std::chrono::milliseconds timeout,
258 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
265 sent +=
static_cast<size_t>(
ret);
266 if (sent ==
data.size()) {
276 const auto now = GetTime<std::chrono::milliseconds>();
278 if (now >= deadline) {
280 "Send timeout (sent only %u of %u bytes before that)", sent,
data.size()));
285 "Send interrupted (sent only %u of %u bytes before that)", sent,
data.size()));
290 const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{
MAX_WAIT_FOR_IO});
296 std::chrono::milliseconds timeout,
303 std::chrono::milliseconds timeout,
305 size_t max_data)
const
307 const auto deadline = GetTime<std::chrono::milliseconds>() + timeout;
309 bool terminator_found{
false};
320 if (
data.size() >= max_data) {
321 throw std::runtime_error(
322 strprintf(
"Received too many bytes without a terminator (%u)",
data.size()));
327 const ssize_t peek_ret{
Recv(buf, std::min(
sizeof(buf), max_data -
data.size()), MSG_PEEK)};
338 throw std::runtime_error(
"Connection unexpectedly closed by peer");
340 auto end = buf + peek_ret;
341 auto terminator_pos = std::find(buf, end, terminator);
342 terminator_found = terminator_pos != end;
344 const size_t try_len{terminator_found ? terminator_pos - buf + 1 :
345 static_cast<size_t>(peek_ret)};
347 const ssize_t read_ret{
Recv(buf, try_len, 0)};
349 if (read_ret < 0 ||
static_cast<size_t>(read_ret) != try_len) {
350 throw std::runtime_error(
351 strprintf(
"recv() returned %u bytes on attempt to read %u bytes but previous "
352 "peek claimed %u bytes are available",
353 read_ret, try_len, peek_ret));
357 const size_t append_len{terminator_found ? try_len - 1 : try_len};
359 data.append(buf, buf + append_len);
361 if (terminator_found) {
366 const auto now = GetTime<std::chrono::milliseconds>();
368 if (now >= deadline) {
370 "Receive timeout (received %u bytes without terminator before that)",
data.size()));
375 "Receive interrupted (received %u bytes without terminator before that)",
380 const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{
MAX_WAIT_FOR_IO});
388 errmsg =
"not connected";
393 switch (
Recv(&c,
sizeof(c), MSG_PEEK)) {
434 return Win32ErrorString(err);
A helper class for interruptible sleeps.
RAII helper class that manages a socket and closes it automatically when it goes out of scope.
virtual std::unique_ptr< Sock > Accept(sockaddr *addr, socklen_t *addr_len) const
accept(2) wrapper.
virtual ssize_t Send(const void *data, size_t len, int flags) const
send(2) wrapper.
static constexpr Event SEND
If passed to Wait(), then it will wait for readiness to send to the socket.
SOCKET m_socket
Contained socket.
Sock & operator=(const Sock &)=delete
Copy assignment operator, disabled because closing the same socket twice is undesirable.
virtual int Bind(const sockaddr *addr, socklen_t addr_len) const
bind(2) wrapper.
virtual bool Wait(std::chrono::milliseconds timeout, Event requested, Event *occurred=nullptr) const
Wait for readiness for input (recv) or output (send).
virtual ~Sock()
Destructor, close the socket or do nothing if empty.
virtual void SendComplete(std::span< const unsigned char > data, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt) const
Send the given data, retrying on transient errors.
virtual int GetSockName(sockaddr *name, socklen_t *name_len) const
getsockname(2) wrapper.
void Close()
Close m_socket if it is not INVALID_SOCKET.
virtual bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock &events_per_sock) const
Same as Wait(), but wait on many sockets within the same timeout.
static constexpr Event ERR
Ignored if passed to Wait(), but could be set in the occurred events if an exceptional condition has ...
virtual bool IsConnected(std::string &errmsg) const
Check if still connected.
virtual int SetSockOpt(int level, int opt_name, const void *opt_val, socklen_t opt_len) const
setsockopt(2) wrapper.
static constexpr Event RECV
If passed to Wait(), then it will wait for readiness to read from the socket.
virtual int GetSockOpt(int level, int opt_name, void *opt_val, socklen_t *opt_len) const
getsockopt(2) wrapper.
virtual int Connect(const sockaddr *addr, socklen_t addr_len) const
connect(2) wrapper.
virtual ssize_t Recv(void *buf, size_t len, int flags) const
recv(2) wrapper.
virtual std::string RecvUntilTerminator(uint8_t terminator, std::chrono::milliseconds timeout, CThreadInterrupt &interrupt, size_t max_data) const
Read from socket until a terminator character is encountered.
virtual int Listen(int backlog) const
listen(2) wrapper.
virtual bool SetNonBlocking() const
Set the non-blocking option on the socket.
std::unordered_map< std::shared_ptr< const Sock >, Events, HashSharedPtrSock, EqualSharedPtrSock > EventsPerSock
On which socket to wait for what events in WaitMany().
virtual bool IsSelectable() const
Check if the underlying socket can be used for select(2) (or the Wait() method).
bool operator==(SOCKET s) const
Check if the internal socket is equal to s.
#define WSAGetLastError()
static bool IOErrorIsPermanent(int err)
std::string NetworkErrorString(int err)
Return readable error string for a network error code.
static constexpr auto MAX_WAIT_FOR_IO
Maximum time to wait for I/O readiness.
constexpr auto MakeUCharSpan(const V &v) -> decltype(UCharSpanCast(std::span{v}))
Like the std::span constructor, but for (const) unsigned char member types only.
Auxiliary requested/occurred events to wait for in WaitMany().
std::string SysErrorString(int err)
Return system error string from errno value.
struct timeval MillisToTimeval(int64_t nTimeout)
Convert milliseconds to a struct timeval for e.g.
constexpr int64_t count_milliseconds(std::chrono::milliseconds t)