diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md
index 6f998be4f8..86ff1025b7 100644
--- a/docs/docs/SUMMARY.md
+++ b/docs/docs/SUMMARY.md
@@ -44,6 +44,7 @@ search:
- [CLI](getting-started/cli/index.md)
- [ASGI](getting-started/asgi.md)
- [OpenTelemetry](getting-started/opentelemetry/index.md)
+ - [Prometheus](getting-started/prometheus/index.md)
- [Logging](getting-started/logging.md)
- [Config Management](getting-started/config/index.md)
- [Task Scheduling](scheduling.md)
@@ -523,6 +524,15 @@ search:
- [telemetry_attributes_provider_factory](api/faststream/confluent/opentelemetry/provider/telemetry_attributes_provider_factory.md)
- parser
- [AsyncConfluentParser](api/faststream/confluent/parser/AsyncConfluentParser.md)
+ - prometheus
+ - [KafkaPrometheusMiddleware](api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md)
+ - middleware
+ - [KafkaPrometheusMiddleware](api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md)
+ - provider
+ - [BaseConfluentMetricsSettingsProvider](api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md)
+ - [BatchConfluentMetricsSettingsProvider](api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md)
+ - [ConfluentMetricsSettingsProvider](api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md)
+ - [settings_provider_factory](api/faststream/confluent/prometheus/provider/settings_provider_factory.md)
- publisher
- asyncapi
- [AsyncAPIBatchPublisher](api/faststream/confluent/publisher/asyncapi/AsyncAPIBatchPublisher.md)
@@ -619,6 +629,15 @@ search:
- parser
- [AioKafkaBatchParser](api/faststream/kafka/parser/AioKafkaBatchParser.md)
- [AioKafkaParser](api/faststream/kafka/parser/AioKafkaParser.md)
+ - prometheus
+ - [KafkaPrometheusMiddleware](api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md)
+ - middleware
+ - [KafkaPrometheusMiddleware](api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md)
+ - provider
+ - [BaseKafkaMetricsSettingsProvider](api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md)
+ - [BatchKafkaMetricsSettingsProvider](api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md)
+ - [KafkaMetricsSettingsProvider](api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md)
+ - [settings_provider_factory](api/faststream/kafka/prometheus/provider/settings_provider_factory.md)
- publisher
- asyncapi
- [AsyncAPIBatchPublisher](api/faststream/kafka/publisher/asyncapi/AsyncAPIBatchPublisher.md)
@@ -732,6 +751,15 @@ search:
- [NatsBaseParser](api/faststream/nats/parser/NatsBaseParser.md)
- [NatsParser](api/faststream/nats/parser/NatsParser.md)
- [ObjParser](api/faststream/nats/parser/ObjParser.md)
+ - prometheus
+ - [NatsPrometheusMiddleware](api/faststream/nats/prometheus/NatsPrometheusMiddleware.md)
+ - middleware
+ - [NatsPrometheusMiddleware](api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md)
+ - provider
+ - [BaseNatsMetricsSettingsProvider](api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md)
+ - [BatchNatsMetricsSettingsProvider](api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md)
+ - [NatsMetricsSettingsProvider](api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md)
+ - [settings_provider_factory](api/faststream/nats/prometheus/provider/settings_provider_factory.md)
- publisher
- asyncapi
- [AsyncAPIPublisher](api/faststream/nats/publisher/asyncapi/AsyncAPIPublisher.md)
@@ -810,6 +838,23 @@ search:
- [TelemetryMiddleware](api/faststream/opentelemetry/middleware/TelemetryMiddleware.md)
- provider
- [TelemetrySettingsProvider](api/faststream/opentelemetry/provider/TelemetrySettingsProvider.md)
+ - prometheus
+ - [BasePrometheusMiddleware](api/faststream/prometheus/BasePrometheusMiddleware.md)
+ - [ConsumeAttrs](api/faststream/prometheus/ConsumeAttrs.md)
+ - [MetricsSettingsProvider](api/faststream/prometheus/MetricsSettingsProvider.md)
+ - container
+ - [MetricsContainer](api/faststream/prometheus/container/MetricsContainer.md)
+ - manager
+ - [MetricsManager](api/faststream/prometheus/manager/MetricsManager.md)
+ - middleware
+ - [BasePrometheusMiddleware](api/faststream/prometheus/middleware/BasePrometheusMiddleware.md)
+ - [PrometheusMiddleware](api/faststream/prometheus/middleware/PrometheusMiddleware.md)
+ - provider
+ - [MetricsSettingsProvider](api/faststream/prometheus/provider/MetricsSettingsProvider.md)
+ - types
+ - [ConsumeAttrs](api/faststream/prometheus/types/ConsumeAttrs.md)
+ - [ProcessingStatus](api/faststream/prometheus/types/ProcessingStatus.md)
+ - [PublishingStatus](api/faststream/prometheus/types/PublishingStatus.md)
- rabbit
- [ExchangeType](api/faststream/rabbit/ExchangeType.md)
- [RabbitBroker](api/faststream/rabbit/RabbitBroker.md)
@@ -848,6 +893,12 @@ search:
- [RabbitTelemetrySettingsProvider](api/faststream/rabbit/opentelemetry/provider/RabbitTelemetrySettingsProvider.md)
- parser
- [AioPikaParser](api/faststream/rabbit/parser/AioPikaParser.md)
+ - prometheus
+ - [RabbitPrometheusMiddleware](api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md)
+ - middleware
+ - [RabbitPrometheusMiddleware](api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md)
+ - provider
+ - [RabbitMetricsSettingsProvider](api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md)
- publisher
- asyncapi
- [AsyncAPIPublisher](api/faststream/rabbit/publisher/asyncapi/AsyncAPIPublisher.md)
@@ -949,6 +1000,15 @@ search:
- [RedisPubSubParser](api/faststream/redis/parser/RedisPubSubParser.md)
- [RedisStreamParser](api/faststream/redis/parser/RedisStreamParser.md)
- [SimpleParser](api/faststream/redis/parser/SimpleParser.md)
+ - prometheus
+ - [RedisPrometheusMiddleware](api/faststream/redis/prometheus/RedisPrometheusMiddleware.md)
+ - middleware
+ - [RedisPrometheusMiddleware](api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md)
+ - provider
+ - [BaseRedisMetricsSettingsProvider](api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md)
+ - [BatchRedisMetricsSettingsProvider](api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md)
+ - [RedisMetricsSettingsProvider](api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md)
+ - [settings_provider_factory](api/faststream/redis/prometheus/provider/settings_provider_factory.md)
- publisher
- asyncapi
- [AsyncAPIChannelPublisher](api/faststream/redis/publisher/asyncapi/AsyncAPIChannelPublisher.md)
diff --git a/docs/docs/en/api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md b/docs/docs/en/api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md
new file mode 100644
index 0000000000..e84e84acc3
--- /dev/null
+++ b/docs/docs/en/api/faststream/confluent/prometheus/KafkaPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.confluent.prometheus.KafkaPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md b/docs/docs/en/api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md
new file mode 100644
index 0000000000..6603893f74
--- /dev/null
+++ b/docs/docs/en/api/faststream/confluent/prometheus/middleware/KafkaPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.confluent.prometheus.middleware.KafkaPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md b/docs/docs/en/api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md
new file mode 100644
index 0000000000..27c186c098
--- /dev/null
+++ b/docs/docs/en/api/faststream/confluent/prometheus/provider/BaseConfluentMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.confluent.prometheus.provider.BaseConfluentMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md b/docs/docs/en/api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md
new file mode 100644
index 0000000000..f784a64e9f
--- /dev/null
+++ b/docs/docs/en/api/faststream/confluent/prometheus/provider/BatchConfluentMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.confluent.prometheus.provider.BatchConfluentMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md b/docs/docs/en/api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md
new file mode 100644
index 0000000000..65f0a8348e
--- /dev/null
+++ b/docs/docs/en/api/faststream/confluent/prometheus/provider/ConfluentMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.confluent.prometheus.provider.ConfluentMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/confluent/prometheus/provider/settings_provider_factory.md b/docs/docs/en/api/faststream/confluent/prometheus/provider/settings_provider_factory.md
new file mode 100644
index 0000000000..78358f46e3
--- /dev/null
+++ b/docs/docs/en/api/faststream/confluent/prometheus/provider/settings_provider_factory.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.confluent.prometheus.provider.settings_provider_factory
diff --git a/docs/docs/en/api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md b/docs/docs/en/api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md
new file mode 100644
index 0000000000..c2ffd5356a
--- /dev/null
+++ b/docs/docs/en/api/faststream/kafka/prometheus/KafkaPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.kafka.prometheus.KafkaPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md b/docs/docs/en/api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md
new file mode 100644
index 0000000000..451b7080c0
--- /dev/null
+++ b/docs/docs/en/api/faststream/kafka/prometheus/middleware/KafkaPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.kafka.prometheus.middleware.KafkaPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md b/docs/docs/en/api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md
new file mode 100644
index 0000000000..0fd044f694
--- /dev/null
+++ b/docs/docs/en/api/faststream/kafka/prometheus/provider/BaseKafkaMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.kafka.prometheus.provider.BaseKafkaMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md b/docs/docs/en/api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md
new file mode 100644
index 0000000000..9bd01d5e71
--- /dev/null
+++ b/docs/docs/en/api/faststream/kafka/prometheus/provider/BatchKafkaMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.kafka.prometheus.provider.BatchKafkaMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md b/docs/docs/en/api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md
new file mode 100644
index 0000000000..ae7c490da8
--- /dev/null
+++ b/docs/docs/en/api/faststream/kafka/prometheus/provider/KafkaMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.kafka.prometheus.provider.KafkaMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/kafka/prometheus/provider/settings_provider_factory.md b/docs/docs/en/api/faststream/kafka/prometheus/provider/settings_provider_factory.md
new file mode 100644
index 0000000000..1393fd9065
--- /dev/null
+++ b/docs/docs/en/api/faststream/kafka/prometheus/provider/settings_provider_factory.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.kafka.prometheus.provider.settings_provider_factory
diff --git a/docs/docs/en/api/faststream/nats/prometheus/NatsPrometheusMiddleware.md b/docs/docs/en/api/faststream/nats/prometheus/NatsPrometheusMiddleware.md
new file mode 100644
index 0000000000..d9b179b0c4
--- /dev/null
+++ b/docs/docs/en/api/faststream/nats/prometheus/NatsPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.nats.prometheus.NatsPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md b/docs/docs/en/api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md
new file mode 100644
index 0000000000..7202731048
--- /dev/null
+++ b/docs/docs/en/api/faststream/nats/prometheus/middleware/NatsPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.nats.prometheus.middleware.NatsPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md b/docs/docs/en/api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md
new file mode 100644
index 0000000000..80742833bc
--- /dev/null
+++ b/docs/docs/en/api/faststream/nats/prometheus/provider/BaseNatsMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.nats.prometheus.provider.BaseNatsMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md b/docs/docs/en/api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md
new file mode 100644
index 0000000000..163ebb7bc6
--- /dev/null
+++ b/docs/docs/en/api/faststream/nats/prometheus/provider/BatchNatsMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.nats.prometheus.provider.BatchNatsMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md b/docs/docs/en/api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md
new file mode 100644
index 0000000000..e5515a4cc5
--- /dev/null
+++ b/docs/docs/en/api/faststream/nats/prometheus/provider/NatsMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.nats.prometheus.provider.NatsMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/nats/prometheus/provider/settings_provider_factory.md b/docs/docs/en/api/faststream/nats/prometheus/provider/settings_provider_factory.md
new file mode 100644
index 0000000000..aeaa7b26e0
--- /dev/null
+++ b/docs/docs/en/api/faststream/nats/prometheus/provider/settings_provider_factory.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.nats.prometheus.provider.settings_provider_factory
diff --git a/docs/docs/en/api/faststream/prometheus/BasePrometheusMiddleware.md b/docs/docs/en/api/faststream/prometheus/BasePrometheusMiddleware.md
new file mode 100644
index 0000000000..1f5cf6a1d4
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/BasePrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.BasePrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/prometheus/ConsumeAttrs.md b/docs/docs/en/api/faststream/prometheus/ConsumeAttrs.md
new file mode 100644
index 0000000000..ad8e536b7a
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/ConsumeAttrs.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.ConsumeAttrs
diff --git a/docs/docs/en/api/faststream/prometheus/MetricsSettingsProvider.md b/docs/docs/en/api/faststream/prometheus/MetricsSettingsProvider.md
new file mode 100644
index 0000000000..0f7405e44d
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/MetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.MetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/prometheus/container/MetricsContainer.md b/docs/docs/en/api/faststream/prometheus/container/MetricsContainer.md
new file mode 100644
index 0000000000..009d88d263
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/container/MetricsContainer.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.container.MetricsContainer
diff --git a/docs/docs/en/api/faststream/prometheus/manager/MetricsManager.md b/docs/docs/en/api/faststream/prometheus/manager/MetricsManager.md
new file mode 100644
index 0000000000..b1a897c717
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/manager/MetricsManager.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.manager.MetricsManager
diff --git a/docs/docs/en/api/faststream/prometheus/middleware/BasePrometheusMiddleware.md b/docs/docs/en/api/faststream/prometheus/middleware/BasePrometheusMiddleware.md
new file mode 100644
index 0000000000..62bbd031ac
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/middleware/BasePrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.middleware.BasePrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/prometheus/middleware/PrometheusMiddleware.md b/docs/docs/en/api/faststream/prometheus/middleware/PrometheusMiddleware.md
new file mode 100644
index 0000000000..2902586e38
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/middleware/PrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.middleware.PrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/prometheus/provider/MetricsSettingsProvider.md b/docs/docs/en/api/faststream/prometheus/provider/MetricsSettingsProvider.md
new file mode 100644
index 0000000000..3511a21a5b
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/provider/MetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.provider.MetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/prometheus/types/ConsumeAttrs.md b/docs/docs/en/api/faststream/prometheus/types/ConsumeAttrs.md
new file mode 100644
index 0000000000..d9196cab8d
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/types/ConsumeAttrs.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.types.ConsumeAttrs
diff --git a/docs/docs/en/api/faststream/prometheus/types/ProcessingStatus.md b/docs/docs/en/api/faststream/prometheus/types/ProcessingStatus.md
new file mode 100644
index 0000000000..98b6710bcd
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/types/ProcessingStatus.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.types.ProcessingStatus
diff --git a/docs/docs/en/api/faststream/prometheus/types/PublishingStatus.md b/docs/docs/en/api/faststream/prometheus/types/PublishingStatus.md
new file mode 100644
index 0000000000..4e7435fbea
--- /dev/null
+++ b/docs/docs/en/api/faststream/prometheus/types/PublishingStatus.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.prometheus.types.PublishingStatus
diff --git a/docs/docs/en/api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md b/docs/docs/en/api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md
new file mode 100644
index 0000000000..2c4308fabd
--- /dev/null
+++ b/docs/docs/en/api/faststream/rabbit/prometheus/RabbitPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.rabbit.prometheus.RabbitPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md b/docs/docs/en/api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md
new file mode 100644
index 0000000000..45163c998a
--- /dev/null
+++ b/docs/docs/en/api/faststream/rabbit/prometheus/middleware/RabbitPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.rabbit.prometheus.middleware.RabbitPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md b/docs/docs/en/api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md
new file mode 100644
index 0000000000..6d63301b34
--- /dev/null
+++ b/docs/docs/en/api/faststream/rabbit/prometheus/provider/RabbitMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.rabbit.prometheus.provider.RabbitMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/redis/prometheus/RedisPrometheusMiddleware.md b/docs/docs/en/api/faststream/redis/prometheus/RedisPrometheusMiddleware.md
new file mode 100644
index 0000000000..01b23fe4f1
--- /dev/null
+++ b/docs/docs/en/api/faststream/redis/prometheus/RedisPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.redis.prometheus.RedisPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md b/docs/docs/en/api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md
new file mode 100644
index 0000000000..c29cc91130
--- /dev/null
+++ b/docs/docs/en/api/faststream/redis/prometheus/middleware/RedisPrometheusMiddleware.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.redis.prometheus.middleware.RedisPrometheusMiddleware
diff --git a/docs/docs/en/api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md b/docs/docs/en/api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md
new file mode 100644
index 0000000000..243414331b
--- /dev/null
+++ b/docs/docs/en/api/faststream/redis/prometheus/provider/BaseRedisMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.redis.prometheus.provider.BaseRedisMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md b/docs/docs/en/api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md
new file mode 100644
index 0000000000..33d1d2d3a1
--- /dev/null
+++ b/docs/docs/en/api/faststream/redis/prometheus/provider/BatchRedisMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.redis.prometheus.provider.BatchRedisMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md b/docs/docs/en/api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md
new file mode 100644
index 0000000000..a7f5f3abe8
--- /dev/null
+++ b/docs/docs/en/api/faststream/redis/prometheus/provider/RedisMetricsSettingsProvider.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.redis.prometheus.provider.RedisMetricsSettingsProvider
diff --git a/docs/docs/en/api/faststream/redis/prometheus/provider/settings_provider_factory.md b/docs/docs/en/api/faststream/redis/prometheus/provider/settings_provider_factory.md
new file mode 100644
index 0000000000..aa4812f1e2
--- /dev/null
+++ b/docs/docs/en/api/faststream/redis/prometheus/provider/settings_provider_factory.md
@@ -0,0 +1,11 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 0.5
+---
+
+::: faststream.redis.prometheus.provider.settings_provider_factory
diff --git a/docs/docs/en/getting-started/prometheus/index.md b/docs/docs/en/getting-started/prometheus/index.md
new file mode 100644
index 0000000000..54203ce7df
--- /dev/null
+++ b/docs/docs/en/getting-started/prometheus/index.md
@@ -0,0 +1,82 @@
+---
+# 0.5 - API
+# 2 - Release
+# 3 - Contributing
+# 5 - Template Page
+# 10 - Default
+search:
+ boost: 10
+---
+
+# Prometheus
+
+[**Prometheus**](https://prometheus.io/){.external-link target="_blank"} is an open-source monitoring and alerting toolkit originally built at SoundCloud.
+With a focus on reliability, robustness, and easy scalability, **Prometheus** allows users to collect metrics,
+scrape data from various sources, store them efficiently, and query them in real-time. Its flexible data model,
+powerful query language, and seamless integration with [**Grafana**](https://grafana.com/){.external-link target="_blank"} make it a popular choice for monitoring the health
+and performance of systems and applications.
+
+### FastStream Metrics
+
+To add a metrics to your broker, you need to:
+
+1. Install `FastStream` with `prometheus-client`
+
+ ```shell
+ pip install faststream[prometheus]
+ ```
+
+2. Add `PrometheusMiddleware` to your broker
+
+{!> includes/getting_started/prometheus/1.md !}
+
+### Exposing the `/metrics` endpoint
+The way Prometheus works requires the service to expose an HTTP endpoint for analysis.
+By convention, this is a GET endpoint, and its path is usually `/metrics`.
+
+FastStream's built-in **ASGI** support allows you to expose endpoints in your application.
+
+A convenient way to serve this endpoint is to use `make_asgi_app` from `prometheus_client`,
+passing in the registry that was passed to `PrometheusMiddleware`.
+
+{!> includes/getting_started/prometheus/2.md !}
+
+---
+
+### Exported metrics
+
+{% set received_messages_total_description = 'The metric is incremented each time the application receives a message.
This is necessary to count messages that the application has received but has not yet started processing.' %}
+{% set received_messages_size_bytes_description = 'The metric is filled with the sizes of received messages. When a message is received, the size of its body in bytes is calculated and written to the metric.
Useful for analyzing the sizes of incoming messages, also in cases when the application receives messages of unexpected sizes.' %}
+{% set received_messages_in_process_description = 'The metric is incremented when the message processing starts and decremented when the processing ends.
It is necessary to count the number of messages that the application processes.
Such a metric will help answer the question: _`is there a need to scale the service?`_' %}
+{% set received_processed_messages_total_description = 'The metric is incremented after a message is processed, regardless of whether the processing ended with a success or an error.
This metric allows you to analyze the number of processed messages and their statuses.' %}
+{% set received_processed_messages_duration_seconds_description = 'The metric is filled with the message processing time regardless of whether the processing ended with a success or an error.
Time stamps are recorded just before and immediately after the processing.
Then the metric is filled with their difference (in seconds).' %}
+{% set received_processed_messages_exceptions_total_description = 'The metric is incremented if any exception occurred while processing a message (except `AckMessage`, `NackMessage`, `RejectMessage` and `SkipMessage`).
It can be used to draw conclusions about how many and what kind of exceptions occurred while processing messages.' %}
+{% set published_messages_total_description = 'The metric is incremented when messages are sent, regardless of whether the sending was successful or not.' %}
+{% set published_messages_duration_seconds_description = 'The metric is filled with the time the message was sent, regardless of whether the sending was successful or failed.
Timestamps are written immediately before and immediately after sending.
Then the metric is filled with their difference (in seconds).' %}
+{% set published_messages_exceptions_total_description = 'The metric increases if any exception occurred while sending a message.
You can draw conclusions about how many and what exceptions occurred while sending messages.' %}
+
+
+| Metric | Type | Description | Labels |
+|--------------------------------------------------|---------------|----------------------------------------------------------------|-------------------------------------------------------|
+| **received_messages_total** | **Counter** | {{ received_messages_total_description }} | `app_name`, `broker`, `handler` |
+| **received_messages_size_bytes** | **Histogram** | {{ received_messages_size_bytes_description }} | `app_name`, `broker`, `handler` |
+| **received_messages_in_process** | **Gauge** | {{ received_messages_in_process_description }} | `app_name`, `broker`, `handler` |
+| **received_processed_messages_total** | **Counter** | {{ received_processed_messages_total_description }} | `app_name`, `broker`, `handler`, `status` |
+| **received_processed_messages_duration_seconds** | **Histogram** | {{ received_processed_messages_duration_seconds_description }} | `app_name`, `broker`, `handler` |
+| **received_processed_messages_exceptions_total** | **Counter** | {{ received_processed_messages_exceptions_total_description }} | `app_name`, `broker`, `handler`, `exception_type` |
+| **published_messages_total** | **Counter** | {{ published_messages_total_description }} | `app_name`, `broker`, `destination`, `status` |
+| **published_messages_duration_seconds** | **Histogram** | {{ published_messages_duration_seconds_description }} | `app_name`, `broker`, `destination` |
+| **published_messages_exceptions_total** | **Counter** | {{ published_messages_exceptions_total_description }} | `app_name`, `broker`, `destination`, `exception_type` |
+
+### Labels
+
+| Label | Description | Values |
+|-----------------------------------|-----------------------------------------------------------------|---------------------------------------------------|
+| app_name | The name of the application, which the user can specify himself | `faststream` by default |
+| broker | Broker name | `kafka`, `rabbit`, `nats`, `redis` |
+| handler | Where the message came from | |
+| status (while receiving) | Message processing status | `acked`, `nacked`, `rejected`, `skipped`, `error` |
+| exception_type (while receiving) | Exception type when processing message | |
+| status (while publishing) | Message publishing status | `success`, `error` |
+| destination | Where the message is sent | |
+| exception_type (while publishing) | Exception type when publishing message | |
diff --git a/docs/docs/navigation_template.txt b/docs/docs/navigation_template.txt
index 4cd45d0874..e505d5a783 100644
--- a/docs/docs/navigation_template.txt
+++ b/docs/docs/navigation_template.txt
@@ -44,6 +44,7 @@ search:
- [CLI](getting-started/cli/index.md)
- [ASGI](getting-started/asgi.md)
- [OpenTelemetry](getting-started/opentelemetry/index.md)
+ - [Prometheus](getting-started/prometheus/index.md)
- [Logging](getting-started/logging.md)
- [Config Management](getting-started/config/index.md)
- [Task Scheduling](scheduling.md)
diff --git a/docs/docs_src/getting_started/prometheus/__init__.py b/docs/docs_src/getting_started/prometheus/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/docs/docs_src/getting_started/prometheus/confluent.py b/docs/docs_src/getting_started/prometheus/confluent.py
new file mode 100644
index 0000000000..2d89e8bee6
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/confluent.py
@@ -0,0 +1,13 @@
+from faststream import FastStream
+from faststream.confluent import KafkaBroker
+from faststream.confluent.prometheus import KafkaPrometheusMiddleware
+from prometheus_client import CollectorRegistry
+
+registry = CollectorRegistry()
+
+broker = KafkaBroker(
+ middlewares=(
+ KafkaPrometheusMiddleware(registry=registry),
+ )
+)
+app = FastStream(broker)
diff --git a/docs/docs_src/getting_started/prometheus/confluent_asgi.py b/docs/docs_src/getting_started/prometheus/confluent_asgi.py
new file mode 100644
index 0000000000..2574a49cef
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/confluent_asgi.py
@@ -0,0 +1,18 @@
+from faststream.asgi import AsgiFastStream
+from faststream.confluent import KafkaBroker
+from faststream.confluent.prometheus import KafkaPrometheusMiddleware
+from prometheus_client import CollectorRegistry, make_asgi_app
+
+registry = CollectorRegistry()
+
+broker = KafkaBroker(
+ middlewares=(
+ KafkaPrometheusMiddleware(registry=registry),
+ )
+)
+app = AsgiFastStream(
+ broker,
+ asgi_routes=[
+ ("/metrics", make_asgi_app(registry)),
+ ]
+)
diff --git a/docs/docs_src/getting_started/prometheus/kafka.py b/docs/docs_src/getting_started/prometheus/kafka.py
new file mode 100644
index 0000000000..f6d1224e66
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/kafka.py
@@ -0,0 +1,13 @@
+from faststream import FastStream
+from faststream.kafka import KafkaBroker
+from faststream.kafka.prometheus import KafkaPrometheusMiddleware
+from prometheus_client import CollectorRegistry
+
+registry = CollectorRegistry()
+
+broker = KafkaBroker(
+ middlewares=(
+ KafkaPrometheusMiddleware(registry=registry),
+ )
+)
+app = FastStream(broker)
diff --git a/docs/docs_src/getting_started/prometheus/kafka_asgi.py b/docs/docs_src/getting_started/prometheus/kafka_asgi.py
new file mode 100644
index 0000000000..ddf79040d9
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/kafka_asgi.py
@@ -0,0 +1,18 @@
+from faststream.asgi import AsgiFastStream
+from faststream.kafka import KafkaBroker
+from faststream.kafka.prometheus import KafkaPrometheusMiddleware
+from prometheus_client import CollectorRegistry, make_asgi_app
+
+registry = CollectorRegistry()
+
+broker = KafkaBroker(
+ middlewares=(
+ KafkaPrometheusMiddleware(registry=registry),
+ )
+)
+app = AsgiFastStream(
+ broker,
+ asgi_routes=[
+ ("/metrics", make_asgi_app(registry)),
+ ]
+)
diff --git a/docs/docs_src/getting_started/prometheus/nats.py b/docs/docs_src/getting_started/prometheus/nats.py
new file mode 100644
index 0000000000..0894078881
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/nats.py
@@ -0,0 +1,13 @@
+from faststream import FastStream
+from faststream.nats import NatsBroker
+from faststream.nats.prometheus import NatsPrometheusMiddleware
+from prometheus_client import CollectorRegistry
+
+registry = CollectorRegistry()
+
+broker = NatsBroker(
+ middlewares=(
+ NatsPrometheusMiddleware(registry=registry),
+ )
+)
+app = FastStream(broker)
diff --git a/docs/docs_src/getting_started/prometheus/nats_asgi.py b/docs/docs_src/getting_started/prometheus/nats_asgi.py
new file mode 100644
index 0000000000..c273a36f36
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/nats_asgi.py
@@ -0,0 +1,18 @@
+from faststream.asgi import AsgiFastStream
+from faststream.nats import NatsBroker
+from faststream.nats.prometheus import NatsPrometheusMiddleware
+from prometheus_client import CollectorRegistry, make_asgi_app
+
+registry = CollectorRegistry()
+
+broker = NatsBroker(
+ middlewares=(
+ NatsPrometheusMiddleware(registry=registry),
+ )
+)
+app = AsgiFastStream(
+ broker,
+ asgi_routes=[
+ ("/metrics", make_asgi_app(registry)),
+ ]
+)
diff --git a/docs/docs_src/getting_started/prometheus/rabbit.py b/docs/docs_src/getting_started/prometheus/rabbit.py
new file mode 100644
index 0000000000..a0fb683b7f
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/rabbit.py
@@ -0,0 +1,13 @@
+from faststream import FastStream
+from faststream.rabbit import RabbitBroker
+from faststream.rabbit.prometheus import RabbitPrometheusMiddleware
+from prometheus_client import CollectorRegistry
+
+registry = CollectorRegistry()
+
+broker = RabbitBroker(
+ middlewares=(
+ RabbitPrometheusMiddleware(registry=registry),
+ )
+)
+app = FastStream(broker)
diff --git a/docs/docs_src/getting_started/prometheus/rabbit_asgi.py b/docs/docs_src/getting_started/prometheus/rabbit_asgi.py
new file mode 100644
index 0000000000..40bc990fcc
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/rabbit_asgi.py
@@ -0,0 +1,18 @@
+from faststream.asgi import AsgiFastStream
+from faststream.rabbit import RabbitBroker
+from faststream.rabbit.prometheus import RabbitPrometheusMiddleware
+from prometheus_client import CollectorRegistry, make_asgi_app
+
+registry = CollectorRegistry()
+
+broker = RabbitBroker(
+ middlewares=(
+ RabbitPrometheusMiddleware(registry=registry),
+ )
+)
+app = AsgiFastStream(
+ broker,
+ asgi_routes=[
+ ("/metrics", make_asgi_app(registry)),
+ ]
+)
diff --git a/docs/docs_src/getting_started/prometheus/redis.py b/docs/docs_src/getting_started/prometheus/redis.py
new file mode 100644
index 0000000000..98fc2b70c0
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/redis.py
@@ -0,0 +1,13 @@
+from faststream import FastStream
+from faststream.redis import RedisBroker
+from faststream.redis.prometheus import RedisPrometheusMiddleware
+from prometheus_client import CollectorRegistry
+
+registry = CollectorRegistry()
+
+broker = RedisBroker(
+ middlewares=(
+ RedisPrometheusMiddleware(registry=registry),
+ )
+)
+app = FastStream(broker)
diff --git a/docs/docs_src/getting_started/prometheus/redis_asgi.py b/docs/docs_src/getting_started/prometheus/redis_asgi.py
new file mode 100644
index 0000000000..2c3b095f1b
--- /dev/null
+++ b/docs/docs_src/getting_started/prometheus/redis_asgi.py
@@ -0,0 +1,18 @@
+from faststream.asgi import AsgiFastStream
+from faststream.redis import RedisBroker
+from faststream.redis.prometheus import RedisPrometheusMiddleware
+from prometheus_client import CollectorRegistry, make_asgi_app
+
+registry = CollectorRegistry()
+
+broker = RedisBroker(
+ middlewares=(
+ RedisPrometheusMiddleware(registry=registry),
+ )
+)
+app = AsgiFastStream(
+ broker,
+ asgi_routes=[
+ ("/metrics", make_asgi_app(registry)),
+ ]
+)
diff --git a/docs/includes/getting_started/prometheus/1.md b/docs/includes/getting_started/prometheus/1.md
new file mode 100644
index 0000000000..16ad860ff8
--- /dev/null
+++ b/docs/includes/getting_started/prometheus/1.md
@@ -0,0 +1,24 @@
+=== "AIOKafka"
+ ```python linenums="1" hl_lines="6 10"
+ {!> docs_src/getting_started/prometheus/kafka.py!}
+ ```
+
+=== "Confluent"
+ ```python linenums="1" hl_lines="6 10"
+ {!> docs_src/getting_started/prometheus/confluent.py!}
+ ```
+
+=== "RabbitMQ"
+ ```python linenums="1" hl_lines="6 10"
+ {!> docs_src/getting_started/prometheus/rabbit.py!}
+ ```
+
+=== "NATS"
+ ```python linenums="1" hl_lines="6 10"
+ {!> docs_src/getting_started/prometheus/nats.py!}
+ ```
+
+=== "Redis"
+ ```python linenums="1" hl_lines="6 10"
+ {!> docs_src/getting_started/prometheus/redis.py!}
+ ```
diff --git a/docs/includes/getting_started/prometheus/2.md b/docs/includes/getting_started/prometheus/2.md
new file mode 100644
index 0000000000..483e5437f4
--- /dev/null
+++ b/docs/includes/getting_started/prometheus/2.md
@@ -0,0 +1,24 @@
+=== "AIOKafka"
+ ```python linenums="1" hl_lines="6 10 13 16"
+ {!> docs_src/getting_started/prometheus/kafka_asgi.py!}
+ ```
+
+=== "Confluent"
+ ```python linenums="1" hl_lines="6 10 13 16"
+ {!> docs_src/getting_started/prometheus/confluent_asgi.py!}
+ ```
+
+=== "RabbitMQ"
+ ```python linenums="1" hl_lines="6 10 13 16"
+ {!> docs_src/getting_started/prometheus/rabbit_asgi.py!}
+ ```
+
+=== "NATS"
+ ```python linenums="1" hl_lines="6 10 13 16"
+ {!> docs_src/getting_started/prometheus/nats_asgi.py!}
+ ```
+
+=== "Redis"
+ ```python linenums="1" hl_lines="6 10 13 16"
+ {!> docs_src/getting_started/prometheus/redis_asgi.py!}
+ ```
diff --git a/faststream/__about__.py b/faststream/__about__.py
index a829efe0e2..f350fa0af9 100644
--- a/faststream/__about__.py
+++ b/faststream/__about__.py
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""
-__version__ = "0.5.27"
+__version__ = "0.5.28"
SERVICE_NAME = f"faststream-{__version__}"
diff --git a/faststream/broker/message.py b/faststream/broker/message.py
index 82593e7cdc..e06e912593 100644
--- a/faststream/broker/message.py
+++ b/faststream/broker/message.py
@@ -63,13 +63,16 @@ class StreamMessage(Generic[MsgType]):
_decoded_body: Optional["DecodedMessage"] = field(default=None, init=False)
async def ack(self) -> None:
- self.committed = AckStatus.acked
+ if not self.committed:
+ self.committed = AckStatus.acked
async def nack(self) -> None:
- self.committed = AckStatus.nacked
+ if not self.committed:
+ self.committed = AckStatus.nacked
async def reject(self) -> None:
- self.committed = AckStatus.rejected
+ if not self.committed:
+ self.committed = AckStatus.rejected
async def decode(self) -> Optional["DecodedMessage"]:
"""Serialize the message by lazy decoder."""
diff --git a/faststream/confluent/message.py b/faststream/confluent/message.py
index 14fe05ae7b..83ee0e814b 100644
--- a/faststream/confluent/message.py
+++ b/faststream/confluent/message.py
@@ -66,7 +66,7 @@ async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if self.is_manual and not self.committed:
await self.consumer.commit()
- await super().ack()
+ await super().ack()
async def nack(self) -> None:
"""Reject the Kafka message."""
@@ -81,4 +81,4 @@ async def nack(self) -> None:
partition=raw_message.partition(),
offset=raw_message.offset(),
)
- await super().nack()
+ await super().nack()
diff --git a/faststream/confluent/prometheus/__init__.py b/faststream/confluent/prometheus/__init__.py
new file mode 100644
index 0000000000..7498fa5ddc
--- /dev/null
+++ b/faststream/confluent/prometheus/__init__.py
@@ -0,0 +1,3 @@
+from faststream.confluent.prometheus.middleware import KafkaPrometheusMiddleware
+
+__all__ = ("KafkaPrometheusMiddleware",)
diff --git a/faststream/confluent/prometheus/middleware.py b/faststream/confluent/prometheus/middleware.py
new file mode 100644
index 0000000000..2ac27dacea
--- /dev/null
+++ b/faststream/confluent/prometheus/middleware.py
@@ -0,0 +1,26 @@
+from typing import TYPE_CHECKING, Optional, Sequence
+
+from faststream.confluent.prometheus.provider import settings_provider_factory
+from faststream.prometheus.middleware import BasePrometheusMiddleware
+from faststream.types import EMPTY
+
+if TYPE_CHECKING:
+ from prometheus_client import CollectorRegistry
+
+
+class KafkaPrometheusMiddleware(BasePrometheusMiddleware):
+ def __init__(
+ self,
+ *,
+ registry: "CollectorRegistry",
+ app_name: str = EMPTY,
+ metrics_prefix: str = "faststream",
+ received_messages_size_buckets: Optional[Sequence[float]] = None,
+ ) -> None:
+ super().__init__(
+ settings_provider_factory=settings_provider_factory,
+ registry=registry,
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ received_messages_size_buckets=received_messages_size_buckets,
+ )
diff --git a/faststream/confluent/prometheus/provider.py b/faststream/confluent/prometheus/provider.py
new file mode 100644
index 0000000000..bdcb26728a
--- /dev/null
+++ b/faststream/confluent/prometheus/provider.py
@@ -0,0 +1,64 @@
+from typing import TYPE_CHECKING, Sequence, Tuple, Union, cast
+
+from faststream.broker.message import MsgType, StreamMessage
+from faststream.prometheus import (
+ ConsumeAttrs,
+ MetricsSettingsProvider,
+)
+
+if TYPE_CHECKING:
+ from confluent_kafka import Message
+
+ from faststream.types import AnyDict
+
+
+class BaseConfluentMetricsSettingsProvider(MetricsSettingsProvider[MsgType]):
+ __slots__ = ("messaging_system",)
+
+ def __init__(self) -> None:
+ self.messaging_system = "kafka"
+
+ def get_publish_destination_name_from_kwargs(
+ self,
+ kwargs: "AnyDict",
+ ) -> str:
+ return cast(str, kwargs["topic"])
+
+
+class ConfluentMetricsSettingsProvider(BaseConfluentMetricsSettingsProvider["Message"]):
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[Message]",
+ ) -> ConsumeAttrs:
+ return {
+ "destination_name": cast(str, msg.raw_message.topic()),
+ "message_size": len(msg.body),
+ "messages_count": 1,
+ }
+
+
+class BatchConfluentMetricsSettingsProvider(
+ BaseConfluentMetricsSettingsProvider[Tuple["Message", ...]]
+):
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[Tuple[Message, ...]]",
+ ) -> ConsumeAttrs:
+ raw_message = msg.raw_message[0]
+ return {
+ "destination_name": cast(str, raw_message.topic()),
+ "message_size": len(bytearray().join(cast(Sequence[bytes], msg.body))),
+ "messages_count": len(msg.raw_message),
+ }
+
+
+def settings_provider_factory(
+ msg: Union["Message", Sequence["Message"], None],
+) -> Union[
+ ConfluentMetricsSettingsProvider,
+ BatchConfluentMetricsSettingsProvider,
+]:
+ if isinstance(msg, Sequence):
+ return BatchConfluentMetricsSettingsProvider()
+ else:
+ return ConfluentMetricsSettingsProvider()
diff --git a/faststream/kafka/message.py b/faststream/kafka/message.py
index d83a57bf6a..bde7669787 100644
--- a/faststream/kafka/message.py
+++ b/faststream/kafka/message.py
@@ -77,7 +77,7 @@ async def nack(self) -> None:
partition=topic_partition,
offset=raw_message.offset,
)
- await super().nack()
+ await super().nack()
class KafkaAckableMessage(KafkaMessage):
@@ -85,4 +85,4 @@ async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if not self.committed:
await self.consumer.commit()
- await super().ack()
+ await super().ack()
diff --git a/faststream/kafka/prometheus/__init__.py b/faststream/kafka/prometheus/__init__.py
new file mode 100644
index 0000000000..e5ae7e2d4f
--- /dev/null
+++ b/faststream/kafka/prometheus/__init__.py
@@ -0,0 +1,3 @@
+from faststream.kafka.prometheus.middleware import KafkaPrometheusMiddleware
+
+__all__ = ("KafkaPrometheusMiddleware",)
diff --git a/faststream/kafka/prometheus/middleware.py b/faststream/kafka/prometheus/middleware.py
new file mode 100644
index 0000000000..3fd41edeba
--- /dev/null
+++ b/faststream/kafka/prometheus/middleware.py
@@ -0,0 +1,26 @@
+from typing import TYPE_CHECKING, Optional, Sequence
+
+from faststream.kafka.prometheus.provider import settings_provider_factory
+from faststream.prometheus.middleware import BasePrometheusMiddleware
+from faststream.types import EMPTY
+
+if TYPE_CHECKING:
+ from prometheus_client import CollectorRegistry
+
+
+class KafkaPrometheusMiddleware(BasePrometheusMiddleware):
+ def __init__(
+ self,
+ *,
+ registry: "CollectorRegistry",
+ app_name: str = EMPTY,
+ metrics_prefix: str = "faststream",
+ received_messages_size_buckets: Optional[Sequence[float]] = None,
+ ) -> None:
+ super().__init__(
+ settings_provider_factory=settings_provider_factory,
+ registry=registry,
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ received_messages_size_buckets=received_messages_size_buckets,
+ )
diff --git a/faststream/kafka/prometheus/provider.py b/faststream/kafka/prometheus/provider.py
new file mode 100644
index 0000000000..9caf118e1f
--- /dev/null
+++ b/faststream/kafka/prometheus/provider.py
@@ -0,0 +1,64 @@
+from typing import TYPE_CHECKING, Sequence, Tuple, Union, cast
+
+from faststream.broker.message import MsgType, StreamMessage
+from faststream.prometheus import (
+ MetricsSettingsProvider,
+)
+
+if TYPE_CHECKING:
+ from aiokafka import ConsumerRecord
+
+ from faststream.prometheus import ConsumeAttrs
+ from faststream.types import AnyDict
+
+
+class BaseKafkaMetricsSettingsProvider(MetricsSettingsProvider[MsgType]):
+ __slots__ = ("messaging_system",)
+
+ def __init__(self) -> None:
+ self.messaging_system = "kafka"
+
+ def get_publish_destination_name_from_kwargs(
+ self,
+ kwargs: "AnyDict",
+ ) -> str:
+ return cast(str, kwargs["topic"])
+
+
+class KafkaMetricsSettingsProvider(BaseKafkaMetricsSettingsProvider["ConsumerRecord"]):
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[ConsumerRecord]",
+ ) -> "ConsumeAttrs":
+ return {
+ "destination_name": msg.raw_message.topic,
+ "message_size": len(msg.body),
+ "messages_count": 1,
+ }
+
+
+class BatchKafkaMetricsSettingsProvider(
+ BaseKafkaMetricsSettingsProvider[Tuple["ConsumerRecord", ...]]
+):
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[Tuple[ConsumerRecord, ...]]",
+ ) -> "ConsumeAttrs":
+ raw_message = msg.raw_message[0]
+ return {
+ "destination_name": raw_message.topic,
+ "message_size": len(bytearray().join(cast(Sequence[bytes], msg.body))),
+ "messages_count": len(msg.raw_message),
+ }
+
+
+def settings_provider_factory(
+ msg: Union["ConsumerRecord", Sequence["ConsumerRecord"], None],
+) -> Union[
+ KafkaMetricsSettingsProvider,
+ BatchKafkaMetricsSettingsProvider,
+]:
+ if isinstance(msg, Sequence):
+ return BatchKafkaMetricsSettingsProvider()
+ else:
+ return KafkaMetricsSettingsProvider()
diff --git a/faststream/nats/message.py b/faststream/nats/message.py
index ee54ef2caa..0f104a3310 100644
--- a/faststream/nats/message.py
+++ b/faststream/nats/message.py
@@ -15,7 +15,7 @@ async def ack(self) -> None:
# to be compatible with `self.raw_message.ack()`
if not self.raw_message._ackd:
await self.raw_message.ack()
- await super().ack()
+ await super().ack()
async def nack(
self,
@@ -23,12 +23,12 @@ async def nack(
) -> None:
if not self.raw_message._ackd:
await self.raw_message.nak(delay=delay)
- await super().nack()
+ await super().nack()
async def reject(self) -> None:
if not self.raw_message._ackd:
await self.raw_message.term()
- await super().reject()
+ await super().reject()
async def in_progress(self) -> None:
if not self.raw_message._ackd:
diff --git a/faststream/nats/prometheus/__init__.py b/faststream/nats/prometheus/__init__.py
new file mode 100644
index 0000000000..564d3ea4f4
--- /dev/null
+++ b/faststream/nats/prometheus/__init__.py
@@ -0,0 +1,3 @@
+from faststream.nats.prometheus.middleware import NatsPrometheusMiddleware
+
+__all__ = ("NatsPrometheusMiddleware",)
diff --git a/faststream/nats/prometheus/middleware.py b/faststream/nats/prometheus/middleware.py
new file mode 100644
index 0000000000..3aadeb61d1
--- /dev/null
+++ b/faststream/nats/prometheus/middleware.py
@@ -0,0 +1,26 @@
+from typing import TYPE_CHECKING, Optional, Sequence
+
+from faststream.nats.prometheus.provider import settings_provider_factory
+from faststream.prometheus.middleware import BasePrometheusMiddleware
+from faststream.types import EMPTY
+
+if TYPE_CHECKING:
+ from prometheus_client import CollectorRegistry
+
+
+class NatsPrometheusMiddleware(BasePrometheusMiddleware):
+ def __init__(
+ self,
+ *,
+ registry: "CollectorRegistry",
+ app_name: str = EMPTY,
+ metrics_prefix: str = "faststream",
+ received_messages_size_buckets: Optional[Sequence[float]] = None,
+ ) -> None:
+ super().__init__(
+ settings_provider_factory=settings_provider_factory,
+ registry=registry,
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ received_messages_size_buckets=received_messages_size_buckets,
+ )
diff --git a/faststream/nats/prometheus/provider.py b/faststream/nats/prometheus/provider.py
new file mode 100644
index 0000000000..e6ac0a4684
--- /dev/null
+++ b/faststream/nats/prometheus/provider.py
@@ -0,0 +1,66 @@
+from typing import TYPE_CHECKING, List, Sequence, Union, cast
+
+from nats.aio.msg import Msg
+
+from faststream.broker.message import MsgType, StreamMessage
+from faststream.prometheus import (
+ ConsumeAttrs,
+ MetricsSettingsProvider,
+)
+
+if TYPE_CHECKING:
+ from faststream.types import AnyDict
+
+
+class BaseNatsMetricsSettingsProvider(MetricsSettingsProvider[MsgType]):
+ __slots__ = ("messaging_system",)
+
+ def __init__(self) -> None:
+ self.messaging_system = "nats"
+
+ def get_publish_destination_name_from_kwargs(
+ self,
+ kwargs: "AnyDict",
+ ) -> str:
+ return cast(str, kwargs["subject"])
+
+
+class NatsMetricsSettingsProvider(BaseNatsMetricsSettingsProvider["Msg"]):
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[Msg]",
+ ) -> ConsumeAttrs:
+ return {
+ "destination_name": msg.raw_message.subject,
+ "message_size": len(msg.body),
+ "messages_count": 1,
+ }
+
+
+class BatchNatsMetricsSettingsProvider(BaseNatsMetricsSettingsProvider[List["Msg"]]):
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[List[Msg]]",
+ ) -> ConsumeAttrs:
+ raw_message = msg.raw_message[0]
+ return {
+ "destination_name": raw_message.subject,
+ "message_size": len(msg.body),
+ "messages_count": len(msg.raw_message),
+ }
+
+
+def settings_provider_factory(
+ msg: Union["Msg", Sequence["Msg"], None],
+) -> Union[
+ NatsMetricsSettingsProvider,
+ BatchNatsMetricsSettingsProvider,
+ None,
+]:
+ if isinstance(msg, Sequence):
+ return BatchNatsMetricsSettingsProvider()
+ elif isinstance(msg, Msg) or msg is None:
+ return NatsMetricsSettingsProvider()
+ else:
+ # KeyValue and Object Storage watch cases
+ return None
diff --git a/faststream/prometheus/__init__.py b/faststream/prometheus/__init__.py
new file mode 100644
index 0000000000..8b21a09eee
--- /dev/null
+++ b/faststream/prometheus/__init__.py
@@ -0,0 +1,9 @@
+from faststream.prometheus.middleware import BasePrometheusMiddleware
+from faststream.prometheus.provider import MetricsSettingsProvider
+from faststream.prometheus.types import ConsumeAttrs
+
+__all__ = (
+ "BasePrometheusMiddleware",
+ "MetricsSettingsProvider",
+ "ConsumeAttrs",
+)
diff --git a/faststream/prometheus/consts.py b/faststream/prometheus/consts.py
new file mode 100644
index 0000000000..3c4648d333
--- /dev/null
+++ b/faststream/prometheus/consts.py
@@ -0,0 +1,17 @@
+from faststream.broker.message import AckStatus
+from faststream.exceptions import AckMessage, NackMessage, RejectMessage, SkipMessage
+from faststream.prometheus.types import ProcessingStatus
+
+PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP = {
+ AckMessage: ProcessingStatus.acked,
+ NackMessage: ProcessingStatus.nacked,
+ RejectMessage: ProcessingStatus.rejected,
+ SkipMessage: ProcessingStatus.skipped,
+}
+
+
+PROCESSING_STATUS_BY_ACK_STATUS = {
+ AckStatus.acked: ProcessingStatus.acked,
+ AckStatus.nacked: ProcessingStatus.nacked,
+ AckStatus.rejected: ProcessingStatus.rejected,
+}
diff --git a/faststream/prometheus/container.py b/faststream/prometheus/container.py
new file mode 100644
index 0000000000..6b5f813f63
--- /dev/null
+++ b/faststream/prometheus/container.py
@@ -0,0 +1,100 @@
+from typing import Optional, Sequence
+
+from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram
+
+
+class MetricsContainer:
+ __slots__ = (
+ "_registry",
+ "_metrics_prefix",
+ "received_messages_total",
+ "received_messages_size_bytes",
+ "received_processed_messages_duration_seconds",
+ "received_messages_in_process",
+ "received_processed_messages_total",
+ "received_processed_messages_exceptions_total",
+ "published_messages_total",
+ "published_messages_duration_seconds",
+ "published_messages_exceptions_total",
+ )
+
+ DEFAULT_SIZE_BUCKETS = (
+ 2.0**4,
+ 2.0**6,
+ 2.0**8,
+ 2.0**10,
+ 2.0**12,
+ 2.0**14,
+ 2.0**16,
+ 2.0**18,
+ 2.0**20,
+ 2.0**22,
+ 2.0**24,
+ float("inf"),
+ )
+
+ def __init__(
+ self,
+ registry: "CollectorRegistry",
+ *,
+ metrics_prefix: str = "faststream",
+ received_messages_size_buckets: Optional[Sequence[float]] = None,
+ ):
+ self._registry = registry
+ self._metrics_prefix = metrics_prefix
+
+ self.received_messages_total = Counter(
+ name=f"{metrics_prefix}_received_messages_total",
+ documentation="Count of received messages by broker and handler",
+ labelnames=["app_name", "broker", "handler"],
+ registry=registry,
+ )
+ self.received_messages_size_bytes = Histogram(
+ name=f"{metrics_prefix}_received_messages_size_bytes",
+ documentation="Histogram of received messages size in bytes by broker and handler",
+ labelnames=["app_name", "broker", "handler"],
+ registry=registry,
+ buckets=received_messages_size_buckets or self.DEFAULT_SIZE_BUCKETS,
+ )
+ self.received_messages_in_process = Gauge(
+ name=f"{metrics_prefix}_received_messages_in_process",
+ documentation="Gauge of received messages in process by broker and handler",
+ labelnames=["app_name", "broker", "handler"],
+ registry=registry,
+ )
+ self.received_processed_messages_total = Counter(
+ name=f"{metrics_prefix}_received_processed_messages_total",
+ documentation="Count of received processed messages by broker, handler and status",
+ labelnames=["app_name", "broker", "handler", "status"],
+ registry=registry,
+ )
+ self.received_processed_messages_duration_seconds = Histogram(
+ name=f"{metrics_prefix}_received_processed_messages_duration_seconds",
+ documentation="Histogram of received processed messages duration in seconds by broker and handler",
+ labelnames=["app_name", "broker", "handler"],
+ registry=registry,
+ )
+ self.received_processed_messages_exceptions_total = Counter(
+ name=f"{metrics_prefix}_received_processed_messages_exceptions_total",
+ documentation="Count of received processed messages exceptions by broker, handler and exception_type",
+ labelnames=["app_name", "broker", "handler", "exception_type"],
+ registry=registry,
+ )
+ self.published_messages_total = Counter(
+ name=f"{metrics_prefix}_published_messages_total",
+ documentation="Count of published messages by destination and status",
+ labelnames=["app_name", "broker", "destination", "status"],
+ registry=registry,
+ )
+ self.published_messages_duration_seconds = Histogram(
+ name=f"{metrics_prefix}_published_messages_duration_seconds",
+ documentation="Histogram of published messages duration in seconds by broker and destination",
+ labelnames=["app_name", "broker", "destination"],
+ registry=registry,
+ )
+ self.published_messages_exceptions_total = Counter(
+ name=f"{metrics_prefix}_published_messages_exceptions_total",
+ documentation="Count of published messages exceptions by broker, destination and exception_type",
+ labelnames=["app_name", "broker", "destination", "exception_type"],
+ registry=registry,
+ )
diff --git a/faststream/prometheus/manager.py b/faststream/prometheus/manager.py
new file mode 100644
index 0000000000..e2f7704f77
--- /dev/null
+++ b/faststream/prometheus/manager.py
@@ -0,0 +1,131 @@
+from faststream.prometheus.container import MetricsContainer
+from faststream.prometheus.types import ProcessingStatus, PublishingStatus
+
+
+class MetricsManager:
+ __slots__ = ("_container", "_app_name")
+
+ def __init__(self, container: MetricsContainer, *, app_name: str = "faststream"):
+ self._container = container
+ self._app_name = app_name
+
+ def add_received_message(self, broker: str, handler: str, amount: int = 1) -> None:
+ self._container.received_messages_total.labels(
+ app_name=self._app_name,
+ broker=broker,
+ handler=handler,
+ ).inc(amount)
+
+ def observe_received_messages_size(
+ self,
+ broker: str,
+ handler: str,
+ size: int,
+ ) -> None:
+ self._container.received_messages_size_bytes.labels(
+ app_name=self._app_name,
+ broker=broker,
+ handler=handler,
+ ).observe(size)
+
+ def add_received_message_in_process(
+ self,
+ broker: str,
+ handler: str,
+ amount: int = 1,
+ ) -> None:
+ self._container.received_messages_in_process.labels(
+ app_name=self._app_name,
+ broker=broker,
+ handler=handler,
+ ).inc(amount)
+
+ def remove_received_message_in_process(
+ self,
+ broker: str,
+ handler: str,
+ amount: int = 1,
+ ) -> None:
+ self._container.received_messages_in_process.labels(
+ app_name=self._app_name,
+ broker=broker,
+ handler=handler,
+ ).dec(amount)
+
+ def add_received_processed_message(
+ self,
+ broker: str,
+ handler: str,
+ status: ProcessingStatus,
+ amount: int = 1,
+ ) -> None:
+ self._container.received_processed_messages_total.labels(
+ app_name=self._app_name,
+ broker=broker,
+ handler=handler,
+ status=status.value,
+ ).inc(amount)
+
+ def observe_received_processed_message_duration(
+ self,
+ duration: float,
+ broker: str,
+ handler: str,
+ ) -> None:
+ self._container.received_processed_messages_duration_seconds.labels(
+ app_name=self._app_name,
+ broker=broker,
+ handler=handler,
+ ).observe(duration)
+
+ def add_received_processed_message_exception(
+ self,
+ broker: str,
+ handler: str,
+ exception_type: str,
+ ) -> None:
+ self._container.received_processed_messages_exceptions_total.labels(
+ app_name=self._app_name,
+ broker=broker,
+ handler=handler,
+ exception_type=exception_type,
+ ).inc()
+
+ def add_published_message(
+ self,
+ broker: str,
+ destination: str,
+ status: PublishingStatus,
+ amount: int = 1,
+ ) -> None:
+ self._container.published_messages_total.labels(
+ app_name=self._app_name,
+ broker=broker,
+ destination=destination,
+ status=status.value,
+ ).inc(amount)
+
+ def observe_published_message_duration(
+ self,
+ duration: float,
+ broker: str,
+ destination: str,
+ ) -> None:
+ self._container.published_messages_duration_seconds.labels(
+ app_name=self._app_name,
+ broker=broker,
+ destination=destination,
+ ).observe(duration)
+
+ def add_published_message_exception(
+ self,
+ broker: str,
+ destination: str,
+ exception_type: str,
+ ) -> None:
+ self._container.published_messages_exceptions_total.labels(
+ app_name=self._app_name,
+ broker=broker,
+ destination=destination,
+ exception_type=exception_type,
+ ).inc()
diff --git a/faststream/prometheus/middleware.py b/faststream/prometheus/middleware.py
new file mode 100644
index 0000000000..575d846342
--- /dev/null
+++ b/faststream/prometheus/middleware.py
@@ -0,0 +1,202 @@
+import time
+from typing import TYPE_CHECKING, Any, Callable, Optional, Sequence
+
+from faststream import BaseMiddleware
+from faststream.prometheus.consts import (
+ PROCESSING_STATUS_BY_ACK_STATUS,
+ PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
+)
+from faststream.prometheus.container import MetricsContainer
+from faststream.prometheus.manager import MetricsManager
+from faststream.prometheus.provider import MetricsSettingsProvider
+from faststream.prometheus.types import ProcessingStatus, PublishingStatus
+from faststream.types import EMPTY
+
+if TYPE_CHECKING:
+ from prometheus_client import CollectorRegistry
+
+ from faststream.broker.message import StreamMessage
+ from faststream.types import AsyncFunc, AsyncFuncAny
+
+
+class PrometheusMiddleware(BaseMiddleware):
+ def __init__(
+ self,
+ msg: Optional[Any] = None,
+ *,
+ settings_provider_factory: Callable[
+ [Any], Optional[MetricsSettingsProvider[Any]]
+ ],
+ metrics_manager: MetricsManager,
+ ) -> None:
+ self._metrics_manager = metrics_manager
+ self._settings_provider = settings_provider_factory(msg)
+ super().__init__(msg)
+
+ async def consume_scope(
+ self,
+ call_next: "AsyncFuncAny",
+ msg: "StreamMessage[Any]",
+ ) -> Any:
+ if self._settings_provider is None:
+ return await call_next(msg)
+
+ messaging_system = self._settings_provider.messaging_system
+ consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg)
+ destination_name = consume_attrs["destination_name"]
+
+ self._metrics_manager.add_received_message(
+ amount=consume_attrs["messages_count"],
+ broker=messaging_system,
+ handler=destination_name,
+ )
+
+ self._metrics_manager.observe_received_messages_size(
+ size=consume_attrs["message_size"],
+ broker=messaging_system,
+ handler=destination_name,
+ )
+
+ self._metrics_manager.add_received_message_in_process(
+ amount=consume_attrs["messages_count"],
+ broker=messaging_system,
+ handler=destination_name,
+ )
+
+ err: Optional[Exception] = None
+ start_time = time.perf_counter()
+
+ try:
+ result = await call_next(await self.on_consume(msg))
+
+ except Exception as e:
+ err = e
+ self._metrics_manager.add_received_processed_message_exception(
+ exception_type=type(err).__name__,
+ broker=messaging_system,
+ handler=destination_name,
+ )
+ raise
+
+ finally:
+ duration = time.perf_counter() - start_time
+ self._metrics_manager.observe_received_processed_message_duration(
+ duration=duration,
+ broker=messaging_system,
+ handler=destination_name,
+ )
+
+ self._metrics_manager.remove_received_message_in_process(
+ amount=consume_attrs["messages_count"],
+ broker=messaging_system,
+ handler=destination_name,
+ )
+
+ status = ProcessingStatus.acked
+
+ if msg.committed or err:
+ status = (
+ PROCESSING_STATUS_BY_ACK_STATUS.get(msg.committed) # type: ignore[arg-type]
+ or PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err))
+ or ProcessingStatus.error
+ )
+
+ self._metrics_manager.add_received_processed_message(
+ amount=consume_attrs["messages_count"],
+ status=status,
+ broker=messaging_system,
+ handler=destination_name,
+ )
+
+ return result
+
+ async def publish_scope(
+ self,
+ call_next: "AsyncFunc",
+ msg: Any,
+ *args: Any,
+ **kwargs: Any,
+ ) -> Any:
+ if self._settings_provider is None:
+ return await call_next(msg, *args, **kwargs)
+
+ destination_name = (
+ self._settings_provider.get_publish_destination_name_from_kwargs(kwargs)
+ )
+ messaging_system = self._settings_provider.messaging_system
+
+ err: Optional[Exception] = None
+ start_time = time.perf_counter()
+
+ try:
+ result = await call_next(
+ await self.on_publish(msg, *args, **kwargs),
+ *args,
+ **kwargs,
+ )
+
+ except Exception as e:
+ err = e
+ self._metrics_manager.add_published_message_exception(
+ exception_type=type(err).__name__,
+ broker=messaging_system,
+ destination=destination_name,
+ )
+ raise
+
+ finally:
+ duration = time.perf_counter() - start_time
+
+ self._metrics_manager.observe_published_message_duration(
+ duration=duration,
+ broker=messaging_system,
+ destination=destination_name,
+ )
+
+ status = PublishingStatus.error if err else PublishingStatus.success
+ messages_count = len((msg, *args))
+
+ self._metrics_manager.add_published_message(
+ amount=messages_count,
+ status=status,
+ broker=messaging_system,
+ destination=destination_name,
+ )
+
+ return result
+
+
+class BasePrometheusMiddleware:
+ __slots__ = ("_metrics_container", "_metrics_manager", "_settings_provider_factory")
+
+ def __init__(
+ self,
+ *,
+ settings_provider_factory: Callable[
+ [Any], Optional[MetricsSettingsProvider[Any]]
+ ],
+ registry: "CollectorRegistry",
+ app_name: str = EMPTY,
+ metrics_prefix: str = "faststream",
+ received_messages_size_buckets: Optional[Sequence[float]] = None,
+ ):
+ if app_name is EMPTY:
+ app_name = metrics_prefix
+
+ self._settings_provider_factory = settings_provider_factory
+ self._metrics_container = MetricsContainer(
+ registry,
+ metrics_prefix=metrics_prefix,
+ received_messages_size_buckets=received_messages_size_buckets,
+ )
+ self._metrics_manager = MetricsManager(
+ self._metrics_container,
+ app_name=app_name,
+ )
+
+ def __call__(self, msg: Optional[Any]) -> BaseMiddleware:
+ return PrometheusMiddleware(
+ msg=msg,
+ metrics_manager=self._metrics_manager,
+ settings_provider_factory=self._settings_provider_factory,
+ )
diff --git a/faststream/prometheus/provider.py b/faststream/prometheus/provider.py
new file mode 100644
index 0000000000..1a543f5b55
--- /dev/null
+++ b/faststream/prometheus/provider.py
@@ -0,0 +1,22 @@
+from typing import TYPE_CHECKING, Protocol
+
+from faststream.broker.message import MsgType
+
+if TYPE_CHECKING:
+ from faststream.broker.message import StreamMessage
+ from faststream.prometheus import ConsumeAttrs
+ from faststream.types import AnyDict
+
+
+class MetricsSettingsProvider(Protocol[MsgType]):
+ messaging_system: str
+
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[MsgType]",
+ ) -> "ConsumeAttrs": ...
+
+ def get_publish_destination_name_from_kwargs(
+ self,
+ kwargs: "AnyDict",
+ ) -> str: ...
diff --git a/faststream/prometheus/types.py b/faststream/prometheus/types.py
new file mode 100644
index 0000000000..ae6ffb7161
--- /dev/null
+++ b/faststream/prometheus/types.py
@@ -0,0 +1,21 @@
+from enum import Enum
+from typing import TypedDict
+
+
+class ProcessingStatus(str, Enum):
+ acked = "acked"
+ nacked = "nacked"
+ rejected = "rejected"
+ skipped = "skipped"
+ error = "error"
+
+
+class PublishingStatus(str, Enum):
+ success = "success"
+ error = "error"
+
+
+class ConsumeAttrs(TypedDict):
+ message_size: int
+ destination_name: str
+ messages_count: int
diff --git a/faststream/rabbit/prometheus/__init__.py b/faststream/rabbit/prometheus/__init__.py
new file mode 100644
index 0000000000..bdb07907ee
--- /dev/null
+++ b/faststream/rabbit/prometheus/__init__.py
@@ -0,0 +1,3 @@
+from faststream.rabbit.prometheus.middleware import RabbitPrometheusMiddleware
+
+__all__ = ("RabbitPrometheusMiddleware",)
diff --git a/faststream/rabbit/prometheus/middleware.py b/faststream/rabbit/prometheus/middleware.py
new file mode 100644
index 0000000000..b2f96e45ca
--- /dev/null
+++ b/faststream/rabbit/prometheus/middleware.py
@@ -0,0 +1,26 @@
+from typing import TYPE_CHECKING, Optional, Sequence
+
+from faststream.prometheus.middleware import BasePrometheusMiddleware
+from faststream.rabbit.prometheus.provider import RabbitMetricsSettingsProvider
+from faststream.types import EMPTY
+
+if TYPE_CHECKING:
+ from prometheus_client import CollectorRegistry
+
+
+class RabbitPrometheusMiddleware(BasePrometheusMiddleware):
+ def __init__(
+ self,
+ *,
+ registry: "CollectorRegistry",
+ app_name: str = EMPTY,
+ metrics_prefix: str = "faststream",
+ received_messages_size_buckets: Optional[Sequence[float]] = None,
+ ) -> None:
+ super().__init__(
+ settings_provider_factory=lambda _: RabbitMetricsSettingsProvider(),
+ registry=registry,
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ received_messages_size_buckets=received_messages_size_buckets,
+ )
diff --git a/faststream/rabbit/prometheus/provider.py b/faststream/rabbit/prometheus/provider.py
new file mode 100644
index 0000000000..48c1bb2541
--- /dev/null
+++ b/faststream/rabbit/prometheus/provider.py
@@ -0,0 +1,44 @@
+from typing import TYPE_CHECKING, Union
+
+from faststream.prometheus import (
+ ConsumeAttrs,
+ MetricsSettingsProvider,
+)
+
+if TYPE_CHECKING:
+ from aio_pika import IncomingMessage
+
+ from faststream.broker.message import StreamMessage
+ from faststream.rabbit.schemas.exchange import RabbitExchange
+ from faststream.types import AnyDict
+
+
+class RabbitMetricsSettingsProvider(MetricsSettingsProvider["IncomingMessage"]):
+ __slots__ = ("messaging_system",)
+
+ def __init__(self) -> None:
+ self.messaging_system = "rabbitmq"
+
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[IncomingMessage]",
+ ) -> ConsumeAttrs:
+ exchange = msg.raw_message.exchange or "default"
+ routing_key = msg.raw_message.routing_key
+
+ return {
+ "destination_name": f"{exchange}.{routing_key}",
+ "message_size": len(msg.body),
+ "messages_count": 1,
+ }
+
+ def get_publish_destination_name_from_kwargs(
+ self,
+ kwargs: "AnyDict",
+ ) -> str:
+ exchange: Union[None, str, RabbitExchange] = kwargs.get("exchange")
+ exchange_prefix = getattr(exchange, "name", exchange or "default")
+
+ routing_key: str = kwargs["routing_key"]
+
+ return f"{exchange_prefix}.{routing_key}"
diff --git a/faststream/redis/message.py b/faststream/redis/message.py
index 8bce4005c8..86cc9b3d96 100644
--- a/faststream/redis/message.py
+++ b/faststream/redis/message.py
@@ -128,7 +128,7 @@ async def ack(
ids = self.raw_message["message_ids"]
channel = self.raw_message["channel"]
await redis.xack(channel, group, *ids) # type: ignore[no-untyped-call]
- await super().ack()
+ await super().ack()
@override
async def nack(
diff --git a/faststream/redis/prometheus/__init__.py b/faststream/redis/prometheus/__init__.py
new file mode 100644
index 0000000000..84c831aabb
--- /dev/null
+++ b/faststream/redis/prometheus/__init__.py
@@ -0,0 +1,3 @@
+from faststream.redis.prometheus.middleware import RedisPrometheusMiddleware
+
+__all__ = ("RedisPrometheusMiddleware",)
diff --git a/faststream/redis/prometheus/middleware.py b/faststream/redis/prometheus/middleware.py
new file mode 100644
index 0000000000..1b157cb5a9
--- /dev/null
+++ b/faststream/redis/prometheus/middleware.py
@@ -0,0 +1,26 @@
+from typing import TYPE_CHECKING, Optional, Sequence
+
+from faststream.prometheus.middleware import BasePrometheusMiddleware
+from faststream.redis.prometheus.provider import settings_provider_factory
+from faststream.types import EMPTY
+
+if TYPE_CHECKING:
+ from prometheus_client import CollectorRegistry
+
+
+class RedisPrometheusMiddleware(BasePrometheusMiddleware):
+ def __init__(
+ self,
+ *,
+ registry: "CollectorRegistry",
+ app_name: str = EMPTY,
+ metrics_prefix: str = "faststream",
+ received_messages_size_buckets: Optional[Sequence[float]] = None,
+ ) -> None:
+ super().__init__(
+ settings_provider_factory=settings_provider_factory,
+ registry=registry,
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ received_messages_size_buckets=received_messages_size_buckets,
+ )
diff --git a/faststream/redis/prometheus/provider.py b/faststream/redis/prometheus/provider.py
new file mode 100644
index 0000000000..51eb831669
--- /dev/null
+++ b/faststream/redis/prometheus/provider.py
@@ -0,0 +1,63 @@
+from typing import TYPE_CHECKING, Optional, Sized, Union, cast
+
+from faststream.prometheus import (
+ ConsumeAttrs,
+ MetricsSettingsProvider,
+)
+
+if TYPE_CHECKING:
+ from faststream.broker.message import StreamMessage
+ from faststream.types import AnyDict
+
+
+class BaseRedisMetricsSettingsProvider(MetricsSettingsProvider["AnyDict"]):
+ __slots__ = ("messaging_system",)
+
+ def __init__(self) -> None:
+ self.messaging_system = "redis"
+
+ def get_publish_destination_name_from_kwargs(
+ self,
+ kwargs: "AnyDict",
+ ) -> str:
+ return self._get_destination(kwargs)
+
+ @staticmethod
+ def _get_destination(kwargs: "AnyDict") -> str:
+ return kwargs.get("channel") or kwargs.get("list") or kwargs.get("stream") or ""
+
+
+class RedisMetricsSettingsProvider(BaseRedisMetricsSettingsProvider):
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[AnyDict]",
+ ) -> ConsumeAttrs:
+ return {
+ "destination_name": self._get_destination(msg.raw_message),
+ "message_size": len(msg.body),
+ "messages_count": 1,
+ }
+
+
+class BatchRedisMetricsSettingsProvider(BaseRedisMetricsSettingsProvider):
+ def get_consume_attrs_from_message(
+ self,
+ msg: "StreamMessage[AnyDict]",
+ ) -> ConsumeAttrs:
+ return {
+ "destination_name": self._get_destination(msg.raw_message),
+ "message_size": len(msg.body),
+ "messages_count": len(cast(Sized, msg._decoded_body)),
+ }
+
+
+def settings_provider_factory(
+ msg: Optional["AnyDict"],
+) -> Union[
+ RedisMetricsSettingsProvider,
+ BatchRedisMetricsSettingsProvider,
+]:
+ if msg is not None and msg.get("type", "").startswith("b"):
+ return BatchRedisMetricsSettingsProvider()
+ else:
+ return RedisMetricsSettingsProvider()
diff --git a/pyproject.toml b/pyproject.toml
index 350d53ed3e..7867cfc450 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -83,8 +83,10 @@ cli = [
"watchfiles>=0.15.0,<0.25.0"
]
+prometheus = ["prometheus-client>=0.20.0,<0.30.0"]
+
# dev dependencies
-optionals = ["faststream[rabbit,kafka,confluent,nats,redis,otel,cli]"]
+optionals = ["faststream[rabbit,kafka,confluent,nats,redis,otel,cli,prometheus]"]
devdocs = [
"mkdocs-material==9.5.40",
diff --git a/tests/brokers/test_pushback.py b/tests/brokers/test_pushback.py
index 9f18aa038b..ac56078cb0 100644
--- a/tests/brokers/test_pushback.py
+++ b/tests/brokers/test_pushback.py
@@ -12,7 +12,7 @@
@pytest.fixture
def message():
- return AsyncMock(message_id=1)
+ return AsyncMock(message_id=1, committed=None)
@pytest.mark.asyncio
diff --git a/tests/prometheus/__init__.py b/tests/prometheus/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/prometheus/basic.py b/tests/prometheus/basic.py
new file mode 100644
index 0000000000..f2d9a5d6cf
--- /dev/null
+++ b/tests/prometheus/basic.py
@@ -0,0 +1,204 @@
+import asyncio
+from typing import Any, Optional, Type
+from unittest.mock import ANY, Mock, call
+
+import pytest
+from prometheus_client import CollectorRegistry
+
+from faststream import Context
+from faststream.broker.message import AckStatus
+from faststream.exceptions import RejectMessage
+from faststream.prometheus.middleware import (
+ PROCESSING_STATUS_BY_ACK_STATUS,
+ PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
+)
+from faststream.prometheus.types import ProcessingStatus
+from tests.brokers.base.basic import BaseTestcaseConfig
+
+
+@pytest.mark.asyncio
+class LocalPrometheusTestcase(BaseTestcaseConfig):
+ def get_broker(self, apply_types=False, **kwargs):
+ raise NotImplementedError
+
+ def get_middleware(self, **kwargs):
+ raise NotImplementedError
+
+ @staticmethod
+ def consume_destination_name(queue: str) -> str:
+ return queue
+
+ @property
+ def settings_provider_factory(self):
+ return self.get_middleware(
+ registry=CollectorRegistry()
+ )._settings_provider_factory
+
+ @pytest.mark.parametrize(
+ (
+ "status",
+ "exception_class",
+ ),
+ [
+ pytest.param(
+ AckStatus.acked,
+ RejectMessage,
+ id="acked status with reject message exception",
+ ),
+ pytest.param(
+ AckStatus.acked, Exception, id="acked status with not handler exception"
+ ),
+ pytest.param(AckStatus.acked, None, id="acked status without exception"),
+ pytest.param(AckStatus.nacked, None, id="nacked status without exception"),
+ pytest.param(
+ AckStatus.rejected, None, id="rejected status without exception"
+ ),
+ ],
+ )
+ async def test_metrics(
+ self,
+ event: asyncio.Event,
+ queue: str,
+ status: AckStatus,
+ exception_class: Optional[Type[Exception]],
+ ):
+ middleware = self.get_middleware(registry=CollectorRegistry())
+ metrics_manager_mock = Mock()
+ middleware._metrics_manager = metrics_manager_mock
+
+ broker = self.get_broker(apply_types=True, middlewares=(middleware,))
+
+ args, kwargs = self.get_subscriber_params(queue)
+
+ message = None
+
+ @broker.subscriber(*args, **kwargs)
+ async def handler(m=Context("message")):
+ event.set()
+
+ nonlocal message
+ message = m
+
+ if exception_class:
+ raise exception_class
+
+ if status == AckStatus.acked:
+ await message.ack()
+ elif status == AckStatus.nacked:
+ await message.nack()
+ elif status == AckStatus.rejected:
+ await message.reject()
+
+ async with broker:
+ await broker.start()
+ tasks = (
+ asyncio.create_task(broker.publish("hello", queue)),
+ asyncio.create_task(event.wait()),
+ )
+ await asyncio.wait(tasks, timeout=self.timeout)
+
+ assert event.is_set()
+ self.assert_consume_metrics(
+ metrics_manager=metrics_manager_mock,
+ message=message,
+ exception_class=exception_class,
+ )
+ self.assert_publish_metrics(metrics_manager=metrics_manager_mock)
+
+ def assert_consume_metrics(
+ self,
+ *,
+ metrics_manager: Any,
+ message: Any,
+ exception_class: Optional[Type[Exception]],
+ ):
+ settings_provider = self.settings_provider_factory(message.raw_message)
+ consume_attrs = settings_provider.get_consume_attrs_from_message(message)
+ assert metrics_manager.add_received_message.mock_calls == [
+ call(
+ amount=consume_attrs["messages_count"],
+ broker=settings_provider.messaging_system,
+ handler=consume_attrs["destination_name"],
+ ),
+ ]
+
+ assert metrics_manager.observe_received_messages_size.mock_calls == [
+ call(
+ size=consume_attrs["message_size"],
+ broker=settings_provider.messaging_system,
+ handler=consume_attrs["destination_name"],
+ ),
+ ]
+
+ assert metrics_manager.add_received_message_in_process.mock_calls == [
+ call(
+ amount=consume_attrs["messages_count"],
+ broker=settings_provider.messaging_system,
+ handler=consume_attrs["destination_name"],
+ ),
+ ]
+ assert metrics_manager.remove_received_message_in_process.mock_calls == [
+ call(
+ amount=consume_attrs["messages_count"],
+ broker=settings_provider.messaging_system,
+ handler=consume_attrs["destination_name"],
+ )
+ ]
+
+ assert (
+ metrics_manager.observe_received_processed_message_duration.mock_calls
+ == [
+ call(
+ duration=ANY,
+ broker=settings_provider.messaging_system,
+ handler=consume_attrs["destination_name"],
+ ),
+ ]
+ )
+
+ status = ProcessingStatus.acked
+
+ if exception_class:
+ status = (
+ PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(exception_class)
+ or ProcessingStatus.error
+ )
+ elif message.committed:
+ status = PROCESSING_STATUS_BY_ACK_STATUS[message.committed]
+
+ assert metrics_manager.add_received_processed_message.mock_calls == [
+ call(
+ amount=consume_attrs["messages_count"],
+ broker=settings_provider.messaging_system,
+ handler=consume_attrs["destination_name"],
+ status=status.value,
+ ),
+ ]
+
+ if status == ProcessingStatus.error:
+ assert (
+ metrics_manager.add_received_processed_message_exception.mock_calls
+ == [
+ call(
+ broker=settings_provider.messaging_system,
+ handler=consume_attrs["destination_name"],
+ exception_type=exception_class.__name__,
+ ),
+ ]
+ )
+
+ def assert_publish_metrics(self, metrics_manager: Any):
+ settings_provider = self.settings_provider_factory(None)
+ assert metrics_manager.observe_published_message_duration.mock_calls == [
+ call(
+ duration=ANY, broker=settings_provider.messaging_system, destination=ANY
+ ),
+ ]
+ assert metrics_manager.add_published_message.mock_calls == [
+ call(
+ amount=ANY,
+ broker=settings_provider.messaging_system,
+ destination=ANY,
+ status="success",
+ ),
+ ]
diff --git a/tests/prometheus/confluent/__init__.py b/tests/prometheus/confluent/__init__.py
new file mode 100644
index 0000000000..c4a1803708
--- /dev/null
+++ b/tests/prometheus/confluent/__init__.py
@@ -0,0 +1,3 @@
+import pytest
+
+pytest.importorskip("confluent_kafka")
diff --git a/tests/prometheus/confluent/test_confluent.py b/tests/prometheus/confluent/test_confluent.py
new file mode 100644
index 0000000000..d1e3034ad6
--- /dev/null
+++ b/tests/prometheus/confluent/test_confluent.py
@@ -0,0 +1,79 @@
+import asyncio
+from unittest.mock import Mock
+
+import pytest
+from prometheus_client import CollectorRegistry
+
+from faststream import Context
+from faststream.confluent import KafkaBroker
+from faststream.confluent.prometheus.middleware import KafkaPrometheusMiddleware
+from tests.brokers.confluent.basic import ConfluentTestcaseConfig
+from tests.brokers.confluent.test_consume import TestConsume
+from tests.brokers.confluent.test_publish import TestPublish
+from tests.prometheus.basic import LocalPrometheusTestcase
+
+
+@pytest.mark.confluent
+class TestPrometheus(ConfluentTestcaseConfig, LocalPrometheusTestcase):
+ def get_broker(self, apply_types=False, **kwargs):
+ return KafkaBroker(apply_types=apply_types, **kwargs)
+
+ def get_middleware(self, **kwargs):
+ return KafkaPrometheusMiddleware(**kwargs)
+
+ async def test_metrics_batch(
+ self,
+ event: asyncio.Event,
+ queue: str,
+ ):
+ middleware = self.get_middleware(registry=CollectorRegistry())
+ metrics_manager_mock = Mock()
+ middleware._metrics_manager = metrics_manager_mock
+
+ broker = self.get_broker(apply_types=True, middlewares=(middleware,))
+
+ args, kwargs = self.get_subscriber_params(queue, batch=True)
+ message = None
+
+ @broker.subscriber(*args, **kwargs)
+ async def handler(m=Context("message")):
+ event.set()
+
+ nonlocal message
+ message = m
+
+ async with broker:
+ await broker.start()
+ tasks = (
+ asyncio.create_task(
+ broker.publish_batch("hello", "world", topic=queue)
+ ),
+ asyncio.create_task(event.wait()),
+ )
+ await asyncio.wait(tasks, timeout=self.timeout)
+
+ assert event.is_set()
+ self.assert_consume_metrics(
+ metrics_manager=metrics_manager_mock, message=message, exception_class=None
+ )
+ self.assert_publish_metrics(metrics_manager=metrics_manager_mock)
+
+
+@pytest.mark.confluent
+class TestPublishWithPrometheus(TestPublish):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return KafkaBroker(
+ middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
+
+
+@pytest.mark.confluent
+class TestConsumeWithPrometheus(TestConsume):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return KafkaBroker(
+ middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
diff --git a/tests/prometheus/kafka/__init__.py b/tests/prometheus/kafka/__init__.py
new file mode 100644
index 0000000000..bd6bc708fc
--- /dev/null
+++ b/tests/prometheus/kafka/__init__.py
@@ -0,0 +1,3 @@
+import pytest
+
+pytest.importorskip("aiokafka")
diff --git a/tests/prometheus/kafka/test_kafka.py b/tests/prometheus/kafka/test_kafka.py
new file mode 100644
index 0000000000..abb5c86b3f
--- /dev/null
+++ b/tests/prometheus/kafka/test_kafka.py
@@ -0,0 +1,82 @@
+import asyncio
+from unittest.mock import Mock
+
+import pytest
+from prometheus_client import CollectorRegistry
+
+from faststream import Context
+from faststream.kafka import KafkaBroker
+from faststream.kafka.prometheus.middleware import KafkaPrometheusMiddleware
+from tests.brokers.kafka.test_consume import TestConsume
+from tests.brokers.kafka.test_publish import TestPublish
+from tests.prometheus.basic import LocalPrometheusTestcase
+
+
+@pytest.mark.kafka
+class TestPrometheus(LocalPrometheusTestcase):
+ def get_broker(self, apply_types=False, **kwargs):
+ return KafkaBroker(apply_types=apply_types, **kwargs)
+
+ def get_middleware(self, **kwargs):
+ return KafkaPrometheusMiddleware(**kwargs)
+
+ async def test_metrics_batch(
+ self,
+ event: asyncio.Event,
+ queue: str,
+ ):
+ middleware = self.get_middleware(registry=CollectorRegistry())
+ metrics_manager_mock = Mock()
+ middleware._metrics_manager = metrics_manager_mock
+
+ broker = self.get_broker(apply_types=True, middlewares=(middleware,))
+
+ args, kwargs = self.get_subscriber_params(queue, batch=True)
+ message = None
+
+ @broker.subscriber(*args, **kwargs)
+ async def handler(m=Context("message")):
+ event.set()
+
+ nonlocal message
+ message = m
+
+ async with broker:
+ await broker.start()
+ tasks = (
+ asyncio.create_task(
+ broker.publish_batch("hello", "world", topic=queue)
+ ),
+ asyncio.create_task(event.wait()),
+ )
+ await asyncio.wait(tasks, timeout=self.timeout)
+
+ assert event.is_set()
+ self.assert_consume_metrics(
+ metrics_manager=metrics_manager_mock, message=message, exception_class=None
+ )
+ self.assert_publish_metrics(metrics_manager=metrics_manager_mock)
+
+
+@pytest.mark.kafka
+class TestPublishWithPrometheus(TestPublish):
+ def get_broker(
+ self,
+ apply_types: bool = False,
+ **kwargs,
+ ):
+ return KafkaBroker(
+ middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
+
+
+@pytest.mark.kafka
+class TestConsumeWithPrometheus(TestConsume):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return KafkaBroker(
+ middlewares=(KafkaPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
diff --git a/tests/prometheus/nats/__init__.py b/tests/prometheus/nats/__init__.py
new file mode 100644
index 0000000000..87ead90ee6
--- /dev/null
+++ b/tests/prometheus/nats/__init__.py
@@ -0,0 +1,3 @@
+import pytest
+
+pytest.importorskip("nats")
diff --git a/tests/prometheus/nats/test_nats.py b/tests/prometheus/nats/test_nats.py
new file mode 100644
index 0000000000..f65eb41e85
--- /dev/null
+++ b/tests/prometheus/nats/test_nats.py
@@ -0,0 +1,86 @@
+import asyncio
+from unittest.mock import Mock
+
+import pytest
+from prometheus_client import CollectorRegistry
+
+from faststream import Context
+from faststream.nats import JStream, NatsBroker, PullSub
+from faststream.nats.prometheus.middleware import NatsPrometheusMiddleware
+from tests.brokers.nats.test_consume import TestConsume
+from tests.brokers.nats.test_publish import TestPublish
+from tests.prometheus.basic import LocalPrometheusTestcase
+
+
+@pytest.fixture
+def stream(queue):
+ return JStream(queue)
+
+
+@pytest.mark.nats
+class TestPrometheus(LocalPrometheusTestcase):
+ def get_broker(self, apply_types=False, **kwargs):
+ return NatsBroker(apply_types=apply_types, **kwargs)
+
+ def get_middleware(self, **kwargs):
+ return NatsPrometheusMiddleware(**kwargs)
+
+ async def test_metrics_batch(
+ self,
+ event: asyncio.Event,
+ queue: str,
+ stream: JStream,
+ ):
+ middleware = self.get_middleware(registry=CollectorRegistry())
+ metrics_manager_mock = Mock()
+ middleware._metrics_manager = metrics_manager_mock
+
+ broker = self.get_broker(apply_types=True, middlewares=(middleware,))
+
+ args, kwargs = self.get_subscriber_params(
+ queue,
+ stream=stream,
+ pull_sub=PullSub(1, batch=True, timeout=self.timeout),
+ )
+ message = None
+
+ @broker.subscriber(*args, **kwargs)
+ async def handler(m=Context("message")):
+ event.set()
+
+ nonlocal message
+ message = m
+
+ async with broker:
+ await broker.start()
+ tasks = (
+ asyncio.create_task(broker.publish("hello", queue)),
+ asyncio.create_task(event.wait()),
+ )
+ await asyncio.wait(tasks, timeout=self.timeout)
+
+ assert event.is_set()
+ self.assert_consume_metrics(
+ metrics_manager=metrics_manager_mock, message=message, exception_class=None
+ )
+ self.assert_publish_metrics(metrics_manager=metrics_manager_mock)
+
+
+@pytest.mark.nats
+class TestPublishWithPrometheus(TestPublish):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return NatsBroker(
+ middlewares=(NatsPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
+
+
+@pytest.mark.nats
+class TestConsumeWithPrometheus(TestConsume):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return NatsBroker(
+ middlewares=(NatsPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
diff --git a/tests/prometheus/rabbit/__init__.py b/tests/prometheus/rabbit/__init__.py
new file mode 100644
index 0000000000..ebec43fcd5
--- /dev/null
+++ b/tests/prometheus/rabbit/__init__.py
@@ -0,0 +1,3 @@
+import pytest
+
+pytest.importorskip("aio_pika")
diff --git a/tests/prometheus/rabbit/test_rabbit.py b/tests/prometheus/rabbit/test_rabbit.py
new file mode 100644
index 0000000000..6eef6d224f
--- /dev/null
+++ b/tests/prometheus/rabbit/test_rabbit.py
@@ -0,0 +1,42 @@
+import pytest
+from prometheus_client import CollectorRegistry
+
+from faststream.rabbit import RabbitBroker, RabbitExchange
+from faststream.rabbit.prometheus.middleware import RabbitPrometheusMiddleware
+from tests.brokers.rabbit.test_consume import TestConsume
+from tests.brokers.rabbit.test_publish import TestPublish
+from tests.prometheus.basic import LocalPrometheusTestcase
+
+
+@pytest.fixture
+def exchange(queue):
+ return RabbitExchange(name=queue)
+
+
+@pytest.mark.rabbit
+class TestPrometheus(LocalPrometheusTestcase):
+ def get_broker(self, apply_types=False, **kwargs):
+ return RabbitBroker(apply_types=apply_types, **kwargs)
+
+ def get_middleware(self, **kwargs):
+ return RabbitPrometheusMiddleware(**kwargs)
+
+
+@pytest.mark.rabbit
+class TestPublishWithPrometheus(TestPublish):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return RabbitBroker(
+ middlewares=(RabbitPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
+
+
+@pytest.mark.rabbit
+class TestConsumeWithPrometheus(TestConsume):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return RabbitBroker(
+ middlewares=(RabbitPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
diff --git a/tests/prometheus/redis/__init__.py b/tests/prometheus/redis/__init__.py
new file mode 100644
index 0000000000..4752ef19b1
--- /dev/null
+++ b/tests/prometheus/redis/__init__.py
@@ -0,0 +1,3 @@
+import pytest
+
+pytest.importorskip("redis")
diff --git a/tests/prometheus/redis/test_redis.py b/tests/prometheus/redis/test_redis.py
new file mode 100644
index 0000000000..4059c33d48
--- /dev/null
+++ b/tests/prometheus/redis/test_redis.py
@@ -0,0 +1,77 @@
+import asyncio
+from unittest.mock import Mock
+
+import pytest
+from prometheus_client import CollectorRegistry
+
+from faststream import Context
+from faststream.redis import ListSub, RedisBroker
+from faststream.redis.prometheus.middleware import RedisPrometheusMiddleware
+from tests.brokers.redis.test_consume import TestConsume
+from tests.brokers.redis.test_publish import TestPublish
+from tests.prometheus.basic import LocalPrometheusTestcase
+
+
+@pytest.mark.redis
+class TestPrometheus(LocalPrometheusTestcase):
+ def get_broker(self, apply_types=False, **kwargs):
+ return RedisBroker(apply_types=apply_types, **kwargs)
+
+ def get_middleware(self, **kwargs):
+ return RedisPrometheusMiddleware(**kwargs)
+
+ async def test_metrics_batch(
+ self,
+ event: asyncio.Event,
+ queue: str,
+ ):
+ middleware = self.get_middleware(registry=CollectorRegistry())
+ metrics_manager_mock = Mock()
+ middleware._metrics_manager = metrics_manager_mock
+
+ broker = self.get_broker(apply_types=True, middlewares=(middleware,))
+
+ args, kwargs = self.get_subscriber_params(list=ListSub(queue, batch=True))
+
+ message = None
+
+ @broker.subscriber(*args, **kwargs)
+ async def handler(m=Context("message")):
+ event.set()
+
+ nonlocal message
+ message = m
+
+ async with broker:
+ await broker.start()
+ tasks = (
+ asyncio.create_task(broker.publish_batch("hello", "world", list=queue)),
+ asyncio.create_task(event.wait()),
+ )
+ await asyncio.wait(tasks, timeout=self.timeout)
+
+ assert event.is_set()
+ self.assert_consume_metrics(
+ metrics_manager=metrics_manager_mock, message=message, exception_class=None
+ )
+ self.assert_publish_metrics(metrics_manager=metrics_manager_mock)
+
+
+@pytest.mark.redis
+class TestPublishWithPrometheus(TestPublish):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return RedisBroker(
+ middlewares=(RedisPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
+
+
+@pytest.mark.redis
+class TestConsumeWithPrometheus(TestConsume):
+ def get_broker(self, apply_types: bool = False, **kwargs):
+ return RedisBroker(
+ middlewares=(RedisPrometheusMiddleware(registry=CollectorRegistry()),),
+ apply_types=apply_types,
+ **kwargs,
+ )
diff --git a/tests/prometheus/test_metrics.py b/tests/prometheus/test_metrics.py
new file mode 100644
index 0000000000..7f9aa85771
--- /dev/null
+++ b/tests/prometheus/test_metrics.py
@@ -0,0 +1,644 @@
+import random
+from typing import List, Optional
+from unittest.mock import ANY
+
+import pytest
+from dirty_equals import IsPositiveFloat, IsStr
+from prometheus_client import CollectorRegistry, Histogram, Metric
+from prometheus_client.samples import Sample
+
+from faststream.prometheus.container import MetricsContainer
+from faststream.prometheus.manager import MetricsManager
+from faststream.prometheus.types import ProcessingStatus, PublishingStatus
+
+
+class TestCaseMetrics:
+ @staticmethod
+ def create_metrics_manager(
+ app_name: Optional[str] = None,
+ metrics_prefix: Optional[str] = None,
+ received_messages_size_buckets: Optional[List[float]] = None,
+ ) -> MetricsManager:
+ registry = CollectorRegistry()
+ container = MetricsContainer(
+ registry,
+ metrics_prefix=metrics_prefix,
+ received_messages_size_buckets=received_messages_size_buckets,
+ )
+ return MetricsManager(container, app_name=app_name)
+
+ @pytest.fixture
+ def app_name(self, request) -> str:
+ return "youtube"
+
+ @pytest.fixture
+ def metrics_prefix(self, request) -> str:
+ return "fs"
+
+ @pytest.fixture
+ def broker(self) -> str:
+ return "rabbit"
+
+ @pytest.fixture
+ def queue(self) -> str:
+ return "default.test"
+
+ @pytest.fixture
+ def messages_amount(self) -> int:
+ return random.randint(1, 10)
+
+ @pytest.fixture
+ def exception_type(self) -> str:
+ return Exception.__name__
+
+ def test_add_received_message(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ messages_amount: int,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ expected = Metric(
+ name=f"{metrics_prefix}_received_messages",
+ documentation="Count of received messages by broker and handler",
+ unit="",
+ typ="counter",
+ )
+ expected.samples = [
+ Sample(
+ name=f"{metrics_prefix}_received_messages_total",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=float(messages_amount),
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_received_messages_created",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=IsPositiveFloat,
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.add_received_message(
+ amount=messages_amount, broker=broker, handler=queue
+ )
+
+ metric_values = manager._container.received_messages_total.collect()
+
+ assert metric_values == [expected]
+
+ @pytest.mark.parametrize(
+ "is_default_buckets",
+ [
+ pytest.param(True, id="with default buckets"),
+ pytest.param(False, id="with custom buckets"),
+ ],
+ )
+ def test_observe_received_messages_size(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ is_default_buckets: bool,
+ ) -> None:
+ manager_kwargs = {
+ "app_name": app_name,
+ "metrics_prefix": metrics_prefix,
+ }
+
+ custom_buckets = [1.0, 2.0, 3.0, float("inf")]
+
+ if not is_default_buckets:
+ manager_kwargs["received_messages_size_buckets"] = custom_buckets
+
+ manager = self.create_metrics_manager(**manager_kwargs)
+
+ size = 1
+ buckets = (
+ MetricsContainer.DEFAULT_SIZE_BUCKETS
+ if is_default_buckets
+ else custom_buckets
+ )
+
+ expected = Metric(
+ name=f"{metrics_prefix}_received_messages_size_bytes",
+ documentation="Histogram of received messages size in bytes by broker and handler",
+ unit="",
+ typ="histogram",
+ )
+ expected.samples = [
+ *[
+ Sample(
+ name=f"{metrics_prefix}_received_messages_size_bytes_bucket",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "handler": queue,
+ "le": IsStr,
+ },
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ )
+ for _ in buckets
+ ],
+ Sample(
+ name=f"{metrics_prefix}_received_messages_size_bytes_count",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_received_messages_size_bytes_sum",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=size,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_received_messages_size_bytes_created",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=ANY,
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.observe_received_messages_size(size=size, broker=broker, handler=queue)
+
+ metric_values = manager._container.received_messages_size_bytes.collect()
+
+ assert metric_values == [expected]
+
+ def test_add_received_message_in_process(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ messages_amount: int,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ expected = Metric(
+ name=f"{metrics_prefix}_received_messages_in_process",
+ documentation="Gauge of received messages in process by broker and handler",
+ unit="",
+ typ="gauge",
+ )
+ expected.samples = [
+ Sample(
+ name=f"{metrics_prefix}_received_messages_in_process",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=float(messages_amount),
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.add_received_message_in_process(
+ amount=messages_amount, broker=broker, handler=queue
+ )
+
+ metric_values = manager._container.received_messages_in_process.collect()
+
+ assert metric_values == [expected]
+
+ def test_remove_received_message_in_process(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ messages_amount: int,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ expected = Metric(
+ name=f"{metrics_prefix}_received_messages_in_process",
+ documentation="Gauge of received messages in process by broker and handler",
+ unit="",
+ typ="gauge",
+ )
+ expected.samples = [
+ Sample(
+ name=f"{metrics_prefix}_received_messages_in_process",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=float(messages_amount - 1),
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.add_received_message_in_process(
+ amount=messages_amount, broker=broker, handler=queue
+ )
+ manager.remove_received_message_in_process(
+ amount=1, broker=broker, handler=queue
+ )
+
+ metric_values = manager._container.received_messages_in_process.collect()
+
+ assert metric_values == [expected]
+
+ @pytest.mark.parametrize(
+ "status",
+ [
+ pytest.param(ProcessingStatus.acked, id="acked status"),
+ pytest.param(ProcessingStatus.nacked, id="nacked status"),
+ pytest.param(ProcessingStatus.rejected, id="rejected status"),
+ pytest.param(ProcessingStatus.skipped, id="skipped status"),
+ pytest.param(ProcessingStatus.error, id="error status"),
+ ],
+ )
+ def test_add_received_processed_message(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ messages_amount: int,
+ status: ProcessingStatus,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ expected = Metric(
+ name=f"{metrics_prefix}_received_processed_messages",
+ documentation="Count of received processed messages by broker, handler and status",
+ unit="",
+ typ="counter",
+ )
+ expected.samples = [
+ Sample(
+ name=f"{metrics_prefix}_received_processed_messages_total",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "handler": queue,
+ "status": status.value,
+ },
+ value=float(messages_amount),
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_received_processed_messages_created",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "handler": queue,
+ "status": status.value,
+ },
+ value=IsPositiveFloat,
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.add_received_processed_message(
+ amount=messages_amount,
+ status=status,
+ broker=broker,
+ handler=queue,
+ )
+
+ metric_values = manager._container.received_processed_messages_total.collect()
+
+ assert metric_values == [expected]
+
+ def test_observe_received_processed_message_duration(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ duration = 0.001
+
+ expected = Metric(
+ name=f"{metrics_prefix}_received_processed_messages_duration_seconds",
+ documentation="Histogram of received processed messages duration in seconds by broker and handler",
+ unit="",
+ typ="histogram",
+ )
+ expected.samples = [
+ *[
+ Sample(
+ name=f"{metrics_prefix}_received_processed_messages_duration_seconds_bucket",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "handler": queue,
+ "le": IsStr,
+ },
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ )
+ for _ in Histogram.DEFAULT_BUCKETS
+ ],
+ Sample(
+ name=f"{metrics_prefix}_received_processed_messages_duration_seconds_count",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_received_processed_messages_duration_seconds_sum",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=duration,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_received_processed_messages_duration_seconds_created",
+ labels={"app_name": app_name, "broker": broker, "handler": queue},
+ value=ANY,
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.observe_received_processed_message_duration(
+ duration=duration,
+ broker=broker,
+ handler=queue,
+ )
+
+ metric_values = (
+ manager._container.received_processed_messages_duration_seconds.collect()
+ )
+
+ assert metric_values == [expected]
+
+ def test_add_received_processed_message_exception(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ exception_type: str,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ expected = Metric(
+ name=f"{metrics_prefix}_received_processed_messages_exceptions",
+ documentation="Count of received processed messages exceptions by broker, handler and exception_type",
+ unit="",
+ typ="counter",
+ )
+ expected.samples = [
+ Sample(
+ name=f"{metrics_prefix}_received_processed_messages_exceptions_total",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "handler": queue,
+ "exception_type": exception_type,
+ },
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_received_processed_messages_exceptions_created",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "handler": queue,
+ "exception_type": exception_type,
+ },
+ value=IsPositiveFloat,
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.add_received_processed_message_exception(
+ exception_type=exception_type,
+ broker=broker,
+ handler=queue,
+ )
+
+ metric_values = (
+ manager._container.received_processed_messages_exceptions_total.collect()
+ )
+
+ assert metric_values == [expected]
+
+ @pytest.mark.parametrize(
+ "status",
+ [
+ pytest.param(PublishingStatus.success, id="success status"),
+ pytest.param(PublishingStatus.error, id="error status"),
+ ],
+ )
+ def test_add_published_message(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ messages_amount: int,
+ status: PublishingStatus,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ expected = Metric(
+ name=f"{metrics_prefix}_published_messages",
+ documentation="Count of published messages by destination and status",
+ unit="",
+ typ="counter",
+ )
+ expected.samples = [
+ Sample(
+ name=f"{metrics_prefix}_published_messages_total",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "destination": queue,
+ "status": status.value,
+ },
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_published_messages_created",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "destination": queue,
+ "status": status.value,
+ },
+ value=IsPositiveFloat,
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.add_published_message(
+ status=status,
+ broker=broker,
+ destination=queue,
+ )
+
+ metric_values = manager._container.published_messages_total.collect()
+
+ assert metric_values == [expected]
+
+ def test_observe_published_message_duration(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ duration = 0.001
+
+ expected = Metric(
+ name=f"{metrics_prefix}_published_messages_duration_seconds",
+ documentation="Histogram of published messages duration in seconds by broker and destination",
+ unit="",
+ typ="histogram",
+ )
+ expected.samples = [
+ *[
+ Sample(
+ name=f"{metrics_prefix}_published_messages_duration_seconds_bucket",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "destination": queue,
+ "le": IsStr,
+ },
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ )
+ for _ in Histogram.DEFAULT_BUCKETS
+ ],
+ Sample(
+ name=f"{metrics_prefix}_published_messages_duration_seconds_count",
+ labels={"app_name": app_name, "broker": broker, "destination": queue},
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_published_messages_duration_seconds_sum",
+ labels={"app_name": app_name, "broker": broker, "destination": queue},
+ value=duration,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_published_messages_duration_seconds_created",
+ labels={"app_name": app_name, "broker": broker, "destination": queue},
+ value=IsPositiveFloat,
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.observe_published_message_duration(
+ duration=duration,
+ broker=broker,
+ destination=queue,
+ )
+
+ metric_values = manager._container.published_messages_duration_seconds.collect()
+
+ assert metric_values == [expected]
+
+ def test_add_published_message_exception(
+ self,
+ app_name: str,
+ metrics_prefix: str,
+ queue: str,
+ broker: str,
+ exception_type: str,
+ ) -> None:
+ manager = self.create_metrics_manager(
+ app_name=app_name,
+ metrics_prefix=metrics_prefix,
+ )
+
+ expected = Metric(
+ name=f"{metrics_prefix}_published_messages_exceptions",
+ documentation="Count of published messages exceptions by broker, destination and exception_type",
+ unit="",
+ typ="counter",
+ )
+ expected.samples = [
+ Sample(
+ name=f"{metrics_prefix}_published_messages_exceptions_total",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "destination": queue,
+ "exception_type": exception_type,
+ },
+ value=1.0,
+ timestamp=None,
+ exemplar=None,
+ ),
+ Sample(
+ name=f"{metrics_prefix}_published_messages_exceptions_created",
+ labels={
+ "app_name": app_name,
+ "broker": broker,
+ "destination": queue,
+ "exception_type": exception_type,
+ },
+ value=IsPositiveFloat,
+ timestamp=None,
+ exemplar=None,
+ ),
+ ]
+
+ manager.add_published_message_exception(
+ exception_type=exception_type,
+ broker=broker,
+ destination=queue,
+ )
+
+ metric_values = manager._container.published_messages_exceptions_total.collect()
+
+ assert metric_values == [expected]