diff --git a/src/v/storage/compaction_reducers.h b/src/v/storage/compaction_reducers.h index 8cfe224abcba8..fbb72f5702fad 100644 --- a/src/v/storage/compaction_reducers.h +++ b/src/v/storage/compaction_reducers.h @@ -133,6 +133,11 @@ class copy_data_segment_reducer : public compaction_reducer { // of a compactible type. size_t non_compactible_batches{0}; + // Returns whether any data was removed by this reducer. + bool has_removed_data() const { + return batches_discarded > 0 || records_discarded > 0; + } + friend std::ostream& operator<<(std::ostream& os, const stats& s) { fmt::print( os, @@ -145,6 +150,10 @@ class copy_data_segment_reducer : public compaction_reducer { return os; } }; + struct idx_and_stats { + index_state new_idx; + stats reducer_stats; + }; copy_data_segment_reducer( filter_t f, @@ -165,7 +174,7 @@ class copy_data_segment_reducer : public compaction_reducer { , _as(as) {} ss::future operator()(model::record_batch); - storage::index_state end_of_stream() { return std::move(_idx); } + idx_and_stats end_of_stream() { return {std::move(_idx), _stats}; } private: ss::future diff --git a/src/v/storage/segment_deduplication_utils.cc b/src/v/storage/segment_deduplication_utils.cc index fd4d058e77adb..501347f688b9a 100644 --- a/src/v/storage/segment_deduplication_utils.cc +++ b/src/v/storage/segment_deduplication_utils.cc @@ -235,17 +235,32 @@ ss::future deduplicate_segment( inject_reader_failure, cfg.asrc); - auto new_idx = co_await std::move(rdr).consume( + auto res = co_await std::move(rdr).consume( std::move(copy_reducer), model::no_timeout); + const auto& stats = res.reducer_stats; + if (stats.has_removed_data()) { + vlog( + gclog.info, + "Windowed compaction filtering removing data from {}: {}", + seg->filename(), + stats); + } else { + vlog( + gclog.debug, + "Windowed compaction filtering not removing any records from {}: {}", + seg->filename(), + stats); + } // restore broker timestamp and clean compact timestamp + auto& new_idx = res.new_idx; new_idx.broker_timestamp = seg->index().broker_timestamp(); new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp(); // Set may_have_tombstone_records new_idx.may_have_tombstone_records = may_have_tombstone_records; - co_return new_idx; + co_return std::move(new_idx); } } // namespace storage diff --git a/src/v/storage/segment_utils.cc b/src/v/storage/segment_utils.cc index 3d130303518a2..20c424f5b0568 100644 --- a/src/v/storage/segment_utils.cc +++ b/src/v/storage/segment_utils.cc @@ -440,18 +440,33 @@ ss::future do_copy_segment_data( cfg.asrc); // create the segment, get the in-memory index for the new segment - auto new_index = co_await create_segment_full_reader( - seg, cfg, pb, std::move(rw_lock_holder)) - .consume(std::move(copy_reducer), model::no_timeout) - .finally([&] { - return appender->close().handle_exception( - [](std::exception_ptr e) { - vlog( - gclog.error, - "Error copying index to new segment:{}", - e); - }); + auto res = co_await create_segment_full_reader( + seg, cfg, pb, std::move(rw_lock_holder)) + .consume(std::move(copy_reducer), model::no_timeout) + .finally([&] { + return appender->close().handle_exception( + [](std::exception_ptr e) { + vlog( + gclog.error, + "Error copying index to new segment:{}", + e); }); + }); + const auto& stats = res.reducer_stats; + if (stats.has_removed_data()) { + vlog( + gclog.info, + "Self compaction filtering removing data from {}: {}", + seg->filename(), + stats); + } else { + vlog( + gclog.debug, + "Self compaction filtering not removing any records from {}: {}", + seg->filename(), + stats); + } + auto& new_index = res.new_idx; // restore broker timestamp and clean compact timestamp new_index.broker_timestamp = old_broker_timestamp; @@ -460,7 +475,7 @@ ss::future do_copy_segment_data( // Set may_have_tombstone_records new_index.may_have_tombstone_records = may_have_tombstone_records; - co_return new_index; + co_return std::move(new_index); } model::record_batch_reader create_segment_full_reader(