diff --git a/src/v/kafka/tests/fetch_test.cc b/src/v/kafka/tests/fetch_test.cc index 685ba6de67c80..50c8a52c5ac97 100644 --- a/src/v/kafka/tests/fetch_test.cc +++ b/src/v/kafka/tests/fetch_test.cc @@ -253,14 +253,12 @@ FIXTURE_TEST(fetch_one, redpanda_thread_fixture) { } } -SEASTAR_THREAD_TEST_CASE(fetch_response_iterator_test) { - kafka::fetch_response response; - - auto make_partition = [](ss::sstring topic) { +FIXTURE_TEST(fetch_response_iterator_test, redpanda_thread_fixture) { + static auto make_partition = [](ss::sstring topic) { return kafka::fetch_response::partition(model::topic(std::move(topic))); }; - auto make_partition_response = [](int id) { + static auto make_partition_response = [](int id) { kafka::fetch_response::partition_response resp; resp.error = kafka::error_code::none; resp.id = model::partition_id(id); @@ -268,26 +266,33 @@ SEASTAR_THREAD_TEST_CASE(fetch_response_iterator_test) { return resp; }; - response.partitions.push_back(make_partition("tp-1")); - response.partitions.push_back(make_partition("tp-2")); - response.partitions.push_back(make_partition("tp-3")); - - response.partitions[0].responses.push_back(make_partition_response(0)); - response.partitions[0].responses.push_back(make_partition_response(1)); - response.partitions[0].responses.push_back(make_partition_response(2)); + auto make_test_fetch_response = []() { + kafka::fetch_response response; + response.partitions.push_back(make_partition("tp-1")); + response.partitions.push_back(make_partition("tp-2")); + response.partitions.push_back(make_partition("tp-3")); - response.partitions[1].responses.push_back(make_partition_response(0)); + response.partitions[0].responses.push_back(make_partition_response(0)); + response.partitions[0].responses.push_back(make_partition_response(1)); + response.partitions[0].responses.push_back(make_partition_response(2)); - response.partitions[2].responses.push_back(make_partition_response(0)); - response.partitions[2].responses.push_back(make_partition_response(1)); + response.partitions[1].responses.push_back(make_partition_response(0)); + response.partitions[2].responses.push_back(make_partition_response(0)); + response.partitions[2].responses.push_back(make_partition_response(1)); + return response; + }; + kafka::op_context ctx( + make_request_context(), ss::default_smp_service_group()); + auto response = make_test_fetch_response(); + ctx.response = make_test_fetch_response(); + auto wrapper_iterator = ctx.response_begin(); int i = 0; for (auto it = response.begin(); it != response.end(); ++it) { if (i < 3) { BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-1"); BOOST_REQUIRE_EQUAL(it->partition_response->id(), i); - } else if (i == 3) { BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-2"); BOOST_REQUIRE_EQUAL(it->partition_response->id(), 0); @@ -295,7 +300,13 @@ SEASTAR_THREAD_TEST_CASE(fetch_response_iterator_test) { BOOST_REQUIRE_EQUAL(it->partition->name(), "tp-3"); BOOST_REQUIRE_EQUAL(it->partition_response->id(), i - 4); } + BOOST_REQUIRE_EQUAL( + it->partition->name, wrapper_iterator->partition->name); + BOOST_REQUIRE_EQUAL( + wrapper_iterator->partition_response->id, + wrapper_iterator->partition_response->id); ++i; + ++wrapper_iterator; } }; @@ -465,3 +476,92 @@ FIXTURE_TEST(fetch_one_debounce, redpanda_thread_fixture) { BOOST_REQUIRE(resp.partitions[0].responses[0].record_set); BOOST_REQUIRE(resp.partitions[0].responses[0].record_set->size_bytes() > 0); } + +FIXTURE_TEST(fetch_multi_topics, redpanda_thread_fixture) { + // create a topic partition with some data + model::topic topic_1("foo"); + model::topic topic_2("bar"); + model::offset zero(0); + wait_for_controller_leadership().get0(); + + add_topic(model::topic_namespace(model::ns("kafka"), topic_1), 6).get(); + add_topic(model::topic_namespace(model::ns("kafka"), topic_2), 1).get(); + + std::vector ntps = {}; + // topic 1 + for (int i = 0; i < 6; ++i) { + ntps.push_back(make_default_ntp(topic_1, model::partition_id(i))); + wait_for_partition_offset(ntps.back(), model::offset(0)).get0(); + } + // topic 2 + ntps.push_back(make_default_ntp(topic_2, model::partition_id(0))); + wait_for_partition_offset(ntps.back(), model::offset(0)).get0(); + + // request + kafka::fetch_request req; + req.max_bytes = std::numeric_limits::max(); + req.min_bytes = 1; + req.max_wait_time = std::chrono::milliseconds(3000); + req.session_id = kafka::invalid_fetch_session_id; + req.topics = { + { + .name = topic_1, + .partitions = {}, + }, + { + .name = topic_2, + .partitions = {}, + }}; + + for (auto& ntp : ntps) { + kafka::fetch_request::partition p; + p.id = model::partition_id(ntp.tp.partition); + p.log_start_offset = zero; + p.fetch_offset = zero; + p.partition_max_bytes = std::numeric_limits::max(); + auto idx = ntp.tp.topic == topic_1 ? 0 : 1; + req.topics[idx].partitions.push_back(p); + } + + auto client = make_kafka_client().get0(); + client.connect().get(); + // add date to all partitions + for (auto& ntp : ntps) { + auto shard = app.shard_table.local().shard_for(ntp); + auto r = app.partition_manager + .invoke_on( + *shard, + [ntp](cluster::partition_manager& mgr) { + auto partition = mgr.get(ntp); + auto batches = storage::test::make_random_batches( + model::offset(0), 5); + auto rdr = model::make_memory_record_batch_reader( + std::move(batches)); + return partition->replicate( + std::move(rdr), + raft::replicate_options( + raft::consistency_level::quorum_ack)); + }) + .get0(); + } + + auto resp = client.dispatch(req, kafka::api_version(4)).get0(); + client.stop().then([&client] { client.shutdown(); }).get(); + + BOOST_REQUIRE_EQUAL(resp.partitions.size(), 2); + BOOST_REQUIRE_EQUAL(resp.partitions[0].name, topic_1); + BOOST_REQUIRE_EQUAL(resp.partitions[1].name, topic_2); + BOOST_REQUIRE_EQUAL(resp.partitions[0].responses.size(), 6); + BOOST_REQUIRE_EQUAL(resp.partitions[1].responses.size(), 1); + size_t total_size = 0; + for (int i = 0; i < 6; ++i) { + BOOST_REQUIRE_EQUAL( + resp.partitions[0].responses[i].error, kafka::error_code::none); + BOOST_REQUIRE_EQUAL( + resp.partitions[0].responses[i].id, model::partition_id(i)); + BOOST_REQUIRE(resp.partitions[0].responses[i].record_set); + + total_size += resp.partitions[0].responses[i].record_set->size_bytes(); + } + BOOST_REQUIRE_GT(total_size, 0); +}