Skip to content

Commit

Permalink
Merge pull request #18191 from bharathv/co_retention_233x
Browse files Browse the repository at this point in the history
[backport][23.3.x] configuration to enable delete retention for consumer offsets #18140
  • Loading branch information
piyushredpanda authored May 1, 2024
2 parents e21cc8b + c21c4ea commit a55ff03
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 2 deletions.
10 changes: 9 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3007,7 +3007,15 @@ configuration::configuration()
"are allowed.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
{"BASIC"},
validate_http_authn_mechanisms) {}
validate_http_authn_mechanisms)
, unsafe_enable_consumer_offsets_delete_retention(
*this,
"unsafe_enable_consumer_offsets_delete_retention",
"Enables delete retention of consumer offsets topic. This is an "
"internal-only configuration and should be enabled only after consulting "
"with Redpanda Support or engineers.",
{.needs_restart = needs_restart::yes, .visibility = visibility::user},
false) {}

configuration::error_map_t configuration::load(const YAML::Node& root_node) {
if (!root_node["redpanda"]) {
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,9 @@ struct configuration final : public config_store {
// HTTP Authentication
property<std::vector<ss::sstring>> http_authentication;

// temporary - to be deprecated
property<bool> unsafe_enable_consumer_offsets_delete_retention;

configuration();

error_map_t load(const YAML::Node& root_node);
Expand Down
71 changes: 71 additions & 0 deletions src/v/kafka/server/tests/consumer_groups_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
#include "kafka/protocol/errors.h"
#include "kafka/protocol/find_coordinator.h"
#include "kafka/protocol/join_group.h"
#include "kafka/protocol/offset_commit.h"
#include "kafka/protocol/schemata/join_group_request.h"
#include "kafka/types.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "model/timeout_clock.h"
#include "redpanda/tests/fixture.h"
#include "test_utils/async.h"
#include "test_utils/scoped_config.h"
#include "utils/base64.h"

#include <seastar/core/smp.hh>
#include <seastar/core/sstring.hh>
Expand Down Expand Up @@ -100,6 +103,74 @@ FIXTURE_TEST(join_empty_group_static_member, consumer_offsets_fixture) {
}).get();
}

FIXTURE_TEST(conditional_retention_test, consumer_offsets_fixture) {
scoped_config cfg;
cfg.get("group_topic_partitions").set_value(1);
// setting to true to begin with, so log_eviction_stm is attached to
// the partition.
cfg.get("unsafe_enable_consumer_offsets_delete_retention").set_value(true);
add_topic(
model::topic_namespace_view{model::kafka_namespace, model::topic{"foo"}})
.get();
kafka::group_instance_id gr("instance-1");
wait_for_consumer_offsets_topic(gr);
// load some data into the topic via offset_commit requests.
auto client = make_kafka_client().get0();
auto deferred = ss::defer([&client] {
client.stop().then([&client] { client.shutdown(); }).get();
});
client.connect().get();
auto offset = 0;
auto rand_offset_commit = [&] {
auto req_part = offset_commit_request_partition{
.partition_index = model::partition_id{0},
.committed_offset = model::offset{offset++}};
auto topic = offset_commit_request_topic{
.name = model::topic{"foo"}, .partitions = {std::move(req_part)}};

return offset_commit_request{.data{
.group_id = kafka::group_id{fmt::format("foo-{}", offset)},
.topics = {std::move(topic)}}};
};
for (int i = 0; i < 10; i++) {
auto req = rand_offset_commit();
req.data.group_instance_id = gr;
auto resp = client.dispatch(std::move(req)).get();
BOOST_REQUIRE(!resp.data.errored());
}
auto part = app.partition_manager.local().get(model::ntp{
model::kafka_namespace,
model::kafka_consumer_offsets_topic,
model::partition_id{0}});
BOOST_REQUIRE(part);
auto log = part->log();
storage::ntp_config::default_overrides ov;
ov.cleanup_policy_bitflags = model::cleanup_policy_bitflags::deletion
| model::cleanup_policy_bitflags::compaction;
log->update_configuration(ov).get();
log->flush().get();
log->force_roll(ss::default_priority_class()).get();
for (auto retention_enabled : {false, true}) {
// number of partitions of CO topic.
cfg.get("unsafe_enable_consumer_offsets_delete_retention")
.set_value(retention_enabled);
// attempt a GC on the partition log.
// evict the first segment.
storage::gc_config gc_cfg{model::timestamp::max(), 1};
log->gc(gc_cfg).get();
// Check if retention works
try {
tests::cooperative_spin_wait_with_timeout(5s, [&] {
return log.get()->offsets().start_offset > model::offset{0};
}).get();
} catch (const ss::timed_out_error& e) {
if (retention_enabled) {
std::rethrow_exception(std::make_exception_ptr(e));
}
}
}
}

SEASTAR_THREAD_TEST_CASE(consumer_group_decode) {
{
// snatched from a log message after a franz-go client joined
Expand Down
8 changes: 7 additions & 1 deletion src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ namespace storage {
* from normal cluster operation (leadershp movement) and this cost is not
* driven / constrained by historical reads. Similarly for transactions and
* idempotence. Controller topic should space can be managed by snapshots.
*
* Note on unsafe_enable_consumer_offsets_delete_retention: This a special
* configuration some select users can use to enable retention on CO topic
* because the compaction logic is ineffective and they would like to use
* retention as a stop gap until that is fixed. This configuration will be
* deprecated once we fix the compaction gaps.
*/
bool deletion_exempt(const model::ntp& ntp) {
bool is_internal_namespace = ntp.ns() == model::redpanda_ns
Expand All @@ -96,7 +102,7 @@ bool deletion_exempt(const model::ntp& ntp) {
&& ntp.tp.topic
== model::kafka_consumer_offsets_nt.tp;
return (!is_tx_manager_ntp && is_internal_namespace)
|| is_consumer_offsets_ntp;
|| (is_consumer_offsets_ntp && !config::shard_local_cfg().unsafe_enable_consumer_offsets_delete_retention());
}

disk_log_impl::disk_log_impl(
Expand Down
100 changes: 100 additions & 0 deletions tests/rptest/tests/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,106 @@ def check_pids_overflow_test(self):
assert num_consumed == should_be_consumed


class TransactionsStreamsTest(RedpandaTest, TransactionsMixin):
topics = (TopicSpec(partition_count=1, replication_factor=3),
TopicSpec(partition_count=1, replication_factor=3))

def __init__(self, test_context):
extra_rp_conf = {
'unsafe_enable_consumer_offsets_delete_retention': True,
'group_topic_partitions': 1, # to reduce log noise
'log_segment_size_min': 99,
# to be able to make changes to CO
'kafka_nodelete_topics': [],
'kafka_noproduce_topics': [],
}
super(TransactionsStreamsTest,
self).__init__(test_context=test_context,
extra_rp_conf=extra_rp_conf)
self.input_t = self.topics[0]
self.output_t = self.topics[1]

def setup_consumer_offsets(self, rpk: RpkTool):
# initialize consumer groups topic
rpk.consume(topic=self.input_t.name, n=1, group="test-group")
topic = "__consumer_offsets"
# Aggressive roll settings to clear multiple small segments
rpk.alter_topic_config(topic, TopicSpec.PROPERTY_CLEANUP_POLICY,
TopicSpec.CLEANUP_DELETE)
rpk.alter_topic_config(topic, TopicSpec.PROPERTY_SEGMENT_SIZE, 100)

@cluster(num_nodes=3)
def consumer_offsets_retention_test(self):
"""Ensure consumer offsets replays correctly after transactional offset commits"""
input_records = 10000
self.generate_data(self.input_t, input_records)
rpk = RpkTool(self.redpanda)
self.setup_consumer_offsets(rpk)
# Populate consumer offsets with transactional offset commits/aborts
producer_conf = {
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': 'streams',
'transaction.timeout.ms': 10000,
}
producer = ck.Producer(producer_conf)
consumer_conf = {
'bootstrap.servers': self.redpanda.brokers(),
'group.id': "test",
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer = ck.Consumer(consumer_conf)
consumer.subscribe([self.input_t])

producer.init_transactions()
consumed = 0
while consumed != input_records:
records = self.consume(consumer)
producer.begin_transaction()
for record in records:
producer.produce(self.output_t.name,
record.value(),
record.key(),
on_delivery=self.on_delivery)

producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata())

producer.flush()

if random.randint(0, 9) < 5:
producer.commit_transaction()
else:
producer.abort_transaction()
consumed += len(records)

admin = Admin(self.redpanda)
co_topic = "__consumer_offsets"

def get_offsets():
topic_info = list(rpk.describe_topic(co_topic))[0]
assert topic_info
return (topic_info.start_offset, topic_info.high_watermark)

# trim prefix, change leadership and validate the log is replayed successfully on
# the new leader.
attempts = 30
truncate_offset = 100
while attempts > 0:
(start, end) = get_offsets()
self.redpanda.logger.debug(f"Current offsets: {start} - {end}")
if truncate_offset > end:
break
rpk.trim_prefix(co_topic, truncate_offset, partitions=[0])
admin.partition_transfer_leadership("kafka", co_topic, partition=0)
admin.await_stable_leader(topic=co_topic,
replication=3,
timeout_s=30)
truncate_offset += 200
attempts = attempts - 1


@contextmanager
def expect_kafka_error(err: Optional[ck.KafkaError] = None):
try:
Expand Down

0 comments on commit a55ff03

Please sign in to comment.