Skip to content

Commit

Permalink
Try to get KesqueCompactor working properly. #8
Browse files Browse the repository at this point in the history
  • Loading branch information
dcaoyuan committed Mar 25, 2019
1 parent aa2d558 commit 1d772d8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 30 deletions.
26 changes: 12 additions & 14 deletions kesque/src/main/scala/kesque/HashKeyValueTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ object HashKeyValueTable {
// 0x80000000 - 1000 0000 0000 0000 0000 0000 0000 0000
// 0xC0000000 - 1100 0000 0000 0000 0000 0000 0000 0000
private val filenoToBitsHeader = Array(0x00000000, 0x80000000)
private def toMixedOffset(fileno: Int, offset: Int) = {
def toMixedOffset(fileno: Int, offset: Int) = {
val bitsHeader = filenoToBitsHeader(fileno)
bitsHeader | offset
}

private def toFileNoAndOffset(mixedOffset: Int) = {
def toFileNoAndOffset(mixedOffset: Int) = {
val fileno = mixedOffset >>> 31
val offset = mixedOffset & 0x7FFFFFFF // 0111 1111 1111 1111 1111 1111 1111 1111
(fileno, offset)
Expand All @@ -37,9 +37,9 @@ object HashKeyValueTable {

/**
* We use this table to read/write combination of snapshot (in file 0) and post-
* events. The offsets in caches and hashOffsets(index) are with 1st bit as the
* fileNo, where, the 0 1st bit indicates that the offset is of file 0, and the
* 1 1st bit indicates that the offset is of file 1.
* events. The offsets in caches and hashOffsets(index) both are with 1st bit as
* the fileNo, where, the 0 1st bit indicates that the offset is of file 0, and
* the value of 1 of the 1st bit indicates that the offset is of file 1.
* @see toFileOffset and toOffset
*/
final class HashKeyValueTable private[kesque] (
Expand Down Expand Up @@ -208,7 +208,6 @@ final class HashKeyValueTable private[kesque] (
case IntIntsMap.NO_VALUE => None
case mixedOffsets =>
var foundValue: Option[TVal] = None
var foundOffset = Int.MinValue
var i = mixedOffsets.length - 1 // loop backward to find the newest one
while (foundValue.isEmpty && i >= 0) {
val mixedOffset = mixedOffsets(i)
Expand All @@ -223,7 +222,6 @@ final class HashKeyValueTable private[kesque] (
val rec = recs.next
//debug(s"${rec.offset}")
if (rec.offset == offset && Arrays.equals(kesque.getBytes(rec.key), keyBytes)) {
foundOffset = offset
foundValue = if (rec.hasValue) Some(TVal(kesque.getBytes(rec.value), mixedOffset, rec.timestamp)) else None
}
}
Expand Down Expand Up @@ -256,7 +254,7 @@ final class HashKeyValueTable private[kesque] (
var recordBatches = Vector[(List[TKeyVal], List[SimpleRecord], Map[Hash, Int])]()
var tkvs = List[TKeyVal]()
var records = List[SimpleRecord]()
var keyToPrevOffsets = Map[Hash, Int]()
var keyToPrevMixedOffset = Map[Hash, Int]()
val itr = kvs.iterator
while (itr.hasNext) {
val tkv @ TKeyVal(keyBytes, value, offset, timestamp) = itr.next()
Expand All @@ -267,7 +265,7 @@ final class HashKeyValueTable private[kesque] (
val rec = if (timestamp < 0) new SimpleRecord(keyBytes, value) else new SimpleRecord(timestamp, keyBytes, value)
tkvs ::= tkv
records ::= rec
keyToPrevOffsets += key -> prevMixedOffset
keyToPrevMixedOffset += key -> prevMixedOffset
// TODO should only happen when value is set to empty, i.e. removed?
// remove records of prevOffset from memory?
} else {
Expand All @@ -281,16 +279,16 @@ final class HashKeyValueTable private[kesque] (
}

if (records.nonEmpty) {
recordBatches :+= (tkvs, records, keyToPrevOffsets)
recordBatches :+= (tkvs, records, keyToPrevMixedOffset)
}

debug(s"${recordBatches.map(x => x._1.size).mkString(",")}")

// write to log file
recordBatches map { case (tkvs, records, keyToPrevMixedOffsets) => writeRecords(tkvs, records, keyToPrevMixedOffsets, col, fileno) }
recordBatches map { case (tkvs, records, keyToPrevMixedOffset) => writeRecords(tkvs, records, keyToPrevMixedOffset, col, fileno) }
}

private def writeRecords(tkvs: List[TKeyVal], records: List[SimpleRecord], keyToPrevMixedOffsets: Map[Hash, Int], col: Int, fileno: Int): Iterable[Int] = {
private def writeRecords(tkvs: List[TKeyVal], records: List[SimpleRecord], keyToPrevMixedOffset: Map[Hash, Int], col: Int, fileno: Int): Iterable[Int] = {
try {
writeLock.lock()

Expand All @@ -312,7 +310,7 @@ final class HashKeyValueTable private[kesque] (
val indexRecord = new SimpleRecord(intToBytes(hash), intToBytes(offset))

val mixedOffset = toMixedOffset(fileno, offset)
keyToPrevMixedOffsets.get(key) match {
keyToPrevMixedOffset.get(key) match {
case Some(prevMixedOffset) => // there is prevOffset, will also remove it (replace it with current one)
hashOffsets.replace(hash, prevMixedOffset, mixedOffset, col)
case None => // there is none prevOffset
Expand Down Expand Up @@ -415,7 +413,7 @@ final class HashKeyValueTable private[kesque] (
val key = Hash(keyBytes)
val hash = key.hashCode
hashOffsets.get(hash, col) match {
case IntIntsMap.NO_VALUE => None
case IntIntsMap.NO_VALUE =>
case mixedOffsets =>
var found = false
var i = 0
Expand Down
7 changes: 2 additions & 5 deletions kesque/src/main/scala/kesque/IntIntMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,13 @@ object IntIntMap {
i = -max
while (i <= max) {
map.remove(i, n)
i += 1
}

i = -max
while (i <= max) {
if (map.get(i, n) != NO_VALUE) {
println(s"value index $n err at $i - after remove")
System.exit(-1)
}
i += 1
}

n += 1
}
}
Expand Down
4 changes: 0 additions & 4 deletions kesque/src/main/scala/kesque/IntIntsMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ object IntIntsMap {
i = -max
while (i <= max) {
map.remove(i, n)
i += 1
}
i = -max
while (i <= max) {
if (map.get(i, n) != NO_VALUE) {
println(s"Remove all value: err at $i - ${map.get(i, n).mkString("[", ",", "]")}")
System.exit(-1)
Expand Down
28 changes: 21 additions & 7 deletions khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import khipu.trie.BranchNode
import khipu.trie.ByteArraySerializable
import khipu.trie.ExtensionNode
import khipu.trie.LeafNode
import khipu.trie.MerklePatriciaTrie.MPTException
import khipu.trie.Node
import khipu.util
import org.apache.kafka.common.record.CompressionType
Expand Down Expand Up @@ -57,7 +56,7 @@ object KesqueCompactor {
private def processEntity(entity: V, blockNumber: Long) = {
entityCount += 1
if (entityCount % 1000 == 0) {
log.info(s"[comp] got $topic $entityCount, at $blockNumber")
log.info(s"[comp] got $topic entities $entityCount, at #$blockNumber")
}

entityGot(entity, blockNumber)
Expand Down Expand Up @@ -96,14 +95,15 @@ object KesqueCompactor {
if (nodeCount % 1000 == 0) {
val elapsed = (System.nanoTime - start) / 1000000000
val speed = nodeCount / math.max(1, elapsed)
log.info(s"[comp] $topic $nodeCount nodes $speed/s, at $blockNumber")
log.info(s"[comp] $topic nodes $nodeCount $speed/s, at #$blockNumber")
}

nodeGot(TKeyVal(key, bytes, mixedOffset, blockNumber))
Some(bytes, blockNumber)

case None =>
throw MPTException(s"$topic Node not found ${khipu.toHexString(key)}, trie is inconsistent")
log.warning(s"$topic Node not found ${khipu.toHexString(key)}, trie is inconsistent")
None
}
}

Expand All @@ -120,21 +120,35 @@ object KesqueCompactor {
def write(kv: TKeyVal) {
buf += kv
if (buf.size > 100) { // keep the batched size around 4096 (~ 32*100 bytes)
nodeTable.writeShift(buf, topic)
val kvs = buf map {
case TKeyVal(key, value, mixedOffset, timestamp) =>
val (_, offset) = HashKeyValueTable.toFileNoAndOffset(mixedOffset)
TKeyVal(key, value, offset, timestamp)
}
nodeTable.writeShift(kvs, topic)

buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
nodeTable.removeIndexEntry(key, mixedOffset, topic)
}

buf.clear()
}
}

def flush() {
nodeTable.writeShift(buf, topic)
val kvs = buf map {
case TKeyVal(key, value, mixedOffset, timestamp) =>
val (_, offset) = HashKeyValueTable.toFileNoAndOffset(mixedOffset)
TKeyVal(key, value, offset, timestamp)
}
nodeTable.writeShift(kvs, topic)

buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
nodeTable.removeIndexEntry(key, mixedOffset, topic)
}

buf.clear()
}
}
Expand Down Expand Up @@ -183,7 +197,7 @@ object KesqueCompactor {
val storageTable = storages.storageNodeDataSource.table
val blockHeaderStorage = storages.blockHeaderStorage

val compactor = new KesqueCompactor(kesque, accountTable, storageTable, blockHeaderStorage, 7225550)
val compactor = new KesqueCompactor(kesque, accountTable, storageTable, blockHeaderStorage, 7225555)
compactor.load()
}
}
Expand Down

0 comments on commit 1d772d8

Please sign in to comment.