Skip to content

Commit

Permalink
k/fetch_test: added multiple topic fetch test
Browse files Browse the repository at this point in the history
Added test reproducing #180 i.e. redpanda failure when multiple topic
partitions were present in fetch request.

Signed-off-by: Michal Maslanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Nov 26, 2020
1 parent 0b12ddf commit ee87535
Showing 1 changed file with 116 additions and 16 deletions.
132 changes: 116 additions & 16 deletions src/v/kafka/tests/fetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,49 +253,60 @@ FIXTURE_TEST(fetch_one, redpanda_thread_fixture) {
}
}

SEASTAR_THREAD_TEST_CASE(fetch_response_iterator_test) {
kafka::fetch_response response;

auto make_partition = [](ss::sstring topic) {
FIXTURE_TEST(fetch_response_iterator_test, redpanda_thread_fixture) {
static auto make_partition = [](ss::sstring topic) {
return kafka::fetch_response::partition(model::topic(std::move(topic)));
};

auto make_partition_response = [](int id) {
static auto make_partition_response = [](int id) {
kafka::fetch_response::partition_response resp;
resp.error = kafka::error_code::none;
resp.id = model::partition_id(id);
resp.last_stable_offset = model::offset(0);
return resp;
};

response.partitions.push_back(make_partition("tp-1"));
response.partitions.push_back(make_partition("tp-2"));
response.partitions.push_back(make_partition("tp-3"));

response.partitions[0].responses.push_back(make_partition_response(0));
response.partitions[0].responses.push_back(make_partition_response(1));
response.partitions[0].responses.push_back(make_partition_response(2));
auto make_test_fetch_response = []() {
kafka::fetch_response response;
response.partitions.push_back(make_partition("tp-1"));
response.partitions.push_back(make_partition("tp-2"));
response.partitions.push_back(make_partition("tp-3"));

response.partitions[1].responses.push_back(make_partition_response(0));
response.partitions[0].responses.push_back(make_partition_response(0));
response.partitions[0].responses.push_back(make_partition_response(1));
response.partitions[0].responses.push_back(make_partition_response(2));

response.partitions[2].responses.push_back(make_partition_response(0));
response.partitions[2].responses.push_back(make_partition_response(1));
response.partitions[1].responses.push_back(make_partition_response(0));

response.partitions[2].responses.push_back(make_partition_response(0));
response.partitions[2].responses.push_back(make_partition_response(1));
return response;
};
kafka::op_context ctx(
make_request_context(), ss::default_smp_service_group());
auto response = make_test_fetch_response();
ctx.response = make_test_fetch_response();
auto wrapper_iterator = ctx.response_begin();
int i = 0;

for (auto it = response.begin(); it != response.end(); ++it) {
if (i < 3) {
BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-1");
BOOST_REQUIRE_EQUAL(it->partition_response->id(), i);

} else if (i == 3) {
BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-2");
BOOST_REQUIRE_EQUAL(it->partition_response->id(), 0);
} else {
BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-3");
BOOST_REQUIRE_EQUAL(it->partition_response->id(), i - 4);
}
BOOST_REQUIRE_EQUAL(
it->partition->name, wrapper_iterator->partition->name);
BOOST_REQUIRE_EQUAL(
wrapper_iterator->partition_response->id,
wrapper_iterator->partition_response->id);
++i;
++wrapper_iterator;
}
};

Expand Down Expand Up @@ -465,3 +476,92 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) {
BOOST_REQUIRE(resp.partitions[0].responses[0].record_set);
BOOST_REQUIRE(resp.partitions[0].responses[0].record_set->size_bytes() > 0);
}

FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) {
// create a topic partition with some data
model::topic topic_1("foo");
model::topic topic_2("bar");
model::offset zero(0);
wait_for_controller_leadership().get0();

add_topic(model::topic_namespace(model::ns("kafka"), topic_1), 6).get();
add_topic(model::topic_namespace(model::ns("kafka"), topic_2), 1).get();

std::vector<model::ntp> ntps = {};
// topic 1
for (int i = 0; i < 6; ++i) {
ntps.push_back(make_default_ntp(topic_1, model::partition_id(i)));
wait_for_partition_offset(ntps.back(), model::offset(0)).get0();
}
// topic 2
ntps.push_back(make_default_ntp(topic_2, model::partition_id(0)));
wait_for_partition_offset(ntps.back(), model::offset(0)).get0();

// request
kafka::fetch_request req;
req.max_bytes = std::numeric_limits<int32_t>::max();
req.min_bytes = 1;
req.max_wait_time = std::chrono::milliseconds(3000);
req.session_id = kafka::invalid_fetch_session_id;
req.topics = {
{
.name = topic_1,
.partitions = {},
},
{
.name = topic_2,
.partitions = {},
}};

for (auto& ntp : ntps) {
kafka::fetch_request::partition p;
p.id = model::partition_id(ntp.tp.partition);
p.log_start_offset = zero;
p.fetch_offset = zero;
p.partition_max_bytes = std::numeric_limits<int32_t>::max();
auto idx = ntp.tp.topic == topic_1 ? 0 : 1;
req.topics[idx].partitions.push_back(p);
}

auto client = make_kafka_client().get0();
client.connect().get();
// add date to all partitions
for (auto& ntp : ntps) {
auto shard = app.shard_table.local().shard_for(ntp);
auto r = app.partition_manager
.invoke_on(
*shard,
[ntp](cluster::partition_manager& mgr) {
auto partition = mgr.get(ntp);
auto batches = storage::test::make_random_batches(
model::offset(0), 5);
auto rdr = model::make_memory_record_batch_reader(
std::move(batches));
return partition->replicate(
std::move(rdr),
raft::replicate_options(
raft::consistency_level::quorum_ack));
})
.get0();
}

auto resp = client.dispatch(req, kafka::api_version(4)).get0();
client.stop().then([&client] { client.shutdown(); }).get();

BOOST_REQUIRE_EQUAL(resp.partitions.size(), 2);
BOOST_REQUIRE_EQUAL(resp.partitions[0].name, topic_1);
BOOST_REQUIRE_EQUAL(resp.partitions[1].name, topic_2);
BOOST_REQUIRE_EQUAL(resp.partitions[0].responses.size(), 6);
BOOST_REQUIRE_EQUAL(resp.partitions[1].responses.size(), 1);
size_t total_size = 0;
for (int i = 0; i < 6; ++i) {
BOOST_REQUIRE_EQUAL(
resp.partitions[0].responses[i].error, kafka::error_code::none);
BOOST_REQUIRE_EQUAL(
resp.partitions[0].responses[i].id, model::partition_id(i));
BOOST_REQUIRE(resp.partitions[0].responses[i].record_set);

total_size += resp.partitions[0].responses[i].record_set->size_bytes();
}
BOOST_REQUIRE_GT(total_size, 0);
}

0 comments on commit ee87535

Please sign in to comment.