diff --git a/src/v/raft/configuration.cc b/src/v/raft/configuration.cc index ac1bf5133f0ea..70c800b0fd15b 100644 --- a/src/v/raft/configuration.cc +++ b/src/v/raft/configuration.cc @@ -21,6 +21,7 @@ #include #include #include +#include namespace raft { bool group_nodes::contains(vnode id) const { @@ -326,6 +327,50 @@ void group_configuration::update(model::broker broker) { *it = std::move(broker); } +std::vector with_revisions_assigned( + const std::vector& vnodes, model::revision_id new_revision) { + std::vector with_rev; + with_rev.reserve(vnodes.size()); + + std::transform( + vnodes.cbegin(), + vnodes.cend(), + std::back_inserter(with_rev), + [new_revision](const vnode& n) { + vassert( + n.revision() == no_revision, + "changing revision of nodes with current revision set should never " + "happen, current revision: {}", + n.revision()); + return vnode(n.id(), new_revision); + }); + + return with_rev; +} + +bool have_no_revision(const std::vector& vnodes) { + return !vnodes.empty() && vnodes.begin()->revision() == no_revision; +} + +void group_configuration::maybe_set_initial_revision( + model::revision_id new_rev) { + group_nodes new_current; + // if configuration have no revision assigned, fix it + if ( + have_no_revision(_current.learners) + || have_no_revision(_current.voters)) { + // current configuration + _current.voters = with_revisions_assigned(_current.voters, new_rev); + _current.learners = with_revisions_assigned(_current.learners, new_rev); + + // old configuration + if (_old) { + _old->voters = with_revisions_assigned(_old->voters, new_rev); + _old->learners = with_revisions_assigned(_old->learners, new_rev); + } + } +} + std::ostream& operator<<(std::ostream& o, const group_configuration& c) { fmt::print( o, @@ -375,7 +420,7 @@ std::vector make_vnodes(const std::vector ids) { ret.reserve(ids.size()); std::transform( ids.begin(), ids.end(), std::back_inserter(ret), [](model::node_id id) { - return raft::vnode(id, model::revision_id(0)); + return raft::vnode(id, raft::no_revision); }); return ret; } @@ -443,7 +488,7 @@ adl::from(iobuf_parser& p) { old = old_v0->to_v2(); } } - model::revision_id revision{0}; + model::revision_id revision = raft::no_revision; if (version > 0) { revision = adl{}.from(p); } diff --git a/src/v/raft/configuration.h b/src/v/raft/configuration.h index bf34c44ad1a0e..18e8096d43e34 100644 --- a/src/v/raft/configuration.h +++ b/src/v/raft/configuration.h @@ -23,6 +23,7 @@ namespace raft { +static constexpr model::revision_id no_revision{}; class vnode { public: constexpr vnode() = default; @@ -180,6 +181,14 @@ class group_configuration final { void promote_to_voter(vnode id); model::revision_id revision_id() const { return _revision; } + /** + * Used to set initial revision for old configuration vnodes to maintain + * backward compatibility. + * + * IMPORTANT: may be removed in future versions + */ + void maybe_set_initial_revision(model::revision_id r); + friend bool operator==(const group_configuration&, const group_configuration&); diff --git a/src/v/raft/configuration_manager.cc b/src/v/raft/configuration_manager.cc index b8b2b1513bd6f..463775231fb4a 100644 --- a/src/v/raft/configuration_manager.cc +++ b/src/v/raft/configuration_manager.cc @@ -229,7 +229,8 @@ ss::future<> configuration_manager::stop() { return ss::now(); } -ss::future<> configuration_manager::start(bool reset) { +ss::future<> +configuration_manager::start(bool reset, model::revision_id initial_revision) { if (reset) { return _storage.kvs() .remove( @@ -243,7 +244,9 @@ ss::future<> configuration_manager::start(bool reset) { auto map_buf = _storage.kvs().get( storage::kvstore::key_space::consensus, configurations_map_key()); - return _lock.with([this, map_buf = std::move(map_buf)]() mutable { + return _lock.with([this, + map_buf = std::move(map_buf), + initial_revision]() mutable { auto f = ss::now(); if (map_buf) { @@ -267,7 +270,12 @@ ss::future<> configuration_manager::start(bool reset) { _highest_known_offset = std::max(_highest_known_offset, offset); }); } - return f; + + return f.then([this, initial_revision] { + for (auto& [o, cfg] : _configurations) { + cfg.maybe_set_initial_revision(initial_revision); + } + }); }); } diff --git a/src/v/raft/configuration_manager.h b/src/v/raft/configuration_manager.h index ebd3834d52aa7..cf3fa1ce94707 100644 --- a/src/v/raft/configuration_manager.h +++ b/src/v/raft/configuration_manager.h @@ -12,6 +12,7 @@ #pragma once #include "model/fundamental.h" +#include "model/metadata.h" #include "model/record_batch_reader.h" #include "raft/logger.h" #include "raft/types.h" @@ -51,7 +52,7 @@ class configuration_manager { configuration_manager( group_configuration, raft::group_id, storage::api&, ctx_log&); - ss::future<> start(bool reset); + ss::future<> start(bool reset, model::revision_id); ss::future<> stop(); /** diff --git a/src/v/raft/consensus.cc b/src/v/raft/consensus.cc index 260de3b8f0ac7..5715f8d60d121 100644 --- a/src/v/raft/consensus.cc +++ b/src/v/raft/consensus.cc @@ -750,7 +750,8 @@ ss::future<> consensus::start() { return _op_lock.with([this] { read_voted_for(); - return _configuration_manager.start(is_initial_state()) + return _configuration_manager + .start(is_initial_state(), _self.revision()) .then([this] { return hydrate_snapshot(); }) .then([this] { vlog( diff --git a/src/v/raft/tests/configuration_manager_test.cc b/src/v/raft/tests/configuration_manager_test.cc index 971abede05a58..c98f4fbbad657 100644 --- a/src/v/raft/tests/configuration_manager_test.cc +++ b/src/v/raft/tests/configuration_manager_test.cc @@ -27,6 +27,7 @@ #include #include +#include #include using namespace std::chrono_literals; // NOLINT @@ -103,7 +104,7 @@ struct config_manager_fixture { _storage, _logger); - recovered.start(false).get0(); + recovered.start(false, model::revision_id(0)).get0(); BOOST_REQUIRE_EQUAL( recovered.get_highest_known_offset(), @@ -220,7 +221,7 @@ FIXTURE_TEST(test_start_write_concurrency, config_manager_fixture) { _storage, _logger); - auto start = new_cfg_manager.start(false); + auto start = new_cfg_manager.start(false, model::revision_id(0)); auto cfg = random_configuration(); auto add = new_cfg_manager.add(model::offset(3000), cfg); configurations.push_back(cfg); @@ -237,3 +238,29 @@ FIXTURE_TEST(test_start_write_concurrency, config_manager_fixture) { BOOST_REQUIRE_EQUAL( new_cfg_manager.get_highest_known_offset(), model::offset(3000)); } + +FIXTURE_TEST(test_assigning_initial_revision, config_manager_fixture) { + // store some configurations + auto configurations = test_configurations(); + model::revision_id new_revision(10); + + raft::configuration_manager mgr( + raft::group_configuration( + {tests::random_broker(0, 0), tests::random_broker(1, 1)}, + raft::group_nodes{ + .voters = {raft::vnode(model::node_id(0), raft::no_revision)}, + .learners = {raft::vnode(model::node_id(1), raft::no_revision)}, + }, + raft::no_revision, + std::nullopt), + raft::group_id(100), + _storage, + _logger); + + mgr.start(false, new_revision).get0(); + std::cout << mgr.get_latest() << std::endl; + BOOST_REQUIRE( + mgr.get_latest().contains(raft::vnode(model::node_id(0), new_revision))); + BOOST_REQUIRE( + mgr.get_latest().contains(raft::vnode(model::node_id(1), new_revision))); +} diff --git a/src/v/raft/tests/configuration_serialization_test.cc b/src/v/raft/tests/configuration_serialization_test.cc index 6e5343708857f..bffecf2125061 100644 --- a/src/v/raft/tests/configuration_serialization_test.cc +++ b/src/v/raft/tests/configuration_serialization_test.cc @@ -327,7 +327,7 @@ SEASTAR_THREAD_TEST_CASE(configuration_backward_compatibility_test) { cfg_v2.current_config().voters[0].id(), cfg_v3.current_config().voters[0].id()); - BOOST_REQUIRE_EQUAL(cfg_v0.revision_id(), model::revision_id(0)); + BOOST_REQUIRE_EQUAL(cfg_v0.revision_id(), raft::no_revision); BOOST_REQUIRE_EQUAL(cfg_v1.revision_id(), model::revision_id(15)); BOOST_REQUIRE_EQUAL(cfg_v2.revision_id(), model::revision_id(15)); BOOST_REQUIRE_EQUAL(cfg_v3.revision_id(), model::revision_id(15));