14 #include <boost/multi_index_container.hpp> 15 #include <boost/multi_index/ordered_index.hpp> 18 #include <unordered_map> 55 using SequenceNumber = uint64_t;
62 std::chrono::microseconds m_time;
66 const SequenceNumber m_sequence : 59;
68 const bool m_preferred : 1;
70 const bool m_is_wtxid : 1;
78 State GetState()
const {
return static_cast<State>(m_state); }
81 void SetState(
State state) { m_state =
static_cast<uint8_t
>(state); }
84 bool IsSelected()
const 86 return GetState() == State::CANDIDATE_BEST || GetState() == State::REQUESTED;
90 bool IsWaiting()
const 92 return GetState() == State::REQUESTED || GetState() == State::CANDIDATE_DELAYED;
96 bool IsSelectable()
const 98 return GetState() == State::CANDIDATE_READY || GetState() == State::CANDIDATE_BEST;
102 Announcement(
const GenTxid& gtxid,
NodeId peer,
bool preferred, std::chrono::microseconds reqtime,
103 SequenceNumber sequence) :
104 m_txhash(gtxid.
GetHash()), m_time(reqtime), m_peer(peer), m_sequence(sequence), m_preferred(preferred),
105 m_is_wtxid(gtxid.
IsWtxid()), m_state(static_cast<uint8_t>(State::CANDIDATE_DELAYED)) {}
109 using Priority = uint64_t;
115 class PriorityComputer {
116 const uint64_t m_k0, m_k1;
118 explicit PriorityComputer(
bool deterministic) :
119 m_k0{deterministic ? 0 :
GetRand(0xFFFFFFFFFFFFFFFF)},
120 m_k1{deterministic ? 0 :
GetRand(0xFFFFFFFFFFFFFFFF)} {}
122 Priority operator()(
const uint256& txhash,
NodeId peer,
bool preferred)
const 125 return low_bits | uint64_t{preferred} << 63;
128 Priority operator()(
const Announcement& ann)
const 130 return operator()(ann.m_txhash, ann.m_peer, ann.m_preferred);
148 using ByPeerView = std::tuple<NodeId, bool, const uint256&>;
149 struct ByPeerViewExtractor
151 using result_type = ByPeerView;
152 result_type operator()(
const Announcement& ann)
const 154 return ByPeerView{ann.m_peer, ann.GetState() == State::CANDIDATE_BEST, ann.m_txhash};
169 using ByTxHashView = std::tuple<const uint256&, State, Priority>;
170 class ByTxHashViewExtractor {
171 const PriorityComputer& m_computer;
173 explicit ByTxHashViewExtractor(
const PriorityComputer& computer) : m_computer(computer) {}
174 using result_type = ByTxHashView;
175 result_type operator()(
const Announcement& ann)
const 177 const Priority prio = (ann.GetState() == State::CANDIDATE_READY) ? m_computer(ann) : 0;
178 return ByTxHashView{ann.m_txhash, ann.GetState(), prio};
191 WaitState GetWaitState(
const Announcement& ann)
193 if (ann.IsWaiting())
return WaitState::FUTURE_EVENT;
194 if (ann.IsSelectable())
return WaitState::PAST_EVENT;
195 return WaitState::NO_EVENT;
208 using ByTimeView = std::pair<WaitState, std::chrono::microseconds>;
209 struct ByTimeViewExtractor
211 using result_type = ByTimeView;
212 result_type operator()(
const Announcement& ann)
const 214 return ByTimeView{GetWaitState(ann), ann.m_time};
219 using Index = boost::multi_index_container<
221 boost::multi_index::indexed_by<
222 boost::multi_index::ordered_unique<boost::multi_index::tag<ByPeer>, ByPeerViewExtractor>,
223 boost::multi_index::ordered_non_unique<boost::multi_index::tag<ByTxHash>, ByTxHashViewExtractor>,
224 boost::multi_index::ordered_non_unique<boost::multi_index::tag<ByTime>, ByTimeViewExtractor>
229 template<
typename Tag>
230 using Iter =
typename Index::index<Tag>::type::iterator;
235 size_t m_completed = 0;
236 size_t m_requested = 0;
243 size_t m_candidate_delayed = 0;
245 size_t m_candidate_ready = 0;
247 size_t m_candidate_best = 0;
249 size_t m_requested = 0;
251 Priority m_priority_candidate_best = std::numeric_limits<Priority>::max();
253 Priority m_priority_best_candidate_ready = std::numeric_limits<Priority>::min();
255 std::vector<NodeId> m_peers;
259 bool operator==(
const PeerInfo& a,
const PeerInfo& b)
261 return std::tie(a.m_total, a.m_completed, a.m_requested) ==
262 std::tie(b.m_total, b.m_completed, b.m_requested);
266 std::unordered_map<NodeId, PeerInfo> RecomputePeerInfo(
const Index& index)
268 std::unordered_map<NodeId, PeerInfo> ret;
269 for (
const Announcement& ann : index) {
270 PeerInfo& info = ret[ann.m_peer];
272 info.m_requested += (ann.GetState() == State::REQUESTED);
273 info.m_completed += (ann.GetState() == State::COMPLETED);
279 std::map<uint256, TxHashInfo> ComputeTxHashInfo(
const Index& index,
const PriorityComputer& computer)
281 std::map<uint256, TxHashInfo> ret;
282 for (
const Announcement& ann : index) {
283 TxHashInfo& info = ret[ann.m_txhash];
285 info.m_candidate_delayed += (ann.GetState() == State::CANDIDATE_DELAYED);
286 info.m_candidate_ready += (ann.GetState() == State::CANDIDATE_READY);
287 info.m_candidate_best += (ann.GetState() == State::CANDIDATE_BEST);
288 info.m_requested += (ann.GetState() == State::REQUESTED);
290 if (ann.GetState() == State::CANDIDATE_BEST) {
291 info.m_priority_candidate_best = computer(ann);
293 if (ann.GetState() == State::CANDIDATE_READY) {
294 info.m_priority_best_candidate_ready = std::max(info.m_priority_best_candidate_ready, computer(ann));
297 info.m_peers.push_back(ann.m_peer);
304 return {ann.m_is_wtxid, ann.m_txhash};
313 SequenceNumber m_current_sequence{0};
329 assert(m_peerinfo == RecomputePeerInfo(m_index));
332 for (
auto& item : ComputeTxHashInfo(m_index, m_computer)) {
333 TxHashInfo& info = item.second;
336 assert(info.m_candidate_delayed + info.m_candidate_ready + info.m_candidate_best + info.m_requested > 0);
339 assert(info.m_candidate_best + info.m_requested <= 1);
343 if (info.m_candidate_ready > 0) {
344 assert(info.m_candidate_best + info.m_requested == 1);
349 if (info.m_candidate_ready && info.m_candidate_best) {
350 assert(info.m_priority_candidate_best >= info.m_priority_best_candidate_ready);
354 std::sort(info.m_peers.begin(), info.m_peers.end());
355 assert(std::adjacent_find(info.m_peers.begin(), info.m_peers.end()) == info.m_peers.end());
361 for (
const Announcement& ann : m_index) {
362 if (ann.IsWaiting()) {
366 }
else if (ann.IsSelectable()) {
369 assert(ann.m_time <= now);
376 template<
typename Tag>
379 auto peerit = m_peerinfo.find(it->m_peer);
380 peerit->second.m_completed -= it->GetState() == State::COMPLETED;
381 peerit->second.m_requested -= it->GetState() == State::REQUESTED;
382 if (--peerit->second.m_total == 0) m_peerinfo.erase(peerit);
383 return m_index.get<Tag>().
erase(it);
387 template<
typename Tag,
typename Modifier>
390 auto peerit = m_peerinfo.find(it->m_peer);
391 peerit->second.m_completed -= it->GetState() == State::COMPLETED;
392 peerit->second.m_requested -= it->GetState() == State::REQUESTED;
393 m_index.get<Tag>().modify(it, std::move(modifier));
394 peerit->second.m_completed += it->GetState() == State::COMPLETED;
395 peerit->second.m_requested += it->GetState() == State::REQUESTED;
403 assert(it != m_index.get<ByTxHash>().end());
404 assert(it->GetState() == State::CANDIDATE_DELAYED);
406 Modify<ByTxHash>(
it, [](Announcement& ann){ ann.SetState(State::CANDIDATE_READY); });
411 auto it_next = std::next(it);
412 if (it_next == m_index.get<ByTxHash>().end() || it_next->m_txhash != it->m_txhash ||
413 it_next->GetState() == State::COMPLETED) {
416 Modify<ByTxHash>(
it, [](Announcement& ann){ ann.SetState(State::CANDIDATE_BEST); });
417 }
else if (it_next->GetState() == State::CANDIDATE_BEST) {
418 Priority priority_old = m_computer(*it_next);
419 Priority priority_new = m_computer(*it);
420 if (priority_new > priority_old) {
422 Modify<ByTxHash>(it_next, [](Announcement& ann){ ann.SetState(State::CANDIDATE_READY); });
423 Modify<ByTxHash>(
it, [](Announcement& ann){ ann.SetState(State::CANDIDATE_BEST); });
432 assert(new_state == State::COMPLETED || new_state == State::CANDIDATE_DELAYED);
433 assert(it != m_index.get<ByTxHash>().end());
434 if (it->IsSelected() && it != m_index.get<ByTxHash>().begin()) {
435 auto it_prev = std::prev(it);
438 if (it_prev->m_txhash == it->m_txhash && it_prev->GetState() == State::CANDIDATE_READY) {
440 Modify<ByTxHash>(it_prev, [](Announcement& ann){ ann.SetState(State::CANDIDATE_BEST); });
443 Modify<ByTxHash>(
it, [new_state](Announcement& ann){ ann.SetState(new_state); });
449 assert(it != m_index.get<ByTxHash>().end());
450 assert(it->GetState() != State::COMPLETED);
454 if (it != m_index.get<ByTxHash>().begin() && std::prev(it)->m_txhash == it->m_txhash)
return false;
457 if (std::next(it) != m_index.get<ByTxHash>().end() && std::next(it)->m_txhash == it->m_txhash &&
458 std::next(it)->GetState() != State::COMPLETED)
return false;
468 assert(it != m_index.get<ByTxHash>().end());
471 if (it->GetState() == State::COMPLETED)
return true;
473 if (IsOnlyNonCompleted(it)) {
477 it = Erase<ByTxHash>(
it);
478 }
while (it != m_index.get<ByTxHash>().end() && it->m_txhash == txhash);
484 ChangeAndReselect(it, State::COMPLETED);
493 void SetTimePoint(std::chrono::microseconds
now, std::vector<std::pair<NodeId, GenTxid>>* expired)
495 if (expired) expired->clear();
499 while (!m_index.empty()) {
500 auto it = m_index.get<ByTime>().begin();
501 if (
it->GetState() == State::CANDIDATE_DELAYED &&
it->m_time <=
now) {
502 PromoteCandidateReady(m_index.project<ByTxHash>(
it));
503 }
else if (
it->GetState() == State::REQUESTED &&
it->m_time <=
now) {
504 if (expired) expired->emplace_back(
it->m_peer,
ToGenTxid(*
it));
505 MakeCompleted(m_index.project<ByTxHash>(
it));
511 while (!m_index.empty()) {
515 auto it = std::prev(m_index.get<ByTime>().end());
516 if (
it->IsSelectable() &&
it->m_time >
now) {
517 ChangeAndReselect(m_index.project<ByTxHash>(
it), State::CANDIDATE_DELAYED);
525 explicit Impl(
bool deterministic) :
526 m_computer(deterministic),
528 m_index(
boost::make_tuple(
529 boost::make_tuple(ByPeerViewExtractor(),
std::less<ByPeerView>()),
530 boost::make_tuple(ByTxHashViewExtractor(m_computer),
std::less<ByTxHashView>()),
531 boost::make_tuple(ByTimeViewExtractor(),
std::less<ByTimeView>())
536 Impl& operator=(
const Impl&) =
delete;
540 auto& index = m_index.get<ByPeer>();
542 while (it != index.end() && it->m_peer == peer) {
556 auto it_next = (std::next(it) == index.end() || std::next(it)->m_peer != peer) ? index.end() :
561 if (MakeCompleted(m_index.project<ByTxHash>(it))) {
571 auto it = m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0});
572 while (
it != m_index.get<ByTxHash>().end() &&
it->m_txhash == txhash) {
573 it = Erase<ByTxHash>(
it);
578 std::chrono::microseconds reqtime)
583 if (m_index.get<ByPeer>().count(ByPeerView{peer, true, gtxid.GetHash()}))
return;
588 auto ret = m_index.get<ByPeer>().emplace(gtxid, peer, preferred, reqtime, m_current_sequence);
589 if (!ret.second)
return;
592 ++m_peerinfo[peer].m_total;
593 ++m_current_sequence;
598 std::vector<std::pair<NodeId, GenTxid>>* expired)
601 SetTimePoint(now, expired);
604 std::vector<const Announcement*> selected;
605 auto it_peer = m_index.get<ByPeer>().lower_bound(ByPeerView{peer,
true,
uint256::ZERO});
606 while (it_peer != m_index.get<ByPeer>().end() && it_peer->m_peer == peer &&
607 it_peer->GetState() == State::CANDIDATE_BEST) {
608 selected.emplace_back(&*it_peer);
613 std::sort(selected.begin(), selected.end(), [](
const Announcement* a,
const Announcement* b) {
614 return a->m_sequence < b->m_sequence;
618 std::vector<GenTxid> ret;
619 ret.reserve(selected.size());
620 std::transform(selected.begin(), selected.end(), std::back_inserter(ret), [](
const Announcement* ann) {
628 auto it = m_index.get<ByPeer>().find(ByPeerView{peer,
true, txhash});
629 if (
it == m_index.get<ByPeer>().end()) {
635 it = m_index.get<ByPeer>().find(ByPeerView{peer,
false, txhash});
636 if (
it == m_index.get<ByPeer>().end() || (
it->GetState() != State::CANDIDATE_DELAYED &&
637 it->GetState() != State::CANDIDATE_READY)) {
647 auto it_old = m_index.get<ByTxHash>().lower_bound(ByTxHashView{txhash, State::CANDIDATE_BEST, 0});
648 if (it_old != m_index.get<ByTxHash>().end() && it_old->m_txhash == txhash) {
649 if (it_old->GetState() == State::CANDIDATE_BEST) {
656 Modify<ByTxHash>(it_old, [](Announcement& ann) { ann.SetState(State::CANDIDATE_READY); });
657 }
else if (it_old->GetState() == State::REQUESTED) {
660 Modify<ByTxHash>(it_old, [](Announcement& ann) { ann.SetState(State::COMPLETED); });
665 Modify<ByPeer>(
it, [expiry](Announcement& ann) {
666 ann.SetState(State::REQUESTED);
674 auto it = m_index.get<ByPeer>().find(ByPeerView{peer,
false, txhash});
675 if (
it == m_index.get<ByPeer>().end()) {
676 it = m_index.get<ByPeer>().find(ByPeerView{peer,
true, txhash});
678 if (
it != m_index.get<ByPeer>().end()) MakeCompleted(m_index.project<ByTxHash>(
it));
683 auto it = m_peerinfo.find(peer);
684 if (
it != m_peerinfo.end())
return it->second.m_requested;
690 auto it = m_peerinfo.find(peer);
691 if (
it != m_peerinfo.end())
return it->second.m_total -
it->second.m_requested -
it->second.m_completed;
697 auto it = m_peerinfo.find(peer);
698 if (
it != m_peerinfo.end())
return it->second.m_total;
703 size_t Size()
const {
return m_index.size(); }
708 return uint64_t{m_computer(txhash, peer, preferred)};
714 m_impl{MakeUnique<TxRequestTracker::Impl>(deterministic)} {}
728 m_impl->PostGetRequestableSanityCheck(now);
732 std::chrono::microseconds reqtime)
734 m_impl->ReceivedInv(peer, gtxid, preferred, reqtime);
739 m_impl->RequestedTx(peer, txhash, expiry);
744 m_impl->ReceivedResponse(peer, txhash);
748 std::vector<std::pair<NodeId, GenTxid>>* expired)
750 return m_impl->GetRequestable(peer, now, expired);
755 return m_impl->ComputePriority(txhash, peer, preferred);
void ReceivedResponse(NodeId peer, const uint256 &txhash)
Converts a CANDIDATE or REQUESTED announcement to a COMPLETED one.
peer m_getdata_requests erase(peer.m_getdata_requests.begin(), it)
uint64_t GetRand(uint64_t nMax) noexcept
Generate a uniform random integer in the range [0..range).
std::deque< CInv >::iterator it
void ReceivedResponse(NodeId peer, const uint256 &txhash)
const std::chrono::seconds now
size_t Count(NodeId peer) const
Count how many announcements a peer has (REQUESTED, CANDIDATE, and COMPLETED combined).
CSipHasher & Write(uint64_t data)
Hash a 64-bit integer worth of data It is treated as if this was the little-endian interpretation of ...
bool operator==(const CNetAddr &a, const CNetAddr &b)
std::vector< GenTxid > GetRequestable(NodeId peer, std::chrono::microseconds now, std::vector< std::pair< NodeId, GenTxid >> *expired=nullptr)
Find the txids to request now from peer.
void ReceivedInv(NodeId peer, const GenTxid >xid, bool preferred, std::chrono::microseconds reqtime)
Adds a new CANDIDATE announcement.
size_t CountInFlight(NodeId peer) const
Count how many REQUESTED announcements a peer has.
bool MakeCompleted(Iter< ByTxHash > it)
Convert any announcement to a COMPLETED one.
void SetTimePoint(std::chrono::microseconds now, std::vector< std::pair< NodeId, GenTxid >> *expired)
Make the data structure consistent with a given point in time:
std::vector< GenTxid > GetRequestable(NodeId peer, std::chrono::microseconds now, std::vector< std::pair< NodeId, GenTxid >> *expired)
Find the GenTxids to request now from peer.
bool IsOnlyNonCompleted(Iter< ByTxHash > it)
Check if 'it' is the only announcement for a given txhash that isn't COMPLETED.
void ReceivedInv(NodeId peer, const GenTxid >xid, bool preferred, std::chrono::microseconds reqtime)
const uint256 & GetHash() const
const PriorityComputer m_computer
This tracker's priority computer.
size_t Count(NodeId peer) const
State
The various states a (txhash,peer) pair can be in.
void Modify(Iter< Tag > it, Modifier modifier)
Wrapper around Index::...::modify that keeps m_peerinfo up to date.
size_t Size() const
Count how many announcements are being tracked in total across all peers and transaction hashes...
void DisconnectedPeer(NodeId peer)
size_t CountInFlight(NodeId peer) const
void DisconnectedPeer(NodeId peer)
Deletes all announcements for a given peer.
GenTxid ToGenTxid(const CInv &inv)
Convert a TX/WITNESS_TX/WTX CInv to a GenTxid.
void SanityCheck() const
Run internal consistency check (testing only).
size_t CountCandidates(NodeId peer) const
TxRequestTracker(bool deterministic=false)
Construct a TxRequestTracker.
static const uint256 ZERO
size_t Size() const
Count how many announcements are being tracked in total across all peers and transactions.
uint64_t Finalize() const
Compute the 64-bit SipHash-2-4 of the data written so far.
void RequestedTx(NodeId peer, const uint256 &txhash, std::chrono::microseconds expiry)
Marks a transaction as requested, with a specified expiry.
unsigned int size() const
Index m_index
This tracker's main data structure. See SanityCheck() for the invariants that apply to it...
void PostGetRequestableSanityCheck(std::chrono::microseconds now) const
void PromoteCandidateReady(Iter< ByTxHash > it)
Convert a CANDIDATE_DELAYED announcement into a CANDIDATE_READY.
std::unordered_map< NodeId, PeerInfo > m_peerinfo
Map with this tracker's per-peer statistics.
void PostGetRequestableSanityCheck(std::chrono::microseconds now) const
Run a time-dependent internal consistency check (testing only).
void ChangeAndReselect(Iter< ByTxHash > it, State new_state)
Change the state of an announcement to something non-IsSelected().
size_t CountCandidates(NodeId peer) const
Count how many CANDIDATE announcements a peer has.
uint64_t ComputePriority(const uint256 &txhash, NodeId peer, bool preferred) const
Access to the internal priority computation (testing only)
void RequestedTx(NodeId peer, const uint256 &txhash, std::chrono::microseconds expiry)
void ForgetTxHash(const uint256 &txhash)
uint64_t ComputePriority(const uint256 &txhash, NodeId peer, bool preferred) const
const std::unique_ptr< Impl > m_impl
A generic txid reference (txid or wtxid).
Actual implementation for TxRequestTracker's data structure.
void ForgetTxHash(const uint256 &txhash)
Deletes all announcements for a given txhash (both txid and wtxid ones).
Iter< Tag > Erase(Iter< Tag > it)
Wrapper around Index::...erase that keeps m_peerinfo up to date.