diff --git a/src/v/cluster/partition_probe.cc b/src/v/cluster/partition_probe.cc index bbb069e07f964..2607ccba06757 100644 --- a/src/v/cluster/partition_probe.cc +++ b/src/v/cluster/partition_probe.cc @@ -167,6 +167,35 @@ void replicated_partition_probe::setup_internal_metrics(const model::ntp& ntp) { {}, {sm::shard_label, partition_label}); + if (model::is_user_topic(_partition.ntp())) { + _metrics.add_group( + cluster_metrics_name, + { + sm::make_gauge( + "iceberg_offsets_pending_translation", + [this] { + return _partition.log()->config().iceberg_enabled() + ? _iceberg_translation_offset_lag + : metric_feature_disabled_state; + }, + sm::description("Total number of offsets that are pending " + "translation to iceberg."), + labels), + sm::make_gauge( + "iceberg_offsets_pending_commit", + [this] { + return _partition.log()->config().iceberg_enabled() + ? _iceberg_commit_offset_lag + : metric_feature_disabled_state; + }, + sm::description("Total number of offsets that are pending " + "commit to iceberg catalog."), + labels), + }, + {}, + {sm::shard_label, partition_label}); + } + if ( config::shard_local_cfg().enable_schema_id_validation() != pandaproxy::schema_registry::schema_id_validation_mode::none) { diff --git a/src/v/cluster/partition_probe.h b/src/v/cluster/partition_probe.h index 93d83fc4c98e6..7d8c10aaaf6af 100644 --- a/src/v/cluster/partition_probe.h +++ b/src/v/cluster/partition_probe.h @@ -31,6 +31,8 @@ class partition_probe { virtual void add_bytes_fetched(uint64_t) = 0; virtual void add_bytes_fetched_from_follower(uint64_t) = 0; virtual void add_schema_id_validation_failed() = 0; + virtual void update_iceberg_translation_offset_lag(int64_t) = 0; + virtual void update_iceberg_commit_offset_lag(int64_t) = 0; virtual void setup_metrics(const model::ntp&) = 0; virtual void clear_metrics() = 0; virtual ~impl() noexcept = default; @@ -66,6 +68,14 @@ class partition_probe { _impl->add_schema_id_validation_failed(); } + void update_iceberg_translation_offset_lag(int64_t new_lag) { + _impl->update_iceberg_translation_offset_lag(new_lag); + } + + void update_iceberg_commit_offset_lag(int64_t new_lag) { + _impl->update_iceberg_commit_offset_lag(new_lag); + } + void clear_metrics() { _impl->clear_metrics(); } private: @@ -88,6 +98,14 @@ class replicated_partition_probe : public partition_probe::impl { ++_schema_id_validation_records_failed; }; + void update_iceberg_translation_offset_lag(int64_t new_lag) final { + _iceberg_translation_offset_lag = new_lag; + } + + void update_iceberg_commit_offset_lag(int64_t new_lag) final { + _iceberg_commit_offset_lag = new_lag; + } + void clear_metrics() final; private: @@ -98,6 +116,8 @@ class replicated_partition_probe : public partition_probe::impl { void setup_public_scrubber_metric(const model::ntp&); private: + static constexpr int64_t metric_default_initialized_state{-2}; + static constexpr int64_t metric_feature_disabled_state{-1}; const partition& _partition; uint64_t _records_produced{0}; uint64_t _records_fetched{0}; @@ -105,6 +125,8 @@ class replicated_partition_probe : public partition_probe::impl { uint64_t _bytes_fetched{0}; uint64_t _bytes_fetched_from_follower{0}; uint64_t _schema_id_validation_records_failed{0}; + int64_t _iceberg_translation_offset_lag{metric_default_initialized_state}; + int64_t _iceberg_commit_offset_lag{metric_default_initialized_state}; metrics::internal_metric_groups _metrics; metrics::public_metric_groups _public_metrics; }; diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 5e96ad7bae099..3612aa268664b 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -387,8 +387,8 @@ coordinator::sync_add_files( co_return std::nullopt; } -ss::future, coordinator::errc>> -coordinator::sync_get_last_added_offset( +ss::future> +coordinator::sync_get_last_added_offsets( model::topic_partition tp, model::revision_id requested_topic_rev) { auto gate = maybe_gate(); if (gate.has_error()) { @@ -400,7 +400,7 @@ coordinator::sync_get_last_added_offset( } auto topic_it = stm_->state().topic_to_state.find(tp.topic); if (topic_it == stm_->state().topic_to_state.end()) { - co_return std::nullopt; + co_return last_offsets{std::nullopt, std::nullopt}; } const auto& topic = topic_it->second; if (requested_topic_rev < topic.revision) { @@ -415,7 +415,7 @@ coordinator::sync_get_last_added_offset( if (topic.lifecycle_state == topic_state::lifecycle_state_t::purged) { // Coordinator is ready to accept files for the new topic revision, // but there is no stm record yet. Reply with "no offset". - co_return std::nullopt; + co_return last_offsets{std::nullopt, std::nullopt}; } vlog( @@ -438,13 +438,16 @@ coordinator::sync_get_last_added_offset( auto partition_it = topic.pid_to_pending_files.find(tp.partition); if (partition_it == topic.pid_to_pending_files.end()) { - co_return std::nullopt; + co_return last_offsets{std::nullopt, std::nullopt}; } const auto& prt_state = partition_it->second; if (prt_state.pending_entries.empty()) { - co_return prt_state.last_committed; + co_return last_offsets{ + prt_state.last_committed, prt_state.last_committed}; } - co_return prt_state.pending_entries.back().data.last_offset; + co_return last_offsets{ + prt_state.pending_entries.back().data.last_offset, + prt_state.last_committed}; } void coordinator::notify_leadership(std::optional leader_id) { diff --git a/src/v/datalake/coordinator/coordinator.h b/src/v/datalake/coordinator/coordinator.h index b43b6fd869b30..049c77e60af88 100644 --- a/src/v/datalake/coordinator/coordinator.h +++ b/src/v/datalake/coordinator/coordinator.h @@ -65,8 +65,11 @@ class coordinator { model::revision_id topic_revision, chunked_vector); - ss::future, errc>> - sync_get_last_added_offset( + struct last_offsets { + std::optional last_added_offset; + std::optional last_committed_offset; + }; + ss::future> sync_get_last_added_offsets( model::topic_partition tp, model::revision_id topic_rev); void notify_leadership(std::optional); diff --git a/src/v/datalake/coordinator/frontend.cc b/src/v/datalake/coordinator/frontend.cc index 45fe4241f336f..1b8fcb235363f 100644 --- a/src/v/datalake/coordinator/frontend.cc +++ b/src/v/datalake/coordinator/frontend.cc @@ -80,12 +80,14 @@ ss::future fetch_latest_offset( if (!crd) { co_return fetch_latest_translated_offset_reply{errc::not_leader}; } - auto ret = co_await crd->sync_get_last_added_offset( + auto ret = co_await crd->sync_get_last_added_offsets( req.tp, req.topic_revision); if (ret.has_error()) { co_return to_rpc_errc(ret.error()); } - co_return fetch_latest_translated_offset_reply{ret.value()}; + auto& val = ret.value(); + co_return fetch_latest_translated_offset_reply{ + val.last_added_offset, val.last_committed_offset}; } } // namespace diff --git a/src/v/datalake/coordinator/tests/coordinator_test.cc b/src/v/datalake/coordinator/tests/coordinator_test.cc index 06c0905bd065d..2c31e8af7ffc5 100644 --- a/src/v/datalake/coordinator/tests/coordinator_test.cc +++ b/src/v/datalake/coordinator/tests/coordinator_test.cc @@ -116,7 +116,7 @@ ss::future<> file_adder_loop( while (!done) { co_await random_sleep_ms(30); vlog(datalake::datalake_log.debug, "[{}] getting last added", id); - auto last_res = co_await n.crd.sync_get_last_added_offset( + auto last_res = co_await n.crd.sync_get_last_added_offsets( tp, topic_rev); if (last_res.has_error()) { continue; @@ -126,7 +126,7 @@ ss::future<> file_adder_loop( if (ensure_res.has_error()) { continue; } - auto cur_last_opt = last_res.value(); + auto cur_last_opt = last_res.value().last_added_offset; while (true) { co_await random_sleep_ms(30); if (cur_last_opt && cur_last_opt.value()() == last_offset) { @@ -402,14 +402,14 @@ TEST_F(CoordinatorTest, TestLastAddedHappyPath) { ASSERT_FALSE(add_res.has_error()) << add_res.error(); } - auto last_res = leader.crd.sync_get_last_added_offset(tp00, rev).get(); + auto last_res = leader.crd.sync_get_last_added_offsets(tp00, rev).get(); ASSERT_FALSE(last_res.has_error()) << last_res.error(); - ASSERT_TRUE(last_res.value().has_value()); - ASSERT_EQ(400, last_res.value().value()()); + ASSERT_TRUE(last_res.value().last_added_offset.has_value()); + ASSERT_EQ(400, last_res.value().last_added_offset.value()()); - last_res = leader.crd.sync_get_last_added_offset(tp01, rev).get(); + last_res = leader.crd.sync_get_last_added_offsets(tp01, rev).get(); ASSERT_FALSE(last_res.has_error()) << last_res.error(); - ASSERT_FALSE(last_res.value().has_value()); + ASSERT_FALSE(last_res.value().last_added_offset.has_value()); } TEST_F(CoordinatorTest, TestNotLeader) { @@ -435,7 +435,7 @@ TEST_F(CoordinatorTest, TestNotLeader) { ASSERT_TRUE(add_res.has_error()); EXPECT_EQ(coordinator::errc::not_leader, add_res.error()); - auto last_res = non_leader.crd.sync_get_last_added_offset(tp00, rev).get(); + auto last_res = non_leader.crd.sync_get_last_added_offsets(tp00, rev).get(); ASSERT_TRUE(last_res.has_error()) << last_res.error(); EXPECT_EQ(coordinator::errc::not_leader, last_res.error()); } diff --git a/src/v/datalake/coordinator/tests/state_machine_test.cc b/src/v/datalake/coordinator/tests/state_machine_test.cc index 0abe1b477b2db..78985f83e12a1 100644 --- a/src/v/datalake/coordinator/tests/state_machine_test.cc +++ b/src/v/datalake/coordinator/tests/state_machine_test.cc @@ -154,13 +154,14 @@ TEST_F_CORO(coordinator_stm_fixture, test_snapshots) { auto add_files_result = co_await retry_with_leader_coordinator( [&, this](coordinator& coordinator) mutable { auto tp = random_tp(); - return coordinator->sync_get_last_added_offset(tp, rev).then( + return coordinator->sync_get_last_added_offsets(tp, rev).then( [&, tp](auto result) { if (!result) { return ss::make_ready_future(false); } auto last_committed_offset = kafka::offset_cast( - result.value().value_or(kafka::offset{-1})); + result.value().last_added_offset.value_or( + kafka::offset{-1})); std::vector> offset_pairs; offset_pairs.reserve(5); auto next_offset = last_committed_offset() + 1; diff --git a/src/v/datalake/coordinator/types.h b/src/v/datalake/coordinator/types.h index b802ac9a49aa4..ba37346bfa4d3 100644 --- a/src/v/datalake/coordinator/types.h +++ b/src/v/datalake/coordinator/types.h @@ -169,13 +169,17 @@ struct fetch_latest_translated_offset_reply explicit fetch_latest_translated_offset_reply(errc err) : errc(err) {} explicit fetch_latest_translated_offset_reply( - std::optional o) - : last_added_offset(o) + std::optional last_added, + std::optional last_committed) + : last_added_offset(last_added) + , last_iceberg_committed_offset(last_committed) , errc(errc::ok) {} // The offset of the latest data file added to the coordinator. std::optional last_added_offset; + std::optional last_iceberg_committed_offset; + // If not ok, the request processing has a problem. errc errc; diff --git a/src/v/datalake/translation/partition_translator.cc b/src/v/datalake/translation/partition_translator.cc index ab86c580c7545..036289a04748f 100644 --- a/src/v/datalake/translation/partition_translator.cc +++ b/src/v/datalake/translation/partition_translator.cc @@ -300,6 +300,7 @@ partition_translator::do_translate_once(retry_chain_node& parent_rcn) { read_begin_offset, read_end_offset, _partition->last_stable_offset()); + _partition->probe().update_iceberg_translation_offset_lag(0); co_return translation_success::yes; } // We have some data to translate, make a reader @@ -338,14 +339,46 @@ partition_translator::do_translate_once(retry_chain_node& parent_rcn) { parent_rcn, std::move(kafka_reader), read_begin_offset); units.return_all(); vlog(_logger.debug, "translation result: {}", translation_result); - units.return_all(); + auto result = translation_success::no; + auto max_translated_offset = kafka::prev_offset(read_begin_offset); + if (translation_result) { + auto last_translated_offset = translation_result->last_offset; + if (co_await checkpoint_translated_data( + parent_rcn, + read_begin_offset, + std::move(translation_result.value()))) { + max_translated_offset = last_translated_offset; + result = translation_success::yes; + } + } + update_translation_lag(max_translated_offset); + co_return result; +} + +void partition_translator::update_translation_lag( + kafka::offset max_translated_offset) const { + auto max_translatable_offset = max_offset_for_translation(); if ( - translation_result - && co_await checkpoint_translated_data( - parent_rcn, read_begin_offset, std::move(translation_result.value()))) { - co_return translation_success::yes; + !max_translatable_offset + || max_translatable_offset.value() < kafka::offset{0}) { + return; + } + auto offset_lag = max_translatable_offset.value() + - std::max(max_translated_offset, kafka::offset{-1}); + _partition->probe().update_iceberg_translation_offset_lag(offset_lag); +} + +void partition_translator::update_commit_lag( + std::optional max_committed_offset) const { + auto max_translatable_offset = max_offset_for_translation(); + if ( + !max_translatable_offset + || max_translatable_offset.value() < kafka::offset{0}) { + return; } - co_return translation_success::no; + auto offset_lag = max_translatable_offset.value() + - max_committed_offset.value_or(kafka::offset{-1}); + _partition->probe().update_iceberg_commit_offset_lag(offset_lag); } ss::future @@ -409,6 +442,7 @@ partition_translator::reconcile_with_coordinator() { vlog(_logger.warn, "reconciliation failed, response: {}", resp); co_return std::nullopt; } + update_commit_lag(resp.last_iceberg_committed_offset); // No file entry signifies the translation was just enabled on the // topic. In such a case we start translation from the local start // of the log. The underlying assumption is that there is a reasonable diff --git a/src/v/datalake/translation/partition_translator.h b/src/v/datalake/translation/partition_translator.h index 57aa2a595d9a7..e8902b6a914e4 100644 --- a/src/v/datalake/translation/partition_translator.h +++ b/src/v/datalake/translation/partition_translator.h @@ -94,6 +94,11 @@ class partition_translator { ss::future<> do_translate(); + void + update_translation_lag(kafka::offset max_translated_kafka_offset) const; + void update_commit_lag( + std::optional max_committed_kafka_offset) const; + using translation_success = ss::bool_class; ss::future do_translate_once(retry_chain_node& parent); ss::future make_reader(); diff --git a/tests/rptest/tests/datalake/datalake_e2e_test.py b/tests/rptest/tests/datalake/datalake_e2e_test.py index 860a31939aded..fc6c73386e70e 100644 --- a/tests/rptest/tests/datalake/datalake_e2e_test.py +++ b/tests/rptest/tests/datalake/datalake_e2e_test.py @@ -11,6 +11,7 @@ from rptest.clients.types import TopicSpec from rptest.clients.rpk import RpkTool from rptest.services.cluster import cluster +from random import randint from rptest.services.redpanda import PandaproxyConfig, SchemaRegistryConfig, SISettings from rptest.services.serde_client import SerdeClient @@ -21,6 +22,12 @@ from rptest.tests.datalake.utils import supported_storage_types from ducktape.mark import matrix from ducktape.utils.util import wait_until +from rptest.services.metrics_check import MetricCheck + +NO_SCHEMA_ERRORS = [ + r'Must have parsed schema when using structured data mode', + r'Error translating data to binary record' +] class DatalakeE2ETests(RedpandaTest): @@ -184,3 +191,46 @@ def table_deleted(): dl.create_iceberg_enabled_topic(self.topic_name, partitions=5) dl.produce_to_topic(self.topic_name, 1024, count) dl.wait_for_translation(self.topic_name, msg_count=count) + + @cluster(num_nodes=3, log_allow_list=NO_SCHEMA_ERRORS) + @matrix(cloud_storage_type=supported_storage_types()) + def test_metrics(self, cloud_storage_type): + + commit_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_commit' + translation_lag = 'vectorized_cluster_partition_iceberg_offsets_pending_translation' + + with DatalakeServices(self.test_ctx, + redpanda=self.redpanda, + filesystem_catalog_mode=False, + include_query_engines=[]) as dl: + + dl.create_iceberg_enabled_topic( + self.topic_name, + partitions=1, + replicas=1, + iceberg_mode="value_schema_id_prefix") + count = randint(12, 21) + # Populate schemaless messages in schema-ed mode, this should + # hold up translation and commits + dl.produce_to_topic(self.topic_name, 1024, msg_count=count) + + m = MetricCheck(self.redpanda.logger, + self.redpanda, + self.redpanda.nodes[0], + [commit_lag, translation_lag], + labels={ + 'namespace': 'kafka', + 'topic': self.topic_name, + 'partition': '0' + }, + reduce=sum) + expectations = [] + for metric in [commit_lag, translation_lag]: + expectations.append([metric, lambda _, val: val == count]) + + # Ensure lag metric builds up as expected. + wait_until( + lambda: m.evaluate(expectations), + timeout_sec=30, + backoff_sec=5, + err_msg=f"Timed out waiting for metrics to reach: {count}")