diff --git a/src/v/cluster/scheduling/types.cc b/src/v/cluster/scheduling/types.cc index 5b6bf1a95ca29..55a80dcf4215a 100644 --- a/src/v/cluster/scheduling/types.cc +++ b/src/v/cluster/scheduling/types.cc @@ -13,6 +13,7 @@ #include "cluster/logger.h" #include "cluster/scheduling/allocation_state.h" +#include "utils/exceptions.h" #include "utils/to_string.h" #include @@ -47,6 +48,9 @@ allocation_units::allocation_units( allocation_units::~allocation_units() { oncore_debug_verify(_oncore); + if (unlikely(!_state)) { + return; + } for (const auto& replica : _added_replicas) { _state->remove_allocation(replica, _domain); _state->remove_final_count(replica, _domain); @@ -80,6 +84,11 @@ allocated_partition::prepare_move(model::node_id prev_node) const { model::broker_shard allocated_partition::add_replica( model::node_id node, const std::optional& prev) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + if (!_original_node2shard) { _original_node2shard.emplace(); for (const auto& bs : _replicas) { @@ -155,7 +164,12 @@ bool allocated_partition::is_original(model::node_id node) const { } errc allocated_partition::try_revert(const reallocation_step& step) { - if (!_original_node2shard || !_state) { + if (unlikely(!_state)) { + throw concurrent_modification_error( + "allocation_state was concurrently replaced"); + } + + if (!_original_node2shard) { return errc::no_update_in_progress; } diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index 08dc699008869..c216ff51d9eb1 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -160,7 +160,8 @@ struct partition_balancer_planner_fixture { cluster::partition_balancer_planner make_planner( model::partition_autobalancing_mode mode = model::partition_autobalancing_mode::continuous, - size_t max_concurrent_actions = 2) { + size_t max_concurrent_actions = 2, + bool request_ondemand_rebalance = false) { return cluster::partition_balancer_planner( cluster::planner_config{ .mode = mode, @@ -168,6 +169,7 @@ struct partition_balancer_planner_fixture { .hard_max_disk_usage_ratio = 0.95, .max_concurrent_actions = max_concurrent_actions, .node_availability_timeout_sec = std::chrono::minutes(1), + .ondemand_rebalance_requested = request_ondemand_rebalance, .segment_fallocation_step = 16, .node_responsiveness_timeout = std::chrono::seconds(10), .topic_aware = true, diff --git a/src/v/cluster/tests/partition_balancer_planner_test.cc b/src/v/cluster/tests/partition_balancer_planner_test.cc index 79737ce4f58f9..d8ae7eb02d6d4 100644 --- a/src/v/cluster/tests/partition_balancer_planner_test.cc +++ b/src/v/cluster/tests/partition_balancer_planner_test.cc @@ -8,12 +8,15 @@ // by the Apache License, Version 2.0 #include "base/vlog.h" +#include "cluster/controller_snapshot.h" #include "cluster/health_monitor_types.h" #include "cluster/tests/partition_balancer_planner_fixture.h" +#include "utils/stable_iterator_adaptor.h" #include +#include -static ss::logger logger("partition_balancer_planner"); +static ss::logger logger("pb_planner_test"); // a shorthand to avoid spelling out model::node_id static model::node_id n(model::node_id::type id) { return model::node_id{id}; }; @@ -924,3 +927,140 @@ FIXTURE_TEST(balancing_modes, partition_balancer_planner_fixture) { BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0); BOOST_REQUIRE_EQUAL(plan_data.failed_actions_count, 0); } + +FIXTURE_TEST( + concurrent_topic_table_updates, partition_balancer_planner_fixture) { + // Apply lots of topic_table update commands, while concurrently invoking + // the planner. The main goal of this test is to pass ASan checks. + + allocator_register_nodes(5); + config::shard_local_cfg().disable_metrics.set_value(true); + config::shard_local_cfg().disable_public_metrics.set_value(true); + + auto make_create_tp_cmd = [this](ss::sstring name, int partitions) { + int16_t replication_factor = 3; + cluster::topic_configuration cfg( + test_ns, model::topic{name}, partitions, replication_factor); + + ss::chunked_fifo assignments; + for (model::partition_id::type i = 0; i < partitions; ++i) { + std::vector replicas; + for (int r = 0; r < replication_factor; ++r) { + replicas.push_back(model::broker_shard{ + model::node_id{r}, + random_generators::get_int(0, 3)}); + } + std::shuffle( + replicas.begin(), + replicas.end(), + random_generators::internal::gen); + + assignments.push_back(cluster::partition_assignment{ + raft::group_id{1}, model::partition_id{i}, replicas}); + } + return cluster::create_topic_cmd{ + make_tp_ns(name), + cluster::topic_configuration_assignment(cfg, std::move(assignments))}; + }; + + size_t successes = 0; + size_t failures = 0; + size_t reassignments = 0; + bool should_stop = false; + ss::future<> planning_fiber = ss::async([&] { + while (!should_stop) { + vlog(logger.trace, "planning fiber: invoking..."); + auto hr = create_health_report(); + auto planner = make_planner( + model::partition_autobalancing_mode::node_add, 50, true); + + try { + auto plan_data = planner.plan_actions(hr, as).get(); + successes += 1; + reassignments += plan_data.reassignments.size(); + } catch (concurrent_modification_error&) { + failures += 1; + } + vlog(logger.trace, "planning fiber: iteration done"); + } + }); + auto deferred = ss::defer([&] { + if (!should_stop) { + should_stop = true; + planning_fiber.get(); + } + }); + + cluster::topic_table other_tt; + model::offset controller_offset{0}; + std::set cur_topics; + bool node_isolated = false; + + for (size_t iter = 0; iter < 1'000; ++iter) { + int random_val = random_generators::get_int(0, 10); + if (random_val == 10) { + // allow the planner to make some progress + ss::sleep(50ms).get(); + continue; + } + + // randomly create and delete topics + auto topic = ssx::sformat("topic_{}", random_val); + if (!cur_topics.contains(topic)) { + vlog( + logger.trace, + "modifying fiber: creating topic {} (isolated: {})", + topic, + node_isolated); + auto cmd = make_create_tp_cmd( + topic, random_generators::get_int(1, 20)); + other_tt.apply(cmd, controller_offset).get(); + if (!node_isolated) { + workers.dispatch_topic_command(cmd); + } + cur_topics.insert(topic); + } else { + vlog( + logger.trace, + "modifying fiber: deleting topic {} (isolated: {})", + topic, + node_isolated); + cluster::delete_topic_cmd cmd{make_tp_ns(topic), make_tp_ns(topic)}; + other_tt.apply(cmd, controller_offset).get(); + if (!node_isolated) { + workers.dispatch_topic_command(cmd); + } + cur_topics.erase(topic); + } + + if (random_generators::get_int(5) == 0) { + // flip node_isolated flag + + if (node_isolated) { + // simulate node coming back from isolation and recovering + // current controller state from a snapshot. + vlog(logger.trace, "modifying fiber: applying snapshot"); + node_isolated = false; + cluster::controller_snapshot snap; + other_tt.fill_snapshot(snap).get(); + workers.members.local().fill_snapshot(snap); + workers.dispatcher.apply_snapshot(controller_offset, snap) + .get(); + } else { + node_isolated = true; + } + } + + controller_offset += 1; + + vlog(logger.trace, "modifying fiber: iteration done"); + } + + should_stop = true; + planning_fiber.get(); + + // sanity-check that planning made some progress. + BOOST_REQUIRE(successes > 0); + BOOST_REQUIRE(failures > 0); + BOOST_REQUIRE(reassignments > 0); +} diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 5dd4b90bebade..878eef8852252 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -347,6 +347,8 @@ topic_table::apply(finish_moving_partition_replicas_cmd cmd, model::offset o) { _updates_in_progress.erase(it); + _topics_map_revision++; + on_partition_move_finish(cmd.key, cmd.value); // notify backend about finished update @@ -421,6 +423,8 @@ topic_table::apply(cancel_moving_partition_replicas_cmd cmd, model::offset o) { current_assignment_it->replicas = in_progress_it->second.get_previous_replicas(); + _topics_map_revision++; + _pending_deltas.emplace_back( std::move(cmd.key), current_assignment_it->group, @@ -464,6 +468,11 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { co_return errc::no_update_in_progress; } + auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); + if (p_meta_it == tp->second.partitions.end()) { + co_return errc::partition_not_exists; + } + // revert replica set update current_assignment_it->replicas = in_progress_it->second.get_target_replicas(); @@ -474,11 +483,7 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { current_assignment_it->replicas, }; - // update partition_meta object - auto p_meta_it = tp->second.partitions.find(ntp.tp.partition); - if (p_meta_it == tp->second.partitions.end()) { - co_return errc::partition_not_exists; - } + // update partition_meta object: // the cancellation was reverted and update went through, we must // update replicas_revisions. p_meta_it->second.replicas_revisions = update_replicas_revisions( @@ -490,6 +495,8 @@ topic_table::apply(revert_cancel_partition_move_cmd cmd, model::offset o) { /// Since the update is already finished we drop in_progress state _updates_in_progress.erase(in_progress_it); + _topics_map_revision++; + // notify backend about finished update _pending_deltas.emplace_back( ntp, @@ -675,6 +682,7 @@ topic_table::apply(set_topic_partitions_disabled_cmd cmd, model::offset o) { } } + _topics_map_revision++; notify_waiters(); co_return errc::success; @@ -1003,6 +1011,7 @@ class topic_table::snapshot_applier { disabled_partitions_t& _disabled_partitions; fragmented_vector& _pending_deltas; topic_table_probe& _probe; + model::revision_id& _topics_map_revision; model::revision_id _snap_revision; public: @@ -1011,6 +1020,7 @@ class topic_table::snapshot_applier { , _disabled_partitions(parent._disabled_partitions) , _pending_deltas(parent._pending_deltas) , _probe(parent._probe) + , _topics_map_revision(parent._topics_map_revision) , _snap_revision(snap_revision) {} void delete_ntp( @@ -1018,7 +1028,9 @@ class topic_table::snapshot_applier { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_as.id); vlog( clusterlog.trace, "deleting ntp {} not in controller snapshot", ntp); - _updates_in_progress.erase(ntp); + if (_updates_in_progress.erase(ntp)) { + _topics_map_revision++; + }; _pending_deltas.emplace_back( std::move(ntp), @@ -1040,7 +1052,9 @@ class topic_table::snapshot_applier { delete_ntp(ns_tp, p_as); co_await ss::coroutine::maybe_yield(); } - _disabled_partitions.erase(ns_tp); + if (_disabled_partitions.erase(ns_tp)) { + _topics_map_revision++; + }; _probe.handle_topic_deletion(ns_tp); // topic_metadata_item object is supposed to be removed from _topics by // the caller @@ -1055,6 +1069,9 @@ class topic_table::snapshot_applier { vlog(clusterlog.trace, "adding ntp {} from controller snapshot", ntp); size_t pending_deltas_start_idx = _pending_deltas.size(); + // we are going to modify md_item so increment the revision right away. + _topics_map_revision++; + const model::partition_id p_id = ntp.tp.partition; // 1. reconcile the _topics state (the md_item object) and generate @@ -1196,7 +1213,9 @@ class topic_table::snapshot_applier { topic_metadata_item ret{topic_metadata{topic.metadata, {}}}; if (topic.disabled_set) { _disabled_partitions[ns_tp] = *topic.disabled_set; + _topics_map_revision++; } + for (const auto& [p_id, partition] : topic.partitions) { auto ntp = model::ntp(ns_tp.ns, ns_tp.tp, p_id); add_ntp(ntp, topic, partition, ret, false); @@ -1235,6 +1254,7 @@ ss::future<> topic_table::apply_snapshot( // The topic was re-created, delete and add it anew. co_await applier.delete_topic(ns_tp, md_item); md_item = co_await applier.create_topic(ns_tp, topic_snapshot); + _topics_map_revision++; } else { // The topic was present in the previous set, now we need to // reconcile individual partitions. @@ -1252,10 +1272,12 @@ ss::future<> topic_table::apply_snapshot( old_disabled_set = std::exchange( _disabled_partitions[ns_tp], *topic_snapshot.disabled_set); + _topics_map_revision++; } else if (auto it = _disabled_partitions.find(ns_tp); it != _disabled_partitions.end()) { old_disabled_set = std::move(it->second); _disabled_partitions.erase(it); + _topics_map_revision++; } // 2. For each partition in the new set, reconcile assignments @@ -1293,6 +1315,7 @@ ss::future<> topic_table::apply_snapshot( if (!topic_snapshot.partitions.contains(as_it_copy->id)) { applier.delete_ntp(ns_tp, *as_it_copy); md_item.get_assignments().erase(as_it_copy); + _topics_map_revision++; } co_await ss::coroutine::maybe_yield(); } @@ -1704,6 +1727,7 @@ void topic_table::change_partition_replicas( auto previous_assignment = current_assignment.replicas; // replace partition replica set current_assignment.replicas = new_assignment; + _topics_map_revision++; // calculate delta for backend diff --git a/src/v/cluster/topic_table.h b/src/v/cluster/topic_table.h index 776fcd3d2dca0..aac5fd5ad4470 100644 --- a/src/v/cluster/topic_table.h +++ b/src/v/cluster/topic_table.h @@ -94,22 +94,17 @@ class topic_table { // * partition::get_revision_id() // * raft::group_configuration::revision_id() - class concurrent_modification_error final : public std::exception { + class concurrent_modification_error final + : public ::concurrent_modification_error { public: concurrent_modification_error( model::revision_id initial_revision, model::revision_id current_revision) - : _msg(ssx::sformat( + : ::concurrent_modification_error(ssx::sformat( "Topic table was modified by concurrent fiber. " - "(initial_revision: " - "{}, current_revision: {}) ", + "(initial_revision: {}, current_revision: {}) ", initial_revision, current_revision)) {} - - const char* what() const noexcept final { return _msg.c_str(); } - - private: - ss::sstring _msg; }; class in_progress_update { @@ -679,8 +674,13 @@ class topic_table { updates_t _updates_in_progress; model::revision_id _last_applied_revision_id; - // Monotonic counter that is bumped for every addition/deletion to topics - // map. Unlike other revisions this does not correspond to the command + + // Monotonic counter that is bumped each time _topics, _disabled_partitions, + // or _updates_in_progress are modified in a way that makes iteration over + // them unsafe (i.e. invalidates iterators or references, including + // for nested collections like partition sets and replica sets). + // + // Unlike other revisions this does not correspond to the command // revision that updated the map. model::revision_id _topics_map_revision{0}; diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index 8d492010fdf82..a540a59346941 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -1014,7 +1014,7 @@ topics_frontend::partitions_with_lost_majority( co_return errc::concurrent_modification_error; } co_return result; - } catch (const topic_table::concurrent_modification_error& e) { + } catch (const concurrent_modification_error& e) { // state changed while generating the plan, force caller to retry; vlog( clusterlog.info, diff --git a/src/v/utils/exceptions.h b/src/v/utils/exceptions.h new file mode 100644 index 0000000000000..f8289206d2fa1 --- /dev/null +++ b/src/v/utils/exceptions.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +#pragma once + +#include "base/seastarx.h" + +#include + +#include + +/// Some objects reference state that changes comparatively rarely (e.g. +/// topic_table state) across yield points and expect these references to remain +/// valid. In case these references are invalidated by a concurrent fiber, this +/// exception is thrown. This is a signal for the caller to restart the +/// computation with up-to-date state. +class concurrent_modification_error : public std::exception { +public: + explicit concurrent_modification_error(ss::sstring s) + : _msg(std::move(s)) {} + + const char* what() const noexcept override { return _msg.c_str(); } + +private: + ss::sstring _msg; +}; diff --git a/src/v/utils/stable_iterator_adaptor.h b/src/v/utils/stable_iterator_adaptor.h index cfde5ec4d47ff..0e93cbace1227 100644 --- a/src/v/utils/stable_iterator_adaptor.h +++ b/src/v/utils/stable_iterator_adaptor.h @@ -11,20 +11,20 @@ #pragma once #include "base/seastarx.h" +#include "utils/exceptions.h" #include #include #include -#include -#include #include -class iterator_stability_violation : public std::runtime_error { +class iterator_stability_violation final + : public concurrent_modification_error { public: - explicit iterator_stability_violation(const std::string& why) - : std::runtime_error(why){}; + explicit iterator_stability_violation(ss::sstring why) + : concurrent_modification_error(std::move(why)){}; }; /*