Skip to content

Commit

Permalink
Limit max length of knownTransactions. #19
Browse files Browse the repository at this point in the history
  • Loading branch information
dcaoyuan committed Sep 8, 2019
1 parent 621ee01 commit 8573025
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
22 changes: 14 additions & 8 deletions khipu-eth/src/main/scala/khipu/network/PeerEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class PeerEntity(peer: Peer) extends Actor with Timers with ActorLogging {
/**
* stores information which tx hashes are "known" by which peers Seq[peerId]
*/
private val knownTransactions = mutable.HashSet[Hash]()
private val MAX_KNOWN_TRANSACTIONS = 20000
private var knownTransactions = mutable.LinkedHashMap[Hash, Long]()

val mediator = DistributedPubSub(context.system).mediator
mediator ! Subscribe(sync.TxTopic, self)
Expand Down Expand Up @@ -120,22 +121,23 @@ class PeerEntity(peer: Peer) extends Actor with Timers with ActorLogging {
case BroadcastNewBlocks(newBlocks) => // from khipu.NewBlockTopic
for {
pi <- peerInfo
newBlock <- newBlocks if (shouldSendNewBlock(newBlock, pi))
newBlock <- newBlocks if shouldSendNewBlock(newBlock, pi)
} {
self ! PeerEntity.MessageToPeer(peer.id, newBlock)
}

case BroadcastTransactions(transactions) => // from khipu.TxTopic
val transactionsNonKnown = transactions.filterNot(isTxKnown)
val transactionsNonKnown = transactions.filterNot(x => knownTransactions.contains(x.hash))
if (transactionsNonKnown.nonEmpty) {
// TODO broadcast it only after fast-sync done? otherwise seems will cause
// too busy to request/respond
// During fast-sync, the knownTransactions may always not be processed and thus won't be removed
// TODO broadcast it only after fast-sync done? otherwise seems will cause too busy to request/respond

//self ! PeerEntity.MessageToPeer(peerId, SignedTransactions(transactionsNonKnown))
setTxKnown(transactionsNonKnown)
}

case ProcessedTransactions(transactions) => // from khipu.TxTopic
knownTransactions.filterNot(transactions.map(_.hash).contains)
knownTransactions --= transactions.map(_.hash)

case PeerHandshaked(peer, peerInfo) =>
this.peerInfo = Some(peerInfo)
Expand Down Expand Up @@ -247,9 +249,13 @@ class PeerEntity(peer: Peer) extends Actor with Timers with ActorLogging {
private def isTxKnown(transactions: SignedTransaction): Boolean =
knownTransactions.contains(transactions.hash)

// !!!!! TODO - how to avoid memory leak when some entries in knownTransactions are never removed
private def setTxKnown(transactions: Seq[SignedTransaction]) {
knownTransactions ++= transactions.map(_.hash)
knownTransactions ++= transactions.map(_.hash -> System.currentTimeMillis)
// limit max size to avoid memory leak when some entries in knownTransactions are never removed
val exceedLimit = knownTransactions.size - MAX_KNOWN_TRANSACTIONS
if (exceedLimit > 0) {
knownTransactions = knownTransactions.drop(exceedLimit)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,26 @@ final class PendingTransactionsService(txPoolConfig: TxPoolConfig) extends Actor
addTransactions(signedTransactions.toList)
}

def addTransactions(signedTransactions: List[SignedTransaction]) {
private def addTransactions(signedTransactions: List[SignedTransaction]) {
val transactionsToAdd = signedTransactions.filterNot(stx => pendingTransactions.contains(stx.hash))
if (transactionsToAdd.nonEmpty) {
val timestamp = System.currentTimeMillis
pendingTransactions = (transactionsToAdd.map(PendingTransaction(_, timestamp)) ::: pendingTransactions).take(txPoolConfig.txPoolSize)
pendingTransactions = (transactionsToAdd.map(PendingTransaction(_, System.currentTimeMillis)) ::: pendingTransactions).take(txPoolConfig.txPoolSize)

broadcastNewTransactions(transactionsToAdd)
}
}

def addOrOverrideTransaction(newTx: SignedTransaction) {
private def addOrOverrideTransaction(newTx: SignedTransaction) {
val txsWithoutObsoletes = pendingTransactions.filterNot { ptx =>
ptx.stx.sender == newTx.sender && ptx.stx.tx.nonce == newTx.tx.nonce
}

val timestamp = System.currentTimeMillis()
pendingTransactions = (PendingTransaction(newTx, timestamp) :: txsWithoutObsoletes).take(txPoolConfig.txPoolSize)
pendingTransactions = (PendingTransaction(newTx, System.currentTimeMillis) :: txsWithoutObsoletes).take(txPoolConfig.txPoolSize)

broadcastNewTransactions(List(newTx))
}

def broadcastNewTransactions(signedTransactions: Seq[SignedTransaction]) = {
private def broadcastNewTransactions(signedTransactions: Seq[SignedTransaction]) = {
val ptxHashs = pendingTransactions.map(_.stx.hash).toSet
val txsToNotify = signedTransactions.filter(tx => ptxHashs.contains(tx.hash)) // signed transactions that are still pending
if (txsToNotify.nonEmpty) {
Expand Down

0 comments on commit 8573025

Please sign in to comment.