Bitcoin Core  0.20.99
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2019 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
7 
8 #include <validation.h>
9 #include <util/system.h>
10 
11 void zmqError(const char *str)
12 {
13  LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
14 }
15 
17 {
18 }
19 
21 {
22  Shutdown();
23 
24  for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
25  {
26  delete *i;
27  }
28 }
29 
30 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
31 {
32  std::list<const CZMQAbstractNotifier*> result;
33  for (const auto* n : notifiers) {
34  result.push_back(n);
35  }
36  return result;
37 }
38 
40 {
41  CZMQNotificationInterface* notificationInterface = nullptr;
42  std::map<std::string, CZMQNotifierFactory> factories;
43  std::list<CZMQAbstractNotifier*> notifiers;
44 
45  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
46  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
47  factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
48  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
49 
50  for (const auto& entry : factories)
51  {
52  std::string arg("-zmq" + entry.first);
53  if (gArgs.IsArgSet(arg))
54  {
55  CZMQNotifierFactory factory = entry.second;
56  std::string address = gArgs.GetArg(arg, "");
57  CZMQAbstractNotifier *notifier = factory();
58  notifier->SetType(entry.first);
59  notifier->SetAddress(address);
61  notifiers.push_back(notifier);
62  }
63  }
64 
65  if (!notifiers.empty())
66  {
67  notificationInterface = new CZMQNotificationInterface();
68  notificationInterface->notifiers = notifiers;
69 
70  if (!notificationInterface->Initialize())
71  {
72  delete notificationInterface;
73  notificationInterface = nullptr;
74  }
75  }
76 
77  return notificationInterface;
78 }
79 
80 // Called at startup to conditionally set up ZMQ socket(s)
82 {
83  int major = 0, minor = 0, patch = 0;
84  zmq_version(&major, &minor, &patch);
85  LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
86 
87  LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
88  assert(!pcontext);
89 
90  pcontext = zmq_ctx_new();
91 
92  if (!pcontext)
93  {
94  zmqError("Unable to initialize context");
95  return false;
96  }
97 
98  std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
99  for (; i!=notifiers.end(); ++i)
100  {
101  CZMQAbstractNotifier *notifier = *i;
102  if (notifier->Initialize(pcontext))
103  {
104  LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
105  }
106  else
107  {
108  LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
109  break;
110  }
111  }
112 
113  if (i!=notifiers.end())
114  {
115  return false;
116  }
117 
118  return true;
119 }
120 
121 // Called during shutdown sequence
123 {
124  LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
125  if (pcontext)
126  {
127  for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
128  {
129  CZMQAbstractNotifier *notifier = *i;
130  LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
131  notifier->Shutdown();
132  }
133  zmq_ctx_term(pcontext);
134 
135  pcontext = nullptr;
136  }
137 }
138 
139 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
140 {
141  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
142  return;
143 
144  for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
145  {
146  CZMQAbstractNotifier *notifier = *i;
147  if (notifier->NotifyBlock(pindexNew))
148  {
149  i++;
150  }
151  else
152  {
153  notifier->Shutdown();
154  i = notifiers.erase(i);
155  }
156  }
157 }
158 
160 {
161  // Used by BlockConnected and BlockDisconnected as well, because they're
162  // all the same external callback.
163  const CTransaction& tx = *ptx;
164 
165  for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
166  {
167  CZMQAbstractNotifier *notifier = *i;
168  if (notifier->NotifyTransaction(tx))
169  {
170  i++;
171  }
172  else
173  {
174  notifier->Shutdown();
175  i = notifiers.erase(i);
176  }
177  }
178 }
179 
180 void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
181 {
182  for (const CTransactionRef& ptx : pblock->vtx) {
183  // Do a normal notify for each transaction added in the block
185  }
186 }
187 
188 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
189 {
190  for (const CTransactionRef& ptx : pblock->vtx) {
191  // Do a normal notify for each transaction removed in block disconnection
193  }
194 }
195 
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:389
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
bool IsArgSet(const std::string &strArg) const
Return true if the given argument has been manually set.
Definition: system.cpp:375
#define LogPrint(category,...)
Definition: logging.h:182
void TransactionAddedToMempool(const CTransactionRef &tx) override
Notifies listeners of a transaction having been added to mempool.
virtual bool NotifyBlock(const CBlockIndex *pindex)
std::string GetAddress() const
static CZMQNotificationInterface * Create()
virtual bool NotifyTransaction(const CTransaction &transaction)
void SetOutboundMessageHighWaterMark(const int sndhwm)
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void SetAddress(const std::string &a)
static const int DEFAULT_ZMQ_SNDHWM
virtual void Shutdown()=0
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< CZMQAbstractNotifier * > notifiers
void zmqError(const char *str)
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:137
std::string GetArg(const std::string &strArg, const std::string &strDefault) const
Return string argument or default value.
Definition: system.cpp:463
ArgsManager gArgs
Definition: system.cpp:82
CZMQAbstractNotifier *(* CZMQNotifierFactory)()
virtual bool Initialize(void *pcontext)=0
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:253
void SetType(const std::string &t)
std::string GetType() const
CZMQNotificationInterface * g_zmq_notification_interface