Skip to content

Commit

Permalink
Merge pull request #24736 from redpanda-data/manual-backport-24659-v2…
Browse files Browse the repository at this point in the history
…4.3.x-337

[v24.3.x] storage: add log lines about removed data
  • Loading branch information
andrwng authored Jan 9, 2025
2 parents e17aa40 + f125faf commit b731170
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 15 deletions.
10 changes: 10 additions & 0 deletions src/v/storage/compaction_reducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,22 @@ copy_data_segment_reducer::filter(model::record_batch batch) {

ss::future<ss::stop_iteration> copy_data_segment_reducer::filter_and_append(
model::compression original, model::record_batch b) {
++_stats.batches_processed;
using stop_t = ss::stop_iteration;
const auto record_count_before = b.record_count();
auto to_copy = co_await filter(std::move(b));
if (to_copy == std::nullopt) {
++_stats.batches_discarded;
_stats.records_discarded += record_count_before;
co_return stop_t::no;
}
const auto records_to_remove = record_count_before
- to_copy->record_count();
_stats.records_discarded += records_to_remove;
bool compactible_batch = is_compactible(to_copy.value());
if (!compactible_batch) {
++_stats.non_compactible_batches;
}
if (_compacted_idx && compactible_batch) {
co_await model::for_each_record(
to_copy.value(),
Expand Down
38 changes: 37 additions & 1 deletion src/v/storage/compaction_reducers.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,40 @@ class copy_data_segment_reducer : public compaction_reducer {
public:
using filter_t = ss::noncopyable_function<ss::future<bool>(
const model::record_batch&, const model::record&, bool)>;
struct stats {
// Total number of batches passed to this reducer.
size_t batches_processed{0};
// Number of batches that were completely removed.
size_t batches_discarded{0};
// Number of records removed by this reducer, including batches that
// were entirely removed.
size_t records_discarded{0};
// Number of batches that were ignored because they are not
// of a compactible type.
size_t non_compactible_batches{0};

// Returns whether any data was removed by this reducer.
bool has_removed_data() const {
return batches_discarded > 0 || records_discarded > 0;
}

friend std::ostream& operator<<(std::ostream& os, const stats& s) {
fmt::print(
os,
"{{ batches_processed: {}, batches_discarded: {}, "
"records_discarded: {}, non_compactible_batches: {} }}",
s.batches_processed,
s.batches_discarded,
s.records_discarded,
s.non_compactible_batches);
return os;
}
};
struct idx_and_stats {
index_state new_idx;
stats reducer_stats;
};

copy_data_segment_reducer(
filter_t f,
segment_appender* a,
Expand All @@ -140,7 +174,7 @@ class copy_data_segment_reducer : public compaction_reducer {
, _as(as) {}

ss::future<ss::stop_iteration> operator()(model::record_batch);
storage::index_state end_of_stream() { return std::move(_idx); }
idx_and_stats end_of_stream() { return {std::move(_idx), _stats}; }

private:
ss::future<ss::stop_iteration>
Expand Down Expand Up @@ -180,6 +214,8 @@ class copy_data_segment_reducer : public compaction_reducer {
/// Allows the reducer to stop early, e.g. in case the partition is being
/// shut down.
ss::abort_source* _as;

stats _stats;
};

class index_rebuilder_reducer : public compaction_reducer {
Expand Down
19 changes: 17 additions & 2 deletions src/v/storage/segment_deduplication_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,32 @@ ss::future<index_state> deduplicate_segment(
inject_reader_failure,
cfg.asrc);

auto new_idx = co_await std::move(rdr).consume(
auto res = co_await std::move(rdr).consume(
std::move(copy_reducer), model::no_timeout);
const auto& stats = res.reducer_stats;
if (stats.has_removed_data()) {
vlog(
gclog.info,
"Windowed compaction filtering removing data from {}: {}",
seg->filename(),
stats);
} else {
vlog(
gclog.debug,
"Windowed compaction filtering not removing any records from {}: {}",
seg->filename(),
stats);
}

// restore broker timestamp and clean compact timestamp
auto& new_idx = res.new_idx;
new_idx.broker_timestamp = seg->index().broker_timestamp();
new_idx.clean_compact_timestamp = seg->index().clean_compact_timestamp();

// Set may_have_tombstone_records
new_idx.may_have_tombstone_records = may_have_tombstone_records;

co_return new_idx;
co_return std::move(new_idx);
}

} // namespace storage
39 changes: 27 additions & 12 deletions src/v/storage/segment_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,18 +440,33 @@ ss::future<storage::index_state> do_copy_segment_data(
cfg.asrc);

// create the segment, get the in-memory index for the new segment
auto new_index = co_await create_segment_full_reader(
seg, cfg, pb, std::move(rw_lock_holder))
.consume(std::move(copy_reducer), model::no_timeout)
.finally([&] {
return appender->close().handle_exception(
[](std::exception_ptr e) {
vlog(
gclog.error,
"Error copying index to new segment:{}",
e);
});
auto res = co_await create_segment_full_reader(
seg, cfg, pb, std::move(rw_lock_holder))
.consume(std::move(copy_reducer), model::no_timeout)
.finally([&] {
return appender->close().handle_exception(
[](std::exception_ptr e) {
vlog(
gclog.error,
"Error copying index to new segment:{}",
e);
});
});
const auto& stats = res.reducer_stats;
if (stats.has_removed_data()) {
vlog(
gclog.info,
"Self compaction filtering removing data from {}: {}",
seg->filename(),
stats);
} else {
vlog(
gclog.debug,
"Self compaction filtering not removing any records from {}: {}",
seg->filename(),
stats);
}
auto& new_index = res.new_idx;

// restore broker timestamp and clean compact timestamp
new_index.broker_timestamp = old_broker_timestamp;
Expand All @@ -460,7 +475,7 @@ ss::future<storage::index_state> do_copy_segment_data(
// Set may_have_tombstone_records
new_index.may_have_tombstone_records = may_have_tombstone_records;

co_return new_index;
co_return std::move(new_index);
}

model::record_batch_reader create_segment_full_reader(
Expand Down

0 comments on commit b731170

Please sign in to comment.