Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add several new tests #1

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Next Next commit
several tests added in recent years
  • Loading branch information
gloriallluo committed May 17, 2023
commit c7c5f29d263e8bc589589a9fd7186d033399f478
144 changes: 107 additions & 37 deletions src/kvraft/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures::{future, select, FutureExt};
use madsim::{
rand::{self, Rng, SliceRandom},
task,
time::{self, Duration},
time::{self, Duration, Instant},
};
use std::sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -65,10 +65,12 @@ fn check_concurrent_appends(v: &str, counts: &[usize]) {
async fn generic_test(
part: &str,
nclients: usize,
nservers: usize,
unreliable: bool,
crash: bool,
partitions: bool,
maxraftstate: Option<usize>,
randomkeys: bool,
) {
let mut title = "Test: ".to_owned();
if unreliable {
Expand All @@ -86,15 +88,17 @@ async fn generic_test(
if maxraftstate.is_some() {
title += "snapshots, ";
}
if randomkeys {
title += "random keys, ";
}
if nclients > 1 {
title += "many clients";
} else {
title += "one client";
}
info!("{} ({})", title, part);

const NSERVERS: usize = 5;
let t = Arc::new(Tester::new(NSERVERS, unreliable, maxraftstate).await);
let t = Arc::new(Tester::new(nservers, unreliable, maxraftstate).await);

let ck = t.make_client(&t.all());

Expand All @@ -110,21 +114,34 @@ async fn generic_test(
// TODO: change the closure to a future.
let mut j = 0;
let mut rng = rand::rng();
let mut last = String::new();
let key = format!("{}", cli);
ck.put(&key, &last).await;
let mut last = String::new(); // only used when not randomkeys
let mut key = format!("{}", cli);
if !randomkeys {
ck.put(&key, &last).await;
}
while !done.load(Ordering::Relaxed) {
if randomkeys {
key = format!("{}", rng.gen_range(0..nclients));
}
let nv = format!("x {} {} y", cli, j);
if rng.gen_bool(0.5) {
let nv = format!("x {} {} y", cli, j);
debug!("{}: client new append {:?}", cli, nv);
// predict effect of append(k, val) if old value is prev.
last += &nv;
ck.append(&key, &nv).await;
if !randomkeys {
last += &nv;
}
j += 1;
} else if randomkeys && rng.gen_bool(0.1) {
// we only do this when using random keys, because it would break the
// check done after Get() operations.
ck.put(&key, &nv).await;
} else {
debug!("{}: client new get {:?}", cli, key);
let v = ck.get(&key).await;
assert_eq!(v, last, "get wrong value, key {:?}", key);
if !randomkeys {
assert_eq!(v, last, "get wrong value, key {:?}", key);
}
}
}
j
Expand Down Expand Up @@ -174,15 +191,15 @@ async fn generic_test(

if crash {
debug!("shutdown servers");
for i in 0..NSERVERS {
for i in 0..nservers {
t.shutdown_server(i);
}
// Wait for a while for servers to shutdown, since
// shutdown isn't a real crash and isn't instantaneous
time::sleep(RAFT_ELECTION_TIMEOUT).await;
debug!("restart servers");
// crash and re-start all
for i in 0..NSERVERS {
for i in 0..nservers {
t.start_server(i).await;
}
t.connect_all();
Expand All @@ -201,7 +218,9 @@ async fn generic_test(
let key = format!("{}", i);
debug!("Check {:?} for client {}", j, i);
let v = ck.get(&key).await;
check_clnt_appends(i, &v, j);
if !randomkeys {
check_clnt_appends(i, &v, j);
}
}

if let Some(maxraftstate) = maxraftstate {
Expand All @@ -216,25 +235,71 @@ async fn generic_test(
}
}

// TODO linearizable check

t.end();
}

/// Check that ops are committed fast enough,
/// better than 1 per heartbeat interval
async fn generic_test_speed(part: &str, maxraftstate: Option<usize>) {
const NSERVERS: usize = 3;
const NUM_OPS: u32 = 1000;

info!("Test: ops complete fast enough ({})", part);

let t = Tester::new(NSERVERS, false, maxraftstate).await;
let ck = t.make_client(&t.all());

// wait until first op completes, so we know a leader is elected and
// KV servers are ready to process client requests.
ck.get("x").await;

let start = Instant::now();
for i in 0..NUM_OPS {
let value = format!("x 0 {} y", i);
ck.append("x", &value).await;
}
let dur = start.elapsed();

let v = ck.get("x").await;
check_clnt_appends(0, &v, NUM_OPS as _);

// heartbeat interval should be ~100ms, require at least 3 ops per heartbeat
const HEARTBEAT_INTERVAL: Duration = Duration::from_millis(100);
const OPS_PER_INTERVAL: u32 = 3;
let time_per_op = HEARTBEAT_INTERVAL / OPS_PER_INTERVAL;
assert!(
dur <= NUM_OPS * time_per_op,
"Operations completed too slowly {:?}/op > {:?}/op",
dur / NUM_OPS,
time_per_op
);

t.end();
}

#[madsim::test]
async fn basic_3a() {
// Test: one client (3A) ...
generic_test("3A", 1, false, false, false, None).await;
generic_test("3A", 1, 5, false, false, false, None, false).await;
}

#[madsim::test]
async fn concurrent_3a() {
// Test: many clients (3A) ...
generic_test("3A", 5, false, false, false, None).await;
generic_test("3A", 5, 5, false, false, false, None, false).await;
}

#[madsim::test]
async fn speed_3a() {
generic_test_speed("3A", None).await;
}

#[madsim::test]
async fn unreliable_3a() {
// Test: unreliable net, many clients (3A) ...
generic_test("3A", 5, true, false, false, None).await;
generic_test("3A", 5, 5, true, false, false, None, false).await;
}

#[madsim::test]
Expand Down Expand Up @@ -344,50 +409,50 @@ async fn one_partition_3a() {
#[madsim::test]
async fn many_partitions_one_client_3a() {
// Test: partitions, one client (3A) ...
generic_test("3A", 1, false, false, true, None).await;
generic_test("3A", 1, 5, false, false, true, None, false).await;
}

#[madsim::test]
async fn many_partitions_many_clients_3a() {
// Test: partitions, many clients (3A) ...
generic_test("3A", 5, false, false, true, None).await;
generic_test("3A", 5, 5, false, false, true, None, false).await;
}

#[madsim::test]
async fn persist_one_client_3a() {
// Test: restarts, one client (3A) ...
generic_test("3A", 1, false, true, false, None).await;
generic_test("3A", 1, 5, false, true, false, None, false).await;
}

#[madsim::test]
async fn persist_concurrent_3a() {
// Test: restarts, many clients (3A) ...
generic_test("3A", 5, false, true, false, None).await;
generic_test("3A", 5, 5, false, true, false, None, false).await;
}

#[madsim::test]
async fn persist_concurrent_unreliable_3a() {
// Test: unreliable net, restarts, many clients (3A) ...
generic_test("3A", 5, true, true, false, None).await;
generic_test("3A", 5, 5, true, true, false, None, false).await;
}

#[madsim::test]
async fn persist_partition_3a() {
// Test: restarts, partitions, many clients (3A) ...
generic_test("3A", 5, false, true, true, None).await;
generic_test("3A", 5, 5, false, true, true, None, false).await;
}

#[madsim::test]
async fn persist_partition_unreliable_3a() {
// Test: unreliable net, restarts, partitions, many clients (3A) ...
generic_test("3A", 5, true, true, true, None).await;
generic_test("3A", 5, 5, true, true, true, None, false).await;
}

// #[madsim::test]
// async fn persist_partition_unreliable_linearizable_3a() {
// // Test: unreliable net, restarts, partitions, linearizability checks (3A) ...
// generic_test_linearizability("3A", 15, 7, true, true, true, None)
// }
#[madsim::test]
async fn persist_partition_unreliable_linearizable_3a() {
// Test: unreliable net, restarts, partitions, linearizability checks (3A) ...
generic_test("3A", 15, 7, true, true, true, None, true).await;
}

// if one server falls behind, then rejoins, does it
// recover by using the InstallSnapshot RPC?
Expand Down Expand Up @@ -491,38 +556,43 @@ async fn snapshot_size_3b() {
t.end();
}

#[madsim::test]
async fn speed_3b() {
generic_test_speed("3B", Some(1000)).await;
}

#[madsim::test]
async fn snapshot_recover_3b() {
// Test: restarts, snapshots, one client (3B) ...
generic_test("3B", 1, false, true, false, Some(1000)).await;
generic_test("3B", 1, 5, false, true, false, Some(1000), false).await;
}

#[madsim::test]
async fn snapshot_recover_many_clients_3b() {
// Test: restarts, snapshots, many clients (3B) ...
generic_test("3B", 20, false, true, false, Some(1000)).await;
generic_test("3B", 20, 5, false, true, false, Some(1000), false).await;
}

#[madsim::test]
async fn snapshot_unreliable_3b() {
// Test: unreliable net, snapshots, many clients (3B) ...
generic_test("3B", 5, true, false, false, Some(1000)).await;
generic_test("3B", 5, 5, true, false, false, Some(1000), false).await;
}

#[madsim::test]
async fn snapshot_unreliable_recover_3b() {
// Test: unreliable net, restarts, snapshots, many clients (3B) ...
generic_test("3B", 5, true, true, false, Some(1000)).await;
generic_test("3B", 5, 5, true, true, false, Some(1000), false).await;
}

#[madsim::test]
async fn snapshot_unreliable_recover_concurrent_partition_3b() {
// Test: unreliable net, restarts, partitions, snapshots, many clients (3B) ...
generic_test("3B", 5, true, true, true, Some(1000)).await;
generic_test("3B", 5, 5, true, true, true, Some(1000), false).await;
}

// #[madsim::test]
// async fn snapshot_unreliable_recover_concurrent_partition_linearizable_3b() {
// // Test: unreliable net, restarts, partitions, snapshots, linearizability checks (3B) ...
// generic_test_linearizability("3B", 15, 7, true, true, true, Some(1000)).await;
// }
#[madsim::test]
async fn snapshot_unreliable_recover_concurrent_partition_linearizable_3b() {
// Test: unreliable net, restarts, partitions, snapshots, linearizability checks (3B) ...
generic_test("3B", 15, 7, true, true, true, Some(1000), true).await;
}
19 changes: 12 additions & 7 deletions src/raft/tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ impl RaftTester {
self.storage.n_committed(index)
}

pub async fn start(&self, i: usize, cmd: Entry) -> Result<Start> {
pub async fn start<C>(&self, i: usize, cmd: C) -> Result<Start>
where
C: 'static + Send + Sync + Serialize,
{
let raft = self.rafts.lock().unwrap()[i].as_ref().unwrap().clone();
self.handle
.local_handle(self.addrs[i])
Expand Down Expand Up @@ -358,9 +361,10 @@ impl RaftTester {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct Entry {
pub x: u64,
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Entry {
X(u64),
Str(String),
}

#[derive(Clone)]
Expand Down Expand Up @@ -405,8 +409,9 @@ impl StorageHandle {
fn n_committed(&self, index: u64) -> (usize, Option<Entry>) {
let mut count = 0;
let mut cmd = None;
for log in self.logs.lock().unwrap().iter() {
if let Some(&Some(cmd1)) = log.get(index as usize) {
let logs = self.logs.lock().unwrap();
for log in logs.iter() {
if let Some(Some(cmd1)) = &log.get(index as usize) {
if let Some(cmd) = cmd {
assert_eq!(
cmd, cmd1,
Expand All @@ -418,7 +423,7 @@ impl StorageHandle {
cmd = Some(cmd1);
}
}
(count, cmd)
(count, cmd.cloned())
}

fn max_index(&self) -> usize {
Expand Down
Loading