Skip to content

Commit

Permalink
r/configuration: assigning initial revisions to nodes without revisions
Browse files Browse the repository at this point in the history
Implemented assigning initial revision id when old configuration format
is used by raft group.

Older raft::configuration didn't hold `raft::vnode` tuple. It was using
plain `model::node_id` instead. In current version we keep `raft::vnode`
in raft configuration.

When controller backend creates a topic it assign identity of a current
node recognized by this raft group. The `raft::vnode` is equal to
an offset of command that caused partition creation. The problem that we
experienced after updating from old raft configuration format to the new
one was caused by keeping nodes without revision in configuration but
assigning node revision when creating raft group. This made the nodes
hosting raft group not being part of its configured quorum.

Since redpanda now does not support raft configuration changes we can
safely modify configuration content to assign initial revisions to
nodes being part of raft group configuration.

Fixes: #501

Signed-off-by: Michal Maslanka <[email protected]>
  • Loading branch information
mmaslankaprv committed Jan 26, 2021
1 parent 9e98199 commit 62f9a8f
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 10 deletions.
49 changes: 47 additions & 2 deletions src/v/raft/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <iterator>
#include <optional>
#include <utility>
#include <vector>

namespace raft {
bool group_nodes::contains(vnode id) const {
Expand Down Expand Up @@ -326,6 +327,50 @@ void group_configuration::update(model::broker broker) {
*it = std::move(broker);
}

std::vector<vnode> with_revisions_assigned(
const std::vector<vnode>& vnodes, model::revision_id new_revision) {
std::vector<vnode> with_rev;
with_rev.reserve(vnodes.size());

std::transform(
vnodes.cbegin(),
vnodes.cend(),
std::back_inserter(with_rev),
[new_revision](const vnode& n) {
vassert(
n.revision() == no_revision,
"changing revision of nodes with current revision set should never "
"happen, current revision: {}",
n.revision());
return vnode(n.id(), new_revision);
});

return with_rev;
}

bool have_no_revision(const std::vector<vnode>& vnodes) {
return !vnodes.empty() && vnodes.begin()->revision() == no_revision;
}

void group_configuration::maybe_set_initial_revision(
model::revision_id new_rev) {
group_nodes new_current;
// if configuration have no revision assigned, fix it
if (
have_no_revision(_current.learners)
|| have_no_revision(_current.voters)) {
// current configuration
_current.voters = with_revisions_assigned(_current.voters, new_rev);
_current.learners = with_revisions_assigned(_current.learners, new_rev);

// old configuration
if (_old) {
_old->voters = with_revisions_assigned(_old->voters, new_rev);
_old->learners = with_revisions_assigned(_old->learners, new_rev);
}
}
}

std::ostream& operator<<(std::ostream& o, const group_configuration& c) {
fmt::print(
o,
Expand Down Expand Up @@ -375,7 +420,7 @@ std::vector<raft::vnode> make_vnodes(const std::vector<model::node_id> ids) {
ret.reserve(ids.size());
std::transform(
ids.begin(), ids.end(), std::back_inserter(ret), [](model::node_id id) {
return raft::vnode(id, model::revision_id(0));
return raft::vnode(id, raft::no_revision);
});
return ret;
}
Expand Down Expand Up @@ -443,7 +488,7 @@ adl<raft::group_configuration>::from(iobuf_parser& p) {
old = old_v0->to_v2();
}
}
model::revision_id revision{0};
model::revision_id revision = raft::no_revision;
if (version > 0) {
revision = adl<model::revision_id>{}.from(p);
}
Expand Down
9 changes: 9 additions & 0 deletions src/v/raft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

namespace raft {

static constexpr model::revision_id no_revision{};
class vnode {
public:
constexpr vnode() = default;
Expand Down Expand Up @@ -180,6 +181,14 @@ class group_configuration final {
void promote_to_voter(vnode id);
model::revision_id revision_id() const { return _revision; }

/**
* Used to set initial revision for old configuration vnodes to maintain
* backward compatibility.
*
* IMPORTANT: may be removed in future versions
*/
void maybe_set_initial_revision(model::revision_id r);

friend bool
operator==(const group_configuration&, const group_configuration&);

Expand Down
14 changes: 11 additions & 3 deletions src/v/raft/configuration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@ ss::future<> configuration_manager::stop() {
return ss::now();
}

ss::future<> configuration_manager::start(bool reset) {
ss::future<>
configuration_manager::start(bool reset, model::revision_id initial_revision) {
if (reset) {
return _storage.kvs()
.remove(
Expand All @@ -243,7 +244,9 @@ ss::future<> configuration_manager::start(bool reset) {

auto map_buf = _storage.kvs().get(
storage::kvstore::key_space::consensus, configurations_map_key());
return _lock.with([this, map_buf = std::move(map_buf)]() mutable {
return _lock.with([this,
map_buf = std::move(map_buf),
initial_revision]() mutable {
auto f = ss::now();

if (map_buf) {
Expand All @@ -267,7 +270,12 @@ ss::future<> configuration_manager::start(bool reset) {
_highest_known_offset = std::max(_highest_known_offset, offset);
});
}
return f;

return f.then([this, initial_revision] {
for (auto& [o, cfg] : _configurations) {
cfg.maybe_set_initial_revision(initial_revision);
}
});
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/raft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/record_batch_reader.h"
#include "raft/logger.h"
#include "raft/types.h"
Expand Down Expand Up @@ -51,7 +52,7 @@ class configuration_manager {
configuration_manager(
group_configuration, raft::group_id, storage::api&, ctx_log&);

ss::future<> start(bool reset);
ss::future<> start(bool reset, model::revision_id);

ss::future<> stop();
/**
Expand Down
3 changes: 2 additions & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,8 @@ ss::future<> consensus::start() {
return _op_lock.with([this] {
read_voted_for();

return _configuration_manager.start(is_initial_state())
return _configuration_manager
.start(is_initial_state(), _self.revision())
.then([this] { return hydrate_snapshot(); })
.then([this] {
vlog(
Expand Down
31 changes: 29 additions & 2 deletions src/v/raft/tests/configuration_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <boost/test/tools/old/interface.hpp>

#include <chrono>
#include <optional>
#include <vector>

using namespace std::chrono_literals; // NOLINT
Expand Down Expand Up @@ -103,7 +104,7 @@ struct config_manager_fixture {
_storage,
_logger);

recovered.start(false).get0();
recovered.start(false, model::revision_id(0)).get0();

BOOST_REQUIRE_EQUAL(
recovered.get_highest_known_offset(),
Expand Down Expand Up @@ -220,7 +221,7 @@ FIXTURE_TEST(test_start_write_concurrency, config_manager_fixture) {
_storage,
_logger);

auto start = new_cfg_manager.start(false);
auto start = new_cfg_manager.start(false, model::revision_id(0));
auto cfg = random_configuration();
auto add = new_cfg_manager.add(model::offset(3000), cfg);
configurations.push_back(cfg);
Expand All @@ -237,3 +238,29 @@ FIXTURE_TEST(test_start_write_concurrency, config_manager_fixture) {
BOOST_REQUIRE_EQUAL(
new_cfg_manager.get_highest_known_offset(), model::offset(3000));
}

FIXTURE_TEST(test_assigning_initial_revision, config_manager_fixture) {
// store some configurations
auto configurations = test_configurations();
model::revision_id new_revision(10);

raft::configuration_manager mgr(
raft::group_configuration(
{tests::random_broker(0, 0), tests::random_broker(1, 1)},
raft::group_nodes{
.voters = {raft::vnode(model::node_id(0), raft::no_revision)},
.learners = {raft::vnode(model::node_id(1), raft::no_revision)},
},
raft::no_revision,
std::nullopt),
raft::group_id(100),
_storage,
_logger);

mgr.start(false, new_revision).get0();
std::cout << mgr.get_latest() << std::endl;
BOOST_REQUIRE(
mgr.get_latest().contains(raft::vnode(model::node_id(0), new_revision)));
BOOST_REQUIRE(
mgr.get_latest().contains(raft::vnode(model::node_id(1), new_revision)));
}
2 changes: 1 addition & 1 deletion src/v/raft/tests/configuration_serialization_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ SEASTAR_THREAD_TEST_CASE(configuration_backward_compatibility_test) {
cfg_v2.current_config().voters[0].id(),
cfg_v3.current_config().voters[0].id());

BOOST_REQUIRE_EQUAL(cfg_v0.revision_id(), model::revision_id(0));
BOOST_REQUIRE_EQUAL(cfg_v0.revision_id(), raft::no_revision);
BOOST_REQUIRE_EQUAL(cfg_v1.revision_id(), model::revision_id(15));
BOOST_REQUIRE_EQUAL(cfg_v2.revision_id(), model::revision_id(15));
BOOST_REQUIRE_EQUAL(cfg_v3.revision_id(), model::revision_id(15));
Expand Down

0 comments on commit 62f9a8f

Please sign in to comment.