Bitcoin Core 28.99.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-2022 The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
6
7#include <common/args.h>
8#include <kernel/chain.h>
10#include <logging.h>
11#include <netbase.h>
12#include <primitives/block.h>
14#include <validationinterface.h>
17#include <zmq/zmqutil.h>
18
19#include <zmq.h>
20
21#include <cassert>
22#include <map>
23#include <string>
24#include <utility>
25#include <vector>
26
28
30{
31 Shutdown();
32}
33
34std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
35{
36 std::list<const CZMQAbstractNotifier*> result;
37 for (const auto& n : notifiers) {
38 result.push_back(n.get());
39 }
40 return result;
41}
42
43std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<uint8_t>&, const CBlockIndex&)> get_block_by_index)
44{
45 std::map<std::string, CZMQNotifierFactory> factories;
46 factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
47 factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
48 factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
49 return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
50 };
51 factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
52 factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
53
54 std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
55 for (const auto& entry : factories)
56 {
57 std::string arg("-zmq" + entry.first);
58 const auto& factory = entry.second;
59 for (std::string& address : gArgs.GetArgs(arg)) {
60 // libzmq uses prefix "ipc://" for UNIX domain sockets
61 if (address.substr(0, ADDR_PREFIX_UNIX.length()) == ADDR_PREFIX_UNIX) {
62 address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
63 }
64
65 std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
66 notifier->SetType(entry.first);
67 notifier->SetAddress(address);
68 notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
69 notifiers.push_back(std::move(notifier));
70 }
71 }
72
73 if (!notifiers.empty())
74 {
75 std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
76 notificationInterface->notifiers = std::move(notifiers);
77
78 if (notificationInterface->Initialize()) {
79 return notificationInterface;
80 }
81 }
82
83 return nullptr;
84}
85
86// Called at startup to conditionally set up ZMQ socket(s)
88{
89 int major = 0, minor = 0, patch = 0;
90 zmq_version(&major, &minor, &patch);
91 LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
92
93 LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
95
96 pcontext = zmq_ctx_new();
97
98 if (!pcontext)
99 {
100 zmqError("Unable to initialize context");
101 return false;
102 }
103
104 for (auto& notifier : notifiers) {
105 if (notifier->Initialize(pcontext)) {
106 LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
107 } else {
108 LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
109 return false;
110 }
111 }
112
113 return true;
114}
115
116// Called during shutdown sequence
118{
119 LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
120 if (pcontext)
121 {
122 for (auto& notifier : notifiers) {
123 LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
124 notifier->Shutdown();
125 }
126 zmq_ctx_term(pcontext);
127
128 pcontext = nullptr;
129 }
130}
131
132namespace {
133
134template <typename Function>
135void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
136{
137 for (auto i = notifiers.begin(); i != notifiers.end(); ) {
138 CZMQAbstractNotifier* notifier = i->get();
139 if (func(notifier)) {
140 ++i;
141 } else {
142 notifier->Shutdown();
143 i = notifiers.erase(i);
144 }
145 }
146}
147
148} // anonymous namespace
149
150void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
151{
152 if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
153 return;
154
155 TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
156 return notifier->NotifyBlock(pindexNew);
157 });
158}
159
161{
162 const CTransaction& tx = *(ptx.info.m_tx);
163
164 TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
165 return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
166 });
167}
168
170{
171 // Called for all non-block inclusion reasons
172 const CTransaction& tx = *ptx;
173
174 TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
175 return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
176 });
177}
178
179void CZMQNotificationInterface::BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
180{
181 if (role == ChainstateRole::BACKGROUND) {
182 return;
183 }
184 for (const CTransactionRef& ptx : pblock->vtx) {
185 const CTransaction& tx = *ptx;
186 TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
187 return notifier->NotifyTransaction(tx);
188 });
189 }
190
191 // Next we notify BlockConnect listeners for *all* blocks
192 TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
193 return notifier->NotifyBlockConnect(pindexConnected);
194 });
195}
196
197void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
198{
199 for (const CTransactionRef& ptx : pblock->vtx) {
200 const CTransaction& tx = *ptx;
201 TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
202 return notifier->NotifyTransaction(tx);
203 });
204 }
205
206 // Next we notify BlockDisconnect listeners for *all* blocks
207 TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
208 return notifier->NotifyBlockDisconnect(pindexDisconnected);
209 });
210}
211
212std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
ArgsManager gArgs
Definition: args.cpp:42
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:362
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: args.cpp:482
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:141
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:296
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(std::vector< uint8_t > &, const CBlockIndex &)> get_block_by_index)
void TransactionAddedToMempool(const NewMempoolTransactionInfo &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected Provides the block that was disconnected.
void BlockConnected(ChainstateRole role, const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
ChainstateRole
This enum describes the various roles a specific Chainstate instance can take.
Definition: chain.h:25
#define LogDebug(category,...)
Definition: logging.h:280
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
@ ZMQ
Definition: logging.h:48
const std::string ADDR_PREFIX_UNIX
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: netbase.h:31
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:423
const CTransactionRef m_tx
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13
const std::string ADDR_PREFIX_IPC
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: zmqutil.h:13