From 975d182b209608d3b8b9ca3a4210503000939882 Mon Sep 17 00:00:00 2001 From: Roma Frolov <98967567+roma-frolov@users.noreply.github.com> Date: Sun, 20 Oct 2024 19:46:37 +0300 Subject: [PATCH] Feature: Prometheus Middleware (#1791) * base prometheus middleware * rabbit prometheus middleware * fixed always message ack in the absence of errors * small refactoring, redis metrics * kafka prometheus * str statuses -> StrEnums * fix kafka tests * confluent prometheus * nats prometheus * edit message count in process & fix settings provider for Nats KV and Nats OS * importorskip in tests * doc & ruff & mypy * docs: generate API References * clarifications added * pushback tests fixed * lint fixed * lint fixed * number of tests reduced & fix confluent mark * test cases renamed * confluent tests fixed * documentation addition * updated SUMMARY.md * docs: generate API References * trailing-whitespace in index.md * deleted pragma: no-cover * tests for getting started code examples * Revert "tests for getting started code examples" This reverts commit b54950dc771b10b1102c52db68449993942e3024. * MetricsManager abstraction added and used in middleware * fixed an error when there was no exchange in tests * documentation has been corrected due to the renaming of metrics * docs: generate API References * fixed type annotations * added app_name label in index.md * removed extra space in metric doc * fixed status in add_published_message * fixed buckets type * test for implementing metrics on a real prometheus_client * small tests refactoring * changed metrics_prefix default value * **kwargs in get_broker mtehod * readable params in tests * simplified annotation * EMPTY as metrics_prefix default value * fixed test_observe_received_messages_size * apply_types=False as default in get_broker * external links in prometheus/index.md * fixed doc * fixed params naming * fix: revert acknowledgement changes * lint: fix precommit * chore: bump version * app_name EMPTY is default --------- Co-authored-by: roma-frolov Co-authored-by: Nikita Pastukhov Co-authored-by: Pastukhov Nikita --- docs/docs/SUMMARY.md | 60 ++ .../prometheus/KafkaPrometheusMiddleware.md | 11 + .../middleware/KafkaPrometheusMiddleware.md | 11 + .../BaseConfluentMetricsSettingsProvider.md | 11 + .../BatchConfluentMetricsSettingsProvider.md | 11 + .../ConfluentMetricsSettingsProvider.md | 11 + .../provider/settings_provider_factory.md | 11 + .../prometheus/KafkaPrometheusMiddleware.md | 11 + .../middleware/KafkaPrometheusMiddleware.md | 11 + .../BaseKafkaMetricsSettingsProvider.md | 11 + .../BatchKafkaMetricsSettingsProvider.md | 11 + .../provider/KafkaMetricsSettingsProvider.md | 11 + .../provider/settings_provider_factory.md | 11 + .../prometheus/NatsPrometheusMiddleware.md | 11 + .../middleware/NatsPrometheusMiddleware.md | 11 + .../BaseNatsMetricsSettingsProvider.md | 11 + .../BatchNatsMetricsSettingsProvider.md | 11 + .../provider/NatsMetricsSettingsProvider.md | 11 + .../provider/settings_provider_factory.md | 11 + .../prometheus/BasePrometheusMiddleware.md | 11 + .../api/faststream/prometheus/ConsumeAttrs.md | 11 + .../prometheus/MetricsSettingsProvider.md | 11 + .../prometheus/container/MetricsContainer.md | 11 + .../prometheus/manager/MetricsManager.md | 11 + .../middleware/BasePrometheusMiddleware.md | 11 + .../middleware/PrometheusMiddleware.md | 11 + .../provider/MetricsSettingsProvider.md | 11 + .../prometheus/types/ConsumeAttrs.md | 11 + .../prometheus/types/ProcessingStatus.md | 11 + .../prometheus/types/PublishingStatus.md | 11 + .../prometheus/RabbitPrometheusMiddleware.md | 11 + .../middleware/RabbitPrometheusMiddleware.md | 11 + .../provider/RabbitMetricsSettingsProvider.md | 11 + .../prometheus/RedisPrometheusMiddleware.md | 11 + .../middleware/RedisPrometheusMiddleware.md | 11 + .../BaseRedisMetricsSettingsProvider.md | 11 + .../BatchRedisMetricsSettingsProvider.md | 11 + .../provider/RedisMetricsSettingsProvider.md | 11 + .../provider/settings_provider_factory.md | 11 + .../en/getting-started/prometheus/index.md | 82 +++ docs/docs/navigation_template.txt | 1 + .../getting_started/prometheus/__init__.py | 0 .../getting_started/prometheus/confluent.py | 13 + .../prometheus/confluent_asgi.py | 18 + .../getting_started/prometheus/kafka.py | 13 + .../getting_started/prometheus/kafka_asgi.py | 18 + .../getting_started/prometheus/nats.py | 13 + .../getting_started/prometheus/nats_asgi.py | 18 + .../getting_started/prometheus/rabbit.py | 13 + .../getting_started/prometheus/rabbit_asgi.py | 18 + .../getting_started/prometheus/redis.py | 13 + .../getting_started/prometheus/redis_asgi.py | 18 + docs/includes/getting_started/prometheus/1.md | 24 + docs/includes/getting_started/prometheus/2.md | 24 + faststream/__about__.py | 2 +- faststream/broker/message.py | 9 +- faststream/confluent/message.py | 4 +- faststream/confluent/prometheus/__init__.py | 3 + faststream/confluent/prometheus/middleware.py | 26 + faststream/confluent/prometheus/provider.py | 64 ++ faststream/kafka/message.py | 4 +- faststream/kafka/prometheus/__init__.py | 3 + faststream/kafka/prometheus/middleware.py | 26 + faststream/kafka/prometheus/provider.py | 64 ++ faststream/nats/message.py | 6 +- faststream/nats/prometheus/__init__.py | 3 + faststream/nats/prometheus/middleware.py | 26 + faststream/nats/prometheus/provider.py | 66 ++ faststream/prometheus/__init__.py | 9 + faststream/prometheus/consts.py | 17 + faststream/prometheus/container.py | 100 +++ faststream/prometheus/manager.py | 131 ++++ faststream/prometheus/middleware.py | 202 ++++++ faststream/prometheus/provider.py | 22 + faststream/prometheus/types.py | 21 + faststream/rabbit/prometheus/__init__.py | 3 + faststream/rabbit/prometheus/middleware.py | 26 + faststream/rabbit/prometheus/provider.py | 44 ++ faststream/redis/message.py | 2 +- faststream/redis/prometheus/__init__.py | 3 + faststream/redis/prometheus/middleware.py | 26 + faststream/redis/prometheus/provider.py | 63 ++ pyproject.toml | 4 +- tests/brokers/test_pushback.py | 2 +- tests/prometheus/__init__.py | 0 tests/prometheus/basic.py | 204 ++++++ tests/prometheus/confluent/__init__.py | 3 + tests/prometheus/confluent/test_confluent.py | 79 +++ tests/prometheus/kafka/__init__.py | 3 + tests/prometheus/kafka/test_kafka.py | 82 +++ tests/prometheus/nats/__init__.py | 3 + tests/prometheus/nats/test_nats.py | 86 +++ tests/prometheus/rabbit/__init__.py | 3 + tests/prometheus/rabbit/test_rabbit.py | 42 ++ tests/prometheus/redis/__init__.py | 3 + tests/prometheus/redis/test_redis.py | 77 +++ tests/prometheus/test_metrics.py | 644 ++++++++++++++++++ 97 files changed, 2960 insertions(+), 14 deletions(-) create mode 100644 docs/docs/en/api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/confluent/prometheus/provider/settings_provider_factory.md create mode 100644 docs/docs/en/api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/kafka/prometheus/provider/settings_provider_factory.md create mode 100644 docs/docs/en/api/faststream/nats/prometheus/NatsPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/nats/prometheus/provider/settings_provider_factory.md create mode 100644 docs/docs/en/api/faststream/prometheus/BasePrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/prometheus/ConsumeAttrs.md create mode 100644 docs/docs/en/api/faststream/prometheus/MetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/prometheus/container/MetricsContainer.md create mode 100644 docs/docs/en/api/faststream/prometheus/manager/MetricsManager.md create mode 100644 docs/docs/en/api/faststream/prometheus/middleware/BasePrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/prometheus/middleware/PrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/prometheus/provider/MetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/prometheus/types/ConsumeAttrs.md create mode 100644 docs/docs/en/api/faststream/prometheus/types/ProcessingStatus.md create mode 100644 docs/docs/en/api/faststream/prometheus/types/PublishingStatus.md create mode 100644 docs/docs/en/api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/redis/prometheus/RedisPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md create mode 100644 docs/docs/en/api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md create mode 100644 docs/docs/en/api/faststream/redis/prometheus/provider/settings_provider_factory.md create mode 100644 docs/docs/en/getting-started/prometheus/index.md create mode 100644 docs/docs_src/getting_started/prometheus/__init__.py create mode 100644 docs/docs_src/getting_started/prometheus/confluent.py create mode 100644 docs/docs_src/getting_started/prometheus/confluent_asgi.py create mode 100644 docs/docs_src/getting_started/prometheus/kafka.py create mode 100644 docs/docs_src/getting_started/prometheus/kafka_asgi.py create mode 100644 docs/docs_src/getting_started/prometheus/nats.py create mode 100644 docs/docs_src/getting_started/prometheus/nats_asgi.py create mode 100644 docs/docs_src/getting_started/prometheus/rabbit.py create mode 100644 docs/docs_src/getting_started/prometheus/rabbit_asgi.py create mode 100644 docs/docs_src/getting_started/prometheus/redis.py create mode 100644 docs/docs_src/getting_started/prometheus/redis_asgi.py create mode 100644 docs/includes/getting_started/prometheus/1.md create mode 100644 docs/includes/getting_started/prometheus/2.md create mode 100644 faststream/confluent/prometheus/__init__.py create mode 100644 faststream/confluent/prometheus/middleware.py create mode 100644 faststream/confluent/prometheus/provider.py create mode 100644 faststream/kafka/prometheus/__init__.py create mode 100644 faststream/kafka/prometheus/middleware.py create mode 100644 faststream/kafka/prometheus/provider.py create mode 100644 faststream/nats/prometheus/__init__.py create mode 100644 faststream/nats/prometheus/middleware.py create mode 100644 faststream/nats/prometheus/provider.py create mode 100644 faststream/prometheus/__init__.py create mode 100644 faststream/prometheus/consts.py create mode 100644 faststream/prometheus/container.py create mode 100644 faststream/prometheus/manager.py create mode 100644 faststream/prometheus/middleware.py create mode 100644 faststream/prometheus/provider.py create mode 100644 faststream/prometheus/types.py create mode 100644 faststream/rabbit/prometheus/__init__.py create mode 100644 faststream/rabbit/prometheus/middleware.py create mode 100644 faststream/rabbit/prometheus/provider.py create mode 100644 faststream/redis/prometheus/__init__.py create mode 100644 faststream/redis/prometheus/middleware.py create mode 100644 faststream/redis/prometheus/provider.py create mode 100644 tests/prometheus/__init__.py create mode 100644 tests/prometheus/basic.py create mode 100644 tests/prometheus/confluent/__init__.py create mode 100644 tests/prometheus/confluent/test_confluent.py create mode 100644 tests/prometheus/kafka/__init__.py create mode 100644 tests/prometheus/kafka/test_kafka.py create mode 100644 tests/prometheus/nats/__init__.py create mode 100644 tests/prometheus/nats/test_nats.py create mode 100644 tests/prometheus/rabbit/__init__.py create mode 100644 tests/prometheus/rabbit/test_rabbit.py create mode 100644 tests/prometheus/redis/__init__.py create mode 100644 tests/prometheus/redis/test_redis.py create mode 100644 tests/prometheus/test_metrics.py diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index 6f998be4f8..86ff1025b7 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -44,6 +44,7 @@ search: - [CLI](getting-started/cli/index.md) - [ASGI](getting-started/asgi.md) - [OpenTelemetry](getting-started/opentelemetry/index.md) + - [Prometheus](getting-started/prometheus/index.md) - [Logging](getting-started/logging.md) - [Config Management](getting-started/config/index.md) - [Task Scheduling](scheduling.md) @@ -523,6 +524,15 @@ search: - [telemetry_attributes_provider_factory](api/faststream/confluent/opentelemetry/provider/telemetry_attributes_provider_factory.md) - parser - [AsyncConfluentParser](api/faststream/confluent/parser/AsyncConfluentParser.md) + - prometheus + - [KafkaPrometheusMiddleware](api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md) + - middleware + - [KafkaPrometheusMiddleware](api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md) + - provider + - [BaseConfluentMetricsSettingsProvider](api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md) + - [BatchConfluentMetricsSettingsProvider](api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md) + - [ConfluentMetricsSettingsProvider](api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md) + - [settings_provider_factory](api/faststream/confluent/prometheus/provider/settings_provider_factory.md) - publisher - asyncapi - [AsyncAPIBatchPublisher](api/faststream/confluent/publisher/asyncapi/AsyncAPIBatchPublisher.md) @@ -619,6 +629,15 @@ search: - parser - [AioKafkaBatchParser](api/faststream/kafka/parser/AioKafkaBatchParser.md) - [AioKafkaParser](api/faststream/kafka/parser/AioKafkaParser.md) + - prometheus + - [KafkaPrometheusMiddleware](api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md) + - middleware + - [KafkaPrometheusMiddleware](api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md) + - provider + - [BaseKafkaMetricsSettingsProvider](api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md) + - [BatchKafkaMetricsSettingsProvider](api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md) + - [KafkaMetricsSettingsProvider](api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md) + - [settings_provider_factory](api/faststream/kafka/prometheus/provider/settings_provider_factory.md) - publisher - asyncapi - [AsyncAPIBatchPublisher](api/faststream/kafka/publisher/asyncapi/AsyncAPIBatchPublisher.md) @@ -732,6 +751,15 @@ search: - [NatsBaseParser](api/faststream/nats/parser/NatsBaseParser.md) - [NatsParser](api/faststream/nats/parser/NatsParser.md) - [ObjParser](api/faststream/nats/parser/ObjParser.md) + - prometheus + - [NatsPrometheusMiddleware](api/faststream/nats/prometheus/NatsPrometheusMiddleware.md) + - middleware + - [NatsPrometheusMiddleware](api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md) + - provider + - [BaseNatsMetricsSettingsProvider](api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md) + - [BatchNatsMetricsSettingsProvider](api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md) + - [NatsMetricsSettingsProvider](api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md) + - [settings_provider_factory](api/faststream/nats/prometheus/provider/settings_provider_factory.md) - publisher - asyncapi - [AsyncAPIPublisher](api/faststream/nats/publisher/asyncapi/AsyncAPIPublisher.md) @@ -810,6 +838,23 @@ search: - [TelemetryMiddleware](api/faststream/opentelemetry/middleware/TelemetryMiddleware.md) - provider - [TelemetrySettingsProvider](api/faststream/opentelemetry/provider/TelemetrySettingsProvider.md) + - prometheus + - [BasePrometheusMiddleware](api/faststream/prometheus/BasePrometheusMiddleware.md) + - [ConsumeAttrs](api/faststream/prometheus/ConsumeAttrs.md) + - [MetricsSettingsProvider](api/faststream/prometheus/MetricsSettingsProvider.md) + - container + - [MetricsContainer](api/faststream/prometheus/container/MetricsContainer.md) + - manager + - [MetricsManager](api/faststream/prometheus/manager/MetricsManager.md) + - middleware + - [BasePrometheusMiddleware](api/faststream/prometheus/middleware/BasePrometheusMiddleware.md) + - [PrometheusMiddleware](api/faststream/prometheus/middleware/PrometheusMiddleware.md) + - provider + - [MetricsSettingsProvider](api/faststream/prometheus/provider/MetricsSettingsProvider.md) + - types + - [ConsumeAttrs](api/faststream/prometheus/types/ConsumeAttrs.md) + - [ProcessingStatus](api/faststream/prometheus/types/ProcessingStatus.md) + - [PublishingStatus](api/faststream/prometheus/types/PublishingStatus.md) - rabbit - [ExchangeType](api/faststream/rabbit/ExchangeType.md) - [RabbitBroker](api/faststream/rabbit/RabbitBroker.md) @@ -848,6 +893,12 @@ search: - [RabbitTelemetrySettingsProvider](api/faststream/rabbit/opentelemetry/provider/RabbitTelemetrySettingsProvider.md) - parser - [AioPikaParser](api/faststream/rabbit/parser/AioPikaParser.md) + - prometheus + - [RabbitPrometheusMiddleware](api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md) + - middleware + - [RabbitPrometheusMiddleware](api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md) + - provider + - [RabbitMetricsSettingsProvider](api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md) - publisher - asyncapi - [AsyncAPIPublisher](api/faststream/rabbit/publisher/asyncapi/AsyncAPIPublisher.md) @@ -949,6 +1000,15 @@ search: - [RedisPubSubParser](api/faststream/redis/parser/RedisPubSubParser.md) - [RedisStreamParser](api/faststream/redis/parser/RedisStreamParser.md) - [SimpleParser](api/faststream/redis/parser/SimpleParser.md) + - prometheus + - [RedisPrometheusMiddleware](api/faststream/redis/prometheus/RedisPrometheusMiddleware.md) + - middleware + - [RedisPrometheusMiddleware](api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md) + - provider + - [BaseRedisMetricsSettingsProvider](api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md) + - [BatchRedisMetricsSettingsProvider](api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md) + - [RedisMetricsSettingsProvider](api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md) + - [settings_provider_factory](api/faststream/redis/prometheus/provider/settings_provider_factory.md) - publisher - asyncapi - [AsyncAPIChannelPublisher](api/faststream/redis/publisher/asyncapi/AsyncAPIChannelPublisher.md) diff --git a/docs/docs/en/api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md b/docs/docs/en/api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md new file mode 100644 index 0000000000..e84e84acc3 --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.prometheus.KafkaPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md b/docs/docs/en/api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md new file mode 100644 index 0000000000..6603893f74 --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.prometheus.middleware.KafkaPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md b/docs/docs/en/api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md new file mode 100644 index 0000000000..27c186c098 --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.prometheus.provider.BaseConfluentMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md b/docs/docs/en/api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md new file mode 100644 index 0000000000..f784a64e9f --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.prometheus.provider.BatchConfluentMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md b/docs/docs/en/api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md new file mode 100644 index 0000000000..65f0a8348e --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.prometheus.provider.ConfluentMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/confluent/prometheus/provider/settings_provider_factory.md b/docs/docs/en/api/faststream/confluent/prometheus/provider/settings_provider_factory.md new file mode 100644 index 0000000000..78358f46e3 --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/prometheus/provider/settings_provider_factory.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.prometheus.provider.settings_provider_factory diff --git a/docs/docs/en/api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md b/docs/docs/en/api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md new file mode 100644 index 0000000000..c2ffd5356a --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.prometheus.KafkaPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md b/docs/docs/en/api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md new file mode 100644 index 0000000000..451b7080c0 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.prometheus.middleware.KafkaPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md b/docs/docs/en/api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md new file mode 100644 index 0000000000..0fd044f694 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.prometheus.provider.BaseKafkaMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md b/docs/docs/en/api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md new file mode 100644 index 0000000000..9bd01d5e71 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.prometheus.provider.BatchKafkaMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md b/docs/docs/en/api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md new file mode 100644 index 0000000000..ae7c490da8 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.prometheus.provider.KafkaMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/kafka/prometheus/provider/settings_provider_factory.md b/docs/docs/en/api/faststream/kafka/prometheus/provider/settings_provider_factory.md new file mode 100644 index 0000000000..1393fd9065 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/prometheus/provider/settings_provider_factory.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.prometheus.provider.settings_provider_factory diff --git a/docs/docs/en/api/faststream/nats/prometheus/NatsPrometheusMiddleware.md b/docs/docs/en/api/faststream/nats/prometheus/NatsPrometheusMiddleware.md new file mode 100644 index 0000000000..d9b179b0c4 --- /dev/null +++ b/docs/docs/en/api/faststream/nats/prometheus/NatsPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.nats.prometheus.NatsPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md b/docs/docs/en/api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md new file mode 100644 index 0000000000..7202731048 --- /dev/null +++ b/docs/docs/en/api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.nats.prometheus.middleware.NatsPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md b/docs/docs/en/api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md new file mode 100644 index 0000000000..80742833bc --- /dev/null +++ b/docs/docs/en/api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.nats.prometheus.provider.BaseNatsMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md b/docs/docs/en/api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md new file mode 100644 index 0000000000..163ebb7bc6 --- /dev/null +++ b/docs/docs/en/api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.nats.prometheus.provider.BatchNatsMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md b/docs/docs/en/api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md new file mode 100644 index 0000000000..e5515a4cc5 --- /dev/null +++ b/docs/docs/en/api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.nats.prometheus.provider.NatsMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/nats/prometheus/provider/settings_provider_factory.md b/docs/docs/en/api/faststream/nats/prometheus/provider/settings_provider_factory.md new file mode 100644 index 0000000000..aeaa7b26e0 --- /dev/null +++ b/docs/docs/en/api/faststream/nats/prometheus/provider/settings_provider_factory.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.nats.prometheus.provider.settings_provider_factory diff --git a/docs/docs/en/api/faststream/prometheus/BasePrometheusMiddleware.md b/docs/docs/en/api/faststream/prometheus/BasePrometheusMiddleware.md new file mode 100644 index 0000000000..1f5cf6a1d4 --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/BasePrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.BasePrometheusMiddleware diff --git a/docs/docs/en/api/faststream/prometheus/ConsumeAttrs.md b/docs/docs/en/api/faststream/prometheus/ConsumeAttrs.md new file mode 100644 index 0000000000..ad8e536b7a --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/ConsumeAttrs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.ConsumeAttrs diff --git a/docs/docs/en/api/faststream/prometheus/MetricsSettingsProvider.md b/docs/docs/en/api/faststream/prometheus/MetricsSettingsProvider.md new file mode 100644 index 0000000000..0f7405e44d --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/MetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.MetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/prometheus/container/MetricsContainer.md b/docs/docs/en/api/faststream/prometheus/container/MetricsContainer.md new file mode 100644 index 0000000000..009d88d263 --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/container/MetricsContainer.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.container.MetricsContainer diff --git a/docs/docs/en/api/faststream/prometheus/manager/MetricsManager.md b/docs/docs/en/api/faststream/prometheus/manager/MetricsManager.md new file mode 100644 index 0000000000..b1a897c717 --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/manager/MetricsManager.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.manager.MetricsManager diff --git a/docs/docs/en/api/faststream/prometheus/middleware/BasePrometheusMiddleware.md b/docs/docs/en/api/faststream/prometheus/middleware/BasePrometheusMiddleware.md new file mode 100644 index 0000000000..62bbd031ac --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/middleware/BasePrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.middleware.BasePrometheusMiddleware diff --git a/docs/docs/en/api/faststream/prometheus/middleware/PrometheusMiddleware.md b/docs/docs/en/api/faststream/prometheus/middleware/PrometheusMiddleware.md new file mode 100644 index 0000000000..2902586e38 --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/middleware/PrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.middleware.PrometheusMiddleware diff --git a/docs/docs/en/api/faststream/prometheus/provider/MetricsSettingsProvider.md b/docs/docs/en/api/faststream/prometheus/provider/MetricsSettingsProvider.md new file mode 100644 index 0000000000..3511a21a5b --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/provider/MetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.provider.MetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/prometheus/types/ConsumeAttrs.md b/docs/docs/en/api/faststream/prometheus/types/ConsumeAttrs.md new file mode 100644 index 0000000000..d9196cab8d --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/types/ConsumeAttrs.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.types.ConsumeAttrs diff --git a/docs/docs/en/api/faststream/prometheus/types/ProcessingStatus.md b/docs/docs/en/api/faststream/prometheus/types/ProcessingStatus.md new file mode 100644 index 0000000000..98b6710bcd --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/types/ProcessingStatus.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.types.ProcessingStatus diff --git a/docs/docs/en/api/faststream/prometheus/types/PublishingStatus.md b/docs/docs/en/api/faststream/prometheus/types/PublishingStatus.md new file mode 100644 index 0000000000..4e7435fbea --- /dev/null +++ b/docs/docs/en/api/faststream/prometheus/types/PublishingStatus.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.prometheus.types.PublishingStatus diff --git a/docs/docs/en/api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md b/docs/docs/en/api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md new file mode 100644 index 0000000000..2c4308fabd --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.prometheus.RabbitPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md b/docs/docs/en/api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md new file mode 100644 index 0000000000..45163c998a --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.prometheus.middleware.RabbitPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md b/docs/docs/en/api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md new file mode 100644 index 0000000000..6d63301b34 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.prometheus.provider.RabbitMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/redis/prometheus/RedisPrometheusMiddleware.md b/docs/docs/en/api/faststream/redis/prometheus/RedisPrometheusMiddleware.md new file mode 100644 index 0000000000..01b23fe4f1 --- /dev/null +++ b/docs/docs/en/api/faststream/redis/prometheus/RedisPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.redis.prometheus.RedisPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md b/docs/docs/en/api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md new file mode 100644 index 0000000000..c29cc91130 --- /dev/null +++ b/docs/docs/en/api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.redis.prometheus.middleware.RedisPrometheusMiddleware diff --git a/docs/docs/en/api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md b/docs/docs/en/api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md new file mode 100644 index 0000000000..243414331b --- /dev/null +++ b/docs/docs/en/api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.redis.prometheus.provider.BaseRedisMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md b/docs/docs/en/api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md new file mode 100644 index 0000000000..33d1d2d3a1 --- /dev/null +++ b/docs/docs/en/api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.redis.prometheus.provider.BatchRedisMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md b/docs/docs/en/api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md new file mode 100644 index 0000000000..a7f5f3abe8 --- /dev/null +++ b/docs/docs/en/api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.redis.prometheus.provider.RedisMetricsSettingsProvider diff --git a/docs/docs/en/api/faststream/redis/prometheus/provider/settings_provider_factory.md b/docs/docs/en/api/faststream/redis/prometheus/provider/settings_provider_factory.md new file mode 100644 index 0000000000..aa4812f1e2 --- /dev/null +++ b/docs/docs/en/api/faststream/redis/prometheus/provider/settings_provider_factory.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.redis.prometheus.provider.settings_provider_factory diff --git a/docs/docs/en/getting-started/prometheus/index.md b/docs/docs/en/getting-started/prometheus/index.md new file mode 100644 index 0000000000..54203ce7df --- /dev/null +++ b/docs/docs/en/getting-started/prometheus/index.md @@ -0,0 +1,82 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Prometheus + +[**Prometheus**](https://prometheus.io/){.external-link target="_blank"} is an open-source monitoring and alerting toolkit originally built at SoundCloud. +With a focus on reliability, robustness, and easy scalability, **Prometheus** allows users to collect metrics, +scrape data from various sources, store them efficiently, and query them in real-time. Its flexible data model, +powerful query language, and seamless integration with [**Grafana**](https://grafana.com/){.external-link target="_blank"} make it a popular choice for monitoring the health +and performance of systems and applications. + +### FastStream Metrics + +To add a metrics to your broker, you need to: + +1. Install `FastStream` with `prometheus-client` + + ```shell + pip install faststream[prometheus] + ``` + +2. Add `PrometheusMiddleware` to your broker + +{!> includes/getting_started/prometheus/1.md !} + +### Exposing the `/metrics` endpoint +The way Prometheus works requires the service to expose an HTTP endpoint for analysis. +By convention, this is a GET endpoint, and its path is usually `/metrics`. + +FastStream's built-in **ASGI** support allows you to expose endpoints in your application. + +A convenient way to serve this endpoint is to use `make_asgi_app` from `prometheus_client`, +passing in the registry that was passed to `PrometheusMiddleware`. + +{!> includes/getting_started/prometheus/2.md !} + +--- + +### Exported metrics + +{% set received_messages_total_description = 'The metric is incremented each time the application receives a message.

This is necessary to count messages that the application has received but has not yet started processing.' %} +{% set received_messages_size_bytes_description = 'The metric is filled with the sizes of received messages. When a message is received, the size of its body in bytes is calculated and written to the metric.

Useful for analyzing the sizes of incoming messages, also in cases when the application receives messages of unexpected sizes.' %} +{% set received_messages_in_process_description = 'The metric is incremented when the message processing starts and decremented when the processing ends.

It is necessary to count the number of messages that the application processes.

Such a metric will help answer the question: _`is there a need to scale the service?`_' %} +{% set received_processed_messages_total_description = 'The metric is incremented after a message is processed, regardless of whether the processing ended with a success or an error.

This metric allows you to analyze the number of processed messages and their statuses.' %} +{% set received_processed_messages_duration_seconds_description = 'The metric is filled with the message processing time regardless of whether the processing ended with a success or an error.

Time stamps are recorded just before and immediately after the processing.

Then the metric is filled with their difference (in seconds).' %} +{% set received_processed_messages_exceptions_total_description = 'The metric is incremented if any exception occurred while processing a message (except `AckMessage`, `NackMessage`, `RejectMessage` and `SkipMessage`).

It can be used to draw conclusions about how many and what kind of exceptions occurred while processing messages.' %} +{% set published_messages_total_description = 'The metric is incremented when messages are sent, regardless of whether the sending was successful or not.' %} +{% set published_messages_duration_seconds_description = 'The metric is filled with the time the message was sent, regardless of whether the sending was successful or failed.

Timestamps are written immediately before and immediately after sending.

Then the metric is filled with their difference (in seconds).' %} +{% set published_messages_exceptions_total_description = 'The metric increases if any exception occurred while sending a message.

You can draw conclusions about how many and what exceptions occurred while sending messages.' %} + + +| Metric | Type | Description | Labels | +|--------------------------------------------------|---------------|----------------------------------------------------------------|-------------------------------------------------------| +| **received_messages_total** | **Counter** | {{ received_messages_total_description }} | `app_name`, `broker`, `handler` | +| **received_messages_size_bytes** | **Histogram** | {{ received_messages_size_bytes_description }} | `app_name`, `broker`, `handler` | +| **received_messages_in_process** | **Gauge** | {{ received_messages_in_process_description }} | `app_name`, `broker`, `handler` | +| **received_processed_messages_total** | **Counter** | {{ received_processed_messages_total_description }} | `app_name`, `broker`, `handler`, `status` | +| **received_processed_messages_duration_seconds** | **Histogram** | {{ received_processed_messages_duration_seconds_description }} | `app_name`, `broker`, `handler` | +| **received_processed_messages_exceptions_total** | **Counter** | {{ received_processed_messages_exceptions_total_description }} | `app_name`, `broker`, `handler`, `exception_type` | +| **published_messages_total** | **Counter** | {{ published_messages_total_description }} | `app_name`, `broker`, `destination`, `status` | +| **published_messages_duration_seconds** | **Histogram** | {{ published_messages_duration_seconds_description }} | `app_name`, `broker`, `destination` | +| **published_messages_exceptions_total** | **Counter** | {{ published_messages_exceptions_total_description }} | `app_name`, `broker`, `destination`, `exception_type` | + +### Labels + +| Label | Description | Values | +|-----------------------------------|-----------------------------------------------------------------|---------------------------------------------------| +| app_name | The name of the application, which the user can specify himself | `faststream` by default | +| broker | Broker name | `kafka`, `rabbit`, `nats`, `redis` | +| handler | Where the message came from | | +| status (while receiving) | Message processing status | `acked`, `nacked`, `rejected`, `skipped`, `error` | +| exception_type (while receiving) | Exception type when processing message | | +| status (while publishing) | Message publishing status | `success`, `error` | +| destination | Where the message is sent | | +| exception_type (while publishing) | Exception type when publishing message | | diff --git a/docs/docs/navigation_template.txt b/docs/docs/navigation_template.txt index 4cd45d0874..e505d5a783 100644 --- a/docs/docs/navigation_template.txt +++ b/docs/docs/navigation_template.txt @@ -44,6 +44,7 @@ search: - [CLI](getting-started/cli/index.md) - [ASGI](getting-started/asgi.md) - [OpenTelemetry](getting-started/opentelemetry/index.md) + - [Prometheus](getting-started/prometheus/index.md) - [Logging](getting-started/logging.md) - [Config Management](getting-started/config/index.md) - [Task Scheduling](scheduling.md) diff --git a/docs/docs_src/getting_started/prometheus/__init__.py b/docs/docs_src/getting_started/prometheus/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs/docs_src/getting_started/prometheus/confluent.py b/docs/docs_src/getting_started/prometheus/confluent.py new file mode 100644 index 0000000000..2d89e8bee6 --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/confluent.py @@ -0,0 +1,13 @@ +from faststream import FastStream +from faststream.confluent import KafkaBroker +from faststream.confluent.prometheus import KafkaPrometheusMiddleware +from prometheus_client import CollectorRegistry + +registry = CollectorRegistry() + +broker = KafkaBroker( + middlewares=( + KafkaPrometheusMiddleware(registry=registry), + ) +) +app = FastStream(broker) diff --git a/docs/docs_src/getting_started/prometheus/confluent_asgi.py b/docs/docs_src/getting_started/prometheus/confluent_asgi.py new file mode 100644 index 0000000000..2574a49cef --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/confluent_asgi.py @@ -0,0 +1,18 @@ +from faststream.asgi import AsgiFastStream +from faststream.confluent import KafkaBroker +from faststream.confluent.prometheus import KafkaPrometheusMiddleware +from prometheus_client import CollectorRegistry, make_asgi_app + +registry = CollectorRegistry() + +broker = KafkaBroker( + middlewares=( + KafkaPrometheusMiddleware(registry=registry), + ) +) +app = AsgiFastStream( + broker, + asgi_routes=[ + ("/metrics", make_asgi_app(registry)), + ] +) diff --git a/docs/docs_src/getting_started/prometheus/kafka.py b/docs/docs_src/getting_started/prometheus/kafka.py new file mode 100644 index 0000000000..f6d1224e66 --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/kafka.py @@ -0,0 +1,13 @@ +from faststream import FastStream +from faststream.kafka import KafkaBroker +from faststream.kafka.prometheus import KafkaPrometheusMiddleware +from prometheus_client import CollectorRegistry + +registry = CollectorRegistry() + +broker = KafkaBroker( + middlewares=( + KafkaPrometheusMiddleware(registry=registry), + ) +) +app = FastStream(broker) diff --git a/docs/docs_src/getting_started/prometheus/kafka_asgi.py b/docs/docs_src/getting_started/prometheus/kafka_asgi.py new file mode 100644 index 0000000000..ddf79040d9 --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/kafka_asgi.py @@ -0,0 +1,18 @@ +from faststream.asgi import AsgiFastStream +from faststream.kafka import KafkaBroker +from faststream.kafka.prometheus import KafkaPrometheusMiddleware +from prometheus_client import CollectorRegistry, make_asgi_app + +registry = CollectorRegistry() + +broker = KafkaBroker( + middlewares=( + KafkaPrometheusMiddleware(registry=registry), + ) +) +app = AsgiFastStream( + broker, + asgi_routes=[ + ("/metrics", make_asgi_app(registry)), + ] +) diff --git a/docs/docs_src/getting_started/prometheus/nats.py b/docs/docs_src/getting_started/prometheus/nats.py new file mode 100644 index 0000000000..0894078881 --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/nats.py @@ -0,0 +1,13 @@ +from faststream import FastStream +from faststream.nats import NatsBroker +from faststream.nats.prometheus import NatsPrometheusMiddleware +from prometheus_client import CollectorRegistry + +registry = CollectorRegistry() + +broker = NatsBroker( + middlewares=( + NatsPrometheusMiddleware(registry=registry), + ) +) +app = FastStream(broker) diff --git a/docs/docs_src/getting_started/prometheus/nats_asgi.py b/docs/docs_src/getting_started/prometheus/nats_asgi.py new file mode 100644 index 0000000000..c273a36f36 --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/nats_asgi.py @@ -0,0 +1,18 @@ +from faststream.asgi import AsgiFastStream +from faststream.nats import NatsBroker +from faststream.nats.prometheus import NatsPrometheusMiddleware +from prometheus_client import CollectorRegistry, make_asgi_app + +registry = CollectorRegistry() + +broker = NatsBroker( + middlewares=( + NatsPrometheusMiddleware(registry=registry), + ) +) +app = AsgiFastStream( + broker, + asgi_routes=[ + ("/metrics", make_asgi_app(registry)), + ] +) diff --git a/docs/docs_src/getting_started/prometheus/rabbit.py b/docs/docs_src/getting_started/prometheus/rabbit.py new file mode 100644 index 0000000000..a0fb683b7f --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/rabbit.py @@ -0,0 +1,13 @@ +from faststream import FastStream +from faststream.rabbit import RabbitBroker +from faststream.rabbit.prometheus import RabbitPrometheusMiddleware +from prometheus_client import CollectorRegistry + +registry = CollectorRegistry() + +broker = RabbitBroker( + middlewares=( + RabbitPrometheusMiddleware(registry=registry), + ) +) +app = FastStream(broker) diff --git a/docs/docs_src/getting_started/prometheus/rabbit_asgi.py b/docs/docs_src/getting_started/prometheus/rabbit_asgi.py new file mode 100644 index 0000000000..40bc990fcc --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/rabbit_asgi.py @@ -0,0 +1,18 @@ +from faststream.asgi import AsgiFastStream +from faststream.rabbit import RabbitBroker +from faststream.rabbit.prometheus import RabbitPrometheusMiddleware +from prometheus_client import CollectorRegistry, make_asgi_app + +registry = CollectorRegistry() + +broker = RabbitBroker( + middlewares=( + RabbitPrometheusMiddleware(registry=registry), + ) +) +app = AsgiFastStream( + broker, + asgi_routes=[ + ("/metrics", make_asgi_app(registry)), + ] +) diff --git a/docs/docs_src/getting_started/prometheus/redis.py b/docs/docs_src/getting_started/prometheus/redis.py new file mode 100644 index 0000000000..98fc2b70c0 --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/redis.py @@ -0,0 +1,13 @@ +from faststream import FastStream +from faststream.redis import RedisBroker +from faststream.redis.prometheus import RedisPrometheusMiddleware +from prometheus_client import CollectorRegistry + +registry = CollectorRegistry() + +broker = RedisBroker( + middlewares=( + RedisPrometheusMiddleware(registry=registry), + ) +) +app = FastStream(broker) diff --git a/docs/docs_src/getting_started/prometheus/redis_asgi.py b/docs/docs_src/getting_started/prometheus/redis_asgi.py new file mode 100644 index 0000000000..2c3b095f1b --- /dev/null +++ b/docs/docs_src/getting_started/prometheus/redis_asgi.py @@ -0,0 +1,18 @@ +from faststream.asgi import AsgiFastStream +from faststream.redis import RedisBroker +from faststream.redis.prometheus import RedisPrometheusMiddleware +from prometheus_client import CollectorRegistry, make_asgi_app + +registry = CollectorRegistry() + +broker = RedisBroker( + middlewares=( + RedisPrometheusMiddleware(registry=registry), + ) +) +app = AsgiFastStream( + broker, + asgi_routes=[ + ("/metrics", make_asgi_app(registry)), + ] +) diff --git a/docs/includes/getting_started/prometheus/1.md b/docs/includes/getting_started/prometheus/1.md new file mode 100644 index 0000000000..16ad860ff8 --- /dev/null +++ b/docs/includes/getting_started/prometheus/1.md @@ -0,0 +1,24 @@ +=== "AIOKafka" + ```python linenums="1" hl_lines="6 10" + {!> docs_src/getting_started/prometheus/kafka.py!} + ``` + +=== "Confluent" + ```python linenums="1" hl_lines="6 10" + {!> docs_src/getting_started/prometheus/confluent.py!} + ``` + +=== "RabbitMQ" + ```python linenums="1" hl_lines="6 10" + {!> docs_src/getting_started/prometheus/rabbit.py!} + ``` + +=== "NATS" + ```python linenums="1" hl_lines="6 10" + {!> docs_src/getting_started/prometheus/nats.py!} + ``` + +=== "Redis" + ```python linenums="1" hl_lines="6 10" + {!> docs_src/getting_started/prometheus/redis.py!} + ``` diff --git a/docs/includes/getting_started/prometheus/2.md b/docs/includes/getting_started/prometheus/2.md new file mode 100644 index 0000000000..483e5437f4 --- /dev/null +++ b/docs/includes/getting_started/prometheus/2.md @@ -0,0 +1,24 @@ +=== "AIOKafka" + ```python linenums="1" hl_lines="6 10 13 16" + {!> docs_src/getting_started/prometheus/kafka_asgi.py!} + ``` + +=== "Confluent" + ```python linenums="1" hl_lines="6 10 13 16" + {!> docs_src/getting_started/prometheus/confluent_asgi.py!} + ``` + +=== "RabbitMQ" + ```python linenums="1" hl_lines="6 10 13 16" + {!> docs_src/getting_started/prometheus/rabbit_asgi.py!} + ``` + +=== "NATS" + ```python linenums="1" hl_lines="6 10 13 16" + {!> docs_src/getting_started/prometheus/nats_asgi.py!} + ``` + +=== "Redis" + ```python linenums="1" hl_lines="6 10 13 16" + {!> docs_src/getting_started/prometheus/redis_asgi.py!} + ``` diff --git a/faststream/__about__.py b/faststream/__about__.py index a829efe0e2..f350fa0af9 100644 --- a/faststream/__about__.py +++ b/faststream/__about__.py @@ -1,5 +1,5 @@ """Simple and fast framework to create message brokers based microservices.""" -__version__ = "0.5.27" +__version__ = "0.5.28" SERVICE_NAME = f"faststream-{__version__}" diff --git a/faststream/broker/message.py b/faststream/broker/message.py index 82593e7cdc..e06e912593 100644 --- a/faststream/broker/message.py +++ b/faststream/broker/message.py @@ -63,13 +63,16 @@ class StreamMessage(Generic[MsgType]): _decoded_body: Optional["DecodedMessage"] = field(default=None, init=False) async def ack(self) -> None: - self.committed = AckStatus.acked + if not self.committed: + self.committed = AckStatus.acked async def nack(self) -> None: - self.committed = AckStatus.nacked + if not self.committed: + self.committed = AckStatus.nacked async def reject(self) -> None: - self.committed = AckStatus.rejected + if not self.committed: + self.committed = AckStatus.rejected async def decode(self) -> Optional["DecodedMessage"]: """Serialize the message by lazy decoder.""" diff --git a/faststream/confluent/message.py b/faststream/confluent/message.py index 14fe05ae7b..83ee0e814b 100644 --- a/faststream/confluent/message.py +++ b/faststream/confluent/message.py @@ -66,7 +66,7 @@ async def ack(self) -> None: """Acknowledge the Kafka message.""" if self.is_manual and not self.committed: await self.consumer.commit() - await super().ack() + await super().ack() async def nack(self) -> None: """Reject the Kafka message.""" @@ -81,4 +81,4 @@ async def nack(self) -> None: partition=raw_message.partition(), offset=raw_message.offset(), ) - await super().nack() + await super().nack() diff --git a/faststream/confluent/prometheus/__init__.py b/faststream/confluent/prometheus/__init__.py new file mode 100644 index 0000000000..7498fa5ddc --- /dev/null +++ b/faststream/confluent/prometheus/__init__.py @@ -0,0 +1,3 @@ +from faststream.confluent.prometheus.middleware import KafkaPrometheusMiddleware + +__all__ = ("KafkaPrometheusMiddleware",) diff --git a/faststream/confluent/prometheus/middleware.py b/faststream/confluent/prometheus/middleware.py new file mode 100644 index 0000000000..2ac27dacea --- /dev/null +++ b/faststream/confluent/prometheus/middleware.py @@ -0,0 +1,26 @@ +from typing import TYPE_CHECKING, Optional, Sequence + +from faststream.confluent.prometheus.provider import settings_provider_factory +from faststream.prometheus.middleware import BasePrometheusMiddleware +from faststream.types import EMPTY + +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + + +class KafkaPrometheusMiddleware(BasePrometheusMiddleware): + def __init__( + self, + *, + registry: "CollectorRegistry", + app_name: str = EMPTY, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Optional[Sequence[float]] = None, + ) -> None: + super().__init__( + settings_provider_factory=settings_provider_factory, + registry=registry, + app_name=app_name, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) diff --git a/faststream/confluent/prometheus/provider.py b/faststream/confluent/prometheus/provider.py new file mode 100644 index 0000000000..bdcb26728a --- /dev/null +++ b/faststream/confluent/prometheus/provider.py @@ -0,0 +1,64 @@ +from typing import TYPE_CHECKING, Sequence, Tuple, Union, cast + +from faststream.broker.message import MsgType, StreamMessage +from faststream.prometheus import ( + ConsumeAttrs, + MetricsSettingsProvider, +) + +if TYPE_CHECKING: + from confluent_kafka import Message + + from faststream.types import AnyDict + + +class BaseConfluentMetricsSettingsProvider(MetricsSettingsProvider[MsgType]): + __slots__ = ("messaging_system",) + + def __init__(self) -> None: + self.messaging_system = "kafka" + + def get_publish_destination_name_from_kwargs( + self, + kwargs: "AnyDict", + ) -> str: + return cast(str, kwargs["topic"]) + + +class ConfluentMetricsSettingsProvider(BaseConfluentMetricsSettingsProvider["Message"]): + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[Message]", + ) -> ConsumeAttrs: + return { + "destination_name": cast(str, msg.raw_message.topic()), + "message_size": len(msg.body), + "messages_count": 1, + } + + +class BatchConfluentMetricsSettingsProvider( + BaseConfluentMetricsSettingsProvider[Tuple["Message", ...]] +): + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[Tuple[Message, ...]]", + ) -> ConsumeAttrs: + raw_message = msg.raw_message[0] + return { + "destination_name": cast(str, raw_message.topic()), + "message_size": len(bytearray().join(cast(Sequence[bytes], msg.body))), + "messages_count": len(msg.raw_message), + } + + +def settings_provider_factory( + msg: Union["Message", Sequence["Message"], None], +) -> Union[ + ConfluentMetricsSettingsProvider, + BatchConfluentMetricsSettingsProvider, +]: + if isinstance(msg, Sequence): + return BatchConfluentMetricsSettingsProvider() + else: + return ConfluentMetricsSettingsProvider() diff --git a/faststream/kafka/message.py b/faststream/kafka/message.py index d83a57bf6a..bde7669787 100644 --- a/faststream/kafka/message.py +++ b/faststream/kafka/message.py @@ -77,7 +77,7 @@ async def nack(self) -> None: partition=topic_partition, offset=raw_message.offset, ) - await super().nack() + await super().nack() class KafkaAckableMessage(KafkaMessage): @@ -85,4 +85,4 @@ async def ack(self) -> None: """Acknowledge the Kafka message.""" if not self.committed: await self.consumer.commit() - await super().ack() + await super().ack() diff --git a/faststream/kafka/prometheus/__init__.py b/faststream/kafka/prometheus/__init__.py new file mode 100644 index 0000000000..e5ae7e2d4f --- /dev/null +++ b/faststream/kafka/prometheus/__init__.py @@ -0,0 +1,3 @@ +from faststream.kafka.prometheus.middleware import KafkaPrometheusMiddleware + +__all__ = ("KafkaPrometheusMiddleware",) diff --git a/faststream/kafka/prometheus/middleware.py b/faststream/kafka/prometheus/middleware.py new file mode 100644 index 0000000000..3fd41edeba --- /dev/null +++ b/faststream/kafka/prometheus/middleware.py @@ -0,0 +1,26 @@ +from typing import TYPE_CHECKING, Optional, Sequence + +from faststream.kafka.prometheus.provider import settings_provider_factory +from faststream.prometheus.middleware import BasePrometheusMiddleware +from faststream.types import EMPTY + +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + + +class KafkaPrometheusMiddleware(BasePrometheusMiddleware): + def __init__( + self, + *, + registry: "CollectorRegistry", + app_name: str = EMPTY, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Optional[Sequence[float]] = None, + ) -> None: + super().__init__( + settings_provider_factory=settings_provider_factory, + registry=registry, + app_name=app_name, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) diff --git a/faststream/kafka/prometheus/provider.py b/faststream/kafka/prometheus/provider.py new file mode 100644 index 0000000000..9caf118e1f --- /dev/null +++ b/faststream/kafka/prometheus/provider.py @@ -0,0 +1,64 @@ +from typing import TYPE_CHECKING, Sequence, Tuple, Union, cast + +from faststream.broker.message import MsgType, StreamMessage +from faststream.prometheus import ( + MetricsSettingsProvider, +) + +if TYPE_CHECKING: + from aiokafka import ConsumerRecord + + from faststream.prometheus import ConsumeAttrs + from faststream.types import AnyDict + + +class BaseKafkaMetricsSettingsProvider(MetricsSettingsProvider[MsgType]): + __slots__ = ("messaging_system",) + + def __init__(self) -> None: + self.messaging_system = "kafka" + + def get_publish_destination_name_from_kwargs( + self, + kwargs: "AnyDict", + ) -> str: + return cast(str, kwargs["topic"]) + + +class KafkaMetricsSettingsProvider(BaseKafkaMetricsSettingsProvider["ConsumerRecord"]): + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[ConsumerRecord]", + ) -> "ConsumeAttrs": + return { + "destination_name": msg.raw_message.topic, + "message_size": len(msg.body), + "messages_count": 1, + } + + +class BatchKafkaMetricsSettingsProvider( + BaseKafkaMetricsSettingsProvider[Tuple["ConsumerRecord", ...]] +): + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[Tuple[ConsumerRecord, ...]]", + ) -> "ConsumeAttrs": + raw_message = msg.raw_message[0] + return { + "destination_name": raw_message.topic, + "message_size": len(bytearray().join(cast(Sequence[bytes], msg.body))), + "messages_count": len(msg.raw_message), + } + + +def settings_provider_factory( + msg: Union["ConsumerRecord", Sequence["ConsumerRecord"], None], +) -> Union[ + KafkaMetricsSettingsProvider, + BatchKafkaMetricsSettingsProvider, +]: + if isinstance(msg, Sequence): + return BatchKafkaMetricsSettingsProvider() + else: + return KafkaMetricsSettingsProvider() diff --git a/faststream/nats/message.py b/faststream/nats/message.py index ee54ef2caa..0f104a3310 100644 --- a/faststream/nats/message.py +++ b/faststream/nats/message.py @@ -15,7 +15,7 @@ async def ack(self) -> None: # to be compatible with `self.raw_message.ack()` if not self.raw_message._ackd: await self.raw_message.ack() - await super().ack() + await super().ack() async def nack( self, @@ -23,12 +23,12 @@ async def nack( ) -> None: if not self.raw_message._ackd: await self.raw_message.nak(delay=delay) - await super().nack() + await super().nack() async def reject(self) -> None: if not self.raw_message._ackd: await self.raw_message.term() - await super().reject() + await super().reject() async def in_progress(self) -> None: if not self.raw_message._ackd: diff --git a/faststream/nats/prometheus/__init__.py b/faststream/nats/prometheus/__init__.py new file mode 100644 index 0000000000..564d3ea4f4 --- /dev/null +++ b/faststream/nats/prometheus/__init__.py @@ -0,0 +1,3 @@ +from faststream.nats.prometheus.middleware import NatsPrometheusMiddleware + +__all__ = ("NatsPrometheusMiddleware",) diff --git a/faststream/nats/prometheus/middleware.py b/faststream/nats/prometheus/middleware.py new file mode 100644 index 0000000000..3aadeb61d1 --- /dev/null +++ b/faststream/nats/prometheus/middleware.py @@ -0,0 +1,26 @@ +from typing import TYPE_CHECKING, Optional, Sequence + +from faststream.nats.prometheus.provider import settings_provider_factory +from faststream.prometheus.middleware import BasePrometheusMiddleware +from faststream.types import EMPTY + +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + + +class NatsPrometheusMiddleware(BasePrometheusMiddleware): + def __init__( + self, + *, + registry: "CollectorRegistry", + app_name: str = EMPTY, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Optional[Sequence[float]] = None, + ) -> None: + super().__init__( + settings_provider_factory=settings_provider_factory, + registry=registry, + app_name=app_name, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) diff --git a/faststream/nats/prometheus/provider.py b/faststream/nats/prometheus/provider.py new file mode 100644 index 0000000000..e6ac0a4684 --- /dev/null +++ b/faststream/nats/prometheus/provider.py @@ -0,0 +1,66 @@ +from typing import TYPE_CHECKING, List, Sequence, Union, cast + +from nats.aio.msg import Msg + +from faststream.broker.message import MsgType, StreamMessage +from faststream.prometheus import ( + ConsumeAttrs, + MetricsSettingsProvider, +) + +if TYPE_CHECKING: + from faststream.types import AnyDict + + +class BaseNatsMetricsSettingsProvider(MetricsSettingsProvider[MsgType]): + __slots__ = ("messaging_system",) + + def __init__(self) -> None: + self.messaging_system = "nats" + + def get_publish_destination_name_from_kwargs( + self, + kwargs: "AnyDict", + ) -> str: + return cast(str, kwargs["subject"]) + + +class NatsMetricsSettingsProvider(BaseNatsMetricsSettingsProvider["Msg"]): + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[Msg]", + ) -> ConsumeAttrs: + return { + "destination_name": msg.raw_message.subject, + "message_size": len(msg.body), + "messages_count": 1, + } + + +class BatchNatsMetricsSettingsProvider(BaseNatsMetricsSettingsProvider[List["Msg"]]): + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[List[Msg]]", + ) -> ConsumeAttrs: + raw_message = msg.raw_message[0] + return { + "destination_name": raw_message.subject, + "message_size": len(msg.body), + "messages_count": len(msg.raw_message), + } + + +def settings_provider_factory( + msg: Union["Msg", Sequence["Msg"], None], +) -> Union[ + NatsMetricsSettingsProvider, + BatchNatsMetricsSettingsProvider, + None, +]: + if isinstance(msg, Sequence): + return BatchNatsMetricsSettingsProvider() + elif isinstance(msg, Msg) or msg is None: + return NatsMetricsSettingsProvider() + else: + # KeyValue and Object Storage watch cases + return None diff --git a/faststream/prometheus/__init__.py b/faststream/prometheus/__init__.py new file mode 100644 index 0000000000..8b21a09eee --- /dev/null +++ b/faststream/prometheus/__init__.py @@ -0,0 +1,9 @@ +from faststream.prometheus.middleware import BasePrometheusMiddleware +from faststream.prometheus.provider import MetricsSettingsProvider +from faststream.prometheus.types import ConsumeAttrs + +__all__ = ( + "BasePrometheusMiddleware", + "MetricsSettingsProvider", + "ConsumeAttrs", +) diff --git a/faststream/prometheus/consts.py b/faststream/prometheus/consts.py new file mode 100644 index 0000000000..3c4648d333 --- /dev/null +++ b/faststream/prometheus/consts.py @@ -0,0 +1,17 @@ +from faststream.broker.message import AckStatus +from faststream.exceptions import AckMessage, NackMessage, RejectMessage, SkipMessage +from faststream.prometheus.types import ProcessingStatus + +PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP = { + AckMessage: ProcessingStatus.acked, + NackMessage: ProcessingStatus.nacked, + RejectMessage: ProcessingStatus.rejected, + SkipMessage: ProcessingStatus.skipped, +} + + +PROCESSING_STATUS_BY_ACK_STATUS = { + AckStatus.acked: ProcessingStatus.acked, + AckStatus.nacked: ProcessingStatus.nacked, + AckStatus.rejected: ProcessingStatus.rejected, +} diff --git a/faststream/prometheus/container.py b/faststream/prometheus/container.py new file mode 100644 index 0000000000..6b5f813f63 --- /dev/null +++ b/faststream/prometheus/container.py @@ -0,0 +1,100 @@ +from typing import Optional, Sequence + +from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram + + +class MetricsContainer: + __slots__ = ( + "_registry", + "_metrics_prefix", + "received_messages_total", + "received_messages_size_bytes", + "received_processed_messages_duration_seconds", + "received_messages_in_process", + "received_processed_messages_total", + "received_processed_messages_exceptions_total", + "published_messages_total", + "published_messages_duration_seconds", + "published_messages_exceptions_total", + ) + + DEFAULT_SIZE_BUCKETS = ( + 2.0**4, + 2.0**6, + 2.0**8, + 2.0**10, + 2.0**12, + 2.0**14, + 2.0**16, + 2.0**18, + 2.0**20, + 2.0**22, + 2.0**24, + float("inf"), + ) + + def __init__( + self, + registry: "CollectorRegistry", + *, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Optional[Sequence[float]] = None, + ): + self._registry = registry + self._metrics_prefix = metrics_prefix + + self.received_messages_total = Counter( + name=f"{metrics_prefix}_received_messages_total", + documentation="Count of received messages by broker and handler", + labelnames=["app_name", "broker", "handler"], + registry=registry, + ) + self.received_messages_size_bytes = Histogram( + name=f"{metrics_prefix}_received_messages_size_bytes", + documentation="Histogram of received messages size in bytes by broker and handler", + labelnames=["app_name", "broker", "handler"], + registry=registry, + buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS, + ) + self.received_messages_in_process = Gauge( + name=f"{metrics_prefix}_received_messages_in_process", + documentation="Gauge of received messages in process by broker and handler", + labelnames=["app_name", "broker", "handler"], + registry=registry, + ) + self.received_processed_messages_total = Counter( + name=f"{metrics_prefix}_received_processed_messages_total", + documentation="Count of received processed messages by broker, handler and status", + labelnames=["app_name", "broker", "handler", "status"], + registry=registry, + ) + self.received_processed_messages_duration_seconds = Histogram( + name=f"{metrics_prefix}_received_processed_messages_duration_seconds", + documentation="Histogram of received processed messages duration in seconds by broker and handler", + labelnames=["app_name", "broker", "handler"], + registry=registry, + ) + self.received_processed_messages_exceptions_total = Counter( + name=f"{metrics_prefix}_received_processed_messages_exceptions_total", + documentation="Count of received processed messages exceptions by broker, handler and exception_type", + labelnames=["app_name", "broker", "handler", "exception_type"], + registry=registry, + ) + self.published_messages_total = Counter( + name=f"{metrics_prefix}_published_messages_total", + documentation="Count of published messages by destination and status", + labelnames=["app_name", "broker", "destination", "status"], + registry=registry, + ) + self.published_messages_duration_seconds = Histogram( + name=f"{metrics_prefix}_published_messages_duration_seconds", + documentation="Histogram of published messages duration in seconds by broker and destination", + labelnames=["app_name", "broker", "destination"], + registry=registry, + ) + self.published_messages_exceptions_total = Counter( + name=f"{metrics_prefix}_published_messages_exceptions_total", + documentation="Count of published messages exceptions by broker, destination and exception_type", + labelnames=["app_name", "broker", "destination", "exception_type"], + registry=registry, + ) diff --git a/faststream/prometheus/manager.py b/faststream/prometheus/manager.py new file mode 100644 index 0000000000..e2f7704f77 --- /dev/null +++ b/faststream/prometheus/manager.py @@ -0,0 +1,131 @@ +from faststream.prometheus.container import MetricsContainer +from faststream.prometheus.types import ProcessingStatus, PublishingStatus + + +class MetricsManager: + __slots__ = ("_container", "_app_name") + + def __init__(self, container: MetricsContainer, *, app_name: str = "faststream"): + self._container = container + self._app_name = app_name + + def add_received_message(self, broker: str, handler: str, amount: int = 1) -> None: + self._container.received_messages_total.labels( + app_name=self._app_name, + broker=broker, + handler=handler, + ).inc(amount) + + def observe_received_messages_size( + self, + broker: str, + handler: str, + size: int, + ) -> None: + self._container.received_messages_size_bytes.labels( + app_name=self._app_name, + broker=broker, + handler=handler, + ).observe(size) + + def add_received_message_in_process( + self, + broker: str, + handler: str, + amount: int = 1, + ) -> None: + self._container.received_messages_in_process.labels( + app_name=self._app_name, + broker=broker, + handler=handler, + ).inc(amount) + + def remove_received_message_in_process( + self, + broker: str, + handler: str, + amount: int = 1, + ) -> None: + self._container.received_messages_in_process.labels( + app_name=self._app_name, + broker=broker, + handler=handler, + ).dec(amount) + + def add_received_processed_message( + self, + broker: str, + handler: str, + status: ProcessingStatus, + amount: int = 1, + ) -> None: + self._container.received_processed_messages_total.labels( + app_name=self._app_name, + broker=broker, + handler=handler, + status=status.value, + ).inc(amount) + + def observe_received_processed_message_duration( + self, + duration: float, + broker: str, + handler: str, + ) -> None: + self._container.received_processed_messages_duration_seconds.labels( + app_name=self._app_name, + broker=broker, + handler=handler, + ).observe(duration) + + def add_received_processed_message_exception( + self, + broker: str, + handler: str, + exception_type: str, + ) -> None: + self._container.received_processed_messages_exceptions_total.labels( + app_name=self._app_name, + broker=broker, + handler=handler, + exception_type=exception_type, + ).inc() + + def add_published_message( + self, + broker: str, + destination: str, + status: PublishingStatus, + amount: int = 1, + ) -> None: + self._container.published_messages_total.labels( + app_name=self._app_name, + broker=broker, + destination=destination, + status=status.value, + ).inc(amount) + + def observe_published_message_duration( + self, + duration: float, + broker: str, + destination: str, + ) -> None: + self._container.published_messages_duration_seconds.labels( + app_name=self._app_name, + broker=broker, + destination=destination, + ).observe(duration) + + def add_published_message_exception( + self, + broker: str, + destination: str, + exception_type: str, + ) -> None: + self._container.published_messages_exceptions_total.labels( + app_name=self._app_name, + broker=broker, + destination=destination, + exception_type=exception_type, + ).inc() diff --git a/faststream/prometheus/middleware.py b/faststream/prometheus/middleware.py new file mode 100644 index 0000000000..575d846342 --- /dev/null +++ b/faststream/prometheus/middleware.py @@ -0,0 +1,202 @@ +import time +from typing import TYPE_CHECKING, Any, Callable, Optional, Sequence + +from faststream import BaseMiddleware +from faststream.prometheus.consts import ( + PROCESSING_STATUS_BY_ACK_STATUS, + PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP, +) +from faststream.prometheus.container import MetricsContainer +from faststream.prometheus.manager import MetricsManager +from faststream.prometheus.provider import MetricsSettingsProvider +from faststream.prometheus.types import ProcessingStatus, PublishingStatus +from faststream.types import EMPTY + +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + + from faststream.broker.message import StreamMessage + from faststream.types import AsyncFunc, AsyncFuncAny + + +class PrometheusMiddleware(BaseMiddleware): + def __init__( + self, + msg: Optional[Any] = None, + *, + settings_provider_factory: Callable[ + [Any], Optional[MetricsSettingsProvider[Any]] + ], + metrics_manager: MetricsManager, + ) -> None: + self._metrics_manager = metrics_manager + self._settings_provider = settings_provider_factory(msg) + super().__init__(msg) + + async def consume_scope( + self, + call_next: "AsyncFuncAny", + msg: "StreamMessage[Any]", + ) -> Any: + if self._settings_provider is None: + return await call_next(msg) + + messaging_system = self._settings_provider.messaging_system + consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg) + destination_name = consume_attrs["destination_name"] + + self._metrics_manager.add_received_message( + amount=consume_attrs["messages_count"], + broker=messaging_system, + handler=destination_name, + ) + + self._metrics_manager.observe_received_messages_size( + size=consume_attrs["message_size"], + broker=messaging_system, + handler=destination_name, + ) + + self._metrics_manager.add_received_message_in_process( + amount=consume_attrs["messages_count"], + broker=messaging_system, + handler=destination_name, + ) + + err: Optional[Exception] = None + start_time = time.perf_counter() + + try: + result = await call_next(await self.on_consume(msg)) + + except Exception as e: + err = e + self._metrics_manager.add_received_processed_message_exception( + exception_type=type(err).__name__, + broker=messaging_system, + handler=destination_name, + ) + raise + + finally: + duration = time.perf_counter() - start_time + self._metrics_manager.observe_received_processed_message_duration( + duration=duration, + broker=messaging_system, + handler=destination_name, + ) + + self._metrics_manager.remove_received_message_in_process( + amount=consume_attrs["messages_count"], + broker=messaging_system, + handler=destination_name, + ) + + status = ProcessingStatus.acked + + if msg.committed or err: + status = ( + PROCESSING_STATUS_BY_ACK_STATUS.get(msg.committed) # type: ignore[arg-type] + or PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err)) + or ProcessingStatus.error + ) + + self._metrics_manager.add_received_processed_message( + amount=consume_attrs["messages_count"], + status=status, + broker=messaging_system, + handler=destination_name, + ) + + return result + + async def publish_scope( + self, + call_next: "AsyncFunc", + msg: Any, + *args: Any, + **kwargs: Any, + ) -> Any: + if self._settings_provider is None: + return await call_next(msg, *args, **kwargs) + + destination_name = ( + self._settings_provider.get_publish_destination_name_from_kwargs(kwargs) + ) + messaging_system = self._settings_provider.messaging_system + + err: Optional[Exception] = None + start_time = time.perf_counter() + + try: + result = await call_next( + await self.on_publish(msg, *args, **kwargs), + *args, + **kwargs, + ) + + except Exception as e: + err = e + self._metrics_manager.add_published_message_exception( + exception_type=type(err).__name__, + broker=messaging_system, + destination=destination_name, + ) + raise + + finally: + duration = time.perf_counter() - start_time + + self._metrics_manager.observe_published_message_duration( + duration=duration, + broker=messaging_system, + destination=destination_name, + ) + + status = PublishingStatus.error if err else PublishingStatus.success + messages_count = len((msg, *args)) + + self._metrics_manager.add_published_message( + amount=messages_count, + status=status, + broker=messaging_system, + destination=destination_name, + ) + + return result + + +class BasePrometheusMiddleware: + __slots__ = ("_metrics_container", "_metrics_manager", "_settings_provider_factory") + + def __init__( + self, + *, + settings_provider_factory: Callable[ + [Any], Optional[MetricsSettingsProvider[Any]] + ], + registry: "CollectorRegistry", + app_name: str = EMPTY, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Optional[Sequence[float]] = None, + ): + if app_name is EMPTY: + app_name = metrics_prefix + + self._settings_provider_factory = settings_provider_factory + self._metrics_container = MetricsContainer( + registry, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) + self._metrics_manager = MetricsManager( + self._metrics_container, + app_name=app_name, + ) + + def __call__(self, msg: Optional[Any]) -> BaseMiddleware: + return PrometheusMiddleware( + msg=msg, + metrics_manager=self._metrics_manager, + settings_provider_factory=self._settings_provider_factory, + ) diff --git a/faststream/prometheus/provider.py b/faststream/prometheus/provider.py new file mode 100644 index 0000000000..1a543f5b55 --- /dev/null +++ b/faststream/prometheus/provider.py @@ -0,0 +1,22 @@ +from typing import TYPE_CHECKING, Protocol + +from faststream.broker.message import MsgType + +if TYPE_CHECKING: + from faststream.broker.message import StreamMessage + from faststream.prometheus import ConsumeAttrs + from faststream.types import AnyDict + + +class MetricsSettingsProvider(Protocol[MsgType]): + messaging_system: str + + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[MsgType]", + ) -> "ConsumeAttrs": ... + + def get_publish_destination_name_from_kwargs( + self, + kwargs: "AnyDict", + ) -> str: ... diff --git a/faststream/prometheus/types.py b/faststream/prometheus/types.py new file mode 100644 index 0000000000..ae6ffb7161 --- /dev/null +++ b/faststream/prometheus/types.py @@ -0,0 +1,21 @@ +from enum import Enum +from typing import TypedDict + + +class ProcessingStatus(str, Enum): + acked = "acked" + nacked = "nacked" + rejected = "rejected" + skipped = "skipped" + error = "error" + + +class PublishingStatus(str, Enum): + success = "success" + error = "error" + + +class ConsumeAttrs(TypedDict): + message_size: int + destination_name: str + messages_count: int diff --git a/faststream/rabbit/prometheus/__init__.py b/faststream/rabbit/prometheus/__init__.py new file mode 100644 index 0000000000..bdb07907ee --- /dev/null +++ b/faststream/rabbit/prometheus/__init__.py @@ -0,0 +1,3 @@ +from faststream.rabbit.prometheus.middleware import RabbitPrometheusMiddleware + +__all__ = ("RabbitPrometheusMiddleware",) diff --git a/faststream/rabbit/prometheus/middleware.py b/faststream/rabbit/prometheus/middleware.py new file mode 100644 index 0000000000..b2f96e45ca --- /dev/null +++ b/faststream/rabbit/prometheus/middleware.py @@ -0,0 +1,26 @@ +from typing import TYPE_CHECKING, Optional, Sequence + +from faststream.prometheus.middleware import BasePrometheusMiddleware +from faststream.rabbit.prometheus.provider import RabbitMetricsSettingsProvider +from faststream.types import EMPTY + +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + + +class RabbitPrometheusMiddleware(BasePrometheusMiddleware): + def __init__( + self, + *, + registry: "CollectorRegistry", + app_name: str = EMPTY, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Optional[Sequence[float]] = None, + ) -> None: + super().__init__( + settings_provider_factory=lambda _: RabbitMetricsSettingsProvider(), + registry=registry, + app_name=app_name, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) diff --git a/faststream/rabbit/prometheus/provider.py b/faststream/rabbit/prometheus/provider.py new file mode 100644 index 0000000000..48c1bb2541 --- /dev/null +++ b/faststream/rabbit/prometheus/provider.py @@ -0,0 +1,44 @@ +from typing import TYPE_CHECKING, Union + +from faststream.prometheus import ( + ConsumeAttrs, + MetricsSettingsProvider, +) + +if TYPE_CHECKING: + from aio_pika import IncomingMessage + + from faststream.broker.message import StreamMessage + from faststream.rabbit.schemas.exchange import RabbitExchange + from faststream.types import AnyDict + + +class RabbitMetricsSettingsProvider(MetricsSettingsProvider["IncomingMessage"]): + __slots__ = ("messaging_system",) + + def __init__(self) -> None: + self.messaging_system = "rabbitmq" + + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[IncomingMessage]", + ) -> ConsumeAttrs: + exchange = msg.raw_message.exchange or "default" + routing_key = msg.raw_message.routing_key + + return { + "destination_name": f"{exchange}.{routing_key}", + "message_size": len(msg.body), + "messages_count": 1, + } + + def get_publish_destination_name_from_kwargs( + self, + kwargs: "AnyDict", + ) -> str: + exchange: Union[None, str, RabbitExchange] = kwargs.get("exchange") + exchange_prefix = getattr(exchange, "name", exchange or "default") + + routing_key: str = kwargs["routing_key"] + + return f"{exchange_prefix}.{routing_key}" diff --git a/faststream/redis/message.py b/faststream/redis/message.py index 8bce4005c8..86cc9b3d96 100644 --- a/faststream/redis/message.py +++ b/faststream/redis/message.py @@ -128,7 +128,7 @@ async def ack( ids = self.raw_message["message_ids"] channel = self.raw_message["channel"] await redis.xack(channel, group, *ids) # type: ignore[no-untyped-call] - await super().ack() + await super().ack() @override async def nack( diff --git a/faststream/redis/prometheus/__init__.py b/faststream/redis/prometheus/__init__.py new file mode 100644 index 0000000000..84c831aabb --- /dev/null +++ b/faststream/redis/prometheus/__init__.py @@ -0,0 +1,3 @@ +from faststream.redis.prometheus.middleware import RedisPrometheusMiddleware + +__all__ = ("RedisPrometheusMiddleware",) diff --git a/faststream/redis/prometheus/middleware.py b/faststream/redis/prometheus/middleware.py new file mode 100644 index 0000000000..1b157cb5a9 --- /dev/null +++ b/faststream/redis/prometheus/middleware.py @@ -0,0 +1,26 @@ +from typing import TYPE_CHECKING, Optional, Sequence + +from faststream.prometheus.middleware import BasePrometheusMiddleware +from faststream.redis.prometheus.provider import settings_provider_factory +from faststream.types import EMPTY + +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + + +class RedisPrometheusMiddleware(BasePrometheusMiddleware): + def __init__( + self, + *, + registry: "CollectorRegistry", + app_name: str = EMPTY, + metrics_prefix: str = "faststream", + received_messages_size_buckets: Optional[Sequence[float]] = None, + ) -> None: + super().__init__( + settings_provider_factory=settings_provider_factory, + registry=registry, + app_name=app_name, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) diff --git a/faststream/redis/prometheus/provider.py b/faststream/redis/prometheus/provider.py new file mode 100644 index 0000000000..51eb831669 --- /dev/null +++ b/faststream/redis/prometheus/provider.py @@ -0,0 +1,63 @@ +from typing import TYPE_CHECKING, Optional, Sized, Union, cast + +from faststream.prometheus import ( + ConsumeAttrs, + MetricsSettingsProvider, +) + +if TYPE_CHECKING: + from faststream.broker.message import StreamMessage + from faststream.types import AnyDict + + +class BaseRedisMetricsSettingsProvider(MetricsSettingsProvider["AnyDict"]): + __slots__ = ("messaging_system",) + + def __init__(self) -> None: + self.messaging_system = "redis" + + def get_publish_destination_name_from_kwargs( + self, + kwargs: "AnyDict", + ) -> str: + return self._get_destination(kwargs) + + @staticmethod + def _get_destination(kwargs: "AnyDict") -> str: + return kwargs.get("channel") or kwargs.get("list") or kwargs.get("stream") or "" + + +class RedisMetricsSettingsProvider(BaseRedisMetricsSettingsProvider): + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[AnyDict]", + ) -> ConsumeAttrs: + return { + "destination_name": self._get_destination(msg.raw_message), + "message_size": len(msg.body), + "messages_count": 1, + } + + +class BatchRedisMetricsSettingsProvider(BaseRedisMetricsSettingsProvider): + def get_consume_attrs_from_message( + self, + msg: "StreamMessage[AnyDict]", + ) -> ConsumeAttrs: + return { + "destination_name": self._get_destination(msg.raw_message), + "message_size": len(msg.body), + "messages_count": len(cast(Sized, msg._decoded_body)), + } + + +def settings_provider_factory( + msg: Optional["AnyDict"], +) -> Union[ + RedisMetricsSettingsProvider, + BatchRedisMetricsSettingsProvider, +]: + if msg is not None and msg.get("type", "").startswith("b"): + return BatchRedisMetricsSettingsProvider() + else: + return RedisMetricsSettingsProvider() diff --git a/pyproject.toml b/pyproject.toml index 350d53ed3e..7867cfc450 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,8 +83,10 @@ cli = [ "watchfiles>=0.15.0,<0.25.0" ] +prometheus = ["prometheus-client>=0.20.0,<0.30.0"] + # dev dependencies -optionals = ["faststream[rabbit,kafka,confluent,nats,redis,otel,cli]"] +optionals = ["faststream[rabbit,kafka,confluent,nats,redis,otel,cli,prometheus]"] devdocs = [ "mkdocs-material==9.5.40", diff --git a/tests/brokers/test_pushback.py b/tests/brokers/test_pushback.py index 9f18aa038b..ac56078cb0 100644 --- a/tests/brokers/test_pushback.py +++ b/tests/brokers/test_pushback.py @@ -12,7 +12,7 @@ @pytest.fixture def message(): - return AsyncMock(message_id=1) + return AsyncMock(message_id=1, committed=None) @pytest.mark.asyncio diff --git a/tests/prometheus/__init__.py b/tests/prometheus/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/prometheus/basic.py b/tests/prometheus/basic.py new file mode 100644 index 0000000000..f2d9a5d6cf --- /dev/null +++ b/tests/prometheus/basic.py @@ -0,0 +1,204 @@ +import asyncio +from typing import Any, Optional, Type +from unittest.mock import ANY, Mock, call + +import pytest +from prometheus_client import CollectorRegistry + +from faststream import Context +from faststream.broker.message import AckStatus +from faststream.exceptions import RejectMessage +from faststream.prometheus.middleware import ( + PROCESSING_STATUS_BY_ACK_STATUS, + PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP, +) +from faststream.prometheus.types import ProcessingStatus +from tests.brokers.base.basic import BaseTestcaseConfig + + +@pytest.mark.asyncio +class LocalPrometheusTestcase(BaseTestcaseConfig): + def get_broker(self, apply_types=False, **kwargs): + raise NotImplementedError + + def get_middleware(self, **kwargs): + raise NotImplementedError + + @staticmethod + def consume_destination_name(queue: str) -> str: + return queue + + @property + def settings_provider_factory(self): + return self.get_middleware( + registry=CollectorRegistry() + )._settings_provider_factory + + @pytest.mark.parametrize( + ( + "status", + "exception_class", + ), + [ + pytest.param( + AckStatus.acked, + RejectMessage, + id="acked status with reject message exception", + ), + pytest.param( + AckStatus.acked, Exception, id="acked status with not handler exception" + ), + pytest.param(AckStatus.acked, None, id="acked status without exception"), + pytest.param(AckStatus.nacked, None, id="nacked status without exception"), + pytest.param( + AckStatus.rejected, None, id="rejected status without exception" + ), + ], + ) + async def test_metrics( + self, + event: asyncio.Event, + queue: str, + status: AckStatus, + exception_class: Optional[Type[Exception]], + ): + middleware = self.get_middleware(registry=CollectorRegistry()) + metrics_manager_mock = Mock() + middleware._metrics_manager = metrics_manager_mock + + broker = self.get_broker(apply_types=True, middlewares=(middleware,)) + + args, kwargs = self.get_subscriber_params(queue) + + message = None + + @broker.subscriber(*args, **kwargs) + async def handler(m=Context("message")): + event.set() + + nonlocal message + message = m + + if exception_class: + raise exception_class + + if status == AckStatus.acked: + await message.ack() + elif status == AckStatus.nacked: + await message.nack() + elif status == AckStatus.rejected: + await message.reject() + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task(broker.publish("hello", queue)), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + self.assert_consume_metrics( + metrics_manager=metrics_manager_mock, + message=message, + exception_class=exception_class, + ) + self.assert_publish_metrics(metrics_manager=metrics_manager_mock) + + def assert_consume_metrics( + self, + *, + metrics_manager: Any, + message: Any, + exception_class: Optional[Type[Exception]], + ): + settings_provider = self.settings_provider_factory(message.raw_message) + consume_attrs = settings_provider.get_consume_attrs_from_message(message) + assert metrics_manager.add_received_message.mock_calls == [ + call( + amount=consume_attrs["messages_count"], + broker=settings_provider.messaging_system, + handler=consume_attrs["destination_name"], + ), + ] + + assert metrics_manager.observe_received_messages_size.mock_calls == [ + call( + size=consume_attrs["message_size"], + broker=settings_provider.messaging_system, + handler=consume_attrs["destination_name"], + ), + ] + + assert metrics_manager.add_received_message_in_process.mock_calls == [ + call( + amount=consume_attrs["messages_count"], + broker=settings_provider.messaging_system, + handler=consume_attrs["destination_name"], + ), + ] + assert metrics_manager.remove_received_message_in_process.mock_calls == [ + call( + amount=consume_attrs["messages_count"], + broker=settings_provider.messaging_system, + handler=consume_attrs["destination_name"], + ) + ] + + assert ( + metrics_manager.observe_received_processed_message_duration.mock_calls + == [ + call( + duration=ANY, + broker=settings_provider.messaging_system, + handler=consume_attrs["destination_name"], + ), + ] + ) + + status = ProcessingStatus.acked + + if exception_class: + status = ( + PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(exception_class) + or ProcessingStatus.error + ) + elif message.committed: + status = PROCESSING_STATUS_BY_ACK_STATUS[message.committed] + + assert metrics_manager.add_received_processed_message.mock_calls == [ + call( + amount=consume_attrs["messages_count"], + broker=settings_provider.messaging_system, + handler=consume_attrs["destination_name"], + status=status.value, + ), + ] + + if status == ProcessingStatus.error: + assert ( + metrics_manager.add_received_processed_message_exception.mock_calls + == [ + call( + broker=settings_provider.messaging_system, + handler=consume_attrs["destination_name"], + exception_type=exception_class.__name__, + ), + ] + ) + + def assert_publish_metrics(self, metrics_manager: Any): + settings_provider = self.settings_provider_factory(None) + assert metrics_manager.observe_published_message_duration.mock_calls == [ + call( + duration=ANY, broker=settings_provider.messaging_system, destination=ANY + ), + ] + assert metrics_manager.add_published_message.mock_calls == [ + call( + amount=ANY, + broker=settings_provider.messaging_system, + destination=ANY, + status="success", + ), + ] diff --git a/tests/prometheus/confluent/__init__.py b/tests/prometheus/confluent/__init__.py new file mode 100644 index 0000000000..c4a1803708 --- /dev/null +++ b/tests/prometheus/confluent/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("confluent_kafka") diff --git a/tests/prometheus/confluent/test_confluent.py b/tests/prometheus/confluent/test_confluent.py new file mode 100644 index 0000000000..d1e3034ad6 --- /dev/null +++ b/tests/prometheus/confluent/test_confluent.py @@ -0,0 +1,79 @@ +import asyncio +from unittest.mock import Mock + +import pytest +from prometheus_client import CollectorRegistry + +from faststream import Context +from faststream.confluent import KafkaBroker +from faststream.confluent.prometheus.middleware import KafkaPrometheusMiddleware +from tests.brokers.confluent.basic import ConfluentTestcaseConfig +from tests.brokers.confluent.test_consume import TestConsume +from tests.brokers.confluent.test_publish import TestPublish +from tests.prometheus.basic import LocalPrometheusTestcase + + +@pytest.mark.confluent +class TestPrometheus(ConfluentTestcaseConfig, LocalPrometheusTestcase): + def get_broker(self, apply_types=False, **kwargs): + return KafkaBroker(apply_types=apply_types, **kwargs) + + def get_middleware(self, **kwargs): + return KafkaPrometheusMiddleware(**kwargs) + + async def test_metrics_batch( + self, + event: asyncio.Event, + queue: str, + ): + middleware = self.get_middleware(registry=CollectorRegistry()) + metrics_manager_mock = Mock() + middleware._metrics_manager = metrics_manager_mock + + broker = self.get_broker(apply_types=True, middlewares=(middleware,)) + + args, kwargs = self.get_subscriber_params(queue, batch=True) + message = None + + @broker.subscriber(*args, **kwargs) + async def handler(m=Context("message")): + event.set() + + nonlocal message + message = m + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task( + broker.publish_batch("hello", "world", topic=queue) + ), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + self.assert_consume_metrics( + metrics_manager=metrics_manager_mock, message=message, exception_class=None + ) + self.assert_publish_metrics(metrics_manager=metrics_manager_mock) + + +@pytest.mark.confluent +class TestPublishWithPrometheus(TestPublish): + def get_broker(self, apply_types: bool = False, **kwargs): + return KafkaBroker( + middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) + + +@pytest.mark.confluent +class TestConsumeWithPrometheus(TestConsume): + def get_broker(self, apply_types: bool = False, **kwargs): + return KafkaBroker( + middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) diff --git a/tests/prometheus/kafka/__init__.py b/tests/prometheus/kafka/__init__.py new file mode 100644 index 0000000000..bd6bc708fc --- /dev/null +++ b/tests/prometheus/kafka/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("aiokafka") diff --git a/tests/prometheus/kafka/test_kafka.py b/tests/prometheus/kafka/test_kafka.py new file mode 100644 index 0000000000..abb5c86b3f --- /dev/null +++ b/tests/prometheus/kafka/test_kafka.py @@ -0,0 +1,82 @@ +import asyncio +from unittest.mock import Mock + +import pytest +from prometheus_client import CollectorRegistry + +from faststream import Context +from faststream.kafka import KafkaBroker +from faststream.kafka.prometheus.middleware import KafkaPrometheusMiddleware +from tests.brokers.kafka.test_consume import TestConsume +from tests.brokers.kafka.test_publish import TestPublish +from tests.prometheus.basic import LocalPrometheusTestcase + + +@pytest.mark.kafka +class TestPrometheus(LocalPrometheusTestcase): + def get_broker(self, apply_types=False, **kwargs): + return KafkaBroker(apply_types=apply_types, **kwargs) + + def get_middleware(self, **kwargs): + return KafkaPrometheusMiddleware(**kwargs) + + async def test_metrics_batch( + self, + event: asyncio.Event, + queue: str, + ): + middleware = self.get_middleware(registry=CollectorRegistry()) + metrics_manager_mock = Mock() + middleware._metrics_manager = metrics_manager_mock + + broker = self.get_broker(apply_types=True, middlewares=(middleware,)) + + args, kwargs = self.get_subscriber_params(queue, batch=True) + message = None + + @broker.subscriber(*args, **kwargs) + async def handler(m=Context("message")): + event.set() + + nonlocal message + message = m + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task( + broker.publish_batch("hello", "world", topic=queue) + ), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + self.assert_consume_metrics( + metrics_manager=metrics_manager_mock, message=message, exception_class=None + ) + self.assert_publish_metrics(metrics_manager=metrics_manager_mock) + + +@pytest.mark.kafka +class TestPublishWithPrometheus(TestPublish): + def get_broker( + self, + apply_types: bool = False, + **kwargs, + ): + return KafkaBroker( + middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) + + +@pytest.mark.kafka +class TestConsumeWithPrometheus(TestConsume): + def get_broker(self, apply_types: bool = False, **kwargs): + return KafkaBroker( + middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) diff --git a/tests/prometheus/nats/__init__.py b/tests/prometheus/nats/__init__.py new file mode 100644 index 0000000000..87ead90ee6 --- /dev/null +++ b/tests/prometheus/nats/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("nats") diff --git a/tests/prometheus/nats/test_nats.py b/tests/prometheus/nats/test_nats.py new file mode 100644 index 0000000000..f65eb41e85 --- /dev/null +++ b/tests/prometheus/nats/test_nats.py @@ -0,0 +1,86 @@ +import asyncio +from unittest.mock import Mock + +import pytest +from prometheus_client import CollectorRegistry + +from faststream import Context +from faststream.nats import JStream, NatsBroker, PullSub +from faststream.nats.prometheus.middleware import NatsPrometheusMiddleware +from tests.brokers.nats.test_consume import TestConsume +from tests.brokers.nats.test_publish import TestPublish +from tests.prometheus.basic import LocalPrometheusTestcase + + +@pytest.fixture +def stream(queue): + return JStream(queue) + + +@pytest.mark.nats +class TestPrometheus(LocalPrometheusTestcase): + def get_broker(self, apply_types=False, **kwargs): + return NatsBroker(apply_types=apply_types, **kwargs) + + def get_middleware(self, **kwargs): + return NatsPrometheusMiddleware(**kwargs) + + async def test_metrics_batch( + self, + event: asyncio.Event, + queue: str, + stream: JStream, + ): + middleware = self.get_middleware(registry=CollectorRegistry()) + metrics_manager_mock = Mock() + middleware._metrics_manager = metrics_manager_mock + + broker = self.get_broker(apply_types=True, middlewares=(middleware,)) + + args, kwargs = self.get_subscriber_params( + queue, + stream=stream, + pull_sub=PullSub(1, batch=True, timeout=self.timeout), + ) + message = None + + @broker.subscriber(*args, **kwargs) + async def handler(m=Context("message")): + event.set() + + nonlocal message + message = m + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task(broker.publish("hello", queue)), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + self.assert_consume_metrics( + metrics_manager=metrics_manager_mock, message=message, exception_class=None + ) + self.assert_publish_metrics(metrics_manager=metrics_manager_mock) + + +@pytest.mark.nats +class TestPublishWithPrometheus(TestPublish): + def get_broker(self, apply_types: bool = False, **kwargs): + return NatsBroker( + middlewares=(NatsPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) + + +@pytest.mark.nats +class TestConsumeWithPrometheus(TestConsume): + def get_broker(self, apply_types: bool = False, **kwargs): + return NatsBroker( + middlewares=(NatsPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) diff --git a/tests/prometheus/rabbit/__init__.py b/tests/prometheus/rabbit/__init__.py new file mode 100644 index 0000000000..ebec43fcd5 --- /dev/null +++ b/tests/prometheus/rabbit/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("aio_pika") diff --git a/tests/prometheus/rabbit/test_rabbit.py b/tests/prometheus/rabbit/test_rabbit.py new file mode 100644 index 0000000000..6eef6d224f --- /dev/null +++ b/tests/prometheus/rabbit/test_rabbit.py @@ -0,0 +1,42 @@ +import pytest +from prometheus_client import CollectorRegistry + +from faststream.rabbit import RabbitBroker, RabbitExchange +from faststream.rabbit.prometheus.middleware import RabbitPrometheusMiddleware +from tests.brokers.rabbit.test_consume import TestConsume +from tests.brokers.rabbit.test_publish import TestPublish +from tests.prometheus.basic import LocalPrometheusTestcase + + +@pytest.fixture +def exchange(queue): + return RabbitExchange(name=queue) + + +@pytest.mark.rabbit +class TestPrometheus(LocalPrometheusTestcase): + def get_broker(self, apply_types=False, **kwargs): + return RabbitBroker(apply_types=apply_types, **kwargs) + + def get_middleware(self, **kwargs): + return RabbitPrometheusMiddleware(**kwargs) + + +@pytest.mark.rabbit +class TestPublishWithPrometheus(TestPublish): + def get_broker(self, apply_types: bool = False, **kwargs): + return RabbitBroker( + middlewares=(RabbitPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) + + +@pytest.mark.rabbit +class TestConsumeWithPrometheus(TestConsume): + def get_broker(self, apply_types: bool = False, **kwargs): + return RabbitBroker( + middlewares=(RabbitPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) diff --git a/tests/prometheus/redis/__init__.py b/tests/prometheus/redis/__init__.py new file mode 100644 index 0000000000..4752ef19b1 --- /dev/null +++ b/tests/prometheus/redis/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("redis") diff --git a/tests/prometheus/redis/test_redis.py b/tests/prometheus/redis/test_redis.py new file mode 100644 index 0000000000..4059c33d48 --- /dev/null +++ b/tests/prometheus/redis/test_redis.py @@ -0,0 +1,77 @@ +import asyncio +from unittest.mock import Mock + +import pytest +from prometheus_client import CollectorRegistry + +from faststream import Context +from faststream.redis import ListSub, RedisBroker +from faststream.redis.prometheus.middleware import RedisPrometheusMiddleware +from tests.brokers.redis.test_consume import TestConsume +from tests.brokers.redis.test_publish import TestPublish +from tests.prometheus.basic import LocalPrometheusTestcase + + +@pytest.mark.redis +class TestPrometheus(LocalPrometheusTestcase): + def get_broker(self, apply_types=False, **kwargs): + return RedisBroker(apply_types=apply_types, **kwargs) + + def get_middleware(self, **kwargs): + return RedisPrometheusMiddleware(**kwargs) + + async def test_metrics_batch( + self, + event: asyncio.Event, + queue: str, + ): + middleware = self.get_middleware(registry=CollectorRegistry()) + metrics_manager_mock = Mock() + middleware._metrics_manager = metrics_manager_mock + + broker = self.get_broker(apply_types=True, middlewares=(middleware,)) + + args, kwargs = self.get_subscriber_params(list=ListSub(queue, batch=True)) + + message = None + + @broker.subscriber(*args, **kwargs) + async def handler(m=Context("message")): + event.set() + + nonlocal message + message = m + + async with broker: + await broker.start() + tasks = ( + asyncio.create_task(broker.publish_batch("hello", "world", list=queue)), + asyncio.create_task(event.wait()), + ) + await asyncio.wait(tasks, timeout=self.timeout) + + assert event.is_set() + self.assert_consume_metrics( + metrics_manager=metrics_manager_mock, message=message, exception_class=None + ) + self.assert_publish_metrics(metrics_manager=metrics_manager_mock) + + +@pytest.mark.redis +class TestPublishWithPrometheus(TestPublish): + def get_broker(self, apply_types: bool = False, **kwargs): + return RedisBroker( + middlewares=(RedisPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) + + +@pytest.mark.redis +class TestConsumeWithPrometheus(TestConsume): + def get_broker(self, apply_types: bool = False, **kwargs): + return RedisBroker( + middlewares=(RedisPrometheusMiddleware(registry=CollectorRegistry()),), + apply_types=apply_types, + **kwargs, + ) diff --git a/tests/prometheus/test_metrics.py b/tests/prometheus/test_metrics.py new file mode 100644 index 0000000000..7f9aa85771 --- /dev/null +++ b/tests/prometheus/test_metrics.py @@ -0,0 +1,644 @@ +import random +from typing import List, Optional +from unittest.mock import ANY + +import pytest +from dirty_equals import IsPositiveFloat, IsStr +from prometheus_client import CollectorRegistry, Histogram, Metric +from prometheus_client.samples import Sample + +from faststream.prometheus.container import MetricsContainer +from faststream.prometheus.manager import MetricsManager +from faststream.prometheus.types import ProcessingStatus, PublishingStatus + + +class TestCaseMetrics: + @staticmethod + def create_metrics_manager( + app_name: Optional[str] = None, + metrics_prefix: Optional[str] = None, + received_messages_size_buckets: Optional[List[float]] = None, + ) -> MetricsManager: + registry = CollectorRegistry() + container = MetricsContainer( + registry, + metrics_prefix=metrics_prefix, + received_messages_size_buckets=received_messages_size_buckets, + ) + return MetricsManager(container, app_name=app_name) + + @pytest.fixture + def app_name(self, request) -> str: + return "youtube" + + @pytest.fixture + def metrics_prefix(self, request) -> str: + return "fs" + + @pytest.fixture + def broker(self) -> str: + return "rabbit" + + @pytest.fixture + def queue(self) -> str: + return "default.test" + + @pytest.fixture + def messages_amount(self) -> int: + return random.randint(1, 10) + + @pytest.fixture + def exception_type(self) -> str: + return Exception.__name__ + + def test_add_received_message( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + messages_amount: int, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + expected = Metric( + name=f"{metrics_prefix}_received_messages", + documentation="Count of received messages by broker and handler", + unit="", + typ="counter", + ) + expected.samples = [ + Sample( + name=f"{metrics_prefix}_received_messages_total", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=float(messages_amount), + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_received_messages_created", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=IsPositiveFloat, + timestamp=None, + exemplar=None, + ), + ] + + manager.add_received_message( + amount=messages_amount, broker=broker, handler=queue + ) + + metric_values = manager._container.received_messages_total.collect() + + assert metric_values == [expected] + + @pytest.mark.parametrize( + "is_default_buckets", + [ + pytest.param(True, id="with default buckets"), + pytest.param(False, id="with custom buckets"), + ], + ) + def test_observe_received_messages_size( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + is_default_buckets: bool, + ) -> None: + manager_kwargs = { + "app_name": app_name, + "metrics_prefix": metrics_prefix, + } + + custom_buckets = [1.0, 2.0, 3.0, float("inf")] + + if not is_default_buckets: + manager_kwargs["received_messages_size_buckets"] = custom_buckets + + manager = self.create_metrics_manager(**manager_kwargs) + + size = 1 + buckets = ( + MetricsContainer.DEFAULT_SIZE_BUCKETS + if is_default_buckets + else custom_buckets + ) + + expected = Metric( + name=f"{metrics_prefix}_received_messages_size_bytes", + documentation="Histogram of received messages size in bytes by broker and handler", + unit="", + typ="histogram", + ) + expected.samples = [ + *[ + Sample( + name=f"{metrics_prefix}_received_messages_size_bytes_bucket", + labels={ + "app_name": app_name, + "broker": broker, + "handler": queue, + "le": IsStr, + }, + value=1.0, + timestamp=None, + exemplar=None, + ) + for _ in buckets + ], + Sample( + name=f"{metrics_prefix}_received_messages_size_bytes_count", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=1.0, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_received_messages_size_bytes_sum", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=size, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_received_messages_size_bytes_created", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=ANY, + timestamp=None, + exemplar=None, + ), + ] + + manager.observe_received_messages_size(size=size, broker=broker, handler=queue) + + metric_values = manager._container.received_messages_size_bytes.collect() + + assert metric_values == [expected] + + def test_add_received_message_in_process( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + messages_amount: int, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + expected = Metric( + name=f"{metrics_prefix}_received_messages_in_process", + documentation="Gauge of received messages in process by broker and handler", + unit="", + typ="gauge", + ) + expected.samples = [ + Sample( + name=f"{metrics_prefix}_received_messages_in_process", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=float(messages_amount), + timestamp=None, + exemplar=None, + ), + ] + + manager.add_received_message_in_process( + amount=messages_amount, broker=broker, handler=queue + ) + + metric_values = manager._container.received_messages_in_process.collect() + + assert metric_values == [expected] + + def test_remove_received_message_in_process( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + messages_amount: int, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + expected = Metric( + name=f"{metrics_prefix}_received_messages_in_process", + documentation="Gauge of received messages in process by broker and handler", + unit="", + typ="gauge", + ) + expected.samples = [ + Sample( + name=f"{metrics_prefix}_received_messages_in_process", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=float(messages_amount - 1), + timestamp=None, + exemplar=None, + ), + ] + + manager.add_received_message_in_process( + amount=messages_amount, broker=broker, handler=queue + ) + manager.remove_received_message_in_process( + amount=1, broker=broker, handler=queue + ) + + metric_values = manager._container.received_messages_in_process.collect() + + assert metric_values == [expected] + + @pytest.mark.parametrize( + "status", + [ + pytest.param(ProcessingStatus.acked, id="acked status"), + pytest.param(ProcessingStatus.nacked, id="nacked status"), + pytest.param(ProcessingStatus.rejected, id="rejected status"), + pytest.param(ProcessingStatus.skipped, id="skipped status"), + pytest.param(ProcessingStatus.error, id="error status"), + ], + ) + def test_add_received_processed_message( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + messages_amount: int, + status: ProcessingStatus, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + expected = Metric( + name=f"{metrics_prefix}_received_processed_messages", + documentation="Count of received processed messages by broker, handler and status", + unit="", + typ="counter", + ) + expected.samples = [ + Sample( + name=f"{metrics_prefix}_received_processed_messages_total", + labels={ + "app_name": app_name, + "broker": broker, + "handler": queue, + "status": status.value, + }, + value=float(messages_amount), + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_received_processed_messages_created", + labels={ + "app_name": app_name, + "broker": broker, + "handler": queue, + "status": status.value, + }, + value=IsPositiveFloat, + timestamp=None, + exemplar=None, + ), + ] + + manager.add_received_processed_message( + amount=messages_amount, + status=status, + broker=broker, + handler=queue, + ) + + metric_values = manager._container.received_processed_messages_total.collect() + + assert metric_values == [expected] + + def test_observe_received_processed_message_duration( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + duration = 0.001 + + expected = Metric( + name=f"{metrics_prefix}_received_processed_messages_duration_seconds", + documentation="Histogram of received processed messages duration in seconds by broker and handler", + unit="", + typ="histogram", + ) + expected.samples = [ + *[ + Sample( + name=f"{metrics_prefix}_received_processed_messages_duration_seconds_bucket", + labels={ + "app_name": app_name, + "broker": broker, + "handler": queue, + "le": IsStr, + }, + value=1.0, + timestamp=None, + exemplar=None, + ) + for _ in Histogram.DEFAULT_BUCKETS + ], + Sample( + name=f"{metrics_prefix}_received_processed_messages_duration_seconds_count", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=1.0, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_received_processed_messages_duration_seconds_sum", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=duration, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_received_processed_messages_duration_seconds_created", + labels={"app_name": app_name, "broker": broker, "handler": queue}, + value=ANY, + timestamp=None, + exemplar=None, + ), + ] + + manager.observe_received_processed_message_duration( + duration=duration, + broker=broker, + handler=queue, + ) + + metric_values = ( + manager._container.received_processed_messages_duration_seconds.collect() + ) + + assert metric_values == [expected] + + def test_add_received_processed_message_exception( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + exception_type: str, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + expected = Metric( + name=f"{metrics_prefix}_received_processed_messages_exceptions", + documentation="Count of received processed messages exceptions by broker, handler and exception_type", + unit="", + typ="counter", + ) + expected.samples = [ + Sample( + name=f"{metrics_prefix}_received_processed_messages_exceptions_total", + labels={ + "app_name": app_name, + "broker": broker, + "handler": queue, + "exception_type": exception_type, + }, + value=1.0, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_received_processed_messages_exceptions_created", + labels={ + "app_name": app_name, + "broker": broker, + "handler": queue, + "exception_type": exception_type, + }, + value=IsPositiveFloat, + timestamp=None, + exemplar=None, + ), + ] + + manager.add_received_processed_message_exception( + exception_type=exception_type, + broker=broker, + handler=queue, + ) + + metric_values = ( + manager._container.received_processed_messages_exceptions_total.collect() + ) + + assert metric_values == [expected] + + @pytest.mark.parametrize( + "status", + [ + pytest.param(PublishingStatus.success, id="success status"), + pytest.param(PublishingStatus.error, id="error status"), + ], + ) + def test_add_published_message( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + messages_amount: int, + status: PublishingStatus, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + expected = Metric( + name=f"{metrics_prefix}_published_messages", + documentation="Count of published messages by destination and status", + unit="", + typ="counter", + ) + expected.samples = [ + Sample( + name=f"{metrics_prefix}_published_messages_total", + labels={ + "app_name": app_name, + "broker": broker, + "destination": queue, + "status": status.value, + }, + value=1.0, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_published_messages_created", + labels={ + "app_name": app_name, + "broker": broker, + "destination": queue, + "status": status.value, + }, + value=IsPositiveFloat, + timestamp=None, + exemplar=None, + ), + ] + + manager.add_published_message( + status=status, + broker=broker, + destination=queue, + ) + + metric_values = manager._container.published_messages_total.collect() + + assert metric_values == [expected] + + def test_observe_published_message_duration( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + duration = 0.001 + + expected = Metric( + name=f"{metrics_prefix}_published_messages_duration_seconds", + documentation="Histogram of published messages duration in seconds by broker and destination", + unit="", + typ="histogram", + ) + expected.samples = [ + *[ + Sample( + name=f"{metrics_prefix}_published_messages_duration_seconds_bucket", + labels={ + "app_name": app_name, + "broker": broker, + "destination": queue, + "le": IsStr, + }, + value=1.0, + timestamp=None, + exemplar=None, + ) + for _ in Histogram.DEFAULT_BUCKETS + ], + Sample( + name=f"{metrics_prefix}_published_messages_duration_seconds_count", + labels={"app_name": app_name, "broker": broker, "destination": queue}, + value=1.0, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_published_messages_duration_seconds_sum", + labels={"app_name": app_name, "broker": broker, "destination": queue}, + value=duration, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_published_messages_duration_seconds_created", + labels={"app_name": app_name, "broker": broker, "destination": queue}, + value=IsPositiveFloat, + timestamp=None, + exemplar=None, + ), + ] + + manager.observe_published_message_duration( + duration=duration, + broker=broker, + destination=queue, + ) + + metric_values = manager._container.published_messages_duration_seconds.collect() + + assert metric_values == [expected] + + def test_add_published_message_exception( + self, + app_name: str, + metrics_prefix: str, + queue: str, + broker: str, + exception_type: str, + ) -> None: + manager = self.create_metrics_manager( + app_name=app_name, + metrics_prefix=metrics_prefix, + ) + + expected = Metric( + name=f"{metrics_prefix}_published_messages_exceptions", + documentation="Count of published messages exceptions by broker, destination and exception_type", + unit="", + typ="counter", + ) + expected.samples = [ + Sample( + name=f"{metrics_prefix}_published_messages_exceptions_total", + labels={ + "app_name": app_name, + "broker": broker, + "destination": queue, + "exception_type": exception_type, + }, + value=1.0, + timestamp=None, + exemplar=None, + ), + Sample( + name=f"{metrics_prefix}_published_messages_exceptions_created", + labels={ + "app_name": app_name, + "broker": broker, + "destination": queue, + "exception_type": exception_type, + }, + value=IsPositiveFloat, + timestamp=None, + exemplar=None, + ), + ] + + manager.add_published_message_exception( + exception_type=exception_type, + broker=broker, + destination=queue, + ) + + metric_values = manager._container.published_messages_exceptions_total.collect() + + assert metric_values == [expected]