Bitcoin Core  0.20.99
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1 // Copyright (c) 2015-2019 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
4 
7 #include <zmq/zmqutil.h>
8 
9 #include <zmq.h>
10 
11 #include <validation.h>
12 #include <util/system.h>
13 
15 {
16 }
17 
19 {
20  Shutdown();
21 }
22 
23 std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
24 {
25  std::list<const CZMQAbstractNotifier*> result;
26  for (const auto& n : notifiers) {
27  result.push_back(n.get());
28  }
29  return result;
30 }
31 
33 {
34  std::map<std::string, CZMQNotifierFactory> factories;
35  factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
36  factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
37  factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
38  factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
39  factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
40 
41  std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
42  for (const auto& entry : factories)
43  {
44  std::string arg("-zmq" + entry.first);
45  const auto& factory = entry.second;
46  for (const std::string& address : gArgs.GetArgs(arg)) {
47  std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
48  notifier->SetType(entry.first);
49  notifier->SetAddress(address);
50  notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
51  notifiers.push_back(std::move(notifier));
52  }
53  }
54 
55  if (!notifiers.empty())
56  {
57  std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
58  notificationInterface->notifiers = std::move(notifiers);
59 
60  if (notificationInterface->Initialize()) {
61  return notificationInterface.release();
62  }
63  }
64 
65  return nullptr;
66 }
67 
68 // Called at startup to conditionally set up ZMQ socket(s)
70 {
71  int major = 0, minor = 0, patch = 0;
72  zmq_version(&major, &minor, &patch);
73  LogPrint(BCLog::ZMQ, "zmq: version %d.%d.%d\n", major, minor, patch);
74 
75  LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n");
76  assert(!pcontext);
77 
78  pcontext = zmq_ctx_new();
79 
80  if (!pcontext)
81  {
82  zmqError("Unable to initialize context");
83  return false;
84  }
85 
86  for (auto& notifier : notifiers) {
87  if (notifier->Initialize(pcontext)) {
88  LogPrint(BCLog::ZMQ, "zmq: Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
89  } else {
90  LogPrint(BCLog::ZMQ, "zmq: Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
91  return false;
92  }
93  }
94 
95  return true;
96 }
97 
98 // Called during shutdown sequence
100 {
101  LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n");
102  if (pcontext)
103  {
104  for (auto& notifier : notifiers) {
105  LogPrint(BCLog::ZMQ, "zmq: Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
106  notifier->Shutdown();
107  }
108  zmq_ctx_term(pcontext);
109 
110  pcontext = nullptr;
111  }
112 }
113 
114 namespace {
115 
116 template <typename Function>
117 void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
118 {
119  for (auto i = notifiers.begin(); i != notifiers.end(); ) {
120  CZMQAbstractNotifier* notifier = i->get();
121  if (func(notifier)) {
122  ++i;
123  } else {
124  notifier->Shutdown();
125  i = notifiers.erase(i);
126  }
127  }
128 }
129 
130 } // anonymous namespace
131 
132 void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
133 {
134  if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
135  return;
136 
137  TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
138  return notifier->NotifyBlock(pindexNew);
139  });
140 }
141 
143 {
144  const CTransaction& tx = *ptx;
145 
146  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
147  return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
148  });
149 }
150 
152 {
153  // Called for all non-block inclusion reasons
154  const CTransaction& tx = *ptx;
155 
156  TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
157  return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
158  });
159 }
160 
161 void CZMQNotificationInterface::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
162 {
163  for (const CTransactionRef& ptx : pblock->vtx) {
164  const CTransaction& tx = *ptx;
165  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
166  return notifier->NotifyTransaction(tx);
167  });
168  }
169 
170  // Next we notify BlockConnect listeners for *all* blocks
171  TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
172  return notifier->NotifyBlockConnect(pindexConnected);
173  });
174 }
175 
176 void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
177 {
178  for (const CTransactionRef& ptx : pblock->vtx) {
179  const CTransaction& tx = *ptx;
180  TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
181  return notifier->NotifyTransaction(tx);
182  });
183  }
184 
185  // Next we notify BlockDisconnect listeners for *all* blocks
186  TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
187  return notifier->NotifyBlockDisconnect(pindexDisconnected);
188  });
189 }
190 
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:395
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected.
#define LogPrint(category,...)
Definition: logging.h:182
void zmqError(const char *str)
Definition: zmqutil.cpp:11
void TransactionAddedToMempool(const CTransactionRef &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyBlock(const CBlockIndex *pindex)
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal...
Definition: txmempool.h:392
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static CZMQNotificationInterface * Create()
virtual bool NotifyTransaction(const CTransaction &transaction)
void BlockConnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
static const int DEFAULT_ZMQ_SNDHWM
virtual void Shutdown()=0
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
The block chain is a tree shaped structure starting with the genesis block at the root...
Definition: chain.h:137
std::string GetArg(const std::string &strArg, const std::string &strDefault) const
Return string argument or default value.
Definition: system.cpp:466
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
ArgsManager gArgs
Definition: system.cpp:76
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
The basic transaction that is broadcasted on the network and contained in blocks. ...
Definition: transaction.h:259
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: system.cpp:361
CZMQNotificationInterface * g_zmq_notification_interface