13 #include <boost/multi_index/indexed_by.hpp>
14 #include <boost/multi_index/ordered_index.hpp>
15 #include <boost/multi_index/sequenced_index.hpp>
16 #include <boost/multi_index/tag.hpp>
17 #include <boost/multi_index_container.hpp>
18 #include <boost/tuple/tuple.hpp>
21 #include <unordered_map>
42 enum class State : uint8_t {
58 using SequenceNumber = uint64_t;
65 std::chrono::microseconds m_time;
69 const SequenceNumber m_sequence : 59;
71 const bool m_preferred : 1;
73 const bool m_is_wtxid : 1;
76 State m_state : 3 {State::CANDIDATE_DELAYED};
77 State GetState()
const {
return m_state; }
78 void SetState(State state) { m_state = state; }
81 bool IsSelected()
const
83 return GetState() == State::CANDIDATE_BEST || GetState() == State::REQUESTED;
87 bool IsWaiting()
const
89 return GetState() == State::REQUESTED || GetState() == State::CANDIDATE_DELAYED;
93 bool IsSelectable()
const
95 return GetState() == State::CANDIDATE_READY || GetState() == State::CANDIDATE_BEST;
99 Announcement(
const GenTxid& gtxid,
NodeId peer,
bool preferred, std::chrono::microseconds reqtime,
101 : m_txhash(gtxid.GetHash()), m_time(reqtime), m_peer(peer), m_sequence(
sequence), m_preferred(preferred),
102 m_is_wtxid{gtxid.IsWtxid()} {}
106 using Priority = uint64_t;
112 class PriorityComputer {
113 const uint64_t m_k0, m_k1;
115 explicit PriorityComputer(
bool deterministic) :
116 m_k0{deterministic ? 0 :
GetRand(0xFFFFFFFFFFFFFFFF)},
117 m_k1{deterministic ? 0 :
GetRand(0xFFFFFFFFFFFFFFFF)} {}
119 Priority operator()(
const uint256& txhash,
NodeId peer,
bool preferred)
const
122 return low_bits | uint64_t{preferred} << 63;
125 Priority operator()(
const Announcement& ann)
const
127 return operator()(ann.m_txhash, ann.m_peer, ann.m_preferred);
145 using ByPeerView = std::tuple<NodeId, bool, const uint256&>;
146 struct ByPeerViewExtractor
148 using result_type = ByPeerView;
149 result_type operator()(
const Announcement& ann)
const
151 return ByPeerView{ann.m_peer, ann.GetState() == State::CANDIDATE_BEST, ann.m_txhash};
166 using ByTxHashView = std::tuple<const uint256&, State, Priority>;
167 class ByTxHashViewExtractor {
168 const PriorityComputer& m_computer;
170 explicit ByTxHashViewExtractor(
const PriorityComputer& computer) : m_computer(computer) {}
171 using result_type = ByTxHashView;
172 result_type operator()(
const Announcement& ann)
const
174 const Priority prio = (ann.GetState() == State::CANDIDATE_READY) ? m_computer(ann) : 0;
175 return ByTxHashView{ann.m_txhash, ann.GetState(), prio};
179 enum class WaitState {
188 WaitState GetWaitState(
const Announcement& ann)
190 if (ann.IsWaiting())
return WaitState::FUTURE_EVENT;
191 if (ann.IsSelectable())
return WaitState::PAST_EVENT;
192 return WaitState::NO_EVENT;
205 using ByTimeView = std::pair<WaitState, std::chrono::microseconds>;
206 struct ByTimeViewExtractor
208 using result_type = ByTimeView;
209 result_type operator()(
const Announcement& ann)
const
211 return ByTimeView{GetWaitState(ann), ann.m_time};
216 using Index = boost::multi_index_container<
218 boost::multi_index::indexed_by<
219 boost::multi_index::ordered_unique<boost::multi_index::tag<ByPeer>, ByPeerViewExtractor>,
220 boost::multi_index::ordered_non_unique<boost::multi_index::tag<ByTxHash>, ByTxHashViewExtractor>,
221 boost::multi_index::ordered_non_unique<boost::multi_index::tag<ByTime>, ByTimeViewExtractor>
226 template<
typename Tag>
227 using Iter =
typename Index::index<Tag>::type::iterator;
232 size_t m_completed = 0;
233 size_t m_requested = 0;
240 size_t m_candidate_delayed = 0;
242 size_t m_candidate_ready = 0;
244 size_t m_candidate_best = 0;
246 size_t m_requested = 0;
248 Priority m_priority_candidate_best = std::numeric_limits<Priority>::max();
250 Priority m_priority_best_candidate_ready = std::numeric_limits<Priority>::min();
252 std::vector<NodeId> m_peers;
256 bool operator==(
const PeerInfo& a,
const PeerInfo& b)
258 return std::tie(a.m_total, a.m_completed, a.m_requested) ==
259 std::tie(b.m_total, b.m_completed, b.m_requested);
263 std::unordered_map<NodeId, PeerInfo> RecomputePeerInfo(
const Index& index)
265 std::unordered_map<NodeId, PeerInfo>
ret;
266 for (
const Announcement& ann : index) {
267 PeerInfo& info =
ret[ann.m_peer];
269 info.m_requested += (ann.GetState() == State::REQUESTED);
270 info.m_completed += (ann.GetState() == State::COMPLETED);
276 std::map<uint256, TxHashInfo> ComputeTxHashInfo(
const Index& index,
const PriorityComputer& computer)
278 std::map<uint256, TxHashInfo>
ret;
279 for (
const Announcement& ann : index) {
280 TxHashInfo& info =
ret[ann.m_txhash];
282 info.m_candidate_delayed += (ann.GetState() == State::CANDIDATE_DELAYED);
283 info.m_candidate_ready += (ann.GetState() == State::CANDIDATE_READY);
284 info.m_candidate_best += (ann.GetState() == State::CANDIDATE_BEST);
285 info.m_requested += (ann.GetState() == State::REQUESTED);
287 if (ann.GetState() == State::CANDIDATE_BEST) {
288 info.m_priority_candidate_best = computer(ann);
290 if (ann.GetState() == State::CANDIDATE_READY) {
291 info.m_priority_best_candidate_ready = std::max(info.m_priority_best_candidate_ready, computer(ann));
294 info.m_peers.push_back(ann.m_peer);
330 TxHashInfo& info = item.second;
333 assert(info.m_candidate_delayed + info.m_candidate_ready + info.m_candidate_best + info.m_requested > 0);
336 assert(info.m_candidate_best + info.m_requested <= 1);
340 if (info.m_candidate_ready > 0) {
341 assert(info.m_candidate_best + info.m_requested == 1);
346 if (info.m_candidate_ready && info.m_candidate_best) {
347 assert(info.m_priority_candidate_best >= info.m_priority_best_candidate_ready);
351 std::sort(info.m_peers.begin(), info.m_peers.end());
352 assert(std::adjacent_find(info.m_peers.begin(), info.m_peers.end()) == info.m_peers.end());
358 for (
const Announcement& ann :
m_index) {
359 if (ann.IsWaiting()) {
363 }
else if (ann.IsSelectable()) {
366 assert(ann.m_time <= now);
373 template<
typename Tag>
377 peerit->second.m_completed -= it->GetState() == State::COMPLETED;
378 peerit->second.m_requested -= it->GetState() == State::REQUESTED;
379 if (--peerit->second.m_total == 0)
m_peerinfo.erase(peerit);
380 return m_index.get<Tag>().erase(it);
384 template<
typename Tag,
typename Modifier>
385 void Modify(Iter<Tag> it, Modifier modifier)
388 peerit->second.m_completed -= it->GetState() == State::COMPLETED;
389 peerit->second.m_requested -= it->GetState() == State::REQUESTED;
390 m_index.get<Tag>().modify(it, std::move(modifier));
391 peerit->second.m_completed += it->GetState() == State::COMPLETED;
392 peerit->second.m_requested += it->GetState() == State::REQUESTED;
401 assert(it->GetState() == State::CANDIDATE_DELAYED);
403 Modify<ByTxHash>(it, [](Announcement& ann){ ann.SetState(State::CANDIDATE_READY); });
408 auto it_next = std::next(it);
409 if (it_next ==
m_index.get<ByTxHash>().end() || it_next->m_txhash != it->m_txhash ||
410 it_next->GetState() == State::COMPLETED) {
413 Modify<ByTxHash>(it, [](Announcement& ann){ ann.SetState(State::CANDIDATE_BEST); });
414 }
else if (it_next->GetState() == State::CANDIDATE_BEST) {
417 if (priority_new > priority_old) {
419 Modify<ByTxHash>(it_next, [](Announcement& ann){ ann.SetState(State::CANDIDATE_READY); });
420 Modify<ByTxHash>(it, [](Announcement& ann){ ann.SetState(State::CANDIDATE_BEST); });
429 assert(new_state == State::COMPLETED || new_state == State::CANDIDATE_DELAYED);
431 if (it->IsSelected() && it !=
m_index.get<ByTxHash>().begin()) {
432 auto it_prev = std::prev(it);
435 if (it_prev->m_txhash == it->m_txhash && it_prev->GetState() == State::CANDIDATE_READY) {
437 Modify<ByTxHash>(it_prev, [](Announcement& ann){ ann.SetState(State::CANDIDATE_BEST); });
440 Modify<ByTxHash>(it, [new_state](Announcement& ann){ ann.SetState(new_state); });
447 assert(it->GetState() != State::COMPLETED);
451 if (it !=
m_index.get<ByTxHash>().begin() && std::prev(it)->m_txhash == it->m_txhash)
return false;
454 if (std::next(it) !=
m_index.get<ByTxHash>().end() && std::next(it)->m_txhash == it->m_txhash &&
455 std::next(it)->GetState() != State::COMPLETED)
return false;
468 if (it->GetState() == State::COMPLETED)
return true;
474 it = Erase<ByTxHash>(it);
475 }
while (it !=
m_index.get<ByTxHash>().end() && it->m_txhash == txhash);
490 void SetTimePoint(std::chrono::microseconds now, std::vector<std::pair<NodeId, GenTxid>>* expired)
492 if (expired) expired->clear();
497 auto it =
m_index.get<ByTime>().begin();
498 if (it->GetState() == State::CANDIDATE_DELAYED && it->m_time <= now) {
500 }
else if (it->GetState() == State::REQUESTED && it->m_time <= now) {
501 if (expired) expired->emplace_back(it->m_peer,
ToGenTxid(*it));
512 auto it = std::prev(
m_index.get<ByTime>().end());
513 if (it->IsSelectable() && it->m_time > now) {
522 explicit Impl(
bool deterministic) :
526 boost::make_tuple(ByPeerViewExtractor(),
std::less<ByPeerView>()),
527 boost::make_tuple(ByTxHashViewExtractor(
m_computer),
std::less<ByTxHashView>()),
528 boost::make_tuple(ByTimeViewExtractor(),
std::less<ByTimeView>())
537 auto& index =
m_index.get<ByPeer>();
538 auto it = index.lower_bound(ByPeerView{peer,
false,
uint256::ZERO});
539 while (it != index.end() && it->m_peer == peer) {
553 auto it_next = (std::next(it) == index.end() || std::next(it)->m_peer != peer) ? index.end() :
568 auto it =
m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0});
569 while (it !=
m_index.get<ByTxHash>().end() && it->m_txhash == txhash) {
570 it = Erase<ByTxHash>(it);
575 std::chrono::microseconds reqtime)
580 if (
m_index.get<ByPeer>().count(ByPeerView{peer, true, gtxid.GetHash()}))
return;
586 if (!
ret.second)
return;
595 std::vector<std::pair<NodeId, GenTxid>>* expired)
601 std::vector<const Announcement*> selected;
603 while (it_peer !=
m_index.get<ByPeer>().end() && it_peer->m_peer == peer &&
604 it_peer->GetState() == State::CANDIDATE_BEST) {
605 selected.emplace_back(&*it_peer);
610 std::sort(selected.begin(), selected.end(), [](
const Announcement* a,
const Announcement* b) {
611 return a->m_sequence < b->m_sequence;
615 std::vector<GenTxid>
ret;
616 ret.reserve(selected.size());
617 std::transform(selected.begin(), selected.end(), std::back_inserter(
ret), [](
const Announcement* ann) {
618 return ToGenTxid(*ann);
625 auto it =
m_index.get<ByPeer>().find(ByPeerView{peer,
true, txhash});
626 if (it ==
m_index.get<ByPeer>().end()) {
632 it =
m_index.get<ByPeer>().find(ByPeerView{peer,
false, txhash});
633 if (it ==
m_index.get<ByPeer>().end() || (it->GetState() != State::CANDIDATE_DELAYED &&
634 it->GetState() != State::CANDIDATE_READY)) {
644 auto it_old =
m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_BEST, 0});
645 if (it_old !=
m_index.get<ByTxHash>().end() && it_old->m_txhash == txhash) {
646 if (it_old->GetState() == State::CANDIDATE_BEST) {
653 Modify<ByTxHash>(it_old, [](Announcement& ann) { ann.SetState(State::CANDIDATE_READY); });
654 }
else if (it_old->GetState() == State::REQUESTED) {
657 Modify<ByTxHash>(it_old, [](Announcement& ann) { ann.SetState(State::COMPLETED); });
662 Modify<ByPeer>(it, [expiry](Announcement& ann) {
663 ann.SetState(State::REQUESTED);
671 auto it =
m_index.get<ByPeer>().find(ByPeerView{peer,
false, txhash});
672 if (it ==
m_index.get<ByPeer>().end()) {
673 it =
m_index.get<ByPeer>().find(ByPeerView{peer,
true, txhash});
681 if (it !=
m_peerinfo.end())
return it->second.m_requested;
688 if (it !=
m_peerinfo.end())
return it->second.m_total - it->second.m_requested - it->second.m_completed;
695 if (it !=
m_peerinfo.end())
return it->second.m_total;
705 return uint64_t{
m_computer(txhash, peer, preferred)};
725 m_impl->PostGetRequestableSanityCheck(now);
729 std::chrono::microseconds reqtime)
731 m_impl->ReceivedInv(peer, gtxid, preferred, reqtime);
736 m_impl->RequestedTx(peer, txhash, expiry);
741 m_impl->ReceivedResponse(peer, txhash);
745 std::vector<std::pair<NodeId, GenTxid>>* expired)
747 return m_impl->GetRequestable(peer, now, expired);
752 return m_impl->ComputePriority(txhash, peer, preferred);
uint64_t Finalize() const
Compute the 64-bit SipHash-2-4 of the data written so far.
CSipHasher & Write(uint64_t data)
Hash a 64-bit integer worth of data It is treated as if this was the little-endian interpretation of ...
A generic txid reference (txid or wtxid).
static GenTxid Wtxid(const uint256 &hash)
Actual implementation for TxRequestTracker's data structure.
const PriorityComputer m_computer
This tracker's priority computer.
Iter< Tag > Erase(Iter< Tag > it)
Wrapper around Index::...::erase that keeps m_peerinfo up to date.
void PromoteCandidateReady(Iter< ByTxHash > it)
Convert a CANDIDATE_DELAYED announcement into a CANDIDATE_READY.
SequenceNumber m_current_sequence
The current sequence number.
void ReceivedResponse(NodeId peer, const uint256 &txhash)
size_t CountInFlight(NodeId peer) const
std::vector< GenTxid > GetRequestable(NodeId peer, std::chrono::microseconds now, std::vector< std::pair< NodeId, GenTxid >> *expired)
Find the GenTxids to request now from peer.
size_t Size() const
Count how many announcements are being tracked in total across all peers and transactions.
void ReceivedInv(NodeId peer, const GenTxid >xid, bool preferred, std::chrono::microseconds reqtime)
Impl(const Impl &)=delete
bool MakeCompleted(Iter< ByTxHash > it)
Convert any announcement to a COMPLETED one.
void PostGetRequestableSanityCheck(std::chrono::microseconds now) const
Index m_index
This tracker's main data structure. See SanityCheck() for the invariants that apply to it.
bool IsOnlyNonCompleted(Iter< ByTxHash > it)
Check if 'it' is the only announcement for a given txhash that isn't COMPLETED.
Impl & operator=(const Impl &)=delete
void ForgetTxHash(const uint256 &txhash)
uint64_t ComputePriority(const uint256 &txhash, NodeId peer, bool preferred) const
std::unordered_map< NodeId, PeerInfo > m_peerinfo
Map with this tracker's per-peer statistics.
void RequestedTx(NodeId peer, const uint256 &txhash, std::chrono::microseconds expiry)
void ChangeAndReselect(Iter< ByTxHash > it, State new_state)
Change the state of an announcement to something non-IsSelected().
size_t CountCandidates(NodeId peer) const
void SetTimePoint(std::chrono::microseconds now, std::vector< std::pair< NodeId, GenTxid >> *expired)
Make the data structure consistent with a given point in time:
size_t Count(NodeId peer) const
void Modify(Iter< Tag > it, Modifier modifier)
Wrapper around Index::...::modify that keeps m_peerinfo up to date.
void DisconnectedPeer(NodeId peer)
Data structure to keep track of, and schedule, transaction downloads from peers.
void ReceivedInv(NodeId peer, const GenTxid >xid, bool preferred, std::chrono::microseconds reqtime)
Adds a new CANDIDATE announcement.
void SanityCheck() const
Run internal consistency check (testing only).
size_t CountInFlight(NodeId peer) const
Count how many REQUESTED announcements a peer has.
size_t CountCandidates(NodeId peer) const
Count how many CANDIDATE announcements a peer has.
TxRequestTracker(bool deterministic=false)
Construct a TxRequestTracker.
const std::unique_ptr< Impl > m_impl
void DisconnectedPeer(NodeId peer)
Deletes all announcements for a given peer.
void ReceivedResponse(NodeId peer, const uint256 &txhash)
Converts a CANDIDATE or REQUESTED announcement to a COMPLETED one.
uint64_t ComputePriority(const uint256 &txhash, NodeId peer, bool preferred) const
Access to the internal priority computation (testing only)
void PostGetRequestableSanityCheck(std::chrono::microseconds now) const
Run a time-dependent internal consistency check (testing only).
void RequestedTx(NodeId peer, const uint256 &txhash, std::chrono::microseconds expiry)
Marks a transaction as requested, with a specified expiry.
size_t Count(NodeId peer) const
Count how many announcements a peer has (REQUESTED, CANDIDATE, and COMPLETED combined).
size_t Size() const
Count how many announcements are being tracked in total across all peers and transaction hashes.
std::vector< GenTxid > GetRequestable(NodeId peer, std::chrono::microseconds now, std::vector< std::pair< NodeId, GenTxid >> *expired=nullptr)
Find the txids to request now from peer.
void ForgetTxHash(const uint256 &txhash)
Deletes all announcements for a given txhash (both txid and wtxid ones).
static const uint256 ZERO
bool operator==(const CNetAddr &a, const CNetAddr &b)
GenTxid ToGenTxid(const CInv &inv)
Convert a TX/WITNESS_TX/WTX CInv to a GenTxid.
T GetRand(T nMax=std::numeric_limits< T >::max()) noexcept
Generate a uniform random integer of type T in the range [0..nMax) nMax defaults to std::numeric_limi...