Skip to content

Commit

Permalink
Merge pull request #504 from mmaslankaprv/fix-recovering-revisionless…
Browse files Browse the repository at this point in the history
…-configurations

r/configuration: assigning initial revisions to nodes without revisions
  • Loading branch information
mmaslankaprv authored Jan 26, 2021
2 parents a243db7 + 62f9a8f commit d698a3b
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 10 deletions.
49 changes: 47 additions & 2 deletions src/v/raft/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <iterator>
#include <optional>
#include <utility>
#include <vector>

namespace raft {
bool group_nodes::contains(vnode id) const {
Expand Down Expand Up @@ -326,6 +327,50 @@ void group_configuration::update(model::broker broker) {
*it = std::move(broker);
}

std::vector<vnode> with_revisions_assigned(
const std::vector<vnode>& vnodes, model::revision_id new_revision) {
std::vector<vnode> with_rev;
with_rev.reserve(vnodes.size());

std::transform(
vnodes.cbegin(),
vnodes.cend(),
std::back_inserter(with_rev),
[new_revision](const vnode& n) {
vassert(
n.revision() == no_revision,
"changing revision of nodes with current revision set should never "
"happen, current revision: {}",
n.revision());
return vnode(n.id(), new_revision);
});

return with_rev;
}

bool have_no_revision(const std::vector<vnode>& vnodes) {
return !vnodes.empty() && vnodes.begin()->revision() == no_revision;
}

void group_configuration::maybe_set_initial_revision(
model::revision_id new_rev) {
group_nodes new_current;
// if configuration have no revision assigned, fix it
if (
have_no_revision(_current.learners)
|| have_no_revision(_current.voters)) {
// current configuration
_current.voters = with_revisions_assigned(_current.voters, new_rev);
_current.learners = with_revisions_assigned(_current.learners, new_rev);

// old configuration
if (_old) {
_old->voters = with_revisions_assigned(_old->voters, new_rev);
_old->learners = with_revisions_assigned(_old->learners, new_rev);
}
}
}

std::ostream& operator<<(std::ostream& o, const group_configuration& c) {
fmt::print(
o,
Expand Down Expand Up @@ -375,7 +420,7 @@ std::vector<raft::vnode> make_vnodes(const std::vector<model::node_id> ids) {
ret.reserve(ids.size());
std::transform(
ids.begin(), ids.end(), std::back_inserter(ret), [](model::node_id id) {
return raft::vnode(id, model::revision_id(0));
return raft::vnode(id, raft::no_revision);
});
return ret;
}
Expand Down Expand Up @@ -443,7 +488,7 @@ adl<raft::group_configuration>::from(iobuf_parser& p) {
old = old_v0->to_v2();
}
}
model::revision_id revision{0};
model::revision_id revision = raft::no_revision;
if (version > 0) {
revision = adl<model::revision_id>{}.from(p);
}
Expand Down
9 changes: 9 additions & 0 deletions src/v/raft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

namespace raft {

static constexpr model::revision_id no_revision{};
class vnode {
public:
constexpr vnode() = default;
Expand Down Expand Up @@ -180,6 +181,14 @@ class group_configuration final {
void promote_to_voter(vnode id);
model::revision_id revision_id() const { return _revision; }

/**
* Used to set initial revision for old configuration vnodes to maintain
* backward compatibility.
*
* IMPORTANT: may be removed in future versions
*/
void maybe_set_initial_revision(model::revision_id r);

friend bool
operator==(const group_configuration&, const group_configuration&);

Expand Down
14 changes: 11 additions & 3 deletions src/v/raft/configuration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ ss::future<> configuration_manager::stop() {
return ss::now();
}

ss::future<> configuration_manager::start(bool reset) {
ss::future<>
configuration_manager::start(bool reset, model::revision_id initial_revision) {
if (reset) {
return _storage.kvs()
.remove(
Expand All @@ -243,7 +244,9 @@ ss::future<> configuration_manager::start(bool reset) {

auto map_buf = _storage.kvs().get(
storage::kvstore::key_space::consensus, configurations_map_key());
return _lock.with([this, map_buf = std::move(map_buf)]() mutable {
return _lock.with([this,
map_buf = std::move(map_buf),
initial_revision]() mutable {
auto f = ss::now();

if (map_buf) {
Expand All @@ -267,7 +270,12 @@ ss::future<> configuration_manager::start(bool reset) {
_highest_known_offset = std::max(_highest_known_offset, offset);
});
}
return f;

return f.then([this, initial_revision] {
for (auto& [o, cfg] : _configurations) {
cfg.maybe_set_initial_revision(initial_revision);
}
});
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/raft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record_batch_reader.h"
#include "raft/logger.h"
#include "raft/types.h"
Expand Down Expand Up @@ -51,7 +52,7 @@ class configuration_manager {
configuration_manager(
group_configuration, raft::group_id, storage::api&, ctx_log&);

ss::future<> start(bool reset);
ss::future<> start(bool reset, model::revision_id);

ss::future<> stop();
/**
Expand Down
3 changes: 2 additions & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,8 @@ ss::future<> consensus::start() {
return _op_lock.with([this] {
read_voted_for();

return _configuration_manager.start(is_initial_state())
return _configuration_manager
.start(is_initial_state(), _self.revision())
.then([this] { return hydrate_snapshot(); })
.then([this] {
vlog(
Expand Down
31 changes: 29 additions & 2 deletions src/v/raft/tests/configuration_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <boost/test/tools/old/interface.hpp>

#include <chrono>
#include <optional>
#include <vector>

using namespace std::chrono_literals; // NOLINT
Expand Down Expand Up @@ -103,7 +104,7 @@ struct config_manager_fixture {
_storage,
_logger);

recovered.start(false).get0();
recovered.start(false, model::revision_id(0)).get0();

BOOST_REQUIRE_EQUAL(
recovered.get_highest_known_offset(),
Expand Down Expand Up @@ -220,7 +221,7 @@ FIXTURE_TEST(test_start_write_concurrency, config_manager_fixture) {
_storage,
_logger);

auto start = new_cfg_manager.start(false);
auto start = new_cfg_manager.start(false, model::revision_id(0));
auto cfg = random_configuration();
auto add = new_cfg_manager.add(model::offset(3000), cfg);
configurations.push_back(cfg);
Expand All @@ -237,3 +238,29 @@ FIXTURE_TEST(test_start_write_concurrency, config_manager_fixture) {
BOOST_REQUIRE_EQUAL(
new_cfg_manager.get_highest_known_offset(), model::offset(3000));
}

FIXTURE_TEST(test_assigning_initial_revision, config_manager_fixture) {
// store some configurations
auto configurations = test_configurations();
model::revision_id new_revision(10);

raft::configuration_manager mgr(
raft::group_configuration(
{tests::random_broker(0, 0), tests::random_broker(1, 1)},
raft::group_nodes{
.voters = {raft::vnode(model::node_id(0), raft::no_revision)},
.learners = {raft::vnode(model::node_id(1), raft::no_revision)},
},
raft::no_revision,
std::nullopt),
raft::group_id(100),
_storage,
_logger);

mgr.start(false, new_revision).get0();
std::cout << mgr.get_latest() << std::endl;
BOOST_REQUIRE(
mgr.get_latest().contains(raft::vnode(model::node_id(0), new_revision)));
BOOST_REQUIRE(
mgr.get_latest().contains(raft::vnode(model::node_id(1), new_revision)));
}
2 changes: 1 addition & 1 deletion src/v/raft/tests/configuration_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ SEASTAR_THREAD_TEST_CASE(configuration_backward_compatibility_test) {
cfg_v2.current_config().voters[0].id(),
cfg_v3.current_config().voters[0].id());

BOOST_REQUIRE_EQUAL(cfg_v0.revision_id(), model::revision_id(0));
BOOST_REQUIRE_EQUAL(cfg_v0.revision_id(), raft::no_revision);
BOOST_REQUIRE_EQUAL(cfg_v1.revision_id(), model::revision_id(15));
BOOST_REQUIRE_EQUAL(cfg_v2.revision_id(), model::revision_id(15));
BOOST_REQUIRE_EQUAL(cfg_v3.revision_id(), model::revision_id(15));
Expand Down

0 comments on commit d698a3b

Please sign in to comment.