diff --git a/src/v/storage/compaction_reducers.cc b/src/v/storage/compaction_reducers.cc index 9422b5d0051ce..6e6b1002b8627 100644 --- a/src/v/storage/compaction_reducers.cc +++ b/src/v/storage/compaction_reducers.cc @@ -276,12 +276,22 @@ copy_data_segment_reducer::filter(model::record_batch batch) { ss::future copy_data_segment_reducer::filter_and_append( model::compression original, model::record_batch b) { + ++_stats.batches_processed; using stop_t = ss::stop_iteration; + const auto record_count_before = b.record_count(); auto to_copy = co_await filter(std::move(b)); if (to_copy == std::nullopt) { + ++_stats.batches_discarded; + _stats.records_discarded += record_count_before; co_return stop_t::no; } + const auto records_to_remove = record_count_before + - to_copy->record_count(); + _stats.records_discarded += records_to_remove; bool compactible_batch = is_compactible(to_copy.value()); + if (!compactible_batch) { + ++_stats.non_compactible_batches; + } if (_compacted_idx && compactible_batch) { co_await model::for_each_record( to_copy.value(), diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index a4a935954dda4..fbb72f5702fad 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -121,6 +121,40 @@ class copy_data_segment_reducer : public compaction_reducer { public: using filter_t = ss::noncopyable_function( const model::record_batch&, const model::record&, bool)>; + struct stats { + // Total number of batches passed to this reducer. + size_t batches_processed{0}; + // Number of batches that were completely removed. + size_t batches_discarded{0}; + // Number of records removed by this reducer, including batches that + // were entirely removed. + size_t records_discarded{0}; + // Number of batches that were ignored because they are not + // of a compactible type. + size_t non_compactible_batches{0}; + + // Returns whether any data was removed by this reducer. + bool has_removed_data() const { + return batches_discarded > 0 || records_discarded > 0; + } + + friend std::ostream& operator<<(std::ostream& os, const stats& s) { + fmt::print( + os, + "{{ batches_processed: {}, batches_discarded: {}, " + "records_discarded: {}, non_compactible_batches: {} }}", + s.batches_processed, + s.batches_discarded, + s.records_discarded, + s.non_compactible_batches); + return os; + } + }; + struct idx_and_stats { + index_state new_idx; + stats reducer_stats; + }; + copy_data_segment_reducer( filter_t f, segment_appender* a, @@ -140,7 +174,7 @@ class copy_data_segment_reducer : public compaction_reducer { , _as(as) {} ss::future operator()(model::record_batch); - storage::index_state end_of_stream() { return std::move(_idx); } + idx_and_stats end_of_stream() { return {std::move(_idx), _stats}; } private: ss::future @@ -180,6 +214,8 @@ class copy_data_segment_reducer : public compaction_reducer { /// Allows the reducer to stop early, e.g. in case the partition is being /// shut down. ss::abort_source* _as; + + stats _stats; }; class index_rebuilder_reducer : public compaction_reducer { diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index fd4d058e77adb..501347f688b9a 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -235,17 +235,32 @@ ss::future deduplicate_segment( inject_reader_failure, cfg.asrc); - auto new_idx = co_await std::move(rdr).consume( + auto res = co_await std::move(rdr).consume( std::move(copy_reducer), model::no_timeout); + const auto& stats = res.reducer_stats; + if (stats.has_removed_data()) { + vlog( + gclog.info, + "Windowed compaction filtering removing data from {}: {}", + seg->filename(), + stats); + } else { + vlog( + gclog.debug, + "Windowed compaction filtering not removing any records from {}: {}", + seg->filename(), + stats); + } // restore broker timestamp and clean compact timestamp + auto& new_idx = res.new_idx; new_idx.broker_timestamp = seg->index().broker_timestamp(); new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp(); // Set may_have_tombstone_records new_idx.may_have_tombstone_records = may_have_tombstone_records; - co_return new_idx; + co_return std::move(new_idx); } } // namespace storage diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 3d130303518a2..20c424f5b0568 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -440,18 +440,33 @@ ss::future do_copy_segment_data( cfg.asrc); // create the segment, get the in-memory index for the new segment - auto new_index = co_await create_segment_full_reader( - seg, cfg, pb, std::move(rw_lock_holder)) - .consume(std::move(copy_reducer), model::no_timeout) - .finally([&] { - return appender->close().handle_exception( - [](std::exception_ptr e) { - vlog( - gclog.error, - "Error copying index to new segment:{}", - e); - }); + auto res = co_await create_segment_full_reader( + seg, cfg, pb, std::move(rw_lock_holder)) + .consume(std::move(copy_reducer), model::no_timeout) + .finally([&] { + return appender->close().handle_exception( + [](std::exception_ptr e) { + vlog( + gclog.error, + "Error copying index to new segment:{}", + e); }); + }); + const auto& stats = res.reducer_stats; + if (stats.has_removed_data()) { + vlog( + gclog.info, + "Self compaction filtering removing data from {}: {}", + seg->filename(), + stats); + } else { + vlog( + gclog.debug, + "Self compaction filtering not removing any records from {}: {}", + seg->filename(), + stats); + } + auto& new_index = res.new_idx; // restore broker timestamp and clean compact timestamp new_index.broker_timestamp = old_broker_timestamp; @@ -460,7 +475,7 @@ ss::future do_copy_segment_data( // Set may_have_tombstone_records new_index.may_have_tombstone_records = may_have_tombstone_records; - co_return new_index; + co_return std::move(new_index); } model::record_batch_reader create_segment_full_reader(