175 std::unique_ptr<PartiallyDownloadedBlock> partialBlock;
209 std::atomic<ServiceFlags> m_their_services{
NODE_NONE};
212 const bool m_is_inbound;
215 Mutex m_misbehavior_mutex;
217 bool m_should_discourage
GUARDED_BY(m_misbehavior_mutex){
false};
220 Mutex m_block_inv_mutex;
224 std::vector<uint256> m_blocks_for_inv_relay
GUARDED_BY(m_block_inv_mutex);
228 std::vector<uint256> m_blocks_for_headers_relay
GUARDED_BY(m_block_inv_mutex);
239 std::atomic<int> m_starting_height{-1};
242 std::atomic<uint64_t> m_ping_nonce_sent{0};
244 std::atomic<std::chrono::microseconds> m_ping_start{0us};
246 std::atomic<bool> m_ping_queued{
false};
249 std::atomic<bool> m_wtxid_relay{
false};
261 bool m_relay_txs
GUARDED_BY(m_bloom_filter_mutex){
false};
263 std::unique_ptr<CBloomFilter> m_bloom_filter
PT_GUARDED_BY(m_bloom_filter_mutex)
GUARDED_BY(m_bloom_filter_mutex){
nullptr};
274 std::set<uint256> m_tx_inventory_to_send
GUARDED_BY(m_tx_inventory_mutex);
278 bool m_send_mempool
GUARDED_BY(m_tx_inventory_mutex){
false};
281 std::chrono::microseconds m_next_inv_send_time
GUARDED_BY(m_tx_inventory_mutex){0};
287 std::atomic<CAmount> m_fee_filter_received{0};
293 LOCK(m_tx_relay_mutex);
295 m_tx_relay = std::make_unique<Peer::TxRelay>();
296 return m_tx_relay.get();
301 return WITH_LOCK(m_tx_relay_mutex,
return m_tx_relay.get());
330 std::atomic_bool m_addr_relay_enabled{
false};
334 mutable Mutex m_addr_send_times_mutex;
336 std::chrono::microseconds m_next_addr_send
GUARDED_BY(m_addr_send_times_mutex){0};
338 std::chrono::microseconds m_next_local_addr_send
GUARDED_BY(m_addr_send_times_mutex){0};
341 std::atomic_bool m_wants_addrv2{
false};
350 std::atomic<uint64_t> m_addr_rate_limited{0};
352 std::atomic<uint64_t> m_addr_processed{0};
358 Mutex m_getdata_requests_mutex;
360 std::deque<CInv> m_getdata_requests
GUARDED_BY(m_getdata_requests_mutex);
366 Mutex m_headers_sync_mutex;
369 std::unique_ptr<HeadersSyncState> m_headers_sync
PT_GUARDED_BY(m_headers_sync_mutex)
GUARDED_BY(m_headers_sync_mutex) {};
372 std::atomic<bool> m_sent_sendheaders{
false};
382 std::atomic<std::chrono::seconds> m_time_offset{0
s};
386 , m_our_services{our_services}
387 , m_is_inbound{is_inbound}
391 mutable Mutex m_tx_relay_mutex;
394 std::unique_ptr<TxRelay> m_tx_relay
GUARDED_BY(m_tx_relay_mutex);
397using PeerRef = std::shared_ptr<Peer>;
409 uint256 hashLastUnknownBlock{};
415 bool fSyncStarted{
false};
417 std::chrono::microseconds m_stalling_since{0us};
418 std::list<QueuedBlock> vBlocksInFlight;
420 std::chrono::microseconds m_downloading_since{0us};
422 bool fPreferredDownload{
false};
424 bool m_requested_hb_cmpctblocks{
false};
426 bool m_provides_cmpctblocks{
false};
452 struct ChainSyncTimeoutState {
454 std::chrono::seconds m_timeout{0
s};
458 bool m_sent_getheaders{
false};
460 bool m_protect{
false};
463 ChainSyncTimeoutState m_chain_sync;
466 int64_t m_last_block_announcement{0};
495 EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex);
509 void SetBestBlock(
int height,
std::chrono::seconds time)
override
511 m_best_height = height;
512 m_best_block_time = time;
516 const std::chrono::microseconds time_received,
const std::atomic<bool>& interruptMsgProc)
override
517 EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex, !m_tx_download_mutex);
541 void Misbehaving(Peer& peer,
const std::string& message);
552 bool via_compact_block,
const std::string& message =
"")
567 bool MaybeDiscourageAndDisconnect(
CNode& pnode, Peer& peer);
581 bool first_time_failure)
606 bool ProcessOrphanTx(Peer& peer)
616 void ProcessHeadersMessage(
CNode& pfrom, Peer& peer,
618 bool via_compact_block)
630 bool CheckHeadersAreContinuous(const
std::vector<
CBlockHeader>& headers) const;
649 bool IsContinuationOfLowWorkHeadersSync(Peer& peer,
CNode& pfrom,
663 bool TryLowWorkHeadersSync(Peer& peer,
CNode& pfrom,
678 void HeadersDirectFetchBlocks(
CNode& pfrom, const Peer& peer, const
CBlockIndex& last_header);
680 void UpdatePeerStateForReceivedHeaders(
CNode& pfrom, Peer& peer, const
CBlockIndex& last_header,
bool received_new_header,
bool may_have_more_headers)
687 template <
typename... Args>
688 void MakeAndPushMessage(
CNode&
node, std::string msg_type, Args&&...
args)
const
694 void PushNodeVersion(
CNode& pnode,
const Peer& peer);
700 void MaybeSendPing(
CNode& node_to, Peer& peer, std::chrono::microseconds now);
743 std::unique_ptr<TxReconciliationTracker> m_txreconciliation;
746 std::atomic<int> m_best_height{-1};
748 std::atomic<std::chrono::seconds> m_best_block_time{0
s};
756 const Options m_opts;
758 bool RejectIncomingTxs(
const CNode& peer)
const;
766 mutable Mutex m_peer_mutex;
773 std::map<NodeId, PeerRef> m_peer_map
GUARDED_BY(m_peer_mutex);
783 uint32_t GetFetchFlags(
const Peer& peer)
const;
785 std::atomic<std::chrono::microseconds> m_next_inv_to_inbounds{0us};
802 std::atomic<int> m_wtxid_relay_peers{0};
819 std::chrono::microseconds NextInvToInbounds(std::chrono::microseconds now,
824 Mutex m_most_recent_block_mutex;
825 std::shared_ptr<const CBlock> m_most_recent_block
GUARDED_BY(m_most_recent_block_mutex);
826 std::shared_ptr<const CBlockHeaderAndShortTxIDs> m_most_recent_compact_block
GUARDED_BY(m_most_recent_block_mutex);
828 std::unique_ptr<const std::map<uint256, CTransactionRef>> m_most_recent_block_txs
GUARDED_BY(m_most_recent_block_mutex);
832 Mutex m_headers_presync_mutex;
840 using HeadersPresyncStats = std::pair<arith_uint256, std::optional<std::pair<int64_t, uint32_t>>>;
842 std::map<NodeId, HeadersPresyncStats> m_headers_presync_stats
GUARDED_BY(m_headers_presync_mutex) {};
846 std::atomic_bool m_headers_presync_should_signal{
false};
916 std::atomic<
std::chrono::seconds> m_last_tip_update{0
s};
922 void ProcessGetData(
CNode& pfrom, Peer& peer,
const std::atomic<bool>& interruptMsgProc)
927 void ProcessBlock(
CNode&
node,
const std::shared_ptr<const CBlock>& block,
bool force_processing,
bool min_pow_checked);
952 std::vector<CTransactionRef> vExtraTxnForCompact
GUARDED_BY(g_msgproc_mutex);
954 size_t vExtraTxnForCompactIt
GUARDED_BY(g_msgproc_mutex) = 0;
966 int64_t ApproximateBestBlockDepth() const;
976 void ProcessGetBlockData(
CNode& pfrom, Peer& peer, const
CInv& inv)
994 bool PrepareBlockFilterRequest(
CNode&
node, Peer& peer,
996 const
uint256& stop_hash, uint32_t max_height_diff,
1045const CNodeState* PeerManagerImpl::State(
NodeId pnode)
const
1047 std::map<NodeId, CNodeState>::const_iterator it = m_node_states.find(pnode);
1048 if (it == m_node_states.end())
1053CNodeState* PeerManagerImpl::State(
NodeId pnode)
1055 return const_cast<CNodeState*
>(std::as_const(*this).State(pnode));
1063static bool IsAddrCompatible(
const Peer& peer,
const CAddress& addr)
1068void PeerManagerImpl::AddAddressKnown(Peer& peer,
const CAddress& addr)
1070 assert(peer.m_addr_known);
1071 peer.m_addr_known->insert(addr.
GetKey());
1074void PeerManagerImpl::PushAddress(Peer& peer,
const CAddress& addr)
1079 assert(peer.m_addr_known);
1080 if (addr.
IsValid() && !peer.m_addr_known->contains(addr.
GetKey()) && IsAddrCompatible(peer, addr)) {
1082 peer.m_addrs_to_send[m_rng.randrange(peer.m_addrs_to_send.size())] = addr;
1084 peer.m_addrs_to_send.push_back(addr);
1089static void AddKnownTx(Peer& peer,
const uint256& hash)
1091 auto tx_relay = peer.GetTxRelay();
1092 if (!tx_relay)
return;
1094 LOCK(tx_relay->m_tx_inventory_mutex);
1095 tx_relay->m_tx_inventory_known_filter.insert(hash);
1099static bool CanServeBlocks(
const Peer& peer)
1106static bool IsLimitedPeer(
const Peer& peer)
1113static bool CanServeWitnesses(
const Peer& peer)
1118std::chrono::microseconds PeerManagerImpl::NextInvToInbounds(std::chrono::microseconds now,
1119 std::chrono::seconds average_interval)
1121 if (m_next_inv_to_inbounds.load() < now) {
1125 m_next_inv_to_inbounds = now + m_rng.rand_exp_duration(average_interval);
1127 return m_next_inv_to_inbounds;
1130bool PeerManagerImpl::IsBlockRequested(
const uint256& hash)
1132 return mapBlocksInFlight.count(hash);
1135bool PeerManagerImpl::IsBlockRequestedFromOutbound(
const uint256& hash)
1137 for (
auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) {
1138 auto [nodeid, block_it] = range.first->second;
1139 PeerRef peer{GetPeerRef(nodeid)};
1140 if (peer && !peer->m_is_inbound)
return true;
1146void PeerManagerImpl::RemoveBlockRequest(
const uint256& hash, std::optional<NodeId> from_peer)
1148 auto range = mapBlocksInFlight.equal_range(hash);
1149 if (range.first == range.second) {
1157 while (range.first != range.second) {
1158 auto [node_id, list_it] = range.first->second;
1160 if (from_peer && *from_peer != node_id) {
1165 CNodeState& state = *
Assert(State(node_id));
1167 if (state.vBlocksInFlight.begin() == list_it) {
1169 state.m_downloading_since = std::max(state.m_downloading_since, GetTime<std::chrono::microseconds>());
1171 state.vBlocksInFlight.erase(list_it);
1173 if (state.vBlocksInFlight.empty()) {
1175 m_peers_downloading_from--;
1177 state.m_stalling_since = 0us;
1179 range.first = mapBlocksInFlight.erase(range.first);
1183bool PeerManagerImpl::BlockRequested(
NodeId nodeid,
const CBlockIndex& block, std::list<QueuedBlock>::iterator** pit)
1187 CNodeState *state = State(nodeid);
1188 assert(state !=
nullptr);
1193 for (
auto range = mapBlocksInFlight.equal_range(hash); range.first != range.second; range.first++) {
1194 if (range.first->second.first == nodeid) {
1196 *pit = &range.first->second.second;
1203 RemoveBlockRequest(hash, nodeid);
1205 std::list<QueuedBlock>::iterator it = state->vBlocksInFlight.insert(state->vBlocksInFlight.end(),
1206 {&block, std::unique_ptr<PartiallyDownloadedBlock>(pit ? new PartiallyDownloadedBlock(&m_mempool) : nullptr)});
1207 if (state->vBlocksInFlight.size() == 1) {
1209 state->m_downloading_since = GetTime<std::chrono::microseconds>();
1210 m_peers_downloading_from++;
1212 auto itInFlight = mapBlocksInFlight.insert(std::make_pair(hash, std::make_pair(nodeid, it)));
1214 *pit = &itInFlight->second.second;
1219void PeerManagerImpl::MaybeSetPeerAsAnnouncingHeaderAndIDs(
NodeId nodeid)
1226 if (m_opts.ignore_incoming_txs)
return;
1228 CNodeState* nodestate = State(nodeid);
1229 PeerRef peer{GetPeerRef(nodeid)};
1230 if (!nodestate || !nodestate->m_provides_cmpctblocks) {
1235 int num_outbound_hb_peers = 0;
1236 for (std::list<NodeId>::iterator it = lNodesAnnouncingHeaderAndIDs.begin(); it != lNodesAnnouncingHeaderAndIDs.end(); it++) {
1237 if (*it == nodeid) {
1238 lNodesAnnouncingHeaderAndIDs.erase(it);
1239 lNodesAnnouncingHeaderAndIDs.push_back(nodeid);
1242 PeerRef peer_ref{GetPeerRef(*it)};
1243 if (peer_ref && !peer_ref->m_is_inbound) ++num_outbound_hb_peers;
1245 if (peer && peer->m_is_inbound) {
1248 if (lNodesAnnouncingHeaderAndIDs.size() >= 3 && num_outbound_hb_peers == 1) {
1249 PeerRef remove_peer{GetPeerRef(lNodesAnnouncingHeaderAndIDs.front())};
1250 if (remove_peer && !remove_peer->m_is_inbound) {
1253 std::swap(lNodesAnnouncingHeaderAndIDs.front(), *std::next(lNodesAnnouncingHeaderAndIDs.begin()));
1259 if (lNodesAnnouncingHeaderAndIDs.size() >= 3) {
1262 m_connman.
ForNode(lNodesAnnouncingHeaderAndIDs.front(), [
this](
CNode* pnodeStop){
1263 MakeAndPushMessage(*pnodeStop, NetMsgType::SENDCMPCT, false, CMPCTBLOCKS_VERSION);
1265 pnodeStop->m_bip152_highbandwidth_to = false;
1268 lNodesAnnouncingHeaderAndIDs.pop_front();
1273 lNodesAnnouncingHeaderAndIDs.push_back(pfrom->
GetId());
1278bool PeerManagerImpl::TipMayBeStale()
1282 if (m_last_tip_update.load() == 0
s) {
1283 m_last_tip_update = GetTime<std::chrono::seconds>();
1285 return m_last_tip_update.load() < GetTime<std::chrono::seconds>() - std::chrono::seconds{consensusParams.
nPowTargetSpacing * 3} && mapBlocksInFlight.empty();
1288int64_t PeerManagerImpl::ApproximateBestBlockDepth()
const
1293bool PeerManagerImpl::CanDirectFetch()
1300 if (state->pindexBestKnownBlock && pindex == state->pindexBestKnownBlock->GetAncestor(pindex->nHeight))
1302 if (state->pindexBestHeaderSent && pindex == state->pindexBestHeaderSent->GetAncestor(pindex->nHeight))
1307void PeerManagerImpl::ProcessBlockAvailability(
NodeId nodeid) {
1308 CNodeState *state = State(nodeid);
1309 assert(state !=
nullptr);
1311 if (!state->hashLastUnknownBlock.IsNull()) {
1314 if (state->pindexBestKnownBlock ==
nullptr || pindex->
nChainWork >= state->pindexBestKnownBlock->nChainWork) {
1315 state->pindexBestKnownBlock = pindex;
1317 state->hashLastUnknownBlock.SetNull();
1322void PeerManagerImpl::UpdateBlockAvailability(
NodeId nodeid,
const uint256 &hash) {
1323 CNodeState *state = State(nodeid);
1324 assert(state !=
nullptr);
1326 ProcessBlockAvailability(nodeid);
1331 if (state->pindexBestKnownBlock ==
nullptr || pindex->
nChainWork >= state->pindexBestKnownBlock->nChainWork) {
1332 state->pindexBestKnownBlock = pindex;
1336 state->hashLastUnknownBlock = hash;
1341void PeerManagerImpl::FindNextBlocksToDownload(
const Peer& peer,
unsigned int count, std::vector<const CBlockIndex*>& vBlocks,
NodeId& nodeStaller)
1346 vBlocks.reserve(vBlocks.size() +
count);
1347 CNodeState *state = State(peer.m_id);
1348 assert(state !=
nullptr);
1351 ProcessBlockAvailability(peer.m_id);
1353 if (state->pindexBestKnownBlock ==
nullptr || state->pindexBestKnownBlock->nChainWork < m_chainman.
ActiveChain().
Tip()->
nChainWork || state->pindexBestKnownBlock->nChainWork < m_chainman.
MinimumChainWork()) {
1361 const CBlockIndex* snap_base{m_chainman.GetSnapshotBaseBlock()};
1362 if (snap_base && state->pindexBestKnownBlock->GetAncestor(snap_base->nHeight) != snap_base) {
1363 LogDebug(
BCLog::NET,
"Not downloading blocks from peer=%d, which doesn't have the snapshot block in its best chain.\n", peer.m_id);
1370 if (state->pindexLastCommonBlock ==
nullptr ||
1371 (snap_base && state->pindexLastCommonBlock->nHeight < snap_base->nHeight)) {
1372 state->pindexLastCommonBlock = m_chainman.
ActiveChain()[std::min(state->pindexBestKnownBlock->nHeight, m_chainman.
ActiveChain().
Height())];
1377 state->pindexLastCommonBlock =
LastCommonAncestor(state->pindexLastCommonBlock, state->pindexBestKnownBlock);
1378 if (state->pindexLastCommonBlock == state->pindexBestKnownBlock)
1381 const CBlockIndex *pindexWalk = state->pindexLastCommonBlock;
1387 FindNextBlocks(vBlocks, peer, state, pindexWalk,
count, nWindowEnd, &m_chainman.
ActiveChain(), &nodeStaller);
1390void PeerManagerImpl::TryDownloadingHistoricalBlocks(
const Peer& peer,
unsigned int count, std::vector<const CBlockIndex*>& vBlocks,
const CBlockIndex *from_tip,
const CBlockIndex* target_block)
1395 if (vBlocks.size() >=
count) {
1399 vBlocks.reserve(
count);
1400 CNodeState *state =
Assert(State(peer.m_id));
1402 if (state->pindexBestKnownBlock ==
nullptr || state->pindexBestKnownBlock->GetAncestor(target_block->
nHeight) != target_block) {
1419void PeerManagerImpl::FindNextBlocks(std::vector<const CBlockIndex*>& vBlocks,
const Peer& peer, CNodeState *state,
const CBlockIndex *pindexWalk,
unsigned int count,
int nWindowEnd,
const CChain* activeChain,
NodeId* nodeStaller)
1421 std::vector<const CBlockIndex*> vToFetch;
1422 int nMaxHeight = std::min<int>(state->pindexBestKnownBlock->nHeight, nWindowEnd + 1);
1423 bool is_limited_peer = IsLimitedPeer(peer);
1425 while (pindexWalk->
nHeight < nMaxHeight) {
1429 int nToFetch = std::min(nMaxHeight - pindexWalk->
nHeight, std::max<int>(
count - vBlocks.size(), 128));
1430 vToFetch.resize(nToFetch);
1431 pindexWalk = state->pindexBestKnownBlock->
GetAncestor(pindexWalk->
nHeight + nToFetch);
1432 vToFetch[nToFetch - 1] = pindexWalk;
1433 for (
unsigned int i = nToFetch - 1; i > 0; i--) {
1434 vToFetch[i - 1] = vToFetch[i]->
pprev;
1454 state->pindexLastCommonBlock = pindex;
1461 if (waitingfor == -1) {
1463 waitingfor = mapBlocksInFlight.lower_bound(pindex->
GetBlockHash())->second.first;
1469 if (pindex->
nHeight > nWindowEnd) {
1471 if (vBlocks.size() == 0 && waitingfor != peer.m_id) {
1473 if (nodeStaller) *nodeStaller = waitingfor;
1483 vBlocks.push_back(pindex);
1484 if (vBlocks.size() ==
count) {
1493void PeerManagerImpl::PushNodeVersion(
CNode& pnode,
const Peer& peer)
1495 uint64_t my_services{peer.m_our_services};
1496 const int64_t nTime{
count_seconds(GetTime<std::chrono::seconds>())};
1498 const int nNodeStartingHeight{m_best_height};
1505 const bool tx_relay{!RejectIncomingTxs(pnode)};
1512 LogDebug(
BCLog::NET,
"send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d\n",
PROTOCOL_VERSION, nNodeStartingHeight, addr_you.
ToStringAddrPort(), tx_relay, nodeid);
1518void PeerManagerImpl::UpdateLastBlockAnnounceTime(
NodeId node, int64_t time_in_seconds)
1521 CNodeState *state = State(
node);
1522 if (state) state->m_last_block_announcement = time_in_seconds;
1530 m_node_states.try_emplace(m_node_states.end(), nodeid);
1532 WITH_LOCK(m_tx_download_mutex, m_txdownloadman.CheckIsEmpty(nodeid));
1538 PeerRef peer = std::make_shared<Peer>(nodeid, our_services,
node.IsInboundConn());
1541 m_peer_map.emplace_hint(m_peer_map.end(), nodeid, peer);
1545void PeerManagerImpl::ReattemptInitialBroadcast(
CScheduler& scheduler)
1549 for (
const auto& txid : unbroadcast_txids) {
1552 if (tx !=
nullptr) {
1553 RelayTransaction(txid, tx->GetWitnessHash());
1562 scheduler.
scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
1565void PeerManagerImpl::FinalizeNode(
const CNode&
node)
1576 PeerRef peer = RemovePeer(nodeid);
1578 m_wtxid_relay_peers -= peer->m_wtxid_relay;
1579 assert(m_wtxid_relay_peers >= 0);
1581 CNodeState *state = State(nodeid);
1582 assert(state !=
nullptr);
1584 if (state->fSyncStarted)
1587 for (
const QueuedBlock& entry : state->vBlocksInFlight) {
1588 auto range = mapBlocksInFlight.equal_range(entry.pindex->GetBlockHash());
1589 while (range.first != range.second) {
1590 auto [node_id, list_it] = range.first->second;
1591 if (node_id != nodeid) {
1594 range.first = mapBlocksInFlight.erase(range.first);
1599 LOCK(m_tx_download_mutex);
1600 m_txdownloadman.DisconnectedPeer(nodeid);
1602 if (m_txreconciliation) m_txreconciliation->ForgetPeer(nodeid);
1603 m_num_preferred_download_peers -= state->fPreferredDownload;
1604 m_peers_downloading_from -= (!state->vBlocksInFlight.empty());
1605 assert(m_peers_downloading_from >= 0);
1606 m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect;
1607 assert(m_outbound_peers_with_protect_from_disconnect >= 0);
1609 m_node_states.erase(nodeid);
1611 if (m_node_states.empty()) {
1613 assert(mapBlocksInFlight.empty());
1614 assert(m_num_preferred_download_peers == 0);
1615 assert(m_peers_downloading_from == 0);
1616 assert(m_outbound_peers_with_protect_from_disconnect == 0);
1617 assert(m_wtxid_relay_peers == 0);
1618 WITH_LOCK(m_tx_download_mutex, m_txdownloadman.CheckIsEmpty());
1621 if (
node.fSuccessfullyConnected &&
1622 !
node.IsBlockOnlyConn() && !
node.IsInboundConn()) {
1629 LOCK(m_headers_presync_mutex);
1630 m_headers_presync_stats.erase(nodeid);
1635bool PeerManagerImpl::HasAllDesirableServiceFlags(
ServiceFlags services)
const
1638 return !(GetDesirableServiceFlags(services) & (~services));
1652PeerRef PeerManagerImpl::GetPeerRef(
NodeId id)
const
1655 auto it = m_peer_map.find(
id);
1656 return it != m_peer_map.end() ? it->second :
nullptr;
1659PeerRef PeerManagerImpl::RemovePeer(
NodeId id)
1663 auto it = m_peer_map.find(
id);
1664 if (it != m_peer_map.end()) {
1665 ret = std::move(it->second);
1666 m_peer_map.erase(it);
1675 const CNodeState* state = State(nodeid);
1676 if (state ==
nullptr)
1678 stats.
nSyncHeight = state->pindexBestKnownBlock ? state->pindexBestKnownBlock->nHeight : -1;
1679 stats.
nCommonHeight = state->pindexLastCommonBlock ? state->pindexLastCommonBlock->nHeight : -1;
1680 for (
const QueuedBlock& queue : state->vBlocksInFlight) {
1686 PeerRef peer = GetPeerRef(nodeid);
1687 if (peer ==
nullptr)
return false;
1696 auto ping_wait{0us};
1697 if ((0 != peer->m_ping_nonce_sent) && (0 != peer->m_ping_start.load().count())) {
1698 ping_wait = GetTime<std::chrono::microseconds>() - peer->m_ping_start.load();
1701 if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
1714 LOCK(peer->m_headers_sync_mutex);
1715 if (peer->m_headers_sync) {
1724std::vector<TxOrphanage::OrphanTxBase> PeerManagerImpl::GetOrphanTransactions()
1726 LOCK(m_tx_download_mutex);
1727 return m_txdownloadman.GetOrphanTransactions();
1734 .ignores_incoming_txs = m_opts.ignore_incoming_txs,
1738void PeerManagerImpl::AddToCompactExtraTransactions(
const CTransactionRef& tx)
1740 if (m_opts.max_extra_txs <= 0)
1742 if (!vExtraTxnForCompact.size())
1743 vExtraTxnForCompact.resize(m_opts.max_extra_txs);
1744 vExtraTxnForCompact[vExtraTxnForCompactIt] = tx;
1745 vExtraTxnForCompactIt = (vExtraTxnForCompactIt + 1) % m_opts.max_extra_txs;
1748void PeerManagerImpl::Misbehaving(Peer& peer,
const std::string& message)
1750 LOCK(peer.m_misbehavior_mutex);
1752 const std::string message_prefixed = message.empty() ?
"" : (
": " + message);
1753 peer.m_should_discourage =
true;
1758 bool via_compact_block,
const std::string& message)
1760 PeerRef peer{GetPeerRef(nodeid)};
1771 if (!via_compact_block) {
1772 if (peer) Misbehaving(*peer, message);
1780 if (peer && !via_compact_block && !peer->m_is_inbound) {
1781 if (peer) Misbehaving(*peer, message);
1789 if (peer) Misbehaving(*peer, message);
1793 if (peer) Misbehaving(*peer, message);
1798 if (message !=
"") {
1805 PeerRef peer{GetPeerRef(nodeid)};
1811 if (peer) Misbehaving(*peer,
"");
1829bool PeerManagerImpl::BlockRequestAllowed(
const CBlockIndex* pindex)
1838std::optional<std::string> PeerManagerImpl::FetchBlock(
NodeId peer_id,
const CBlockIndex& block_index)
1843 PeerRef peer = GetPeerRef(peer_id);
1844 if (peer ==
nullptr)
return "Peer does not exist";
1847 if (!CanServeWitnesses(*peer))
return "Pre-SegWit peer";
1852 RemoveBlockRequest(block_index.
GetBlockHash(), std::nullopt);
1855 if (!BlockRequested(peer_id, block_index))
return "Already requested from this peer";
1867 if (!success)
return "Peer not fully connected";
1871 return std::nullopt;
1878 return std::make_unique<PeerManagerImpl>(connman, addrman, banman, chainman, pool, warnings, opts);
1884 : m_rng{opts.deterministic_rng},
1886 m_chainparams(chainman.GetParams()),
1890 m_chainman(chainman),
1892 m_txdownloadman(
node::TxDownloadOptions{pool, m_rng, opts.max_orphan_txs, opts.deterministic_rng}),
1893 m_warnings{warnings},
1898 if (opts.reconcile_txs) {
1903void PeerManagerImpl::StartScheduledTasks(
CScheduler& scheduler)
1914 scheduler.
scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta);
1917void PeerManagerImpl::ActiveTipChange(
const CBlockIndex& new_tip,
bool is_ibd)
1925 LOCK(m_tx_download_mutex);
1929 m_txdownloadman.ActiveTipChange();
1939void PeerManagerImpl::BlockConnected(
1941 const std::shared_ptr<const CBlock>& pblock,
1946 m_last_tip_update = GetTime<std::chrono::seconds>();
1949 auto stalling_timeout = m_block_stalling_timeout.load();
1953 if (m_block_stalling_timeout.compare_exchange_strong(stalling_timeout, new_timeout)) {
1963 LOCK(m_tx_download_mutex);
1964 m_txdownloadman.BlockConnected(pblock);
1967void PeerManagerImpl::BlockDisconnected(
const std::shared_ptr<const CBlock> &block,
const CBlockIndex* pindex)
1969 LOCK(m_tx_download_mutex);
1970 m_txdownloadman.BlockDisconnected();
1977void PeerManagerImpl::NewPoWValidBlock(
const CBlockIndex *pindex,
const std::shared_ptr<const CBlock>& pblock)
1979 auto pcmpctblock = std::make_shared<const CBlockHeaderAndShortTxIDs>(*pblock,
FastRandomContext().rand64());
1983 if (pindex->
nHeight <= m_highest_fast_announce)
1985 m_highest_fast_announce = pindex->
nHeight;
1989 uint256 hashBlock(pblock->GetHash());
1990 const std::shared_future<CSerializedNetMsg> lazy_ser{
1994 auto most_recent_block_txs = std::make_unique<std::map<uint256, CTransactionRef>>();
1995 for (
const auto& tx : pblock->vtx) {
1996 most_recent_block_txs->emplace(tx->GetHash(), tx);
1997 most_recent_block_txs->emplace(tx->GetWitnessHash(), tx);
2000 LOCK(m_most_recent_block_mutex);
2001 m_most_recent_block_hash = hashBlock;
2002 m_most_recent_block = pblock;
2003 m_most_recent_compact_block = pcmpctblock;
2004 m_most_recent_block_txs = std::move(most_recent_block_txs);
2012 ProcessBlockAvailability(pnode->
GetId());
2013 CNodeState &state = *State(pnode->
GetId());
2016 if (state.m_requested_hb_cmpctblocks && !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->
pprev)) {
2018 LogDebug(
BCLog::NET,
"%s sending header-and-ids %s to peer=%d\n",
"PeerManager::NewPoWValidBlock",
2019 hashBlock.ToString(), pnode->
GetId());
2022 PushMessage(*pnode, ser_cmpctblock.Copy());
2023 state.pindexBestHeaderSent = pindex;
2032void PeerManagerImpl::UpdatedBlockTip(
const CBlockIndex *pindexNew,
const CBlockIndex *pindexFork,
bool fInitialDownload)
2034 SetBestBlock(pindexNew->
nHeight, std::chrono::seconds{pindexNew->GetBlockTime()});
2037 if (fInitialDownload)
return;
2040 std::vector<uint256> vHashes;
2042 while (pindexToAnnounce != pindexFork) {
2044 pindexToAnnounce = pindexToAnnounce->
pprev;
2054 for (
auto& it : m_peer_map) {
2055 Peer& peer = *it.second;
2056 LOCK(peer.m_block_inv_mutex);
2057 for (
const uint256& hash : vHashes | std::views::reverse) {
2058 peer.m_blocks_for_headers_relay.push_back(hash);
2075 std::map<uint256, std::pair<NodeId, bool>>::iterator it = mapBlockSource.find(hash);
2080 it != mapBlockSource.end() &&
2081 State(it->second.first)) {
2082 MaybePunishNodeForBlock( it->second.first, state, !it->second.second);
2092 mapBlocksInFlight.count(hash) == mapBlocksInFlight.size()) {
2093 if (it != mapBlockSource.end()) {
2094 MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first);
2097 if (it != mapBlockSource.end())
2098 mapBlockSource.erase(it);
2106bool PeerManagerImpl::AlreadyHaveBlock(
const uint256& block_hash)
2111void PeerManagerImpl::SendPings()
2114 for(
auto& it : m_peer_map) it.second->m_ping_queued =
true;
2117void PeerManagerImpl::RelayTransaction(
const uint256& txid,
const uint256& wtxid)
2120 for(
auto& it : m_peer_map) {
2121 Peer& peer = *it.second;
2122 auto tx_relay = peer.GetTxRelay();
2123 if (!tx_relay)
continue;
2125 LOCK(tx_relay->m_tx_inventory_mutex);
2131 if (tx_relay->m_next_inv_send_time == 0
s)
continue;
2133 const uint256& hash{peer.m_wtxid_relay ? wtxid : txid};
2134 if (!tx_relay->m_tx_inventory_known_filter.contains(hash)) {
2135 tx_relay->m_tx_inventory_to_send.insert(hash);
2140void PeerManagerImpl::RelayAddress(
NodeId originator,
2156 const auto current_time{GetTime<std::chrono::seconds>()};
2164 unsigned int nRelayNodes = (fReachable || (hasher.Finalize() & 1)) ? 2 : 1;
2166 std::array<std::pair<uint64_t, Peer*>, 2> best{{{0,
nullptr}, {0,
nullptr}}};
2167 assert(nRelayNodes <= best.size());
2171 for (
auto& [
id, peer] : m_peer_map) {
2172 if (peer->m_addr_relay_enabled &&
id != originator && IsAddrCompatible(*peer, addr)) {
2174 for (
unsigned int i = 0; i < nRelayNodes; i++) {
2175 if (hashKey > best[i].first) {
2176 std::copy(best.begin() + i, best.begin() + nRelayNodes - 1, best.begin() + i + 1);
2177 best[i] = std::make_pair(hashKey, peer.get());
2184 for (
unsigned int i = 0; i < nRelayNodes && best[i].first != 0; i++) {
2185 PushAddress(*best[i].second, addr);
2189void PeerManagerImpl::ProcessGetBlockData(
CNode& pfrom, Peer& peer,
const CInv& inv)
2191 std::shared_ptr<const CBlock> a_recent_block;
2192 std::shared_ptr<const CBlockHeaderAndShortTxIDs> a_recent_compact_block;
2194 LOCK(m_most_recent_block_mutex);
2195 a_recent_block = m_most_recent_block;
2196 a_recent_compact_block = m_most_recent_compact_block;
2199 bool need_activate_chain =
false;
2211 need_activate_chain =
true;
2215 if (need_activate_chain) {
2217 if (!m_chainman.
ActiveChainstate().ActivateBestChain(state, a_recent_block)) {
2224 bool can_direct_fetch{
false};
2232 if (!BlockRequestAllowed(pindex)) {
2233 LogDebug(
BCLog::NET,
"%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom.
GetId());
2260 can_direct_fetch = CanDirectFetch();
2264 std::shared_ptr<const CBlock> pblock;
2265 if (a_recent_block && a_recent_block->GetHash() == pindex->
GetBlockHash()) {
2266 pblock = a_recent_block;
2270 std::vector<uint8_t> block_data;
2284 std::shared_ptr<CBlock> pblockRead = std::make_shared<CBlock>();
2294 pblock = pblockRead;
2302 bool sendMerkleBlock =
false;
2304 if (
auto tx_relay = peer.GetTxRelay(); tx_relay !=
nullptr) {
2305 LOCK(tx_relay->m_bloom_filter_mutex);
2306 if (tx_relay->m_bloom_filter) {
2307 sendMerkleBlock =
true;
2308 merkleBlock =
CMerkleBlock(*pblock, *tx_relay->m_bloom_filter);
2311 if (sendMerkleBlock) {
2319 typedef std::pair<unsigned int, uint256> PairType;
2331 if (a_recent_compact_block && a_recent_compact_block->header.GetHash() == pindex->
GetBlockHash()) {
2344 LOCK(peer.m_block_inv_mutex);
2346 if (inv.
hash == peer.m_continuation_block) {
2350 std::vector<CInv> vInv;
2351 vInv.emplace_back(
MSG_BLOCK, tip->GetBlockHash());
2353 peer.m_continuation_block.SetNull();
2361 auto txinfo = m_mempool.
info_for_relay(gtxid, tx_relay.m_last_inv_sequence);
2363 return std::move(txinfo.tx);
2368 LOCK(m_most_recent_block_mutex);
2369 if (m_most_recent_block_txs !=
nullptr) {
2370 auto it = m_most_recent_block_txs->find(gtxid.
GetHash());
2371 if (it != m_most_recent_block_txs->end())
return it->second;
2378void PeerManagerImpl::ProcessGetData(
CNode& pfrom, Peer& peer,
const std::atomic<bool>& interruptMsgProc)
2382 auto tx_relay = peer.GetTxRelay();
2384 std::deque<CInv>::iterator it = peer.m_getdata_requests.begin();
2385 std::vector<CInv> vNotFound;
2390 while (it != peer.m_getdata_requests.end() && it->IsGenTxMsg()) {
2391 if (interruptMsgProc)
return;
2396 const CInv &inv = *it++;
2398 if (tx_relay ==
nullptr) {
2408 MakeAndPushMessage(pfrom,
NetMsgType::TX, maybe_with_witness(*tx));
2411 vNotFound.push_back(inv);
2417 if (it != peer.m_getdata_requests.end() && !pfrom.
fPauseSend) {
2418 const CInv &inv = *it++;
2420 ProcessGetBlockData(pfrom, peer, inv);
2426 peer.m_getdata_requests.erase(peer.m_getdata_requests.begin(), it);
2428 if (!vNotFound.empty()) {
2447uint32_t PeerManagerImpl::GetFetchFlags(
const Peer& peer)
const
2449 uint32_t nFetchFlags = 0;
2450 if (CanServeWitnesses(peer)) {
2459 for (
size_t i = 0; i < req.
indexes.size(); i++) {
2461 Misbehaving(peer,
"getblocktxn with out-of-bounds tx indices");
2470bool PeerManagerImpl::CheckHeadersPoW(
const std::vector<CBlockHeader>& headers,
const Consensus::Params& consensusParams, Peer& peer)
2474 Misbehaving(peer,
"header with invalid proof of work");
2479 if (!CheckHeadersAreContinuous(headers)) {
2480 Misbehaving(peer,
"non-continuous headers sequence");
2505void PeerManagerImpl::HandleUnconnectingHeaders(
CNode& pfrom, Peer& peer,
2506 const std::vector<CBlockHeader>& headers)
2510 if (MaybeSendGetHeaders(pfrom,
GetLocator(best_header), peer)) {
2511 LogDebug(
BCLog::NET,
"received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d)\n",
2513 headers[0].hashPrevBlock.ToString(),
2514 best_header->nHeight,
2524bool PeerManagerImpl::CheckHeadersAreContinuous(
const std::vector<CBlockHeader>& headers)
const
2528 if (!hashLastBlock.
IsNull() && header.hashPrevBlock != hashLastBlock) {
2531 hashLastBlock = header.GetHash();
2536bool PeerManagerImpl::IsContinuationOfLowWorkHeadersSync(Peer& peer,
CNode& pfrom, std::vector<CBlockHeader>& headers)
2538 if (peer.m_headers_sync) {
2539 auto result = peer.m_headers_sync->ProcessNextHeaders(headers, headers.size() == m_opts.max_headers_result);
2541 if (result.success) peer.m_last_getheaders_timestamp = {};
2542 if (result.request_more) {
2543 auto locator = peer.m_headers_sync->NextHeadersRequestLocator();
2545 Assume(!locator.vHave.empty());
2548 if (!locator.vHave.empty()) {
2551 bool sent_getheaders = MaybeSendGetHeaders(pfrom, locator, peer);
2554 locator.vHave.front().ToString(), pfrom.
GetId());
2559 peer.m_headers_sync.reset(
nullptr);
2564 LOCK(m_headers_presync_mutex);
2565 m_headers_presync_stats.erase(pfrom.
GetId());
2568 HeadersPresyncStats stats;
2569 stats.first = peer.m_headers_sync->GetPresyncWork();
2571 stats.second = {peer.m_headers_sync->GetPresyncHeight(),
2572 peer.m_headers_sync->GetPresyncTime()};
2576 LOCK(m_headers_presync_mutex);
2577 m_headers_presync_stats[pfrom.
GetId()] = stats;
2578 auto best_it = m_headers_presync_stats.find(m_headers_presync_bestpeer);
2579 bool best_updated =
false;
2580 if (best_it == m_headers_presync_stats.end()) {
2584 const HeadersPresyncStats* stat_best{
nullptr};
2585 for (
const auto& [peer, stat] : m_headers_presync_stats) {
2586 if (!stat_best || stat > *stat_best) {
2591 m_headers_presync_bestpeer = peer_best;
2592 best_updated = (peer_best == pfrom.
GetId());
2593 }
else if (best_it->first == pfrom.
GetId() || stats > best_it->second) {
2595 m_headers_presync_bestpeer = pfrom.
GetId();
2596 best_updated =
true;
2598 if (best_updated && stats.second.has_value()) {
2600 m_headers_presync_should_signal =
true;
2604 if (result.success) {
2607 headers.swap(result.pow_validated_headers);
2610 return result.success;
2618bool PeerManagerImpl::TryLowWorkHeadersSync(Peer& peer,
CNode& pfrom,
const CBlockIndex* chain_start_header, std::vector<CBlockHeader>& headers)
2625 arith_uint256 minimum_chain_work = GetAntiDoSWorkThreshold();
2629 if (total_work < minimum_chain_work) {
2633 if (headers.size() == m_opts.max_headers_result) {
2643 LOCK(peer.m_headers_sync_mutex);
2645 chain_start_header, minimum_chain_work));
2650 (void)IsContinuationOfLowWorkHeadersSync(peer, pfrom, headers);
2664bool PeerManagerImpl::IsAncestorOfBestHeaderOrTip(
const CBlockIndex* header)
2666 if (header ==
nullptr) {
2668 }
else if (m_chainman.m_best_header !=
nullptr && header == m_chainman.m_best_header->GetAncestor(header->
nHeight)) {
2676bool PeerManagerImpl::MaybeSendGetHeaders(
CNode& pfrom,
const CBlockLocator& locator, Peer& peer)
2684 peer.m_last_getheaders_timestamp = current_time;
2695void PeerManagerImpl::HeadersDirectFetchBlocks(
CNode& pfrom,
const Peer& peer,
const CBlockIndex& last_header)
2698 CNodeState *nodestate = State(pfrom.
GetId());
2701 std::vector<const CBlockIndex*> vToFetch;
2709 vToFetch.push_back(pindexWalk);
2711 pindexWalk = pindexWalk->
pprev;
2722 std::vector<CInv> vGetData;
2724 for (
const CBlockIndex* pindex : vToFetch | std::views::reverse) {
2729 uint32_t nFetchFlags = GetFetchFlags(peer);
2731 BlockRequested(pfrom.
GetId(), *pindex);
2735 if (vGetData.size() > 1) {
2740 if (vGetData.size() > 0) {
2741 if (!m_opts.ignore_incoming_txs &&
2742 nodestate->m_provides_cmpctblocks &&
2743 vGetData.size() == 1 &&
2744 mapBlocksInFlight.size() == 1 &&
2760void PeerManagerImpl::UpdatePeerStateForReceivedHeaders(
CNode& pfrom, Peer& peer,
2761 const CBlockIndex& last_header,
bool received_new_header,
bool may_have_more_headers)
2764 CNodeState *nodestate = State(pfrom.
GetId());
2773 nodestate->m_last_block_announcement =
GetTime();
2781 if (nodestate->pindexBestKnownBlock && nodestate->pindexBestKnownBlock->nChainWork < m_chainman.
MinimumChainWork()) {
2803 if (m_outbound_peers_with_protect_from_disconnect < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT && nodestate->pindexBestKnownBlock->nChainWork >= m_chainman.
ActiveChain().
Tip()->
nChainWork && !nodestate->m_chain_sync.m_protect) {
2805 nodestate->m_chain_sync.m_protect =
true;
2806 ++m_outbound_peers_with_protect_from_disconnect;
2811void PeerManagerImpl::ProcessHeadersMessage(
CNode& pfrom, Peer& peer,
2812 std::vector<CBlockHeader>&& headers,
2813 bool via_compact_block)
2815 size_t nCount = headers.size();
2822 LOCK(peer.m_headers_sync_mutex);
2823 if (peer.m_headers_sync) {
2824 peer.m_headers_sync.reset(
nullptr);
2825 LOCK(m_headers_presync_mutex);
2826 m_headers_presync_stats.erase(pfrom.
GetId());
2830 peer.m_last_getheaders_timestamp = {};
2838 if (!CheckHeadersPoW(headers, m_chainparams.
GetConsensus(), peer)) {
2853 bool already_validated_work =
false;
2856 bool have_headers_sync =
false;
2858 LOCK(peer.m_headers_sync_mutex);
2860 already_validated_work = IsContinuationOfLowWorkHeadersSync(peer, pfrom, headers);
2872 if (headers.empty()) {
2876 have_headers_sync = !!peer.m_headers_sync;
2881 bool headers_connect_blockindex{chain_start_header !=
nullptr};
2883 if (!headers_connect_blockindex) {
2887 HandleUnconnectingHeaders(pfrom, peer, headers);
2894 peer.m_last_getheaders_timestamp = {};
2904 if (IsAncestorOfBestHeaderOrTip(last_received_header)) {
2905 already_validated_work =
true;
2913 already_validated_work =
true;
2919 if (!already_validated_work && TryLowWorkHeadersSync(peer, pfrom,
2920 chain_start_header, headers)) {
2932 bool received_new_header{last_received_header ==
nullptr};
2938 MaybePunishNodeForBlock(pfrom.
GetId(), state, via_compact_block,
"invalid header received");
2945 if (nCount == m_opts.max_headers_result && !have_headers_sync) {
2947 if (MaybeSendGetHeaders(pfrom,
GetLocator(pindexLast), peer)) {
2949 pindexLast->
nHeight, pfrom.
GetId(), peer.m_starting_height);
2953 UpdatePeerStateForReceivedHeaders(pfrom, peer, *pindexLast, received_new_header, nCount == m_opts.max_headers_result);
2956 HeadersDirectFetchBlocks(pfrom, peer, *pindexLast);
2962 bool first_time_failure)
2968 PeerRef peer{GetPeerRef(nodeid)};
2971 ptx->GetHash().ToString(),
2972 ptx->GetWitnessHash().ToString(),
2976 const auto& [add_extra_compact_tx, unique_parents, package_to_validate] = m_txdownloadman.MempoolRejectedTx(ptx, state, nodeid, first_time_failure);
2979 AddToCompactExtraTransactions(ptx);
2981 for (
const uint256& parent_txid : unique_parents) {
2982 if (peer) AddKnownTx(*peer, parent_txid);
2985 MaybePunishNodeForTx(nodeid, state);
2987 return package_to_validate;
2990void PeerManagerImpl::ProcessValidTx(
NodeId nodeid,
const CTransactionRef& tx,
const std::list<CTransactionRef>& replaced_transactions)
2996 m_txdownloadman.MempoolAcceptedTx(tx);
3000 tx->GetHash().ToString(),
3001 tx->GetWitnessHash().ToString(),
3004 RelayTransaction(tx->GetHash(), tx->GetWitnessHash());
3007 AddToCompactExtraTransactions(removedTx);
3017 const auto&
package = package_to_validate.m_txns;
3018 const auto& senders = package_to_validate.
m_senders;
3021 m_txdownloadman.MempoolRejectedPackage(package);
3024 if (!
Assume(package.size() == 2))
return;
3028 auto package_iter = package.rbegin();
3029 auto senders_iter = senders.rbegin();
3030 while (package_iter != package.rend()) {
3031 const auto& tx = *package_iter;
3032 const NodeId nodeid = *senders_iter;
3033 const auto it_result{package_result.
m_tx_results.find(tx->GetWitnessHash())};
3037 const auto& tx_result = it_result->second;
3038 switch (tx_result.m_result_type) {
3041 ProcessValidTx(nodeid, tx, tx_result.m_replaced_transactions);
3051 ProcessInvalidTx(nodeid, tx, tx_result.m_state,
false);
3067bool PeerManagerImpl::ProcessOrphanTx(Peer& peer)
3074 while (
CTransactionRef porphanTx = m_txdownloadman.GetTxToReconsider(peer.m_id)) {
3077 const Txid& orphanHash = porphanTx->GetHash();
3078 const Wtxid& orphan_wtxid = porphanTx->GetWitnessHash();
3095 ProcessInvalidTx(peer.m_id, porphanTx, state,
false);
3104bool PeerManagerImpl::PrepareBlockFilterRequest(
CNode&
node, Peer& peer,
3106 const uint256& stop_hash, uint32_t max_height_diff,
3110 const bool supported_filter_type =
3113 if (!supported_filter_type) {
3115 static_cast<uint8_t
>(filter_type),
node.DisconnectMsg(
fLogIPs));
3116 node.fDisconnect =
true;
3125 if (!stop_index || !BlockRequestAllowed(stop_index)) {
3128 node.fDisconnect =
true;
3133 uint32_t stop_height = stop_index->
nHeight;
3134 if (start_height > stop_height) {
3136 "start height %d and stop height %d, %s\n",
3137 start_height, stop_height,
node.DisconnectMsg(
fLogIPs));
3138 node.fDisconnect =
true;
3141 if (stop_height - start_height >= max_height_diff) {
3143 stop_height - start_height + 1, max_height_diff,
node.DisconnectMsg(
fLogIPs));
3144 node.fDisconnect =
true;
3149 if (!filter_index) {
3159 uint8_t filter_type_ser;
3160 uint32_t start_height;
3163 vRecv >> filter_type_ser >> start_height >> stop_hash;
3169 if (!PrepareBlockFilterRequest(
node, peer, filter_type, start_height, stop_hash,
3174 std::vector<BlockFilter> filters;
3176 LogDebug(
BCLog::NET,
"Failed to find block filter in index: filter_type=%s, start_height=%d, stop_hash=%s\n",
3181 for (
const auto& filter : filters) {
3188 uint8_t filter_type_ser;
3189 uint32_t start_height;
3192 vRecv >> filter_type_ser >> start_height >> stop_hash;
3198 if (!PrepareBlockFilterRequest(
node, peer, filter_type, start_height, stop_hash,
3204 if (start_height > 0) {
3206 stop_index->
GetAncestor(
static_cast<int>(start_height - 1));
3208 LogDebug(
BCLog::NET,
"Failed to find block filter header in index: filter_type=%s, block_hash=%s\n",
3214 std::vector<uint256> filter_hashes;
3216 LogDebug(
BCLog::NET,
"Failed to find block filter hashes in index: filter_type=%s, start_height=%d, stop_hash=%s\n",
3230 uint8_t filter_type_ser;
3233 vRecv >> filter_type_ser >> stop_hash;
3239 if (!PrepareBlockFilterRequest(
node, peer, filter_type, 0, stop_hash,
3240 std::numeric_limits<uint32_t>::max(),
3241 stop_index, filter_index)) {
3249 for (
int i = headers.size() - 1; i >= 0; i--) {
3254 LogDebug(
BCLog::NET,
"Failed to find block filter header in index: filter_type=%s, block_hash=%s\n",
3266void PeerManagerImpl::ProcessBlock(
CNode&
node,
const std::shared_ptr<const CBlock>& block,
bool force_processing,
bool min_pow_checked)
3268 bool new_block{
false};
3269 m_chainman.
ProcessNewBlock(block, force_processing, min_pow_checked, &new_block);
3271 node.m_last_block_time = GetTime<std::chrono::seconds>();
3276 RemoveBlockRequest(block->GetHash(), std::nullopt);
3279 mapBlockSource.erase(block->GetHash());
3283void PeerManagerImpl::ProcessCompactBlockTxns(
CNode& pfrom, Peer& peer,
const BlockTransactions& block_transactions)
3285 std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
3286 bool fBlockRead{
false};
3290 auto range_flight = mapBlocksInFlight.equal_range(block_transactions.
blockhash);
3291 size_t already_in_flight = std::distance(range_flight.first, range_flight.second);
3292 bool requested_block_from_this_peer{
false};
3295 bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.
GetId());
3297 while (range_flight.first != range_flight.second) {
3298 auto [node_id, block_it] = range_flight.first->second;
3299 if (node_id == pfrom.
GetId() && block_it->partialBlock) {
3300 requested_block_from_this_peer =
true;
3303 range_flight.first++;
3306 if (!requested_block_from_this_peer) {
3315 Misbehaving(peer,
"invalid compact block/non-matching block transactions");
3318 if (first_in_flight) {
3320 std::vector<CInv> invs;
3325 LogDebug(
BCLog::NET,
"Peer %d sent us a compact block but it failed to reconstruct, waiting on first download to complete\n", pfrom.
GetId());
3354 mapBlockSource.emplace(block_transactions.
blockhash, std::make_pair(pfrom.
GetId(),
false));
3364 ProcessBlock(pfrom, pblock,
true,
true);
3369void PeerManagerImpl::ProcessMessage(
CNode& pfrom,
const std::string& msg_type,
DataStream& vRecv,
3370 const std::chrono::microseconds time_received,
3371 const std::atomic<bool>& interruptMsgProc)
3377 PeerRef peer = GetPeerRef(pfrom.
GetId());
3378 if (peer ==
nullptr)
return;
3388 uint64_t nNonce = 1;
3391 std::string cleanSubVer;
3392 int starting_height = -1;
3395 vRecv >> nVersion >> Using<CustomUintFormatter<8>>(nServices) >> nTime;
3410 LogDebug(
BCLog::NET,
"peer does not offer the expected services (%08x offered, %08x expected), %s\n",
3412 GetDesirableServiceFlags(nServices),
3425 if (!vRecv.
empty()) {
3433 if (!vRecv.
empty()) {
3434 std::string strSubVer;
3438 if (!vRecv.
empty()) {
3439 vRecv >> starting_height;
3459 PushNodeVersion(pfrom, *peer);
3472 if (greatest_common_version >= 70016) {
3481 peer->m_their_services = nServices;
3485 pfrom.cleanSubVer = cleanSubVer;
3487 peer->m_starting_height = starting_height;
3497 (fRelay || (peer->m_our_services &
NODE_BLOOM))) {
3498 auto*
const tx_relay = peer->SetTxRelay();
3500 LOCK(tx_relay->m_bloom_filter_mutex);
3501 tx_relay->m_relay_txs = fRelay;
3513 const auto* tx_relay = peer->GetTxRelay();
3514 if (tx_relay &&
WITH_LOCK(tx_relay->m_bloom_filter_mutex,
return tx_relay->m_relay_txs) &&
3516 const uint64_t recon_salt = m_txreconciliation->PreRegisterPeer(pfrom.
GetId());
3527 CNodeState* state = State(pfrom.
GetId());
3529 m_num_preferred_download_peers += state->fPreferredDownload;
3535 bool send_getaddr{
false};
3537 send_getaddr = SetupAddressRelay(pfrom, *peer);
3547 peer->m_getaddr_sent =
true;
3572 LogDebug(
BCLog::NET,
"receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n",
3577 peer->m_time_offset =
NodeSeconds{std::chrono::seconds{nTime}} - Now<NodeSeconds>();
3581 m_outbound_time_offsets.Add(peer->m_time_offset);
3582 m_outbound_time_offsets.WarnIfOutOfSync();
3586 if (greatest_common_version <= 70012) {
3587 constexpr auto finalAlert{
"60010000000000000000000000ffffff7f00000000ffffff7ffeffff7f01ffffff7f00000000ffffff7f00ffffff7f002f555247454e543a20416c657274206b657920636f6d70726f6d697365642c2075706772616465207265717569726564004630440220653febd6410f470f6bae11cad19c48413becb1ac2c17f908fd0fd53bdc3abd5202206d0e9c96fe88d4a0f01ed9dedae2b6f9e00da94cad0fecaae66ecf689bf71b50"_hex};
3588 MakeAndPushMessage(pfrom,
"alert", finalAlert);
3615 LogPrintf(
"New %s %s peer connected: version: %d, blocks=%d, peer=%d%s%s\n",
3618 pfrom.
nVersion.load(), peer->m_starting_height,
3620 (mapped_as ?
strprintf(
", mapped_as=%d", mapped_as) :
""));
3632 if (m_txreconciliation) {
3633 if (!peer->m_wtxid_relay || !m_txreconciliation->IsPeerRegistered(pfrom.
GetId())) {
3637 m_txreconciliation->ForgetPeer(pfrom.
GetId());
3641 if (
auto tx_relay = peer->GetTxRelay()) {
3650 tx_relay->m_tx_inventory_mutex,
3651 return tx_relay->m_tx_inventory_to_send.empty() &&
3652 tx_relay->m_next_inv_send_time == 0
s));
3657 const CNodeState* state = State(pfrom.
GetId());
3659 .m_preferred = state->fPreferredDownload,
3660 .m_relay_permissions = pfrom.HasPermission(NetPermissionFlags::Relay),
3661 .m_wtxid_relay = peer->m_wtxid_relay,
3670 peer->m_prefers_headers =
true;
3675 bool sendcmpct_hb{
false};
3676 uint64_t sendcmpct_version{0};
3677 vRecv >> sendcmpct_hb >> sendcmpct_version;
3683 CNodeState* nodestate = State(pfrom.
GetId());
3684 nodestate->m_provides_cmpctblocks =
true;
3685 nodestate->m_requested_hb_cmpctblocks = sendcmpct_hb;
3702 if (!peer->m_wtxid_relay) {
3703 peer->m_wtxid_relay =
true;
3704 m_wtxid_relay_peers++;
3723 peer->m_wants_addrv2 =
true;
3731 if (!m_txreconciliation) {
3732 LogDebug(
BCLog::NET,
"sendtxrcncl from peer=%d ignored, as our node does not have txreconciliation enabled\n", pfrom.
GetId());
3743 if (RejectIncomingTxs(pfrom)) {
3752 const auto* tx_relay = peer->GetTxRelay();
3753 if (!tx_relay || !
WITH_LOCK(tx_relay->m_bloom_filter_mutex,
return tx_relay->m_relay_txs)) {
3759 uint32_t peer_txreconcl_version;
3760 uint64_t remote_salt;
3761 vRecv >> peer_txreconcl_version >> remote_salt;
3764 peer_txreconcl_version, remote_salt);
3789 const auto ser_params{
3797 std::vector<CAddress> vAddr;
3799 vRecv >> ser_params(vAddr);
3801 if (!SetupAddressRelay(pfrom, *peer)) {
3808 Misbehaving(*peer,
strprintf(
"%s message size = %u", msg_type, vAddr.size()));
3813 std::vector<CAddress> vAddrOk;
3814 const auto current_a_time{Now<NodeSeconds>()};
3817 const auto current_time{GetTime<std::chrono::microseconds>()};
3820 const auto time_diff = std::max(current_time - peer->m_addr_token_timestamp, 0us);
3824 peer->m_addr_token_timestamp = current_time;
3827 uint64_t num_proc = 0;
3828 uint64_t num_rate_limit = 0;
3829 std::shuffle(vAddr.begin(), vAddr.end(), m_rng);
3832 if (interruptMsgProc)
3836 if (peer->m_addr_token_bucket < 1.0) {
3842 peer->m_addr_token_bucket -= 1.0;
3851 addr.
nTime = current_a_time - 5 * 24h;
3853 AddAddressKnown(*peer, addr);
3860 if (addr.
nTime > current_a_time - 10min && !peer->m_getaddr_sent && vAddr.size() <= 10 && addr.
IsRoutable()) {
3862 RelayAddress(pfrom.
GetId(), addr, reachable);
3866 vAddrOk.push_back(addr);
3869 peer->m_addr_processed += num_proc;
3870 peer->m_addr_rate_limited += num_rate_limit;
3871 LogDebug(
BCLog::NET,
"Received addr: %u addresses (%u processed, %u rate-limited) from peer=%d\n",
3872 vAddr.size(), num_proc, num_rate_limit, pfrom.
GetId());
3874 m_addrman.
Add(vAddrOk, pfrom.
addr, 2h);
3875 if (vAddr.size() < 1000) peer->m_getaddr_sent =
false;
3886 std::vector<CInv> vInv;
3890 Misbehaving(*peer,
strprintf(
"inv message size = %u", vInv.size()));
3894 const bool reject_tx_invs{RejectIncomingTxs(pfrom)};
3898 const auto current_time{GetTime<std::chrono::microseconds>()};
3901 for (
CInv& inv : vInv) {
3902 if (interruptMsgProc)
return;
3907 if (peer->m_wtxid_relay) {
3914 const bool fAlreadyHave = AlreadyHaveBlock(inv.
hash);
3917 UpdateBlockAvailability(pfrom.
GetId(), inv.
hash);
3925 best_block = &inv.
hash;
3928 if (reject_tx_invs) {
3934 AddKnownTx(*peer, inv.
hash);
3937 const bool fAlreadyHave{m_txdownloadman.AddTxAnnouncement(pfrom.
GetId(), gtxid, current_time,
true)};
3945 if (best_block !=
nullptr) {
3957 if (state.fSyncStarted || (!peer->m_inv_triggered_getheaders_before_sync && *best_block != m_last_block_inv_triggering_headers_sync)) {
3958 if (MaybeSendGetHeaders(pfrom,
GetLocator(m_chainman.m_best_header), *peer)) {
3960 m_chainman.m_best_header->nHeight, best_block->ToString(),
3963 if (!state.fSyncStarted) {
3964 peer->m_inv_triggered_getheaders_before_sync =
true;
3968 m_last_block_inv_triggering_headers_sync = *best_block;
3977 std::vector<CInv> vInv;
3981 Misbehaving(*peer,
strprintf(
"getdata message size = %u", vInv.size()));
3987 if (vInv.size() > 0) {
3992 LOCK(peer->m_getdata_requests_mutex);
3993 peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end());
3994 ProcessGetData(pfrom, *peer, interruptMsgProc);
4003 vRecv >> locator >> hashStop;
4019 std::shared_ptr<const CBlock> a_recent_block;
4021 LOCK(m_most_recent_block_mutex);
4022 a_recent_block = m_most_recent_block;
4025 if (!m_chainman.
ActiveChainstate().ActivateBestChain(state, a_recent_block)) {
4040 for (; pindex; pindex = m_chainman.
ActiveChain().Next(pindex))
4055 if (--nLimit <= 0) {
4059 WITH_LOCK(peer->m_block_inv_mutex, {peer->m_continuation_block = pindex->GetBlockHash();});
4070 std::shared_ptr<const CBlock> recent_block;
4072 LOCK(m_most_recent_block_mutex);
4073 if (m_most_recent_block_hash == req.
blockhash)
4074 recent_block = m_most_recent_block;
4078 SendBlockTransactions(pfrom, *peer, *recent_block, req);
4097 if (!block_pos.IsNull()) {
4104 SendBlockTransactions(pfrom, *peer, block, req);
4117 WITH_LOCK(peer->m_getdata_requests_mutex, peer->m_getdata_requests.push_back(inv));
4125 vRecv >> locator >> hashStop;
4147 if (m_chainman.
ActiveTip() ==
nullptr ||
4149 LogDebug(
BCLog::NET,
"Ignoring getheaders from peer=%d because active chain has too little work; sending empty response\n", pfrom.
GetId());
4156 CNodeState *nodestate = State(pfrom.
GetId());
4166 if (!BlockRequestAllowed(pindex)) {
4167 LogDebug(
BCLog::NET,
"%s: ignoring request from peer=%i for old block header that isn't in the main chain\n", __func__, pfrom.
GetId());
4180 std::vector<CBlock> vHeaders;
4181 int nLimit = m_opts.max_headers_result;
4183 for (; pindex; pindex = m_chainman.
ActiveChain().Next(pindex))
4186 if (--nLimit <= 0 || pindex->GetBlockHash() == hashStop)
4201 nodestate->pindexBestHeaderSent = pindex ? pindex : m_chainman.
ActiveChain().
Tip();
4207 if (RejectIncomingTxs(pfrom)) {
4222 const uint256& txid = ptx->GetHash();
4223 const uint256& wtxid = ptx->GetWitnessHash();
4225 const uint256& hash = peer->m_wtxid_relay ? wtxid : txid;
4226 AddKnownTx(*peer, hash);
4230 const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.
GetId(), ptx);
4231 if (!should_validate) {
4237 LogPrintf(
"Not relaying non-mempool transaction %s (wtxid=%s) from forcerelay peer=%d\n",
4240 LogPrintf(
"Force relaying tx %s (wtxid=%s) from peer=%d\n",
4246 if (package_to_validate) {
4249 package_result.
m_state.
IsValid() ?
"package accepted" :
"package rejected");
4250 ProcessPackageResult(package_to_validate.value(), package_result);
4256 Assume(!package_to_validate.has_value());
4266 if (
auto package_to_validate{ProcessInvalidTx(pfrom.
GetId(), ptx, state,
true)}) {
4269 package_result.
m_state.
IsValid() ?
"package accepted" :
"package rejected");
4270 ProcessPackageResult(package_to_validate.value(), package_result);
4286 vRecv >> cmpctblock;
4288 bool received_new_header =
false;
4298 MaybeSendGetHeaders(pfrom,
GetLocator(m_chainman.m_best_header), *peer);
4308 received_new_header =
true;
4316 MaybePunishNodeForBlock(pfrom.
GetId(), state,
true,
"invalid header via cmpctblock");
4321 if (received_new_header) {
4322 LogInfo(
"Saw new cmpctblock header hash=%s peer=%d\n",
4323 blockhash.ToString(), pfrom.
GetId());
4326 bool fProcessBLOCKTXN =
false;
4330 bool fRevertToHeaderProcessing =
false;
4334 std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
4335 bool fBlockReconstructed =
false;
4343 CNodeState *nodestate = State(pfrom.
GetId());
4348 nodestate->m_last_block_announcement =
GetTime();
4354 auto range_flight = mapBlocksInFlight.equal_range(pindex->
GetBlockHash());
4355 size_t already_in_flight = std::distance(range_flight.first, range_flight.second);
4356 bool requested_block_from_this_peer{
false};
4359 bool first_in_flight = already_in_flight == 0 || (range_flight.first->second.first == pfrom.
GetId());
4361 while (range_flight.first != range_flight.second) {
4362 if (range_flight.first->second.first == pfrom.
GetId()) {
4363 requested_block_from_this_peer =
true;
4366 range_flight.first++;
4371 if (requested_block_from_this_peer) {
4374 std::vector<CInv> vInv(1);
4375 vInv[0] =
CInv(
MSG_BLOCK | GetFetchFlags(*peer), blockhash);
4382 if (!already_in_flight && !CanDirectFetch()) {
4390 requested_block_from_this_peer) {
4391 std::list<QueuedBlock>::iterator* queuedBlockIt =
nullptr;
4392 if (!BlockRequested(pfrom.
GetId(), *pindex, &queuedBlockIt)) {
4393 if (!(*queuedBlockIt)->partialBlock)
4406 Misbehaving(*peer,
"invalid compact block");
4409 if (first_in_flight) {
4411 std::vector<CInv> vInv(1);
4412 vInv[0] =
CInv(
MSG_BLOCK | GetFetchFlags(*peer), blockhash);
4422 for (
size_t i = 0; i < cmpctblock.
BlockTxCount(); i++) {
4427 fProcessBLOCKTXN =
true;
4428 }
else if (first_in_flight) {
4435 IsBlockRequestedFromOutbound(blockhash) ||
4454 ReadStatus status = tempBlock.InitData(cmpctblock, vExtraTxnForCompact);
4459 std::vector<CTransactionRef> dummy;
4460 status = tempBlock.FillBlock(*pblock, dummy);
4462 fBlockReconstructed =
true;
4466 if (requested_block_from_this_peer) {
4469 std::vector<CInv> vInv(1);
4470 vInv[0] =
CInv(
MSG_BLOCK | GetFetchFlags(*peer), blockhash);
4475 fRevertToHeaderProcessing =
true;
4480 if (fProcessBLOCKTXN) {
4483 return ProcessCompactBlockTxns(pfrom, *peer, txn);
4486 if (fRevertToHeaderProcessing) {
4492 return ProcessHeadersMessage(pfrom, *peer, {cmpctblock.
header},
true);
4495 if (fBlockReconstructed) {
4500 mapBlockSource.emplace(pblock->GetHash(), std::make_pair(pfrom.
GetId(),
false));
4511 ProcessBlock(pfrom, pblock,
true,
true);
4518 RemoveBlockRequest(pblock->GetHash(), std::nullopt);
4535 return ProcessCompactBlockTxns(pfrom, *peer, resp);
4546 std::vector<CBlockHeader> headers;
4550 if (nCount > m_opts.max_headers_result) {
4551 Misbehaving(*peer,
strprintf(
"headers message size = %u", nCount));
4554 headers.resize(nCount);
4555 for (
unsigned int n = 0; n < nCount; n++) {
4556 vRecv >> headers[n];
4560 ProcessHeadersMessage(pfrom, *peer, std::move(headers),
false);
4564 if (m_headers_presync_should_signal.exchange(
false)) {
4565 HeadersPresyncStats stats;
4567 LOCK(m_headers_presync_mutex);
4568 auto it = m_headers_presync_stats.find(m_headers_presync_bestpeer);
4569 if (it != m_headers_presync_stats.end()) stats = it->second;
4587 std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
4598 Misbehaving(*peer,
"mutated block");
4603 bool forceProcessing =
false;
4604 const uint256 hash(pblock->GetHash());
4605 bool min_pow_checked =
false;
4610 forceProcessing = IsBlockRequested(hash);
4611 RemoveBlockRequest(hash, pfrom.
GetId());
4615 mapBlockSource.emplace(hash, std::make_pair(pfrom.
GetId(),
true));
4619 min_pow_checked =
true;
4622 ProcessBlock(pfrom, pblock, forceProcessing, min_pow_checked);
4639 Assume(SetupAddressRelay(pfrom, *peer));
4643 if (peer->m_getaddr_recvd) {
4647 peer->m_getaddr_recvd =
true;
4649 peer->m_addrs_to_send.clear();
4650 std::vector<CAddress> vAddr;
4656 for (
const CAddress &addr : vAddr) {
4657 PushAddress(*peer, addr);
4685 if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
4686 LOCK(tx_relay->m_tx_inventory_mutex);
4687 tx_relay->m_send_mempool =
true;
4713 const auto ping_end = time_received;
4716 bool bPingFinished =
false;
4717 std::string sProblem;
4719 if (nAvail >=
sizeof(
nonce)) {
4723 if (peer->m_ping_nonce_sent != 0) {
4724 if (
nonce == peer->m_ping_nonce_sent) {
4726 bPingFinished =
true;
4727 const auto ping_time = ping_end - peer->m_ping_start.load();
4728 if (ping_time.count() >= 0) {
4733 sProblem =
"Timing mishap";
4737 sProblem =
"Nonce mismatch";
4740 bPingFinished =
true;
4741 sProblem =
"Nonce zero";
4745 sProblem =
"Unsolicited pong without ping";
4749 bPingFinished =
true;
4750 sProblem =
"Short payload";
4753 if (!(sProblem.empty())) {
4757 peer->m_ping_nonce_sent,
4761 if (bPingFinished) {
4762 peer->m_ping_nonce_sent = 0;
4779 Misbehaving(*peer,
"too-large bloom filter");
4780 }
else if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
4782 LOCK(tx_relay->m_bloom_filter_mutex);
4783 tx_relay->m_bloom_filter.reset(
new CBloomFilter(filter));
4784 tx_relay->m_relay_txs =
true;
4798 std::vector<unsigned char> vData;
4806 }
else if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
4807 LOCK(tx_relay->m_bloom_filter_mutex);
4808 if (tx_relay->m_bloom_filter) {
4809 tx_relay->m_bloom_filter->insert(vData);
4815 Misbehaving(*peer,
"bad filteradd message");
4826 auto tx_relay = peer->GetTxRelay();
4827 if (!tx_relay)
return;
4830 LOCK(tx_relay->m_bloom_filter_mutex);
4831 tx_relay->m_bloom_filter =
nullptr;
4832 tx_relay->m_relay_txs =
true;
4841 vRecv >> newFeeFilter;
4843 if (
auto tx_relay = peer->GetTxRelay(); tx_relay !=
nullptr) {
4844 tx_relay->m_fee_filter_received = newFeeFilter;
4852 ProcessGetCFilters(pfrom, *peer, vRecv);
4857 ProcessGetCFHeaders(pfrom, *peer, vRecv);
4862 ProcessGetCFCheckPt(pfrom, *peer, vRecv);
4867 std::vector<CInv> vInv;
4869 std::vector<uint256> tx_invs;
4871 for (
CInv &inv : vInv) {
4873 tx_invs.emplace_back(inv.
hash);
4877 LOCK(m_tx_download_mutex);
4878 m_txdownloadman.ReceivedNotFound(pfrom.
GetId(), tx_invs);
4887bool PeerManagerImpl::MaybeDiscourageAndDisconnect(
CNode& pnode, Peer& peer)
4890 LOCK(peer.m_misbehavior_mutex);
4893 if (!peer.m_should_discourage)
return false;
4895 peer.m_should_discourage =
false;
4900 LogPrintf(
"Warning: not punishing noban peer %d!\n", peer.m_id);
4906 LogPrintf(
"Warning: not punishing manually connected peer %d!\n", peer.m_id);
4926bool PeerManagerImpl::ProcessMessages(
CNode* pfrom, std::atomic<bool>& interruptMsgProc)
4931 PeerRef peer = GetPeerRef(pfrom->
GetId());
4932 if (peer ==
nullptr)
return false;
4936 if (!pfrom->
IsInboundConn() && !peer->m_outbound_version_message_sent)
return false;
4939 LOCK(peer->m_getdata_requests_mutex);
4940 if (!peer->m_getdata_requests.empty()) {
4941 ProcessGetData(*pfrom, *peer, interruptMsgProc);
4945 const bool processed_orphan = ProcessOrphanTx(*peer);
4950 if (processed_orphan)
return true;
4955 LOCK(peer->m_getdata_requests_mutex);
4956 if (!peer->m_getdata_requests.empty())
return true;
4969 bool fMoreWork = poll_result->second;
4980 if (m_opts.capture_messages) {
4985 ProcessMessage(*pfrom,
msg.m_type,
msg.m_recv,
msg.m_time, interruptMsgProc);
4986 if (interruptMsgProc)
return false;
4988 LOCK(peer->m_getdata_requests_mutex);
4989 if (!peer->m_getdata_requests.empty()) fMoreWork =
true;
4996 LOCK(m_tx_download_mutex);
4997 if (m_txdownloadman.HaveMoreWork(peer->m_id)) fMoreWork =
true;
4998 }
catch (
const std::exception& e) {
5007void PeerManagerImpl::ConsiderEviction(
CNode& pto, Peer& peer, std::chrono::seconds time_in_seconds)
5011 CNodeState &state = *State(pto.
GetId());
5020 if (state.pindexBestKnownBlock !=
nullptr && state.pindexBestKnownBlock->nChainWork >= m_chainman.
ActiveChain().
Tip()->
nChainWork) {
5022 if (state.m_chain_sync.m_timeout != 0
s) {
5023 state.m_chain_sync.m_timeout = 0
s;
5024 state.m_chain_sync.m_work_header =
nullptr;
5025 state.m_chain_sync.m_sent_getheaders =
false;
5027 }
else if (state.m_chain_sync.m_timeout == 0
s || (state.m_chain_sync.m_work_header !=
nullptr && state.pindexBestKnownBlock !=
nullptr && state.pindexBestKnownBlock->nChainWork >= state.m_chain_sync.m_work_header->nChainWork)) {
5035 state.m_chain_sync.m_work_header = m_chainman.
ActiveChain().
Tip();
5036 state.m_chain_sync.m_sent_getheaders =
false;
5037 }
else if (state.m_chain_sync.m_timeout > 0
s && time_in_seconds > state.m_chain_sync.m_timeout) {
5041 if (state.m_chain_sync.m_sent_getheaders) {
5043 LogInfo(
"Outbound peer has old chain, best known block = %s, %s\n", state.pindexBestKnownBlock !=
nullptr ? state.pindexBestKnownBlock->GetBlockHash().ToString() :
"<none>", pto.
DisconnectMsg(
fLogIPs));
5046 assert(state.m_chain_sync.m_work_header);
5051 MaybeSendGetHeaders(pto,
5052 GetLocator(state.m_chain_sync.m_work_header->pprev),
5054 LogDebug(
BCLog::NET,
"sending getheaders to outbound peer=%d to verify chain work (current best known block:%s, benchmark blockhash: %s)\n", pto.
GetId(), state.pindexBestKnownBlock !=
nullptr ? state.pindexBestKnownBlock->GetBlockHash().ToString() :
"<none>", state.m_chain_sync.m_work_header->GetBlockHash().ToString());
5055 state.m_chain_sync.m_sent_getheaders =
true;
5067void PeerManagerImpl::EvictExtraOutboundPeers(std::chrono::seconds now)
5076 std::pair<NodeId, std::chrono::seconds> youngest_peer{-1, 0}, next_youngest_peer{-1, 0};
5080 if (pnode->
GetId() > youngest_peer.first) {
5081 next_youngest_peer = youngest_peer;
5082 youngest_peer.first = pnode->GetId();
5083 youngest_peer.second = pnode->m_last_block_time;
5086 NodeId to_disconnect = youngest_peer.first;
5087 if (youngest_peer.second > next_youngest_peer.second) {
5090 to_disconnect = next_youngest_peer.first;
5099 CNodeState *node_state = State(pnode->
GetId());
5100 if (node_state ==
nullptr ||
5103 LogDebug(
BCLog::NET,
"disconnecting extra block-relay-only peer=%d (last block received at time %d)\n",
5107 LogDebug(
BCLog::NET,
"keeping block-relay-only peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n",
5123 int64_t oldest_block_announcement = std::numeric_limits<int64_t>::max();
5126 AssertLockHeld(::cs_main);
5130 if (!pnode->IsFullOutboundConn() || pnode->fDisconnect) return;
5131 CNodeState *state = State(pnode->GetId());
5132 if (state == nullptr) return;
5134 if (state->m_chain_sync.m_protect) return;
5137 if (!m_connman.MultipleManualOrFullOutboundConns(pnode->addr.GetNetwork())) return;
5138 if (state->m_last_block_announcement < oldest_block_announcement || (state->m_last_block_announcement == oldest_block_announcement && pnode->GetId() > worst_peer)) {
5139 worst_peer = pnode->GetId();
5140 oldest_block_announcement = state->m_last_block_announcement;
5143 if (worst_peer != -1) {
5152 CNodeState &state = *State(pnode->
GetId());
5154 LogDebug(
BCLog::NET,
"disconnecting extra outbound peer=%d (last block announcement received at time %d)\n", pnode->
GetId(), oldest_block_announcement);
5158 LogDebug(
BCLog::NET,
"keeping outbound peer=%d chosen for eviction (connect time: %d, blocks_in_flight: %d)\n",
5175void PeerManagerImpl::CheckForStaleTipAndEvictPeers()
5179 auto now{GetTime<std::chrono::seconds>()};
5181 EvictExtraOutboundPeers(now);
5183 if (now > m_stale_tip_check_time) {
5187 LogPrintf(
"Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n",
5196 if (!m_initial_sync_finished && CanDirectFetch()) {
5198 m_initial_sync_finished =
true;
5202void PeerManagerImpl::MaybeSendPing(
CNode& node_to, Peer& peer, std::chrono::microseconds now)
5205 peer.m_ping_nonce_sent &&
5215 bool pingSend =
false;
5217 if (peer.m_ping_queued) {
5222 if (peer.m_ping_nonce_sent == 0 && now > peer.m_ping_start.load() +
PING_INTERVAL) {
5231 }
while (
nonce == 0);
5232 peer.m_ping_queued =
false;
5233 peer.m_ping_start = now;
5235 peer.m_ping_nonce_sent =
nonce;
5239 peer.m_ping_nonce_sent = 0;
5245void PeerManagerImpl::MaybeSendAddr(
CNode&
node, Peer& peer, std::chrono::microseconds current_time)
5248 if (!peer.m_addr_relay_enabled)
return;
5250 LOCK(peer.m_addr_send_times_mutex);
5253 peer.m_next_local_addr_send < current_time) {
5260 if (peer.m_next_local_addr_send != 0us) {
5261 peer.m_addr_known->reset();
5264 CAddress local_addr{*local_service, peer.m_our_services, Now<NodeSeconds>()};
5265 PushAddress(peer, local_addr);
5271 if (current_time <= peer.m_next_addr_send)
return;
5284 bool ret = peer.m_addr_known->contains(addr.
GetKey());
5285 if (!
ret) peer.m_addr_known->insert(addr.
GetKey());
5288 peer.m_addrs_to_send.erase(std::remove_if(peer.m_addrs_to_send.begin(), peer.m_addrs_to_send.end(), addr_already_known),
5289 peer.m_addrs_to_send.end());
5292 if (peer.m_addrs_to_send.empty())
return;
5294 if (peer.m_wants_addrv2) {
5299 peer.m_addrs_to_send.clear();
5302 if (peer.m_addrs_to_send.capacity() > 40) {
5303 peer.m_addrs_to_send.shrink_to_fit();
5307void PeerManagerImpl::MaybeSendSendHeaders(
CNode&
node, Peer& peer)
5315 CNodeState &state = *State(
node.GetId());
5316 if (state.pindexBestKnownBlock !=
nullptr &&
5323 peer.m_sent_sendheaders =
true;
5328void PeerManagerImpl::MaybeSendFeefilter(
CNode& pto, Peer& peer, std::chrono::microseconds current_time)
5330 if (m_opts.ignore_incoming_txs)
return;
5346 if (peer.m_fee_filter_sent == MAX_FILTER) {
5349 peer.m_next_send_feefilter = 0us;
5352 if (current_time > peer.m_next_send_feefilter) {
5353 CAmount filterToSend = m_fee_filter_rounder.round(currentFilter);
5356 if (filterToSend != peer.m_fee_filter_sent) {
5358 peer.m_fee_filter_sent = filterToSend;
5365 (currentFilter < 3 * peer.m_fee_filter_sent / 4 || currentFilter > 4 * peer.m_fee_filter_sent / 3)) {
5371class CompareInvMempoolOrder
5376 explicit CompareInvMempoolOrder(
CTxMemPool *_mempool,
bool use_wtxid)
5379 m_wtxid_relay = use_wtxid;
5382 bool operator()(std::set<uint256>::iterator a, std::set<uint256>::iterator b)
5386 return mp->CompareDepthAndScore(*b, *a, m_wtxid_relay);
5391bool PeerManagerImpl::RejectIncomingTxs(
const CNode& peer)
const
5401bool PeerManagerImpl::SetupAddressRelay(
const CNode&
node, Peer& peer)
5406 if (
node.IsBlockOnlyConn())
return false;
5408 if (!peer.m_addr_relay_enabled.exchange(
true)) {
5412 peer.m_addr_known = std::make_unique<CRollingBloomFilter>(5000, 0.001);
5418bool PeerManagerImpl::SendMessages(
CNode* pto)
5423 PeerRef peer = GetPeerRef(pto->
GetId());
5424 if (!peer)
return false;
5429 if (MaybeDiscourageAndDisconnect(*pto, *peer))
return true;
5432 if (!pto->
IsInboundConn() && !peer->m_outbound_version_message_sent) {
5433 PushNodeVersion(*pto, *peer);
5434 peer->m_outbound_version_message_sent =
true;
5441 const auto current_time{GetTime<std::chrono::microseconds>()};
5449 MaybeSendPing(*pto, *peer, current_time);
5454 MaybeSendAddr(*pto, *peer, current_time);
5456 MaybeSendSendHeaders(*pto, *peer);
5461 CNodeState &state = *State(pto->
GetId());
5464 if (m_chainman.m_best_header ==
nullptr) {
5471 bool sync_blocks_and_headers_from_peer =
false;
5472 if (state.fPreferredDownload) {
5473 sync_blocks_and_headers_from_peer =
true;
5484 if (m_num_preferred_download_peers == 0 || mapBlocksInFlight.empty()) {
5485 sync_blocks_and_headers_from_peer =
true;
5491 if ((nSyncStarted == 0 && sync_blocks_and_headers_from_peer) || m_chainman.m_best_header->Time() >
NodeClock::now() - 24h) {
5492 const CBlockIndex* pindexStart = m_chainman.m_best_header;
5500 if (pindexStart->
pprev)
5501 pindexStart = pindexStart->
pprev;
5502 if (MaybeSendGetHeaders(*pto,
GetLocator(pindexStart), *peer)) {
5505 state.fSyncStarted =
true;
5529 LOCK(peer->m_block_inv_mutex);
5530 std::vector<CBlock> vHeaders;
5531 bool fRevertToInv = ((!peer->m_prefers_headers &&
5532 (!state.m_requested_hb_cmpctblocks || peer->m_blocks_for_headers_relay.size() > 1)) ||
5535 ProcessBlockAvailability(pto->
GetId());
5537 if (!fRevertToInv) {
5538 bool fFoundStartingHeader =
false;
5542 for (
const uint256& hash : peer->m_blocks_for_headers_relay) {
5547 fRevertToInv =
true;
5550 if (pBestIndex !=
nullptr && pindex->
pprev != pBestIndex) {
5562 fRevertToInv =
true;
5565 pBestIndex = pindex;
5566 if (fFoundStartingHeader) {
5569 }
else if (PeerHasHeader(&state, pindex)) {
5571 }
else if (pindex->
pprev ==
nullptr || PeerHasHeader(&state, pindex->
pprev)) {
5574 fFoundStartingHeader =
true;
5579 fRevertToInv =
true;
5584 if (!fRevertToInv && !vHeaders.empty()) {
5585 if (vHeaders.size() == 1 && state.m_requested_hb_cmpctblocks) {
5589 vHeaders.front().GetHash().ToString(), pto->
GetId());
5591 std::optional<CSerializedNetMsg> cached_cmpctblock_msg;
5593 LOCK(m_most_recent_block_mutex);
5594 if (m_most_recent_block_hash == pBestIndex->
GetBlockHash()) {
5598 if (cached_cmpctblock_msg.has_value()) {
5599 PushMessage(*pto, std::move(cached_cmpctblock_msg.value()));
5607 state.pindexBestHeaderSent = pBestIndex;
5608 }
else if (peer->m_prefers_headers) {
5609 if (vHeaders.size() > 1) {
5612 vHeaders.front().GetHash().ToString(),
5613 vHeaders.back().GetHash().ToString(), pto->
GetId());
5616 vHeaders.front().GetHash().ToString(), pto->
GetId());
5619 state.pindexBestHeaderSent = pBestIndex;
5621 fRevertToInv =
true;
5627 if (!peer->m_blocks_for_headers_relay.empty()) {
5628 const uint256& hashToAnnounce = peer->m_blocks_for_headers_relay.back();