Skip to content

Commit

Permalink
Add workerMetricList support
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Meisel committed Jan 27, 2025
1 parent de3d22a commit 513bf25
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 9 deletions.
41 changes: 34 additions & 7 deletions kcl-ciris/src/main/scala/kinesis4cats/kcl/ciris/KCLCiris.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import software.amazon.kinesis.processor.SingleStreamTracker
import software.amazon.kinesis.retrieval.fanout.FanOutConfig
import software.amazon.kinesis.retrieval.polling.PollingConfig
import software.amazon.kinesis.retrieval.{AggregatorUtil, RetrievalConfig}
import software.amazon.kinesis.worker.metric.WorkerMetric

import kinesis4cats.Utils
import kinesis4cats.ciris.CirisReader
Expand Down Expand Up @@ -106,6 +107,10 @@ object KCLCiris {
* @param LE
* [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]]
* for encoding structured logs
* @param workerMetrics
* List of
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/WorkerMetric.java WorkerMetrics]]
* for the application
* @return
* [[cats.effect.Resource Resource]] containing the
* [[kinesis4cats.kcl.KCLConsumer KCLConsumer]]
Expand All @@ -130,7 +135,8 @@ object KCLCiris {
glueSchemaRegistryDeserializer: Option[GlueSchemaRegistryDeserializer] =
None,
encoders: RecordProcessor.LogEncoders = RecordProcessor.LogEncoders.show,
managedClients: Boolean = true
managedClients: Boolean = true,
workerMetrics: Option[List[WorkerMetric]] = None
)(cb: List[CommittableRecord[F]] => F[Unit])(implicit
F: Async[F]
): Resource[F, KCLConsumer[F]] = for {
Expand Down Expand Up @@ -159,6 +165,7 @@ object KCLCiris {
tableCreatorCallback,
leaseManagementFactory,
leaseExecutorService,
workerMetrics,
aggregatorUtil,
taskExecutionListener,
metricsFactory,
Expand Down Expand Up @@ -197,6 +204,10 @@ object KCLCiris {
* @param leaseExecutorService
* [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]]
* for the lease management
* @param workerMetrics
* List of
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/WorkerMetric.java WorkerMetrics]]
* for the application
* @param aggregatorUtil
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AggregatorUtil.java AggregatorUtil]]
* @param taskExecutionListener
Expand Down Expand Up @@ -230,6 +241,7 @@ object KCLCiris {
tableCreatorCallback: Option[TableCreatorCallback],
leaseManagementFactory: Option[LeaseManagementFactory],
leaseExecutorService: Option[ExecutorService],
workerMetrics: Option[List[WorkerMetric]],
aggregatorUtil: Option[AggregatorUtil],
taskExecutionListener: Option[TaskExecutionListener],
metricsFactory: Option[MetricsFactory],
Expand All @@ -252,7 +264,8 @@ object KCLCiris {
customShardDetectorProvider,
tableCreatorCallback,
leaseManagementFactory,
leaseExecutorService
leaseExecutorService,
workerMetrics
)
lifecycleConfig <- Lifecycle
.resource[F](prefix, aggregatorUtil, taskExecutionListener)
Expand Down Expand Up @@ -498,9 +511,13 @@ object KCLCiris {
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java HierarchicalShardSyncer]]
* @param leaseManagementFactory
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java LeaseManagementFactory]]
* @param leaseExecutorService
* @param executorService
* [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]]
* for the lease management
* @param workerMetrics
* List of
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/WorkerMetric.java WorkerMetrics]]
* for the application
* @return
* [[https://cir.is/api/ciris/ConfigDecoder.html ConfigDecoder]] of
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java LeaseManagementConfig]]
Expand All @@ -512,7 +529,8 @@ object KCLCiris {
customShardDetectorProvider: Option[StreamConfig => ShardDetector],
tableCreatorCallback: Option[TableCreatorCallback],
leaseManagementFactory: Option[LeaseManagementFactory],
executorService: Option[ExecutorService]
executorService: Option[ExecutorService],
workerMetrics: Option[List[WorkerMetric]]
): ConfigValue[Effect, LeaseManagementConfig] = for {
appName <- Common.readAppName(prefix)
tableName <- CirisReader
Expand Down Expand Up @@ -928,6 +946,9 @@ object KCLCiris {
_.varianceBalancingFrequency(_)
)
.maybeTransform(workerMetricsEMAAlpha)(_.workerMetricsEMAAlpha(_))
.maybeTransform(workerMetrics) { (conf, workerMetricList) =>
conf.workerMetricList(workerMetricList.asJava)
}
.workerMetricsTableConfig(workerMetricsTableConfig)
leasesRecoveryAuditorExecutionFrequencyMillis <- CirisReader
.readOptional[Duration](
Expand Down Expand Up @@ -1028,9 +1049,13 @@ object KCLCiris {
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java HierarchicalShardSyncer]]
* @param leaseManagementFactory
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java LeaseManagementFactory]]
* @param leaseExecutorService
* @param executorService
* [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]]
* for the lease management
* @param workerMetrics
* List of
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/WorkerMetric.java WorkerMetrics]]
* for the application
* @param F
* [[cats.effect.Async Async]]
* @return
Expand All @@ -1044,15 +1069,17 @@ object KCLCiris {
customShardDetectorProvider: Option[StreamConfig => ShardDetector],
tableCreatorCallback: Option[TableCreatorCallback],
leaseManagementFactory: Option[LeaseManagementFactory],
executorService: Option[ExecutorService]
executorService: Option[ExecutorService],
workerMetrics: Option[List[WorkerMetric]]
)(implicit F: Async[F]): Resource[F, LeaseManagementConfig] = read(
dynamoClient,
kinesisClient,
prefix,
customShardDetectorProvider,
tableCreatorCallback,
leaseManagementFactory,
executorService
executorService,
workerMetrics
).resource[F]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback
import software.amazon.kinesis.lifecycle._
import software.amazon.kinesis.metrics._
import software.amazon.kinesis.retrieval.AggregatorUtil
import software.amazon.kinesis.worker.metric.WorkerMetric

import kinesis4cats.ciris.CirisReader
import kinesis4cats.instances.ciris._
Expand Down Expand Up @@ -100,6 +101,10 @@ object KCLCirisFS2 {
* @param encoders
* [[kinesis4cats.kcl.RecordProcessor.LogEncoders RecordProcessor.LogEncoders]]
* for encoding structured logs
* @param workerMetrics
* List of
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/WorkerMetric.java WorkerMetrics]]
* for the application
* @return
* [[cats.effect.Resource Resource]] containing the
* [[kinesis4cats.kcl.fs2.KCLConsumerFS2 KCLConsumerFS2]]
Expand All @@ -124,7 +129,8 @@ object KCLCirisFS2 {
glueSchemaRegistryDeserializer: Option[GlueSchemaRegistryDeserializer] =
None,
encoders: RecordProcessor.LogEncoders = RecordProcessor.LogEncoders.show,
managedClients: Boolean = true
managedClients: Boolean = true,
workerMetrics: Option[List[WorkerMetric]] = None
)(implicit
F: Async[F],
P: Parallel[F]
Expand Down Expand Up @@ -154,6 +160,7 @@ object KCLCirisFS2 {
tableCreatorCallback,
leaseManagementFactory,
leaseExecutorService,
workerMetrics,
aggregatorUtil,
taskExecutionListener,
metricsFactory,
Expand Down Expand Up @@ -192,6 +199,10 @@ object KCLCirisFS2 {
* @param leaseExecutorService
* [[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html ExecutorService]]
* for the lease management
* @param workerMetrics
* List of
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/worker/metric/WorkerMetric.java WorkerMetrics]]
* for the application
* @param aggregatorUtil
* [[https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AggregatorUtil.java AggregatorUtil]]
* @param taskExecutionListener
Expand Down Expand Up @@ -225,6 +236,7 @@ object KCLCirisFS2 {
tableCreatorCallback: Option[TableCreatorCallback],
leaseManagementFactory: Option[LeaseManagementFactory],
leaseExecutorService: Option[ExecutorService],
workerMetrics: Option[List[WorkerMetric]],
aggregatorUtil: Option[AggregatorUtil],
taskExecutionListener: Option[TaskExecutionListener],
metricsFactory: Option[MetricsFactory],
Expand Down Expand Up @@ -255,7 +267,8 @@ object KCLCirisFS2 {
customShardDetectorProvider,
tableCreatorCallback,
leaseManagementFactory,
leaseExecutorService
leaseExecutorService,
workerMetrics
)
lifecycleConfig <- KCLCiris.Lifecycle
.resource[F](prefix, aggregatorUtil, taskExecutionListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class KCLCirisSpec extends munit.CatsEffectSuite {
None,
None,
None,
None,
None
)
.load[IO]
Expand All @@ -114,6 +115,7 @@ class KCLCirisSpec extends munit.CatsEffectSuite {
None,
None,
None,
None,
None
)
.load[IO]
Expand Down

0 comments on commit 513bf25

Please sign in to comment.