Bitcoin Core 30.99.0
P2P Digital Currency
zmqnotificationinterface.cpp
Go to the documentation of this file.
1// Copyright (c) 2015-present The Bitcoin Core developers
2// Distributed under the MIT software license, see the accompanying
3// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4
6
7#include <common/args.h>
9#include <kernel/types.h>
10#include <logging.h>
11#include <netbase.h>
12#include <primitives/block.h>
14#include <validationinterface.h>
17#include <zmq/zmqutil.h>
18
19#include <zmq.h>
20
21#include <cassert>
22#include <map>
23#include <string>
24#include <utility>
25#include <vector>
26
28
30
32{
33 Shutdown();
34}
35
36std::list<const CZMQAbstractNotifier*> CZMQNotificationInterface::GetActiveNotifiers() const
37{
38 std::list<const CZMQAbstractNotifier*> result;
39 for (const auto& n : notifiers) {
40 result.push_back(n.get());
41 }
42 return result;
43}
44
45std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std::function<bool(std::vector<std::byte>&, const CBlockIndex&)> get_block_by_index)
46{
47 std::map<std::string, CZMQNotifierFactory> factories;
48 factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
49 factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
50 factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
51 return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
52 };
53 factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
54 factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
55
56 std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
57 for (const auto& entry : factories)
58 {
59 std::string arg("-zmq" + entry.first);
60 const auto& factory = entry.second;
61 for (std::string& address : gArgs.GetArgs(arg)) {
62 // libzmq uses prefix "ipc://" for UNIX domain sockets
63 if (address.starts_with(ADDR_PREFIX_UNIX)) {
64 address.replace(0, ADDR_PREFIX_UNIX.length(), ADDR_PREFIX_IPC);
65 }
66
67 std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
68 notifier->SetType(entry.first);
69 notifier->SetAddress(address);
70 notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetIntArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
71 notifiers.push_back(std::move(notifier));
72 }
73 }
74
75 if (!notifiers.empty())
76 {
77 std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
78 notificationInterface->notifiers = std::move(notifiers);
79
80 if (notificationInterface->Initialize()) {
81 return notificationInterface;
82 }
83 }
84
85 return nullptr;
86}
87
88// Called at startup to conditionally set up ZMQ socket(s)
90{
91 int major = 0, minor = 0, patch = 0;
92 zmq_version(&major, &minor, &patch);
93 LogDebug(BCLog::ZMQ, "version %d.%d.%d\n", major, minor, patch);
94
95 LogDebug(BCLog::ZMQ, "Initialize notification interface\n");
97
98 pcontext = zmq_ctx_new();
99
100 if (!pcontext)
101 {
102 zmqError("Unable to initialize context");
103 return false;
104 }
105
106 for (auto& notifier : notifiers) {
107 if (notifier->Initialize(pcontext)) {
108 LogDebug(BCLog::ZMQ, "Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
109 } else {
110 LogDebug(BCLog::ZMQ, "Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
111 return false;
112 }
113 }
114
115 return true;
116}
117
118// Called during shutdown sequence
120{
121 LogDebug(BCLog::ZMQ, "Shutdown notification interface\n");
122 if (pcontext)
123 {
124 for (auto& notifier : notifiers) {
125 LogDebug(BCLog::ZMQ, "Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
126 notifier->Shutdown();
127 }
128 zmq_ctx_term(pcontext);
129
130 pcontext = nullptr;
131 }
132}
133
134namespace {
135
136template <typename Function>
137void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>& notifiers, const Function& func)
138{
139 for (auto i = notifiers.begin(); i != notifiers.end(); ) {
140 CZMQAbstractNotifier* notifier = i->get();
141 if (func(notifier)) {
142 ++i;
143 } else {
144 notifier->Shutdown();
145 i = notifiers.erase(i);
146 }
147 }
148}
149
150} // anonymous namespace
151
152void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
153{
154 if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
155 return;
156
157 TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
158 return notifier->NotifyBlock(pindexNew);
159 });
160}
161
163{
164 const CTransaction& tx = *(ptx.info.m_tx);
165
166 TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
167 return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence);
168 });
169}
170
172{
173 // Called for all non-block inclusion reasons
174 const CTransaction& tx = *ptx;
175
176 TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) {
177 return notifier->NotifyTransactionRemoval(tx, mempool_sequence);
178 });
179}
180
181void CZMQNotificationInterface::BlockConnected(const ChainstateRole& role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected)
182{
183 if (role.historical) {
184 return;
185 }
186 for (const CTransactionRef& ptx : pblock->vtx) {
187 const CTransaction& tx = *ptx;
188 TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
189 return notifier->NotifyTransaction(tx);
190 });
191 }
192
193 // Next we notify BlockConnect listeners for *all* blocks
194 TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) {
195 return notifier->NotifyBlockConnect(pindexConnected);
196 });
197}
198
199void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected)
200{
201 for (const CTransactionRef& ptx : pblock->vtx) {
202 const CTransaction& tx = *ptx;
203 TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) {
204 return notifier->NotifyTransaction(tx);
205 });
206 }
207
208 // Next we notify BlockDisconnect listeners for *all* blocks
209 TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) {
210 return notifier->NotifyBlockDisconnect(pindexDisconnected);
211 });
212}
213
214std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
ArgsManager gArgs
Definition: args.cpp:40
std::vector< std::string > GetArgs(const std::string &strArg) const
Return a vector of strings of the given argument.
Definition: args.cpp:366
int64_t GetIntArg(const std::string &strArg, int64_t nDefault) const
Return integer argument or default value.
Definition: args.cpp:486
The block chain is a tree shaped structure starting with the genesis block at the root,...
Definition: chain.h:95
The basic transaction that is broadcasted on the network and contained in blocks.
Definition: transaction.h:281
virtual void Shutdown()=0
virtual bool NotifyBlockConnect(const CBlockIndex *pindex)
static const int DEFAULT_ZMQ_SNDHWM
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
virtual bool NotifyBlock(const CBlockIndex *pindex)
virtual bool NotifyTransaction(const CTransaction &transaction)
virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex)
virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
static std::unique_ptr< CZMQNotificationInterface > Create(std::function< bool(std::vector< std::byte > &, const CBlockIndex &)> get_block_by_index)
void TransactionAddedToMempool(const NewMempoolTransactionInfo &tx, uint64_t mempool_sequence) override
Notifies listeners of a transaction having been added to mempool.
void BlockConnected(const kernel::ChainstateRole &role, const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexConnected) override
Notifies listeners of a block being connected.
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
Notifies listeners when the block chain tip advances.
std::list< std::unique_ptr< CZMQAbstractNotifier > > notifiers
void BlockDisconnected(const std::shared_ptr< const CBlock > &pblock, const CBlockIndex *pindexDisconnected) override
Notifies listeners of a block being disconnected Provides the block that was disconnected.
void TransactionRemovedFromMempool(const CTransactionRef &tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
Notifies listeners of a transaction leaving mempool.
std::list< const CZMQAbstractNotifier * > GetActiveNotifiers() const
is a home for simple enum and struct type definitions that can be used internally by functions in the...
#define LogDebug(category,...)
Definition: logging.h:411
MemPoolRemovalReason
Reason why a transaction was removed from the mempool, this is passed to the notification signal.
@ ZMQ
Definition: logging.h:92
const std::string ADDR_PREFIX_UNIX
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: netbase.h:31
std::shared_ptr< const CTransaction > CTransactionRef
Definition: transaction.h:403
const CTransactionRef m_tx
Information about chainstate that notifications are sent from.
Definition: types.h:18
bool historical
Whether this is a historical chainstate downloading old blocks to validate an assumeutxo snapshot,...
Definition: types.h:26
assert(!tx.IsCoinBase())
std::unique_ptr< CZMQNotificationInterface > g_zmq_notification_interface
void zmqError(const std::string &str)
Definition: zmqutil.cpp:13
const std::string ADDR_PREFIX_IPC
Prefix for unix domain socket addresses (which are local filesystem paths)
Definition: zmqutil.h:13