diff --git a/src/node/txdownloadman_impl.cpp b/src/node/txdownloadman_impl.cpp index fe961048d34..8933e88a87f 100644 --- a/src/node/txdownloadman_impl.cpp +++ b/src/node/txdownloadman_impl.cpp @@ -179,23 +179,21 @@ bool TxDownloadManagerImpl::AddTxAnnouncement(NodeId peer, const GenTxid& gtxid, // - exists in orphanage // - peer can be an orphan resolution candidate if (gtxid.IsWtxid()) { - if (auto orphan_tx{m_orphanage.GetTx(Wtxid::FromUint256(gtxid.GetHash()))}) { + const auto wtxid{Wtxid::FromUint256(gtxid.GetHash())}; + if (auto orphan_tx{m_orphanage.GetTx(wtxid)}) { auto unique_parents{GetUniqueParents(*orphan_tx)}; std::erase_if(unique_parents, [&](const auto& txid){ return AlreadyHaveTx(GenTxid::Txid(txid), /*include_reconsiderable=*/false); }); - if (unique_parents.empty()) return true; - - if (auto delay{OrphanResolutionCandidate(peer, Wtxid::FromUint256(gtxid.GetHash()), unique_parents.size())}) { - m_orphanage.AddAnnouncer(Wtxid::FromUint256(gtxid.GetHash()), peer); - - const auto& info = m_peer_info.at(peer).m_connection_info; - for (const auto& parent_txid : unique_parents) { - m_txrequest.ReceivedInv(peer, GenTxid::Txid(parent_txid), info.m_preferred, now + *delay); - } + // The missing parents may have all been rejected or accepted since the orphan was added to the orphanage. + // Do not delete from the orphanage, as it may be queued for processing. + if (unique_parents.empty()) { + return true; + } - LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", peer, gtxid.GetHash().ToString()); + if (MaybeAddOrphanResolutionCandidate(unique_parents, wtxid, peer, now)) { + m_orphanage.AddAnnouncer(orphan_tx->GetWitnessHash(), peer); } // Return even if the peer isn't an orphan resolution candidate. This would be caught by AlreadyHaveTx. @@ -231,13 +229,15 @@ bool TxDownloadManagerImpl::AddTxAnnouncement(NodeId peer, const GenTxid& gtxid, return false; } -std::optional TxDownloadManagerImpl::OrphanResolutionCandidate(NodeId nodeid, const Wtxid& orphan_wtxid, size_t num_parents) +bool TxDownloadManagerImpl::MaybeAddOrphanResolutionCandidate(const std::vector& unique_parents, const Wtxid& wtxid, NodeId nodeid, std::chrono::microseconds now) { - if (m_peer_info.count(nodeid) == 0) return std::nullopt; - if (m_orphanage.HaveTxFromPeer(orphan_wtxid, nodeid)) return std::nullopt; + auto it_peer = m_peer_info.find(nodeid); + if (it_peer == m_peer_info.end()) return false; + if (m_orphanage.HaveTxFromPeer(wtxid, nodeid)) return false; const auto& peer_entry = m_peer_info.at(nodeid); const auto& info = peer_entry.m_connection_info; + // TODO: add delays and limits based on the amount of orphan resolution we are already doing // with this peer, how much they are using the orphanage, etc. if (!info.m_relay_permissions) { @@ -245,7 +245,7 @@ std::optional TxDownloadManagerImpl::OrphanResolutionCandi // existing behavior: drop if we are tracking too many invs for this peer already. Each // orphan resolution involves at least 1 transaction request which may or may not be // currently tracked in m_txrequest, so we include that in the count. - if (m_txrequest.Count(nodeid) + num_parents > MAX_PEER_TX_ANNOUNCEMENTS) return std::nullopt; + if (m_txrequest.Count(nodeid) + unique_parents.size() > MAX_PEER_TX_ANNOUNCEMENTS) return false; } std::chrono::seconds delay{0s}; @@ -258,7 +258,13 @@ std::optional TxDownloadManagerImpl::OrphanResolutionCandi const bool overloaded = !info.m_relay_permissions && m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT; if (overloaded) delay += OVERLOADED_PEER_TX_DELAY; - return delay; + // Treat finding orphan resolution candidate as equivalent to the peer announcing all missing parents. + // In the future, orphan resolution may include more explicit steps + for (const auto& parent_txid : unique_parents) { + m_txrequest.ReceivedInv(nodeid, GenTxid::Txid(parent_txid), info.m_preferred, now + delay); + } + LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", nodeid, wtxid.ToString()); + return true; } std::vector TxDownloadManagerImpl::GetRequestsToSend(NodeId nodeid, std::chrono::microseconds current_time) @@ -327,7 +333,7 @@ void TxDownloadManagerImpl::MempoolAcceptedTx(const CTransactionRef& tx) m_txrequest.ForgetTxHash(tx->GetHash()); m_txrequest.ForgetTxHash(tx->GetWitnessHash()); - m_orphanage.AddChildrenToWorkSet(*tx); + m_orphanage.AddChildrenToWorkSet(*tx, m_opts.m_rng); // If it came from the orphanage, remove it. No-op if the tx is not in txorphanage. m_orphanage.EraseTx(tx->GetWitnessHash()); } @@ -400,27 +406,19 @@ node::RejectedTxTodo TxDownloadManagerImpl::MempoolRejectedTx(const CTransaction // means it was already added to vExtraTxnForCompact. add_extra_compact_tx &= !m_orphanage.HaveTx(wtxid); - auto add_orphan_reso_candidate = [&](const CTransactionRef& orphan_tx, const std::vector& unique_parents, NodeId nodeid, std::chrono::microseconds now) { - const auto& wtxid = orphan_tx->GetWitnessHash(); - if (auto delay{OrphanResolutionCandidate(nodeid, wtxid, unique_parents.size())}) { - const auto& info = m_peer_info.at(nodeid).m_connection_info; - m_orphanage.AddTx(orphan_tx, nodeid); - - // Treat finding orphan resolution candidate as equivalent to the peer announcing all missing parents - // In the future, orphan resolution may include more explicit steps - for (const auto& parent_txid : unique_parents) { - m_txrequest.ReceivedInv(nodeid, GenTxid::Txid(parent_txid), info.m_preferred, now + *delay); - } - LogDebug(BCLog::TXPACKAGES, "added peer=%d as a candidate for resolving orphan %s\n", nodeid, wtxid.ToString()); - } - }; - // If there is no candidate for orphan resolution, AddTx will not be called. This means // that if a peer is overloading us with invs and orphans, they will eventually not be // able to add any more transactions to the orphanage. - add_orphan_reso_candidate(ptx, unique_parents, nodeid, now); - for (const auto& candidate : m_txrequest.GetCandidatePeers(ptx)) { - add_orphan_reso_candidate(ptx, unique_parents, candidate, now); + // + // Search by txid and, if the tx has a witness, wtxid + std::vector orphan_resolution_candidates{nodeid}; + m_txrequest.GetCandidatePeers(ptx->GetHash().ToUint256(), orphan_resolution_candidates); + if (ptx->HasWitness()) m_txrequest.GetCandidatePeers(ptx->GetWitnessHash().ToUint256(), orphan_resolution_candidates); + + for (const auto& nodeid : orphan_resolution_candidates) { + if (MaybeAddOrphanResolutionCandidate(unique_parents, ptx->GetWitnessHash(), nodeid, now)) { + m_orphanage.AddTx(ptx, nodeid); + } } // Once added to the orphan pool, a tx is considered AlreadyHave, and we shouldn't request it anymore. diff --git a/src/node/txdownloadman_impl.h b/src/node/txdownloadman_impl.h index ab563b22415..d9688d0c8d8 100644 --- a/src/node/txdownloadman_impl.h +++ b/src/node/txdownloadman_impl.h @@ -194,11 +194,11 @@ class TxDownloadManagerImpl { /** Helper for getting deduplicated vector of Txids in vin. */ std::vector GetUniqueParents(const CTransaction& tx); - /** Determine candidacy (and delay) for potential orphan resolution candidate. - * @returns delay for orphan resolution if this peer is a good candidate for orphan resolution, - * std::nullopt if this peer cannot be added because it has reached download/orphanage limits. + /** If this peer is an orphan resolution candidate for this transaction, treat the unique_parents as announced by + * this peer; add them as new invs to m_txrequest. + * @returns whether this transaction was a valid orphan resolution candidate. * */ - std::optional OrphanResolutionCandidate(NodeId nodeid, const Wtxid& orphan_wtxid, size_t num_parents); + bool MaybeAddOrphanResolutionCandidate(const std::vector& unique_parents, const Wtxid& wtxid, NodeId nodeid, std::chrono::microseconds now); }; } // namespace node #endif // BITCOIN_NODE_TXDOWNLOADMAN_IMPL_H diff --git a/src/test/fuzz/txorphan.cpp b/src/test/fuzz/txorphan.cpp index 61c2308a296..3a025f62796 100644 --- a/src/test/fuzz/txorphan.cpp +++ b/src/test/fuzz/txorphan.cpp @@ -33,7 +33,7 @@ void initialize_orphanage() FUZZ_TARGET(txorphan, .init = initialize_orphanage) { FuzzedDataProvider fuzzed_data_provider(buffer.data(), buffer.size()); - FastRandomContext limit_orphans_rng{/*fDeterministic=*/true}; + FastRandomContext orphanage_rng{/*fDeterministic=*/true}; SetMockTime(ConsumeTime(fuzzed_data_provider)); TxOrphanage orphanage; @@ -79,7 +79,7 @@ FUZZ_TARGET(txorphan, .init = initialize_orphanage) // previous loop and potentially the parent of this tx. if (ptx_potential_parent) { // Set up future GetTxToReconsider call. - orphanage.AddChildrenToWorkSet(*ptx_potential_parent); + orphanage.AddChildrenToWorkSet(*ptx_potential_parent, orphanage_rng); // Check that all txns returned from GetChildrenFrom* are indeed a direct child of this tx. NodeId peer_id = fuzzed_data_provider.ConsumeIntegral(); @@ -154,7 +154,7 @@ FUZZ_TARGET(txorphan, .init = initialize_orphanage) // test mocktime and expiry SetMockTime(ConsumeTime(fuzzed_data_provider)); auto limit = fuzzed_data_provider.ConsumeIntegral(); - orphanage.LimitOrphans(limit, limit_orphans_rng); + orphanage.LimitOrphans(limit, orphanage_rng); Assert(orphanage.Size() <= limit); }); diff --git a/src/test/fuzz/txrequest.cpp b/src/test/fuzz/txrequest.cpp index 74d20f86e04..931ddf03288 100644 --- a/src/test/fuzz/txrequest.cpp +++ b/src/test/fuzz/txrequest.cpp @@ -295,6 +295,19 @@ class Tester tracked += m_announcements[txhash][peer].m_state != State::NOTHING; inflight += m_announcements[txhash][peer].m_state == State::REQUESTED; candidates += m_announcements[txhash][peer].m_state == State::CANDIDATE; + + std::bitset expected_announcers; + for (int peer = 0; peer < MAX_PEERS; ++peer) { + if (m_announcements[txhash][peer].m_state == State::CANDIDATE || m_announcements[txhash][peer].m_state == State::REQUESTED) { + expected_announcers[peer] = true; + } + } + std::vector candidate_peers; + m_tracker.GetCandidatePeers(TXHASHES[txhash], candidate_peers); + assert(expected_announcers.count() == candidate_peers.size()); + for (const auto& peer : candidate_peers) { + assert(expected_announcers[peer]); + } } assert(m_tracker.Count(peer) == tracked); assert(m_tracker.CountInFlight(peer) == inflight); diff --git a/src/test/orphanage_tests.cpp b/src/test/orphanage_tests.cpp index f30d4b402f8..9beb24f7dff 100644 --- a/src/test/orphanage_tests.cpp +++ b/src/test/orphanage_tests.cpp @@ -532,19 +532,27 @@ BOOST_AUTO_TEST_CASE(peer_worksets) BOOST_CHECK(orphanage.HaveTxFromPeer(orphan_wtxid, node)); } - // Parent accepted: add child to all 3 worksets. - orphanage.AddChildrenToWorkSet(*tx_missing_parent); - BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node0), tx_orphan); - BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node1), tx_orphan); - // Don't call GetTxToReconsider(node2) yet because it mutates the workset. + // Parent accepted: child is added to 1 of 3 worksets. + orphanage.AddChildrenToWorkSet(*tx_missing_parent, det_rand); + int node0_reconsider = orphanage.HaveTxToReconsider(node0); + int node1_reconsider = orphanage.HaveTxToReconsider(node1); + int node2_reconsider = orphanage.HaveTxToReconsider(node2); + BOOST_CHECK_EQUAL(node0_reconsider + node1_reconsider + node2_reconsider, 1); + + NodeId assigned_peer; + if (node0_reconsider) { + assigned_peer = node0; + } else if (node1_reconsider) { + assigned_peer = node1; + } else { + BOOST_CHECK(node2_reconsider); + assigned_peer = node2; + } // EraseForPeer also removes that tx from the workset. - orphanage.EraseForPeer(node0); + orphanage.EraseForPeer(assigned_peer); BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node0), nullptr); - // However, the other peers' worksets are not touched. - BOOST_CHECK_EQUAL(orphanage.GetTxToReconsider(node2), tx_orphan); - // Delete this tx, clearing the orphanage. BOOST_CHECK_EQUAL(orphanage.EraseTx(orphan_wtxid), 1); BOOST_CHECK_EQUAL(orphanage.Size(), 0); diff --git a/src/txorphanage.cpp b/src/txorphanage.cpp index 07f7fa021fd..06b6ab4af20 100644 --- a/src/txorphanage.cpp +++ b/src/txorphanage.cpp @@ -152,7 +152,7 @@ void TxOrphanage::LimitOrphans(unsigned int max_orphans, FastRandomContext& rng) if (nEvicted > 0) LogDebug(BCLog::TXPACKAGES, "orphanage overflow, removed %u tx\n", nEvicted); } -void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx) +void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx, FastRandomContext& rng) { for (unsigned int i = 0; i < tx.vout.size(); i++) { const auto it_by_prev = m_outpoint_to_orphan_it.find(COutPoint(tx.GetHash(), i)); @@ -160,15 +160,21 @@ void TxOrphanage::AddChildrenToWorkSet(const CTransaction& tx) for (const auto& elem : it_by_prev->second) { // Belt and suspenders, each orphan should always have at least 1 announcer. if (!Assume(!elem->second.announcers.empty())) continue; - for (const auto announcer: elem->second.announcers) { - // Get this source peer's work set, emplacing an empty set if it didn't exist - // (note: if this peer wasn't still connected, we would have removed the orphan tx already) - std::set& orphan_work_set = m_peer_work_set.try_emplace(announcer).first->second; - // Add this tx to the work set - orphan_work_set.insert(elem->first); - LogDebug(BCLog::TXPACKAGES, "added %s (wtxid=%s) to peer %d workset\n", - tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), announcer); - } + + // Select a random peer to assign orphan processing, reducing wasted work if the orphan is still missing + // inputs. However, we don't want to create an issue in which the assigned peer can purposefully stop us + // from processing the orphan by disconnecting. + auto announcer_iter = std::begin(elem->second.announcers); + std::advance(announcer_iter, rng.randrange(elem->second.announcers.size())); + auto announcer = *(announcer_iter); + + // Get this source peer's work set, emplacing an empty set if it didn't exist + // (note: if this peer wasn't still connected, we would have removed the orphan tx already) + std::set& orphan_work_set = m_peer_work_set.try_emplace(announcer).first->second; + // Add this tx to the work set + orphan_work_set.insert(elem->first); + LogDebug(BCLog::TXPACKAGES, "added %s (wtxid=%s) to peer %d workset\n", + tx.GetHash().ToString(), tx.GetWitnessHash().ToString(), announcer); } } } diff --git a/src/txorphanage.h b/src/txorphanage.h index 868741e7890..4205da01994 100644 --- a/src/txorphanage.h +++ b/src/txorphanage.h @@ -62,7 +62,7 @@ class TxOrphanage { void LimitOrphans(unsigned int max_orphans, FastRandomContext& rng); /** Add any orphans that list a particular tx as a parent into the from peer's work set */ - void AddChildrenToWorkSet(const CTransaction& tx); + void AddChildrenToWorkSet(const CTransaction& tx, FastRandomContext& rng); /** Does this peer have any work to do? */ bool HaveTxToReconsider(NodeId peer); diff --git a/src/txrequest.cpp b/src/txrequest.cpp index ca68a998680..5909146427b 100644 --- a/src/txrequest.cpp +++ b/src/txrequest.cpp @@ -574,21 +574,13 @@ class TxRequestTracker::Impl { } } - std::vector GetCandidatePeers(const CTransactionRef& tx) const + void GetCandidatePeers(const uint256& txhash, std::vector& result_peers) const { - // Search by txid and, if the tx has a witness, wtxid - std::vector hashes{tx->GetHash().ToUint256()}; - if (tx->HasWitness()) hashes.emplace_back(tx->GetWitnessHash().ToUint256()); - - std::vector result_peers; - for (const uint256& txhash : hashes) { - auto it = m_index.get().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0}); - while (it != m_index.get().end() && it->m_txhash == txhash && it->GetState() != State::COMPLETED) { - result_peers.push_back(it->m_peer); - ++it; - } + auto it = m_index.get().lower_bound(ByTxHashView{txhash, State::CANDIDATE_DELAYED, 0}); + while (it != m_index.get().end() && it->m_txhash == txhash && it->GetState() != State::COMPLETED) { + result_peers.push_back(it->m_peer); + ++it; } - return result_peers; } void ReceivedInv(NodeId peer, const GenTxid& gtxid, bool preferred, @@ -738,7 +730,7 @@ size_t TxRequestTracker::CountInFlight(NodeId peer) const { return m_impl->Count size_t TxRequestTracker::CountCandidates(NodeId peer) const { return m_impl->CountCandidates(peer); } size_t TxRequestTracker::Count(NodeId peer) const { return m_impl->Count(peer); } size_t TxRequestTracker::Size() const { return m_impl->Size(); } -std::vector TxRequestTracker::GetCandidatePeers(const CTransactionRef& tx) const { return m_impl->GetCandidatePeers(tx); } +void TxRequestTracker::GetCandidatePeers(const uint256& txhash, std::vector& result_peers) const { return m_impl->GetCandidatePeers(txhash, result_peers); } void TxRequestTracker::SanityCheck() const { m_impl->SanityCheck(); } void TxRequestTracker::PostGetRequestableSanityCheck(std::chrono::microseconds now) const diff --git a/src/txrequest.h b/src/txrequest.h index 95a1e9e7f6b..fe793210935 100644 --- a/src/txrequest.h +++ b/src/txrequest.h @@ -195,8 +195,9 @@ class TxRequestTracker { /** Count how many announcements are being tracked in total across all peers and transaction hashes. */ size_t Size() const; - /** For some tx return all peers with non-COMPLETED announcements for its txid or wtxid. The resulting vector may contain duplicate NodeIds. */ - std::vector GetCandidatePeers(const CTransactionRef& tx) const; + /** For some txhash (txid or wtxid), finds all peers with non-COMPLETED announcements and appends them to + * result_peers. Does not try to ensure that result_peers contains no duplicates. */ + void GetCandidatePeers(const uint256& txhash, std::vector& result_peers) const; /** Access to the internal priority computation (testing only) */ uint64_t ComputePriority(const uint256& txhash, NodeId peer, bool preferred) const; diff --git a/test/functional/p2p_orphan_handling.py b/test/functional/p2p_orphan_handling.py index dc58a220682..0805764a8ee 100755 --- a/test/functional/p2p_orphan_handling.py +++ b/test/functional/p2p_orphan_handling.py @@ -66,8 +66,8 @@ def wrapper(self): class PeerTxRelayer(P2PTxInvStore): """A P2PTxInvStore that also remembers all of the getdata and tx messages it receives.""" - def __init__(self): - super().__init__() + def __init__(self, wtxidrelay=True): + super().__init__(wtxidrelay=wtxidrelay) self._tx_received = [] self._getdata_received = [] @@ -402,7 +402,7 @@ def test_orphan_inherit_rejection(self): node = self.nodes[0] peer1 = node.add_p2p_connection(PeerTxRelayer()) peer2 = node.add_p2p_connection(PeerTxRelayer()) - peer3 = node.add_p2p_connection(PeerTxRelayer()) + peer3 = node.add_p2p_connection(PeerTxRelayer(wtxidrelay=False)) self.log.info("Test that an orphan with rejected parents, along with any descendants, cannot be retried with an alternate witness") parent_low_fee_nonsegwit = self.wallet_nonsegwit.create_self_transfer(fee_rate=0) @@ -776,16 +776,18 @@ def test_parents_change(self): assert tx_replacer_BC["txid"] in node.getrawmempool() node.sendrawtransaction(tx_replacer_C["hex"]) assert tx_replacer_BC["txid"] not in node.getrawmempool() + assert parent_peekaboo_AB["txid"] not in node.getrawmempool() assert tx_replacer_C["txid"] in node.getrawmempool() - # Second peer is an additional announcer for this orphan + # Second peer is an additional announcer for this orphan, but its missing parents are different from when it was + # previously announced. peer2 = node.add_p2p_connection(PeerTxRelayer()) peer2.send_and_ping(msg_inv([orphan_inv])) assert_equal(len(node.getorphantxs(verbosity=2)[0]["from"]), 2) # Disconnect peer1. peer2 should become the new candidate for orphan resolution. peer1.peer_disconnect() - node.bumpmocktime(NONPREF_PEER_TX_DELAY + TXID_RELAY_DELAY) + node.bumpmocktime(TXREQUEST_TIME_SKIP) self.wait_until(lambda: len(node.getorphantxs(verbosity=2)[0]["from"]) == 1) # Both parents should be requested, now that they are both missing. peer2.wait_for_parent_requests([int(parent_peekaboo_AB["txid"], 16), int(parent_missing["txid"], 16)]) diff --git a/test/functional/test_framework/p2p.py b/test/functional/test_framework/p2p.py index 523e1bd0682..207d19137b1 100755 --- a/test/functional/test_framework/p2p.py +++ b/test/functional/test_framework/p2p.py @@ -928,8 +928,8 @@ def send_txs_and_test(self, txs, node, *, success=True, expect_disconnect=False, class P2PTxInvStore(P2PInterface): """A P2PInterface which stores a count of how many times each txid has been announced.""" - def __init__(self): - super().__init__() + def __init__(self, **kwargs): + super().__init__(**kwargs) self.tx_invs_received = defaultdict(int) def on_inv(self, message):