Skip to content

Commit

Permalink
Added offset to TVal/TKeyVal. #8
Browse files Browse the repository at this point in the history
  • Loading branch information
dcaoyuan committed Mar 24, 2019
1 parent deed43d commit 17608e0
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 59 deletions.
75 changes: 39 additions & 36 deletions kesque/src/main/scala/kesque/HashKeyValueTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ final class HashKeyValueTable private[kesque] (
/* time to key table, should be the first topic to initially create it */
private var timeIndex = Array.ofDim[Array[Byte]](200)

private val caches = Array.ofDim[FIFOCache[Hash, (TVal, Int)]](topics.length)
private val caches = Array.ofDim[FIFOCache[Hash, TVal]](topics.length)
private val (topicToCol, _) = topics.foldLeft(Map[String, Int](), 0) {
case ((map, i), topic) => (map + (topic -> i), i + 1)
}
Expand Down Expand Up @@ -94,24 +94,15 @@ final class HashKeyValueTable private[kesque] (
var tasks = List[Thread]()
var col = 0
while (col < topics.length) {
val topic = topics(col)
caches(col) = new FIFOCache[Hash, (TVal, Int)](cacheSize)
caches(col) = new FIFOCache[Hash, TVal](cacheSize)

tasks = (new Thread() {
override def run() {
loadOffsetsOf(col)
}
}) :: tasks
tasks = LoadIndexTask(col) :: tasks

col += 1
}

val timeIndexTask = if (withTimeToKey) {
List(new Thread() {
override def run() {
loadTimeIndex()
}
})
List(LoadTimeIndexTask)
} else {
Nil
}
Expand All @@ -120,6 +111,11 @@ final class HashKeyValueTable private[kesque] (
timeIndexTask ::: tasks foreach { _.join() }
}

final case class LoadIndexTask(col: Int) extends Thread {
override def run() {
loadOffsetsOf(col)
}
}
private def loadOffsetsOf(col: Int) {
info(s"Loading index of ${topics(col)}")
val start = System.nanoTime
Expand All @@ -128,9 +124,9 @@ final class HashKeyValueTable private[kesque] (
val (_, counts) = indexTopicsOfFileno.foldLeft(0, initCounts) {
case ((fileno, counts), idxTps) =>
db.iterateOver(idxTps(col), 0, fetchMaxBytesInLoadOffsets) {
case (offset, TKeyVal(hash, recordOffset, timestamp)) =>
if (hash != null && recordOffset != null) {
hashOffsets.put(bytesToInt(hash), toMixedOffset(fileno, bytesToInt(recordOffset)), col)
case TKeyVal(hash, recordOffsetValue, offset, timestamp) =>
if (hash != null && recordOffsetValue != null) {
hashOffsets.put(bytesToInt(hash), toMixedOffset(fileno, bytesToInt(recordOffsetValue)), col)
counts(fileno) += 1
}
}
Expand All @@ -140,14 +136,19 @@ final class HashKeyValueTable private[kesque] (
info(s"Loaded index of ${topics(col)} in ${(System.nanoTime - start) / 1000000} ms, count ${counts.mkString("(", ",", ")")}, size ${hashOffsets.size}")
}

object LoadTimeIndexTask extends Thread {
override def run() {
loadTimeIndex()
}
}
private def loadTimeIndex() {
info(s"Loading time index from ${topics(0)}")
val start = System.nanoTime

var count = 0
topicsOfFileno foreach { tps =>
db.iterateOver(tps(0), 0, fetchMaxBytesInLoadOffsets) {
case (offset, TKeyVal(key, value, timestamp)) =>
case TKeyVal(key, value, offset, timestamp) =>
if (key != null && value != null) {
putTimeToKey(timestamp, key)
count += 1
Expand Down Expand Up @@ -191,6 +192,9 @@ final class HashKeyValueTable private[kesque] (
}
}

/**
* @return TVal with mixedOffset
*/
def read(keyBytes: Array[Byte], topic: String, bypassCache: Boolean = false): Option[TVal] = {
try {
readLock.lock
Expand Down Expand Up @@ -220,21 +224,19 @@ final class HashKeyValueTable private[kesque] (
//debug(s"${rec.offset}")
if (rec.offset == offset && Arrays.equals(kesque.getBytes(rec.key), keyBytes)) {
foundOffset = offset
foundValue = if (rec.hasValue) Some(TVal(kesque.getBytes(rec.value), rec.timestamp)) else None
foundValue = if (rec.hasValue) Some(TVal(kesque.getBytes(rec.value), mixedOffset, rec.timestamp)) else None
}
}
i -= 1
}

if (!bypassCache) {
foundValue foreach { tv =>
caches(col).put(key, (tv, foundOffset))
}
foundValue foreach { tv => caches(col).put(key, tv) }
}

foundValue
}
case Some((value, offset)) => Some(value)
case Some(value) => Some(value)
}
} finally {
readLock.unlock()
Expand All @@ -257,15 +259,15 @@ final class HashKeyValueTable private[kesque] (
var keyToPrevOffsets = Map[Hash, Int]()
val itr = kvs.iterator
while (itr.hasNext) {
val tkv @ TKeyVal(keyBytes, value, timestamp) = itr.next()
val tkv @ TKeyVal(keyBytes, value, offset, timestamp) = itr.next()
val key = Hash(keyBytes)
caches(col).get(key) match {
case Some((TVal(prevValue, _), prevOffset)) =>
case Some(TVal(prevValue, prevMixedOffset, _)) =>
if (isValueChanged(value, prevValue)) {
val rec = if (timestamp < 0) new SimpleRecord(keyBytes, value) else new SimpleRecord(timestamp, keyBytes, value)
tkvs ::= tkv
records ::= rec
keyToPrevOffsets += key -> prevOffset
keyToPrevOffsets += key -> prevMixedOffset
// TODO should only happen when value is set to empty, i.e. removed?
// remove records of prevOffset from memory?
} else {
Expand All @@ -285,10 +287,10 @@ final class HashKeyValueTable private[kesque] (
debug(s"${recordBatches.map(x => x._1.size).mkString(",")}")

// write to log file
recordBatches map { case (tkvs, records, keyToPrevOffsets) => writeRecords(tkvs, records, keyToPrevOffsets, col, fileno) }
recordBatches map { case (tkvs, records, keyToPrevMixedOffsets) => writeRecords(tkvs, records, keyToPrevMixedOffsets, col, fileno) }
}

private def writeRecords(tkvs: List[TKeyVal], records: List[SimpleRecord], keyToPrevOffsets: Map[Hash, Int], col: Int, fileno: Int): Iterable[Int] = {
private def writeRecords(tkvs: List[TKeyVal], records: List[SimpleRecord], keyToPrevMixedOffsets: Map[Hash, Int], col: Int, fileno: Int): Iterable[Int] = {
try {
writeLock.lock()

Expand All @@ -303,19 +305,20 @@ final class HashKeyValueTable private[kesque] (
if (appendInfo.numMessages > 0) {
val firstOffert = appendInfo.firstOffset.get
val (lastOffset, idxRecords) = tkvs.foldLeft(firstOffert, Vector[SimpleRecord]()) {
case ((offset, idxRecords), TKeyVal(keyBytes, value, timestamp)) =>
case ((_offset, idxRecords), TKeyVal(keyBytes, value, _, timestamp)) =>
val offset = _offset.toInt
val key = Hash(keyBytes)
val hash = key.hashCode
val indexRecord = new SimpleRecord(intToBytes(hash), intToBytes(offset.toInt))
val indexRecord = new SimpleRecord(intToBytes(hash), intToBytes(offset))

val mixedOffset = toMixedOffset(fileno, offset.toInt)
keyToPrevOffsets.get(key) match {
case Some(prevOffset) => // there is prevOffset, will also remove it (replace it with current one)
hashOffsets.replace(hash, prevOffset, mixedOffset, col)
val mixedOffset = toMixedOffset(fileno, offset)
keyToPrevMixedOffsets.get(key) match {
case Some(prevMixedOffset) => // there is prevOffset, will also remove it (replace it with current one)
hashOffsets.replace(hash, prevMixedOffset, mixedOffset, col)
case None => // there is none prevOffset
hashOffsets.put(hash, mixedOffset, col)
}
caches(col).put(key, (TVal(value, timestamp), mixedOffset))
caches(col).put(key, TVal(value, mixedOffset, timestamp))
(offset + 1, idxRecords :+ indexRecord)
}

Expand Down Expand Up @@ -407,7 +410,7 @@ final class HashKeyValueTable private[kesque] (
}
}

def iterateOver(fetchOffset: Long, topic: String)(op: (Long, TKeyVal) => Unit) = {
def iterateOver(fetchOffset: Long, topic: String)(op: TKeyVal => Unit) = {
try {
readLock.lock()

Expand All @@ -417,7 +420,7 @@ final class HashKeyValueTable private[kesque] (
}
}

def readOnce(fetchOffset: Long, topic: String)(op: (Long, TKeyVal) => Unit) = {
def readOnce(fetchOffset: Long, topic: String)(op: TKeyVal => Unit) = {
try {
readLock.lock()

Expand Down
15 changes: 8 additions & 7 deletions kesque/src/main/scala/kesque/Kesque.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object Kesque {

private def testWrite(table: HashKeyValueTable, topic: String, seq: Int) = {
val kvs = 1 to 100000 map { i =>
TKeyVal(i.toString.getBytes, (s"value_$i").getBytes)
TKeyVal(i.toString.getBytes, (s"value_$i").getBytes, -1, -1)
}
table.write(kvs, topic)
}
Expand Down Expand Up @@ -119,7 +119,7 @@ final class Kesque(props: Properties) {
* @param fetchOffset
* @param op: action applied on (offset, key, value)
*/
private[kesque] def iterateOver(topic: String, fetchOffset: Long = 0L, fetchMaxBytes: Int)(op: (Long, TKeyVal) => Unit) = {
private[kesque] def iterateOver(topic: String, fetchOffset: Long = 0L, fetchMaxBytes: Int)(op: TKeyVal => Unit) = {
var offset = fetchOffset
var nRead = 0
do {
Expand All @@ -136,7 +136,7 @@ final class Kesque(props: Properties) {
* @param fetchOffset
* @param op: action applied on (offset, key, value)
*/
private[kesque] def readOnce(topic: String, fetchOffset: Long, fetchMaxBytes: Int)(op: (Long, TKeyVal) => Unit) = {
private[kesque] def readOnce(topic: String, fetchOffset: Long, fetchMaxBytes: Int)(op: TKeyVal => Unit) = {
val (topicPartition, result) = read(topic, fetchOffset, fetchMaxBytes).head
val recs = result.info.records.records.iterator
var i = 0
Expand All @@ -146,8 +146,8 @@ final class Kesque(props: Properties) {
val key = if (rec.hasKey) kesque.getBytes(rec.key) else null
val value = if (rec.hasValue) kesque.getBytes(rec.value) else null
val timestamp = rec.timestamp
val offset = rec.offset
op(offset, TKeyVal(key, value, timestamp))
val offset = rec.offset.toInt
op(TKeyVal(key, value, offset, timestamp))

lastOffset = offset
i += 1
Expand All @@ -161,5 +161,6 @@ final class Kesque(props: Properties) {
}
}

final case class TKeyVal(key: Array[Byte], value: Array[Byte], timestamp: Long = -1L)
final case class TVal(value: Array[Byte], timestamp: Long)
// -1 value of offset/timestamp means unset
final case class TKeyVal(key: Array[Byte], value: Array[Byte], offset: Int, timestamp: Long)
final case class TVal(value: Array[Byte], offset: Int, timestamp: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ final class BlockBodyStorage(val source: KesqueDataSource) extends SimpleMap[Has
override def update(toRemove: Set[Hash], toUpsert: Map[Hash, BlockBody]): BlockBodyStorage = {
//toRemove foreach CachedNodeStorage.remove // TODO remove from repositoty when necessary (pruning)
//toUpsert foreach { case (key, value) => nodeTable.put(key, () => Future(value)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(value.toBytes, -1L)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(value.toBytes, -1, -1L)) }
toRemove foreach { key => source.remove(key) }
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ final class BlockHeaderStorage(val source: KesqueDataSource) extends SimpleMap[H
override def update(toRemove: Set[Hash], toUpsert: Map[Hash, BlockHeader]): BlockHeaderStorage = {
//toRemove foreach CachedNodeStorage.remove // TODO remove from repositoty when necessary (pruning)
//toUpsert foreach { case (key, value) => nodeTable.put(key, () => Future(value)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(value.toBytes, -1L)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(value.toBytes, -1, -1L)) }
toRemove foreach { key => source.remove(key) }
this
}
Expand Down
2 changes: 1 addition & 1 deletion khipu-eth/src/main/scala/khipu/store/EvmCodeStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final class EvmCodeStorage(val source: KesqueDataSource) extends SimpleMap[Hash,
override def update(toRemove: Set[Hash], toUpsert: Map[Hash, ByteString]): EvmCodeStorage = {
//toRemove foreach CachedNodeStorage.remove // TODO remove from repositoty when necessary (pruning)
//toUpsert foreach { case (key, value) => nodeTable.put(key, () => Future(value)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(value.toArray, -1L)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(value.toArray, -1, -1L)) }
toRemove foreach { key => source.remove(key) }
this
}
Expand Down
4 changes: 2 additions & 2 deletions khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ object KesqueCompactor {
Some(key, blockNumber)
} else {
nodeTable.read(key, topic, bypassCache = true) match {
case Some(TVal(bytes, blockNumber)) =>
case Some(TVal(bytes, offset, blockNumber)) =>
nodeCount += 1
if (nodeCount % 1000 == 0) {
val elapsed = (System.nanoTime - start) / 1000000000
val speed = nodeCount / math.max(1, elapsed)
log.info(s"[comp] $topic $nodeCount nodes $speed/s, at $blockNumber")
}

nodeGot(TKeyVal(key, bytes, blockNumber))
nodeGot(TKeyVal(key, bytes, offset, blockNumber))
Some(bytes, blockNumber)

case None =>
Expand Down
2 changes: 1 addition & 1 deletion khipu-eth/src/main/scala/khipu/store/ReceiptsStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ final class ReceiptsStorage(val source: KesqueDataSource) extends SimpleMap[Hash
override def update(toRemove: Set[Hash], toUpsert: Map[Hash, Seq[Receipt]]): ReceiptsStorage = {
//toRemove foreach CachedNodeStorage.remove // TODO remove from repositoty when necessary (pruning)
//toUpsert foreach { case (key, value) => nodeTable.put(key, () => Future(value)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(toBytes(value), -1L)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(toBytes(value), -1, -1L)) }
toRemove foreach { key => source.remove(key) }
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final class TotalDifficultyStorage(val source: KesqueDataSource) extends SimpleM
override def update(toRemove: Set[Hash], toUpsert: Map[Hash, UInt256]): TotalDifficultyStorage = {
//toRemove foreach CachedNodeStorage.remove // TODO remove from repositoty when necessary (pruning)
//toUpsert foreach { case (key, value) => nodeTable.put(key, () => Future(value)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(value.bigEndianMag, -1L)) }
toUpsert foreach { case (key, value) => source.put(key, TVal(value.bigEndianMag, -1, -1L)) }
toRemove foreach { key => source.remove(key) }
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ object KesqueDataSource {
var offset = fetchOffset
var nRead = 0
do {
val records = mutable.ArrayBuffer[(Long, Array[Byte], Array[Byte])]()
val records = mutable.ArrayBuffer[(Array[Byte], Array[Byte], Int, Long)]()
tableSrc.readOnce(offset, src) {
case (offset, TKeyVal(k, v, t)) =>
case TKeyVal(k, v, offset, t) =>
val NodeRecord(flag, key, value) = NodeRecord.fromBytes(v)
val hash = Hash(key)
hashOffsets.put(hash.hashCode, offset.toInt, 0) // TODO
hashOffsets.put(hash.hashCode, offset, 0) // TODO

records += ((offset, key, value))
records += ((key, value, offset, t))
} match {
case (n, o) =>
println("nRecs read: " + n + ", lastOffset: " + o)

val sorted = records.sortBy(x => x._1).map(x => TKeyVal(x._2, x._3))
val sorted = records.sortBy(x => x._3).map(x => TKeyVal(x._1, x._2, x._3, x._4))
val writeResults = tableTgt.write(sorted, tgt)
println(writeResults) // TODO check exceptions

Expand All @@ -166,7 +166,7 @@ object KesqueDataSource {

val records = new mutable.HashMap[Hash, ByteString]()
table.iterateOver(0, topic) {
case (offset, TKeyVal(key, value, timestamp)) =>
case TKeyVal(key, value, offset, timestamp) =>
//print(s"$offset - ${Hash(key).hexString},")
val hash = Hash(key)
if (records.contains(hash)) {
Expand Down Expand Up @@ -203,7 +203,7 @@ object KesqueDataSource {
var prevValue: Option[Array[Byte]] = None
val table = db.getTable(Array(topic))
table.iterateOver(0, topic) {
case (offset, TKeyVal(key, value, timestamp)) =>
case TKeyVal(key, value, offset, timestamp) =>
if (Arrays.equals(key, lostKey)) {
val keyHash = Hash(lostKey).hashCode
println(s"Found $keyInHex at offset $offset, key hash is $keyHash")
Expand Down Expand Up @@ -244,7 +244,7 @@ final class KesqueDataSource(val table: HashKeyValueTable, val topic: String)(im
def update(toRemove: Set[Hash], toUpsert: Map[Hash, TVal]): KesqueDataSource = {
// TODO what's the meaning of remove a node? sometimes causes node not found
//table.remove(toRemove.map(_.bytes).toList)
table.write(toUpsert.map { case (key, value) => TKeyVal(key.bytes, value.value, _currWritingBlockNumber) }, topic)
table.write(toUpsert.map { case (key, tval) => TKeyVal(key.bytes, tval.value, tval.offset, _currWritingBlockNumber) }, topic)
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ final class NodeTableStorage(source: KesqueDataSource)(implicit system: ActorSys
override def update(toRemove: Set[Hash], toUpsert: Map[Hash, Array[Byte]]): NodeTableStorage = {
//toRemove foreach CachedNodeStorage.remove // TODO remove from repositoty when necessary (pruning)
//toUpsert foreach { case (key, value) => nodeTable.put(key, () => Future(value)) }
source.update(toRemove, toUpsert map { case (key, value) => key -> TVal(value, -1L) })
source.update(toRemove, toUpsert map { case (key, value) => key -> TVal(value, -1, -1L) })
this
}

Expand Down

0 comments on commit 17608e0

Please sign in to comment.