Skip to content

Commit

Permalink
gc/retention: test for consumer offsets retention.
Browse files Browse the repository at this point in the history
(cherry picked from commit 0e70805)
  • Loading branch information
bharathv committed May 1, 2024
1 parent 8d57c12 commit 6320ab5
Showing 1 changed file with 71 additions and 0 deletions.
71 changes: 71 additions & 0 deletions src/v/kafka/server/tests/consumer_groups_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
#include "kafka/protocol/errors.h"
#include "kafka/protocol/find_coordinator.h"
#include "kafka/protocol/join_group.h"
#include "kafka/protocol/offset_commit.h"
#include "kafka/protocol/schemata/join_group_request.h"
#include "kafka/types.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "model/timeout_clock.h"
#include "redpanda/tests/fixture.h"
#include "test_utils/async.h"
#include "test_utils/scoped_config.h"
#include "utils/base64.h"

#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
Expand Down Expand Up @@ -100,6 +103,74 @@ FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) {
}).get();
}

FIXTURE_TEST(conditional_retention_test, consumer_offsets_fixture) {
scoped_config cfg;
cfg.get("group_topic_partitions").set_value(1);
// setting to true to begin with, so log_eviction_stm is attached to
// the partition.
cfg.get("unsafe_enable_consumer_offsets_delete_retention").set_value(true);
add_topic(
model::topic_namespace_view{model::kafka_namespace, model::topic{"foo"}})
.get();
kafka::group_instance_id gr("instance-1");
wait_for_consumer_offsets_topic(gr);
// load some data into the topic via offset_commit requests.
auto client = make_kafka_client().get0();
auto deferred = ss::defer([&client] {
client.stop().then([&client] { client.shutdown(); }).get();
});
client.connect().get();
auto offset = 0;
auto rand_offset_commit = [&] {
auto req_part = offset_commit_request_partition{
.partition_index = model::partition_id{0},
.committed_offset = model::offset{offset++}};
auto topic = offset_commit_request_topic{
.name = model::topic{"foo"}, .partitions = {std::move(req_part)}};

return offset_commit_request{.data{
.group_id = kafka::group_id{fmt::format("foo-{}", offset)},
.topics = {std::move(topic)}}};
};
for (int i = 0; i < 10; i++) {
auto req = rand_offset_commit();
req.data.group_instance_id = gr;
auto resp = client.dispatch(std::move(req)).get();
BOOST_REQUIRE(!resp.data.errored());
}
auto part = app.partition_manager.local().get(model::ntp{
model::kafka_namespace,
model::kafka_consumer_offsets_topic,
model::partition_id{0}});
BOOST_REQUIRE(part);
auto log = part->log();
storage::ntp_config::default_overrides ov;
ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion
| model::cleanup_policy_bitflags::compaction;
log->update_configuration(ov).get();
log->flush().get();
log->force_roll(ss::default_priority_class()).get();
for (auto retention_enabled : {false, true}) {
// number of partitions of CO topic.
cfg.get("unsafe_enable_consumer_offsets_delete_retention")
.set_value(retention_enabled);
// attempt a GC on the partition log.
// evict the first segment.
storage::gc_config gc_cfg{model::timestamp::max(), 1};
log->gc(gc_cfg).get();
// Check if retention works
try {
tests::cooperative_spin_wait_with_timeout(5s, [&] {
return log.get()->offsets().start_offset > model::offset{0};
}).get();
} catch (const ss::timed_out_error& e) {
if (retention_enabled) {
std::rethrow_exception(std::make_exception_ptr(e));
}
}
}
}

SEASTAR_THREAD_TEST_CASE(consumer_group_decode) {
{
// snatched from a log message after a franz-go client joined
Expand Down

0 comments on commit 6320ab5

Please sign in to comment.