Bitcoin Core  0.19.99
P2P Digital Currency
zmqpublishnotifier.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2020 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
5 #include <chain.h>
6 #include <chainparams.h>
7 #include <streams.h>
9 #include <validation.h>
10 #include <util/system.h>
11 #include <rpc/server.h>
12 
13 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
14 
15 static const char *MSG_HASHBLOCK = "hashblock";
16 static const char *MSG_HASHTX = "hashtx";
17 static const char *MSG_RAWBLOCK = "rawblock";
18 static const char *MSG_RAWTX = "rawtx";
19 
20 // Internal function to send multipart message
21 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
22 {
23  va_list args;
24  va_start(args, size);
25 
26  while (1)
27  {
28  zmq_msg_t msg;
29 
30  int rc = zmq_msg_init_size(&msg, size);
31  if (rc != 0)
32  {
33  zmqError("Unable to initialize ZMQ msg");
34  va_end(args);
35  return -1;
36  }
37 
38  void *buf = zmq_msg_data(&msg);
39  memcpy(buf, data, size);
40 
41  data = va_arg(args, const void*);
42 
43  rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
44  if (rc == -1)
45  {
46  zmqError("Unable to send ZMQ msg");
47  zmq_msg_close(&msg);
48  va_end(args);
49  return -1;
50  }
51 
52  zmq_msg_close(&msg);
53 
54  if (!data)
55  break;
56 
57  size = va_arg(args, size_t);
58  }
59  va_end(args);
60  return 0;
61 }
62 
64 {
65  assert(!psocket);
66 
67  // check if address is being used by other publish notifier
68  std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
69 
70  if (i==mapPublishNotifiers.end())
71  {
72  psocket = zmq_socket(pcontext, ZMQ_PUB);
73  if (!psocket)
74  {
75  zmqError("Failed to create socket");
76  return false;
77  }
78 
79  LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
80 
81  int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
82  if (rc != 0)
83  {
84  zmqError("Failed to set outbound message high water mark");
85  zmq_close(psocket);
86  return false;
87  }
88 
89  rc = zmq_bind(psocket, address.c_str());
90  if (rc != 0)
91  {
92  zmqError("Failed to bind address");
93  zmq_close(psocket);
94  return false;
95  }
96 
97  // register this notifier for the address, so it can be reused for other publish notifier
98  mapPublishNotifiers.insert(std::make_pair(address, this));
99  return true;
100  }
101  else
102  {
103  LogPrint(BCLog::ZMQ, "zmq: Reusing socket for address %s\n", address);
104  LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
105 
106  psocket = i->second->psocket;
107  mapPublishNotifiers.insert(std::make_pair(address, this));
108 
109  return true;
110  }
111 }
112 
114 {
115  // Early return if Initialize was not called
116  if (!psocket) return;
117 
118  int count = mapPublishNotifiers.count(address);
119 
120  // remove this notifier from the list of publishers using this address
121  typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
122  std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
123 
124  for (iterator it = iterpair.first; it != iterpair.second; ++it)
125  {
126  if (it->second==this)
127  {
128  mapPublishNotifiers.erase(it);
129  break;
130  }
131  }
132 
133  if (count == 1)
134  {
135  LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
136  int linger = 0;
137  zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
138  zmq_close(psocket);
139  }
140 
141  psocket = nullptr;
142 }
143 
144 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
145 {
146  assert(psocket);
147 
148  /* send three parts, command & data & a LE 4byte sequence number */
149  unsigned char msgseq[sizeof(uint32_t)];
150  WriteLE32(&msgseq[0], nSequence);
151  int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), nullptr);
152  if (rc == -1)
153  return false;
154 
155  /* increment memory only sequence number after sending */
156  nSequence++;
157 
158  return true;
159 }
160 
162 {
163  uint256 hash = pindex->GetBlockHash();
164  LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
165  char data[32];
166  for (unsigned int i = 0; i < 32; i++)
167  data[31 - i] = hash.begin()[i];
168  return SendMessage(MSG_HASHBLOCK, data, 32);
169 }
170 
172 {
173  uint256 hash = transaction.GetHash();
174  LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
175  char data[32];
176  for (unsigned int i = 0; i < 32; i++)
177  data[31 - i] = hash.begin()[i];
178  return SendMessage(MSG_HASHTX, data, 32);
179 }
180 
182 {
183  LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
184 
185  const Consensus::Params& consensusParams = Params().GetConsensus();
187  {
188  LOCK(cs_main);
189  CBlock block;
190  if(!ReadBlockFromDisk(block, pindex, consensusParams))
191  {
192  zmqError("Can't read block from disk");
193  return false;
194  }
195 
196  ss << block;
197  }
198 
199  return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
200 }
201 
203 {
204  uint256 hash = transaction.GetHash();
205  LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
207  ss << transaction;
208  return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
209 }
static const char * MSG_HASHBLOCK
bool NotifyTransaction(const CTransaction &transaction) override
uint32_t nSequence
upcounting per message sequence number
#define LogPrint(category,...)
Definition: logging.h:179
Definition: block.h:72
bool ReadBlockFromDisk(CBlock &block, const FlatFilePos &pos, const Consensus::Params &consensusParams)
Functions for disk access for blocks.
static std::multimap< std::string, CZMQAbstractPublishNotifier * > mapPublishNotifiers
static void WriteLE32(unsigned char *ptr, uint32_t x)
Definition: common.h:44
Double ended buffer combining vector and stream-like interfaces.
Definition: streams.h:201
static const char * MSG_HASHTX
unsigned char * begin()
Definition: uint256.h:54
bool NotifyTransaction(const CTransaction &transaction) override
bool NotifyBlock(const CBlockIndex *pindex) override
uint256 GetBlockHash() const
Definition: chain.h:233
size_type size() const
Definition: streams.h:292
#define LOCK(cs)
Definition: sync.h:179
bool SendMessage(const char *command, const void *data, size_t size)
const uint256 & GetHash() const
Definition: transaction.h:322
bool NotifyBlock(const CBlockIndex *pindex) override
RecursiveMutex cs_main
Mutex to guard access to validation specific variables, such as reading or changing the chainstate...
Definition: validation.cpp:106
static int zmq_send_multipart(void *sock, const void *data, size_t size,...)
Parameters that influence chain consensus.
Definition: params.h:45
256-bit opaque blob.
Definition: uint256.h:120
static const char * MSG_RAWTX
const_iterator begin() const
Definition: streams.h:288
static const char * MSG_RAWBLOCK
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:137
const CChainParams & Params()
Return the currently selected parameters.
int RPCSerializationFlags()
Definition: server.cpp:491
void * memcpy(void *a, const void *b, size_t c)
static const int PROTOCOL_VERSION
network protocol versioning
Definition: version.h:12
std::string GetHex() const
Definition: uint256.cpp:20
static int count
Definition: tests.c:45
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:270
const Consensus::Params & GetConsensus() const
Definition: chainparams.h:60
auto it
Definition: validation.cpp:362
void zmqError(const char *str)
bool Initialize(void *pcontext) override