diff --git a/build.sbt b/build.sbt index f0ad7c50..abc02d26 100644 --- a/build.sbt +++ b/build.sbt @@ -113,8 +113,15 @@ lazy val `kcl-ciris` = project .dependsOn(kcl, `shared-ciris`, `kcl-localstack` % Test) lazy val `kcl-fs2-ciris` = project + .settings(BuildInfoPlugin.buildInfoDefaultSettings) + .settings(BuildInfoPlugin.buildInfoScopedSettings(Test)) .settings( - description := "Ciris tooling for the Kinesis Client Library (KCL) via FS2" + description := "Ciris tooling for the Kinesis Client Library (KCL) via FS2", + Test / envVars ++= KCLFS2CirisSpecVars.env, + Test / javaOptions ++= KCLFS2CirisSpecVars.prop, + Test / buildInfoKeys := KCLFS2CirisSpecVars.buildInfoKeys, + Test / buildInfoPackage := "kinesis4cats.kcl.fs2.ciris", + Test / buildInfoOptions += BuildInfoOption.ConstantValue ) .enableIntegrationTests .dependsOn(`kcl-fs2`, `kcl-ciris`, `shared-ciris`) diff --git a/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/ciris/KCLCirisFS2.scala b/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/ciris/KCLCirisFS2.scala index 6c509b3f..a4e7f780 100644 --- a/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/ciris/KCLCirisFS2.scala +++ b/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/ciris/KCLCirisFS2.scala @@ -21,6 +21,7 @@ import scala.concurrent.duration._ import java.util.concurrent.ExecutorService +import _root_.ciris._ import cats.Parallel import cats.effect.{Async, Resource} import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer @@ -213,37 +214,6 @@ object KCLCirisFS2 { F: Async[F], LE: RecordProcessor.LogEncoders ): Resource[F, KCLConsumerFS2.Config[F]] = for { - queueSize <- CirisReader - .readDefaulted[Int](List("kcl", "fs2", "queue", "size"), 100, prefix) - .resource[F] - commitMaxChunk <- CirisReader - .readDefaulted[Int]( - List("kcl", "fs2", "commit", "max", "chunk"), - 1000, - prefix - ) - .resource[F] - commitMaxWait <- CirisReader - .readDefaulted[FiniteDuration]( - List("kcl", "fs2", "commit", "max", "wait"), - 10.seconds, - prefix - ) - .resource[F] - commitMaxRetries <- CirisReader - .readDefaulted[Int]( - List("kcl", "fs2", "commit", "max", "retries"), - 5, - prefix - ) - .resource[F] - commitRetryInterval <- CirisReader - .readDefaulted[FiniteDuration]( - List("kcl", "fs2", "commit", "retry", "interval"), - 0.seconds, - prefix - ) - .resource[F] autoCommit <- CirisReader .readDefaulted[Boolean]( List("kcl", "processor", "auto", "commit"), @@ -251,6 +221,7 @@ object KCLCirisFS2 { prefix ) .resource[F] + fs2Config <- readFS2Config(prefix).resource[F] checkpointConfig <- KCLCiris.Checkpoint.resource[F] coordinatorConfig <- KCLCiris.Coordinator.resource[F]( prefix, @@ -282,15 +253,48 @@ object KCLCirisFS2 { lifecycleConfig, metricsConfig, retrievalConfig, - queueSize, - commitMaxChunk, - commitMaxWait, - commitMaxRetries, - commitRetryInterval, + fs2Config, processConfig.copy(recordProcessorConfig = processConfig.recordProcessorConfig.copy(autoCommit = autoCommit) ) ) } yield config + + def readFS2Config( + prefix: Option[String] = None + ): ConfigValue[Effect, KCLConsumerFS2.FS2Config] = for { + queueSize <- CirisReader + .readDefaulted[Int](List("kcl", "fs2", "queue", "size"), 100, prefix) + commitMaxChunk <- CirisReader + .readDefaulted[Int]( + List("kcl", "fs2", "commit", "max", "chunk"), + 1000, + prefix + ) + commitMaxWait <- CirisReader + .readDefaulted[FiniteDuration]( + List("kcl", "fs2", "commit", "max", "wait"), + 10.seconds, + prefix + ) + commitMaxRetries <- CirisReader + .readDefaulted[Int]( + List("kcl", "fs2", "commit", "max", "retries"), + 5, + prefix + ) + commitRetryInterval <- CirisReader + .readDefaulted[FiniteDuration]( + List("kcl", "fs2", "commit", "retry", "interval"), + 0.seconds, + prefix + ) + } yield KCLConsumerFS2.FS2Config( + queueSize, + commitMaxChunk, + commitMaxWait, + commitMaxRetries, + commitRetryInterval + ) } diff --git a/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/instances/eq.scala b/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/instances/eq.scala new file mode 100644 index 00000000..58176926 --- /dev/null +++ b/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/instances/eq.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.kcl.fs2.instances + +import cats.Eq +import cats.syntax.all._ + +import kinesis4cats.kcl.fs2.KCLConsumerFS2 + +object eq { + implicit val fs2ConfigEq: Eq[KCLConsumerFS2.FS2Config] = (x, y) => + x.commitMaxChunk === y.commitMaxChunk && + x.commitMaxRetries === y.commitMaxRetries && + x.commitMaxWait === y.commitMaxWait && + x.commitRetryInterval === y.commitRetryInterval +} diff --git a/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/instances/show.scala b/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/instances/show.scala new file mode 100644 index 00000000..d31179c4 --- /dev/null +++ b/kcl-fs2-ciris/src/main/scala/kinesis4cats/kcl/fs2/instances/show.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.kcl.fs2.instances + +import cats.Show + +import kinesis4cats.ShowBuilder +import kinesis4cats.kcl.fs2.KCLConsumerFS2 + +object show { + implicit val fs2ConfigShow: Show[KCLConsumerFS2.FS2Config] = x => + ShowBuilder("FS2Config") + .add("queueSize", x.queueSize) + .add("commitMaxChunk", x.commitMaxChunk) + .add("commitMaxWait", x.commitMaxWait) + .add("commitMaxRetries", x.commitMaxRetries) + .add("commitRetryInterval", x.commitRetryInterval) + .build +} diff --git a/kcl-fs2-ciris/src/test/resources/logback-test.xml b/kcl-fs2-ciris/src/test/resources/logback-test.xml new file mode 100644 index 00000000..66296c00 --- /dev/null +++ b/kcl-fs2-ciris/src/test/resources/logback-test.xml @@ -0,0 +1,14 @@ + + + + [%thread] %highlight(%-5level) %d{ISO8601} %cyan(%logger{15}) %yellow(%mdc) - %msg %n + + + + + + + + + + diff --git a/kcl-fs2-ciris/src/test/scala/kinesis4cats/kcl/fs2/ciris/KCLCirisFS2Spec.scala b/kcl-fs2-ciris/src/test/scala/kinesis4cats/kcl/fs2/ciris/KCLCirisFS2Spec.scala new file mode 100644 index 00000000..979ab853 --- /dev/null +++ b/kcl-fs2-ciris/src/test/scala/kinesis4cats/kcl/fs2/ciris/KCLCirisFS2Spec.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2023-2023 etspaceman + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kinesis4cats.kcl.fs2.ciris + +import scala.concurrent.duration._ + +import cats.effect.IO +import cats.syntax.all._ + +import kinesis4cats.kcl.fs2.KCLConsumerFS2 +import kinesis4cats.kcl.fs2.instances.eq._ +import kinesis4cats.kcl.fs2.instances.show._ + +class KCLCirisFS2Spec extends munit.CatsEffectSuite { + test( + "It should load the environment variables the same as system properties for CoordinatorConfig" + ) { + for { + configEnv <- KCLCirisFS2.readFS2Config(Some("env")).load[IO] + configProp <- KCLCirisFS2.readFS2Config(Some("prop")).load[IO] + expected = KCLConsumerFS2.FS2Config( + 200, + 500, + 5.seconds, + 3, + 1.second + ) + } yield { + assert( + configEnv === configProp, + s"envi: ${configEnv.show}\nprop: ${configProp.show}" + ) + assert( + configEnv === expected, + s"envi: ${configEnv.show}\nexpe: ${expected.show}" + ) + } + } +} diff --git a/kcl-fs2/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala b/kcl-fs2/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala index 3e40cd1c..abc82921 100644 --- a/kcl-fs2/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala +++ b/kcl-fs2/src/main/scala/kinesis4cats/kcl/fs2/KCLConsumerFS2.scala @@ -240,11 +240,7 @@ object KCLConsumerFS2 { lifecycleConfig: LifecycleConfig, metricsConfig: MetricsConfig, retrievalConfig: RetrievalConfig, - queueSize: Int = 100, - commitMaxChunk: Int = 1000, - commitMaxWait: FiniteDuration = 10.seconds, - commitMaxRetries: Int = 5, - commitRetryInterval: FiniteDuration = 0.seconds, + fs2Config: FS2Config = FS2Config.default, processConfig: KCLConsumer.ProcessConfig = defaultProcessConfig )(implicit F: Async[F], @@ -258,11 +254,7 @@ object KCLConsumerFS2 { lifecycleConfig, metricsConfig, retrievalConfig, - queueSize, - commitMaxChunk, - commitMaxWait, - commitMaxRetries, - commitRetryInterval, + fs2Config, processConfig ) .map(new KCLConsumerFS2[F](_)) @@ -318,11 +310,7 @@ object KCLConsumerFS2 { cloudWatchClient: CloudWatchAsyncClient, streamName: String, appName: String, - queueSize: Int = 100, - commitMaxChunk: Int = 1000, - commitMaxWait: FiniteDuration = 10.seconds, - commitMaxRetries: Int = 5, - commitRetryInterval: FiniteDuration = 0.seconds, + fs2Config: FS2Config = FS2Config.default, workerId: String = UUID.randomUUID.toString, processConfig: KCLConsumer.ProcessConfig = defaultProcessConfig )( @@ -341,11 +329,7 @@ object KCLConsumerFS2 { cloudWatchClient, streamName, appName, - queueSize, - commitMaxChunk, - commitMaxWait, - commitMaxRetries, - commitRetryInterval, + fs2Config, workerId, processConfig )(tfn) @@ -405,11 +389,7 @@ object KCLConsumerFS2 { cloudWatchClient: CloudWatchAsyncClient, tracker: MultiStreamTracker, appName: String, - queueSize: Int = 100, - commitMaxChunk: Int = 1000, - commitMaxWait: FiniteDuration = 10.seconds, - commitMaxRetries: Int = 5, - commitRetryInterval: FiniteDuration = 0.seconds, + fs2Config: FS2Config = FS2Config.default, workerId: String = UUID.randomUUID.toString, processConfig: KCLConsumer.ProcessConfig = defaultProcessConfig )( @@ -428,11 +408,7 @@ object KCLConsumerFS2 { cloudWatchClient, tracker, appName, - queueSize, - commitMaxChunk, - commitMaxWait, - commitMaxRetries, - commitRetryInterval, + fs2Config, workerId, processConfig )(tfn) @@ -460,10 +436,7 @@ object KCLConsumerFS2 { final case class Config[F[_]]( underlying: kinesis4cats.kcl.KCLConsumer.Config[F], queue: Queue[F, CommittableRecord[F]], - maxCommitChunk: Int, - maxCommitWait: FiniteDuration, - maxCommitRetries: Int, - maxCommitRetryDuration: FiniteDuration + fs2Config: FS2Config ) object Config { @@ -514,17 +487,15 @@ object KCLConsumerFS2 { lifecycleConfig: LifecycleConfig, metricsConfig: MetricsConfig, retrievalConfig: RetrievalConfig, - queueSize: Int = 100, - commitMaxChunk: Int = 1000, - commitMaxWait: FiniteDuration = 10.seconds, - commitMaxRetries: Int = 5, - commitRetryInterval: FiniteDuration = 0.seconds, + fs2Config: FS2Config, processConfig: KCLConsumer.ProcessConfig = defaultProcessConfig )(implicit F: Async[F], encoders: RecordProcessor.LogEncoders ): Resource[F, Config[F]] = for { - queue <- Queue.bounded[F, CommittableRecord[F]](queueSize).toResource + queue <- Queue + .bounded[F, CommittableRecord[F]](fs2Config.queueSize) + .toResource underlying <- kinesis4cats.kcl.KCLConsumer.Config .create( checkpointConfig, @@ -535,14 +506,7 @@ object KCLConsumerFS2 { retrievalConfig, processConfig )(callback(queue)) - } yield Config( - underlying, - queue, - commitMaxChunk, - commitMaxWait, - commitMaxRetries, - commitRetryInterval - ) + } yield Config(underlying, queue, fs2Config) /** Constructor for the * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] @@ -563,18 +527,6 @@ object KCLConsumerFS2 { * @param appName * Name of the application. Usually also the dynamo table name for * checkpoints - * @param queueSize - * Size of the underlying queue for the FS2 stream. If the queue fills - * up, backpressure on the processors will occur. Default 100 - * @param commitMaxChunk - * Max records to be received in the commitRecords [[fs2.Pipe Pipe]] - * before a commit is run. Default is 1000 - * @param commitMaxWait - * Max duration to wait in commitRecords [[fs2.Pipe Pipe]] before a - * commit is run. Default is 10 seconds - * @param commitMaxRetries - * Max number of retries for a commit operation - * @param commitRetryInterval * @param workerId * Unique identifier for a single instance of this consumer. Default is a * random UUID. @@ -599,11 +551,7 @@ object KCLConsumerFS2 { cloudWatchClient: CloudWatchAsyncClient, streamName: String, appName: String, - queueSize: Int = 100, - commitMaxChunk: Int = 1000, - commitMaxWait: FiniteDuration = 10.seconds, - commitMaxRetries: Int = 5, - commitRetryInterval: FiniteDuration = 0.seconds, + fs2Config: KCLConsumerFS2.FS2Config = KCLConsumerFS2.FS2Config.default, workerId: String = UUID.randomUUID.toString, processConfig: KCLConsumer.ProcessConfig = defaultProcessConfig )( @@ -615,7 +563,9 @@ object KCLConsumerFS2 { F: Async[F], encoders: RecordProcessor.LogEncoders ): Resource[F, Config[F]] = for { - queue <- Queue.bounded[F, CommittableRecord[F]](queueSize).toResource + queue <- Queue + .bounded[F, CommittableRecord[F]](fs2Config.queueSize) + .toResource underlying <- kinesis4cats.kcl.KCLConsumer.Config .configsBuilder( kinesisClient, @@ -626,14 +576,7 @@ object KCLConsumerFS2 { workerId, processConfig )(callback(queue))(tfn) - } yield Config( - underlying, - queue, - commitMaxChunk, - commitMaxWait, - commitMaxRetries, - commitRetryInterval - ) + } yield Config(underlying, queue, fs2Config) /** Constructor for the * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] @@ -693,11 +636,7 @@ object KCLConsumerFS2 { cloudWatchClient: CloudWatchAsyncClient, tracker: MultiStreamTracker, appName: String, - queueSize: Int = 100, - commitMaxChunk: Int = 1000, - commitMaxWait: FiniteDuration = 10.seconds, - commitMaxRetries: Int = 5, - commitRetryInterval: FiniteDuration = 0.seconds, + fs2Config: FS2Config, workerId: String = UUID.randomUUID.toString, processConfig: KCLConsumer.ProcessConfig = defaultProcessConfig )( @@ -709,7 +648,9 @@ object KCLConsumerFS2 { F: Async[F], encoders: RecordProcessor.LogEncoders ): Resource[F, Config[F]] = for { - queue <- Queue.bounded[F, CommittableRecord[F]](queueSize).toResource + queue <- Queue + .bounded[F, CommittableRecord[F]](fs2Config.queueSize) + .toResource underlying <- kinesis4cats.kcl.KCLConsumer.Config .configsBuilderMultiStream( kinesisClient, @@ -720,13 +661,40 @@ object KCLConsumerFS2 { workerId, processConfig )(callback(queue))(tfn) - } yield Config( - underlying, - queue, - commitMaxChunk, - commitMaxWait, - commitMaxRetries, - commitRetryInterval + } yield Config(underlying, queue, fs2Config) + } + + /** Configuration for the FS2 implementation + * + * @param queueSize + * Size of the underlying queue for the FS2 stream. If the queue fills up, + * backpressure on the processors will occur. Default 100 + * @param commitMaxChunk + * Max records to be received in the commitRecords [[fs2.Pipe Pipe]] before + * a commit is run. Default is 1000 + * @param commitMaxWait + * Max duration to wait in commitRecords [[fs2.Pipe Pipe]] before a commit + * is run. Default is 10 seconds + * @param commitMaxRetries + * Max number of retries for a commit operation + * @param commitRetryInterval + * Interval to wait between commit retries + */ + final case class FS2Config( + queueSize: Int, + commitMaxChunk: Int, + commitMaxWait: FiniteDuration, + commitMaxRetries: Int, + commitRetryInterval: FiniteDuration + ) + + object FS2Config { + val default = FS2Config( + 100, + 1000, + 10.seconds, + 5, + 0.seconds ) } @@ -769,15 +737,18 @@ object KCLConsumerFS2 { F: Async[F], P: Parallel[F] ): Pipe[F, CommittableRecord[F], CommittableRecord[F]] = - _.groupWithin(config.maxCommitChunk, config.maxCommitWait) + _.groupWithin( + config.fs2Config.commitMaxChunk, + config.fs2Config.commitMaxWait + ) .evalTap(chunk => chunk.toList.groupBy(_.shardId).toList.parTraverse_ { case (_, records) => val max = records.max max.canCheckpoint.ifM( retryingOnAllErrors( - limitRetries(config.maxCommitRetries) - .join(constantDelay(config.maxCommitRetryDuration)), + limitRetries(config.fs2Config.commitMaxRetries) + .join(constantDelay(config.fs2Config.commitRetryInterval)), noop[F, Throwable] )(max.checkpoint), F.unit diff --git a/kcl-localstack/src/main/scala/kinesis4cats/kcl/fs2/localstack/LocalstackKCLConsumerFS2.scala b/kcl-localstack/src/main/scala/kinesis4cats/kcl/fs2/localstack/LocalstackKCLConsumerFS2.scala index 1ffa66e1..ffacc57e 100644 --- a/kcl-localstack/src/main/scala/kinesis4cats/kcl/fs2/localstack/LocalstackKCLConsumerFS2.scala +++ b/kcl-localstack/src/main/scala/kinesis4cats/kcl/fs2/localstack/LocalstackKCLConsumerFS2.scala @@ -18,15 +18,12 @@ package kinesis4cats.kcl package fs2 package localstack -import scala.concurrent.duration._ - import java.util.UUID import cats.Parallel import cats.effect.std.Queue import cats.effect.syntax.all._ import cats.effect.{Async, Resource} -import cats.syntax.all._ import software.amazon.kinesis.common._ import kinesis4cats.kcl.localstack.LocalstackKCLConsumer @@ -81,7 +78,8 @@ object LocalstackKCLConsumerFS2 { position, processConfig )(KCLConsumerFS2.callback(queue)) - } yield KCLConsumerFS2.Config[F](underlying, queue, 5, 1.second, 5, 0.seconds) + } yield KCLConsumerFS2 + .Config[F](underlying, queue, KCLConsumerFS2.FS2Config.default) /** Creates a * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] that @@ -123,7 +121,8 @@ object LocalstackKCLConsumerFS2 { workerId, processConfig )(KCLConsumerFS2.callback(queue)) - } yield KCLConsumerFS2.Config[F](underlying, queue, 5, 1.second, 5, 0.seconds) + } yield KCLConsumerFS2 + .Config[F](underlying, queue, KCLConsumerFS2.FS2Config.default) /** Creates a * [[kinesis4cats.kcl.fs2.KCLConsumerFS2.Config KCLConsumerFS2.Config]] that diff --git a/project/KCLFS2CirisSpecVars.scala b/project/KCLFS2CirisSpecVars.scala new file mode 100644 index 00000000..50c14f9f --- /dev/null +++ b/project/KCLFS2CirisSpecVars.scala @@ -0,0 +1,30 @@ +import sbtbuildinfo.BuildInfoKey + +object KCLFS2CirisSpecVars { + + val propsAndEnvs = + KCLCirisSpecVars.propsAndEnvs ++ PropsAndEnvs( + List( + CirisUtil.propAndEnv(List("kcl", "fs2", "queue", "size"), "200"), + CirisUtil + .propAndEnv(List("kcl", "fs2", "commit", "max", "chunk"), "500"), + CirisUtil + .propAndEnv(List("kcl", "fs2", "commit", "max", "wait"), "5 seconds"), + CirisUtil + .propAndEnv(List("kcl", "fs2", "commit", "max", "retries"), "3"), + CirisUtil + .propAndEnv( + List("kcl", "fs2", "commit", "retry", "interval"), + "1 second" + ), + CirisUtil.propAndEnv( + List("kcl", "processor", "auto", "commit"), + "false" + ) + ) + ) + + val env: Map[String, String] = propsAndEnvs.envs + val prop: Seq[String] = propsAndEnvs.props + val buildInfoKeys: Seq[BuildInfoKey] = propsAndEnvs.buildInfoKeys +}