Bitcoin Core  21.99.0
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
6 
7 #include <chain.h>
8 #include <chainparams.h>
9 #include <rpc/server.h>
10 #include <streams.h>
11 #include <util/system.h>
12 #include <validation.h>
13 #include <zmq/zmqutil.h>
14 
15 #include <zmq.h>
16 
17 #include <cstdarg>
18 #include <cstddef>
19 #include <map>
20 #include <optional>
21 #include <string>
22 #include <utility>
23 
24 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
25 
26 static const char *MSG_HASHBLOCK = "hashblock";
27 static const char *MSG_HASHTX = "hashtx";
28 static const char *MSG_RAWBLOCK = "rawblock";
29 static const char *MSG_RAWTX = "rawtx";
30 static const char *MSG_SEQUENCE = "sequence";
31 
32 // Internal function to send multipart message
33 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
34 {
35  va_list args;
36  va_start(args, size);
37 
38  while (1)
39  {
40  zmq_msg_t msg;
41 
42  int rc = zmq_msg_init_size(&msg, size);
43  if (rc != 0)
44  {
45  zmqError("Unable to initialize ZMQ msg");
46  va_end(args);
47  return -1;
48  }
49 
50  void *buf = zmq_msg_data(&msg);
51  memcpy(buf, data, size);
52 
53  data = va_arg(args, const void*);
54 
55  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
56  if (rc == -1)
57  {
58  zmqError("Unable to send ZMQ msg");
59  zmq_msg_close(&msg);
60  va_end(args);
61  return -1;
62  }
63 
64  zmq_msg_close(&msg);
65 
66  if (!data)
67  break;
68 
69  size = va_arg(args, size_t);
70  }
71  va_end(args);
72  return 0;
73 }
74 
76 {
77  assert(!psocket);
78 
79  // check if address is being used by other publish notifier
80  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
81 
82  if (i==mapPublishNotifiers.end())
83  {
84  psocket = zmq_socket(pcontext, ZMQ_PUB);
85  if (!psocket)
86  {
87  zmqError("Failed to create socket");
88  return false;
89  }
90 
91  LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
92 
93  int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
94  if (rc != 0)
95  {
96  zmqError("Failed to set outbound message high water mark");
97  zmq_close(psocket);
98  return false;
99  }
100 
101  const int so_keepalive_option {1};
102  rc = zmq_setsockopt(psocket, ZMQ_TCP_KEEPALIVE, &so_keepalive_option, sizeof(so_keepalive_option));
103  if (rc != 0) {
104  zmqError("Failed to set SO_KEEPALIVE");
105  zmq_close(psocket);
106  return false;
107  }
108 
109  rc = zmq_bind(psocket, address.c_str());
110  if (rc != 0)
111  {
112  zmqError("Failed to bind address");
113  zmq_close(psocket);
114  return false;
115  }
116 
117  // register this notifier for the address, so it can be reused for other publish notifier
118  mapPublishNotifiers.insert(std::make_pair(address, this));
119  return true;
120  }
121  else
122  {
123  LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
124  LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
125 
126  psocket = i->second->psocket;
127  mapPublishNotifiers.insert(std::make_pair(address, this));
128 
129  return true;
130  }
131 }
132 
134 {
135  // Early return if Initialize was not called
136  if (!psocket) return;
137 
138  int count = mapPublishNotifiers.count(address);
139 
140  // remove this notifier from the list of publishers using this address
141  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
142  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
143 
144  for (iterator it = iterpair.first; it != iterpair.second; ++it)
145  {
146  if (it->second==this)
147  {
148  mapPublishNotifiers.erase(it);
149  break;
150  }
151  }
152 
153  if (count == 1)
154  {
155  LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
156  int linger = 0;
157  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
158  zmq_close(psocket);
159  }
160 
161  psocket = nullptr;
162 }
163 
164 bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void* data, size_t size)
165 {
166  assert(psocket);
167 
168  /* send three parts, command & data & a LE 4byte sequence number */
169  unsigned char msgseq[sizeof(uint32_t)];
170  WriteLE32(&msgseq[0], nSequence);
171  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
172  if (rc == -1)
173  return false;
174 
175  /* increment memory only sequence number after sending */
176  nSequence++;
177 
178  return true;
179 }
180 
182 {
183  uint256 hash = pindex->GetBlockHash();
184  LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
185  char data[32];
186  for (unsigned int i = 0; i < 32; i++)
187  data[31 - i] = hash.begin()[i];
188  return SendZmqMessage(MSG_HASHBLOCK, data, 32);
189 }
190 
192 {
193  uint256 hash = transaction.GetHash();
194  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
195  char data[32];
196  for (unsigned int i = 0; i < 32; i++)
197  data[31 - i] = hash.begin()[i];
198  return SendZmqMessage(MSG_HASHTX, data, 32);
199 }
200 
202 {
203  LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
204 
205  const Consensus::Params& consensusParams = Params().GetConsensus();
207  {
208  LOCK(cs_main);
209  CBlock block;
210  if(!ReadBlockFromDisk(block, pindex, consensusParams))
211  {
212  zmqError("Can't read block from disk");
213  return false;
214  }
215 
216  ss << block;
217  }
218 
219  return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
220 }
221 
223 {
224  uint256 hash = transaction.GetHash();
225  LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
227  ss << transaction;
228  return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
229 }
230 
231 // Helper function to send a 'sequence' topic message with the following structure:
232 // <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional)
233 static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional<uint64_t> sequence = {})
234 {
235  unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)];
236  for (unsigned int i = 0; i < sizeof(hash); ++i) {
237  data[sizeof(hash) - 1 - i] = hash.begin()[i];
238  }
239  data[sizeof(hash)] = label;
240  if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence);
241  return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label));
242 }
243 
245 {
246  uint256 hash = pindex->GetBlockHash();
247  LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
248  return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C');
249 }
250 
252 {
253  uint256 hash = pindex->GetBlockHash();
254  LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
255  return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D');
256 }
257 
258 bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
259 {
260  uint256 hash = transaction.GetHash();
261  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
262  return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence);
263 }
264 
265 bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
266 {
267  uint256 hash = transaction.GetHash();
268  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
269  return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
270 }
static const char * MSG_HASHBLOCK
bool NotifyTransaction(const CTransaction &transaction) override
std::deque< CInv >::iterator it
uint32_t nSequence
upcounting per message sequence number
#define LogPrint(category,...)
Definition: logging.h:182
assert(!tx.IsCoinBase())
void zmqError(const char *str)
Definition: zmqutil.cpp:11
bool NotifyBlockDisconnect(const CBlockIndex *pindex) override
bool SendZmqMessage(const char *command, const void *data, size_t size)
Definition: block.h:62
static void WriteLE64(unsigned char *ptr, uint64_t x)
Definition: common.h:50
bool ReadBlockFromDisk(CBlock &block, const FlatFilePos &pos, const Consensus::Params &consensusParams)
Functions for disk access for blocks.
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
static bool SendSequenceMsg(CZMQAbstractPublishNotifier &notifier, uint256 hash, char label, std::optional< uint64_t > sequence={})
static void WriteLE32(unsigned char *ptr, uint32_t x)
Definition: common.h:44
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:202
static const char * MSG_HASHTX
unsigned char * begin()
Definition: uint256.h:58
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
uint256 GetBlockHash() const
Definition: chain.h:233
size_type size() const
Definition: streams.h:293
#define LOCK(cs)
Definition: sync.h:232
const uint256 & GetHash() const
Definition: transaction.h:302
bool NotifyBlock(const CBlockIndex *pindex) override
RecursiveMutex cs_main
Mutex to guard access to validation specific variables, such as reading or changing the chainstate...
Definition: validation.cpp:128
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
Parameters that influence chain consensus.
Definition: params.h:46
bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override
256-bit opaque blob.
Definition: uint256.h:124
static const char * MSG_RAWTX
const_iterator begin() const
Definition: streams.h:289
static const char * MSG_RAWBLOCK
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:137
const CChainParams & Params()
Return the currently selected parameters.
int RPCSerializationFlags()
Definition: server.cpp:509
void * memcpy(void *a, const void *b, size_t c)
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:12
bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override
std::string GetHex() const
Definition: uint256.cpp:20
static int count
Definition: tests.c:35
bool NotifyBlockConnect(const CBlockIndex *pindex) override
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:259
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:65
static const char * MSG_SEQUENCE
bool Initialize(void *pcontext) override