diff --git a/khipu-eth/src/main/scala/khipu/storage/KesqueCompactor.scala b/khipu-eth/src/main/scala/khipu/storage/KesqueCompactor.scala index 0ee1e3d..145292e 100644 --- a/khipu-eth/src/main/scala/khipu/storage/KesqueCompactor.scala +++ b/khipu-eth/src/main/scala/khipu/storage/KesqueCompactor.scala @@ -34,6 +34,8 @@ object KesqueCompactor { import system.dispatcher lazy val serviceBoard = ServiceBoard(system) lazy val dbConfig = serviceBoard.dbConfig + + private val FETCH_MAX_BYTES_IN_BACTH = 50 * 1024 * 1024 // 52428800, 50M implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { def genString(o: AnyRef): String = o.getClass.getName @@ -275,8 +277,8 @@ final class KesqueCompactor( var offset = storageWriter.maxOffset + 1 var nRead = 0 do { - val (lastOffset, recs) = accountTable.readBatch(DbConfig.account, offset, 4096) - recs foreach accountWriter.write + val (lastOffset, recs) = storageTable.readBatch(DbConfig.account, offset, FETCH_MAX_BYTES_IN_BACTH) + recs foreach storageWriter.write nRead = recs.length offset = lastOffset + 1 } while (nRead > 0) @@ -293,7 +295,7 @@ final class KesqueCompactor( var offset = accountWriter.maxOffset + 1 var nRead = 0 do { - val (lastOffset, recs) = accountTable.readBatch(DbConfig.account, offset, 4096) + val (lastOffset, recs) = accountTable.readBatch(DbConfig.account, offset, FETCH_MAX_BYTES_IN_BACTH) recs foreach accountWriter.write nRead = recs.length offset = lastOffset + 1 diff --git a/khipu-eth/src/main/scala/khipu/storage/KesqueNodeCompactor.scala b/khipu-eth/src/main/scala/khipu/storage/KesqueNodeCompactor.scala index 960d47f..730aa1b 100644 --- a/khipu-eth/src/main/scala/khipu/storage/KesqueNodeCompactor.scala +++ b/khipu-eth/src/main/scala/khipu/storage/KesqueNodeCompactor.scala @@ -32,6 +32,8 @@ object KesqueNodeCompactor { implicit lazy val system = ActorSystem("khipu") import system.dispatcher + private val FETCH_MAX_BYTES_IN_BACTH = 50 * 1024 * 1024 // 52428800, 50M + implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { def genString(o: AnyRef): String = o.getClass.getName override def getClazz(o: AnyRef): Class[_] = o.getClass @@ -118,16 +120,18 @@ object KesqueNodeCompactor { final class NodeWriter(topic: String, nodeDataSource: KesqueNodeDataSource) { private val buf = new mutable.ArrayBuffer[TKeyVal]() - private var _maxOffset = Long.MinValue + private var _maxOffset = -1L // kafka offset starts from 0 def maxOffset = _maxOffset /** * Should flush() after all kv are written. */ def write(kv: TKeyVal) { - buf += kv - if (buf.size > 100) { // keep the batched size around 4096 (~ 32*100 bytes) - flush() + if (kv ne null) { + buf += kv + if (buf.size > 100) { // keep the batched size around 4096 (~ 32*100 bytes) + flush() + } } } @@ -223,9 +227,9 @@ final class KesqueNodeCompactor( } def start() { - loadSnaphot() + //loadSnaphot() stopWorld(() => true) - //postAppend() + postAppend() } private def loadSnaphot() { @@ -252,39 +256,53 @@ final class KesqueNodeCompactor( * should stop world during postAppend() */ private def postAppend() { + val start = System.nanoTime + val storageTask = new Thread { override def run() { - log.info(s"[comp] post append storage from offset ${storageWriter.maxOffset + 1} ...") - // TODO topic from fromFileNo + println(s"[comp] storage post append from offset ${storageWriter.maxOffset + 1} ...") + val start = System.nanoTime var offset = storageWriter.maxOffset + 1 var nRead = 0 + var count = 0 do { - val (lastOffset, recs) = accountDataSource.readBatch(DbConfig.account, offset, 4096) - recs foreach accountWriter.write + val (lastOffset, recs) = storageDataSource.readBatch(DbConfig.account, offset, FETCH_MAX_BYTES_IN_BACTH) + recs foreach storageWriter.write nRead = recs.length offset = lastOffset + 1 + + count += nRead + val elapsed = (System.nanoTime - start) / 1000000000 + val speed = count / math.max(1, elapsed) + println(s"[comp] storage nodes $count $speed/s, at #$blockNumber, table size ${storageDataSource.count}") } while (nRead > 0) storageWriter.flush() - log.info(s"[comp] post append storage done.") + println(s"[comp] storage post append done.") } } val accountTask = new Thread { override def run() { - log.info(s"[comp] post append account from offset ${accountWriter.maxOffset + 1} ...") - // TODO topic from fromFileNo + println(s"[comp] account post append from offset ${accountWriter.maxOffset + 1} ...") + val start = System.nanoTime var offset = accountWriter.maxOffset + 1 var nRead = 0 + var count = 0 do { - val (lastOffset, recs) = accountDataSource.readBatch(DbConfig.account, offset, 4096) + val (lastOffset, recs) = accountDataSource.readBatch(DbConfig.account, offset, FETCH_MAX_BYTES_IN_BACTH) recs foreach accountWriter.write nRead = recs.length offset = lastOffset + 1 + + count += nRead + val elapsed = (System.nanoTime - start) / 1000000000 + val speed = count / math.max(1, elapsed) + println(s"[comp] account nodes $count $speed/s, at #$blockNumber, table size ${accountDataSource.count}") } while (nRead > 0) accountWriter.flush() - log.info(s"[comp] post append account done.") + println(s"[comp] account post append done.") } } @@ -293,6 +311,8 @@ final class KesqueNodeCompactor( storageTask.join accountTask.join - log.info(s"[comp] post append done.") + + val elapsed = (System.nanoTime - start) / 1000000000 + println(s"[comp] post append done in ${elapsed}s.") } } \ No newline at end of file diff --git a/khipu-kesque/src/main/resources/kafka.server.properties.properties b/khipu-kesque/src/main/resources/kafka.server.properties similarity index 100% rename from khipu-kesque/src/main/resources/kafka.server.properties.properties rename to khipu-kesque/src/main/resources/kafka.server.properties