Skip to content

Commit

Permalink
KCL FS2 Ciris Tests (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
etspaceman authored Feb 4, 2023
1 parent c62da8b commit 63dc687
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 132 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,15 @@ lazy val `kcl-ciris` = project
.dependsOn(kcl, `shared-ciris`, `kcl-localstack` % Test)

lazy val `kcl-fs2-ciris` = project
.settings(BuildInfoPlugin.buildInfoDefaultSettings)
.settings(BuildInfoPlugin.buildInfoScopedSettings(Test))
.settings(
description := "Ciris tooling for the Kinesis Client Library (KCL) via FS2"
description := "Ciris tooling for the Kinesis Client Library (KCL) via FS2",
Test / envVars ++= KCLFS2CirisSpecVars.env,
Test / javaOptions ++= KCLFS2CirisSpecVars.prop,
Test / buildInfoKeys := KCLFS2CirisSpecVars.buildInfoKeys,
Test / buildInfoPackage := "kinesis4cats.kcl.fs2.ciris",
Test / buildInfoOptions += BuildInfoOption.ConstantValue
)
.enableIntegrationTests
.dependsOn(`kcl-fs2`, `kcl-ciris`, `shared-ciris`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.concurrent.duration._

import java.util.concurrent.ExecutorService

import _root_.ciris._
import cats.Parallel
import cats.effect.{Async, Resource}
import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializer
Expand Down Expand Up @@ -213,44 +214,14 @@ object KCLCirisFS2 {
F: Async[F],
LE: RecordProcessor.LogEncoders
): Resource[F, KCLConsumerFS2.Config[F]] = for {
queueSize <- CirisReader
.readDefaulted[Int](List("kcl", "fs2", "queue", "size"), 100, prefix)
.resource[F]
commitMaxChunk <- CirisReader
.readDefaulted[Int](
List("kcl", "fs2", "commit", "max", "chunk"),
1000,
prefix
)
.resource[F]
commitMaxWait <- CirisReader
.readDefaulted[FiniteDuration](
List("kcl", "fs2", "commit", "max", "wait"),
10.seconds,
prefix
)
.resource[F]
commitMaxRetries <- CirisReader
.readDefaulted[Int](
List("kcl", "fs2", "commit", "max", "retries"),
5,
prefix
)
.resource[F]
commitRetryInterval <- CirisReader
.readDefaulted[FiniteDuration](
List("kcl", "fs2", "commit", "retry", "interval"),
0.seconds,
prefix
)
.resource[F]
autoCommit <- CirisReader
.readDefaulted[Boolean](
List("kcl", "processor", "auto", "commit"),
false,
prefix
)
.resource[F]
fs2Config <- readFS2Config(prefix).resource[F]
checkpointConfig <- KCLCiris.Checkpoint.resource[F]
coordinatorConfig <- KCLCiris.Coordinator.resource[F](
prefix,
Expand Down Expand Up @@ -282,15 +253,48 @@ object KCLCirisFS2 {
lifecycleConfig,
metricsConfig,
retrievalConfig,
queueSize,
commitMaxChunk,
commitMaxWait,
commitMaxRetries,
commitRetryInterval,
fs2Config,
processConfig.copy(recordProcessorConfig =
processConfig.recordProcessorConfig.copy(autoCommit = autoCommit)
)
)

} yield config

def readFS2Config(
prefix: Option[String] = None
): ConfigValue[Effect, KCLConsumerFS2.FS2Config] = for {
queueSize <- CirisReader
.readDefaulted[Int](List("kcl", "fs2", "queue", "size"), 100, prefix)
commitMaxChunk <- CirisReader
.readDefaulted[Int](
List("kcl", "fs2", "commit", "max", "chunk"),
1000,
prefix
)
commitMaxWait <- CirisReader
.readDefaulted[FiniteDuration](
List("kcl", "fs2", "commit", "max", "wait"),
10.seconds,
prefix
)
commitMaxRetries <- CirisReader
.readDefaulted[Int](
List("kcl", "fs2", "commit", "max", "retries"),
5,
prefix
)
commitRetryInterval <- CirisReader
.readDefaulted[FiniteDuration](
List("kcl", "fs2", "commit", "retry", "interval"),
0.seconds,
prefix
)
} yield KCLConsumerFS2.FS2Config(
queueSize,
commitMaxChunk,
commitMaxWait,
commitMaxRetries,
commitRetryInterval
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2023-2023 etspaceman
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kinesis4cats.kcl.fs2.instances

import cats.Eq
import cats.syntax.all._

import kinesis4cats.kcl.fs2.KCLConsumerFS2

object eq {
implicit val fs2ConfigEq: Eq[KCLConsumerFS2.FS2Config] = (x, y) =>
x.commitMaxChunk === y.commitMaxChunk &&
x.commitMaxRetries === y.commitMaxRetries &&
x.commitMaxWait === y.commitMaxWait &&
x.commitRetryInterval === y.commitRetryInterval
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2023-2023 etspaceman
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kinesis4cats.kcl.fs2.instances

import cats.Show

import kinesis4cats.ShowBuilder
import kinesis4cats.kcl.fs2.KCLConsumerFS2

object show {
implicit val fs2ConfigShow: Show[KCLConsumerFS2.FS2Config] = x =>
ShowBuilder("FS2Config")
.add("queueSize", x.queueSize)
.add("commitMaxChunk", x.commitMaxChunk)
.add("commitMaxWait", x.commitMaxWait)
.add("commitMaxRetries", x.commitMaxRetries)
.add("commitRetryInterval", x.commitRetryInterval)
.build
}
14 changes: 14 additions & 0 deletions kcl-fs2-ciris/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[%thread] %highlight(%-5level) %d{ISO8601} %cyan(%logger{15}) %yellow(%mdc) - %msg %n</pattern>
</encoder>
</appender>
<logger name="kinesis4cats" level="ERROR"/>
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="STDOUT" />
</appender>
<root level="ERROR">
<appender-ref ref="ASYNC" />
</root>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2023-2023 etspaceman
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kinesis4cats.kcl.fs2.ciris

import scala.concurrent.duration._

import cats.effect.IO
import cats.syntax.all._

import kinesis4cats.kcl.fs2.KCLConsumerFS2
import kinesis4cats.kcl.fs2.instances.eq._
import kinesis4cats.kcl.fs2.instances.show._

class KCLCirisFS2Spec extends munit.CatsEffectSuite {
test(
"It should load the environment variables the same as system properties for CoordinatorConfig"
) {
for {
configEnv <- KCLCirisFS2.readFS2Config(Some("env")).load[IO]
configProp <- KCLCirisFS2.readFS2Config(Some("prop")).load[IO]
expected = KCLConsumerFS2.FS2Config(
200,
500,
5.seconds,
3,
1.second
)
} yield {
assert(
configEnv === configProp,
s"envi: ${configEnv.show}\nprop: ${configProp.show}"
)
assert(
configEnv === expected,
s"envi: ${configEnv.show}\nexpe: ${expected.show}"
)
}
}
}
Loading

0 comments on commit 63dc687

Please sign in to comment.