diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 0000000000..d9bf9eaa5e --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,4 @@ +# The restart tests run an entire sequencing network, and so are quite resource intensive. +[[profile.default.overrides]] +filter = 'test(slow_test_restart)' +threads-required = 'num-cpus' diff --git a/.github/workflows/slowtest.yaml b/.github/workflows/slowtest.yaml index 1653faa9da..c8aa2948ea 100644 --- a/.github/workflows/slowtest.yaml +++ b/.github/workflows/slowtest.yaml @@ -15,7 +15,7 @@ on: concurrency: group: ${{ github.workflow }}-${{ github.ref }} - cancel-in-progress: ${{ !contains(github.ref, 'main')}} + cancel-in-progress: false env: RUSTFLAGS: '--cfg async_executor_impl="async-std" --cfg async_channel_impl="async-std"' @@ -53,12 +53,12 @@ jobs: cargo nextest run --locked --release --workspace --all-features --no-run timeout-minutes: 90 - - name: SlowEst + - name: Slow tests env: CARGO_TERM_COLOR: always # Build test binary with `testing` feature, which requires `hotshot_example` config run: | export RUSTFLAGS="$RUSTFLAGS --cfg hotshot_example" export PATH="$PWD/target/release:$PATH" - cargo nextest run --locked --release --workspace --all-features --verbose -E 'test(slow_)' - timeout-minutes: 25 + cargo nextest run --locked --release --workspace --all-features --verbose -E 'test(slow_)' --nocapture + timeout-minutes: 120 diff --git a/Cargo.lock b/Cargo.lock index 30cb704ba6..b592b43914 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1487,34 +1487,10 @@ dependencies = [ [[package]] name = "cdn-broker" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.3#26a7b21372c4b38dca2f4aecb4267b271d1ac883" +source = "git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown#93eb15bc5e1a77ca0d89aa40b67c66b81bb8bdef" dependencies = [ "async-std", - "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.3)", - "clap", - "console-subscriber 0.3.0", - "dashmap", - "derivative", - "jf-signature", - "lazy_static", - "local-ip-address", - "parking_lot", - "portpicker", - "prometheus", - "rand 0.8.5", - "rkyv", - "tokio", - "tracing", - "tracing-subscriber 0.3.18", -] - -[[package]] -name = "cdn-broker" -version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4#398b77fb2f461218a611294e1484b7e0bd571d64" -dependencies = [ - "async-std", - "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4)", + "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown)", "clap", "console-subscriber 0.3.0", "dashmap", @@ -1535,10 +1511,10 @@ dependencies = [ [[package]] name = "cdn-client" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4#398b77fb2f461218a611294e1484b7e0bd571d64" +source = "git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown#93eb15bc5e1a77ca0d89aa40b67c66b81bb8bdef" dependencies = [ "async-std", - "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4)", + "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown)", "clap", "jf-signature", "rand 0.8.5", @@ -1550,24 +1526,10 @@ dependencies = [ [[package]] name = "cdn-marshal" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.3#26a7b21372c4b38dca2f4aecb4267b271d1ac883" +source = "git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown#93eb15bc5e1a77ca0d89aa40b67c66b81bb8bdef" dependencies = [ "async-std", - "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.3)", - "clap", - "jf-signature", - "tokio", - "tracing", - "tracing-subscriber 0.3.18", -] - -[[package]] -name = "cdn-marshal" -version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4#398b77fb2f461218a611294e1484b7e0bd571d64" -dependencies = [ - "async-std", - "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4)", + "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown)", "clap", "jf-signature", "tokio", @@ -1578,7 +1540,7 @@ dependencies = [ [[package]] name = "cdn-proto" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.3#26a7b21372c4b38dca2f4aecb4267b271d1ac883" +source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4#398b77fb2f461218a611294e1484b7e0bd571d64" dependencies = [ "anyhow", "ark-serialize", @@ -1592,7 +1554,6 @@ dependencies = [ "mnemonic", "num_enum", "pem 3.0.4", - "prometheus", "quinn", "rand 0.8.5", "rcgen 0.13.1", @@ -1612,7 +1573,7 @@ dependencies = [ [[package]] name = "cdn-proto" version = "0.4.0" -source = "git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4#398b77fb2f461218a611294e1484b7e0bd571d64" +source = "git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown#93eb15bc5e1a77ca0d89aa40b67c66b81bb8bdef" dependencies = [ "anyhow", "ark-serialize", @@ -2935,6 +2896,7 @@ dependencies = [ "committable", "contract-bindings", "derive_more 0.99.18", + "dyn-clone 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "ethers", "fluent-asserter", "futures", @@ -4064,7 +4026,7 @@ dependencies = [ [[package]] name = "hotshot" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "anyhow", "async-broadcast", @@ -4075,9 +4037,9 @@ dependencies = [ "bimap", "bincode", "blake3", - "cdn-broker 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4)", + "cdn-broker", "cdn-client", - "cdn-marshal 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4)", + "cdn-marshal", "chrono", "committable", "custom_debug 0.5.1", @@ -4088,7 +4050,7 @@ dependencies = [ "futures", "hotshot-orchestrator", "hotshot-task", - "hotshot-task-impls", + "hotshot-task-impls 0.5.69 (git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown)", "hotshot-types", "jf-signature", "libp2p-identity", @@ -4111,7 +4073,7 @@ dependencies = [ [[package]] name = "hotshot-builder-api" version = "0.1.7" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "async-trait", "clap", @@ -4208,7 +4170,7 @@ dependencies = [ [[package]] name = "hotshot-example-types" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "anyhow", "async-broadcast", @@ -4223,7 +4185,7 @@ dependencies = [ "futures", "hotshot", "hotshot-task", - "hotshot-task-impls", + "hotshot-task-impls 0.5.69 (git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown)", "hotshot-types", "rand 0.8.5", "reqwest 0.12.5", @@ -4241,7 +4203,7 @@ dependencies = [ [[package]] name = "hotshot-fakeapi" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "anyhow", "async-lock 2.8.0", @@ -4260,7 +4222,7 @@ dependencies = [ [[package]] name = "hotshot-macros" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "derive_builder", "proc-macro2", @@ -4271,7 +4233,7 @@ dependencies = [ [[package]] name = "hotshot-orchestrator" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "anyhow", "async-compatibility-layer", @@ -4357,7 +4319,7 @@ dependencies = [ [[package]] name = "hotshot-stake-table" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "ark-bn254", "ark-ed-on-bn254", @@ -4422,7 +4384,7 @@ dependencies = [ [[package]] name = "hotshot-task" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "anyhow", "async-broadcast", @@ -4471,10 +4433,47 @@ dependencies = [ "vec1", ] +[[package]] +name = "hotshot-task-impls" +version = "0.5.69" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" +dependencies = [ + "anyhow", + "async-broadcast", + "async-compatibility-layer", + "async-lock 2.8.0", + "async-std", + "async-trait", + "bincode", + "bitvec", + "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown)", + "chrono", + "committable", + "either", + "futures", + "hotshot-builder-api", + "hotshot-task", + "hotshot-types", + "jf-signature", + "jf-vid", + "rand 0.8.5", + "serde", + "sha2 0.10.8", + "snafu 0.8.4", + "surf-disco", + "tagged-base64", + "time 0.3.36", + "tokio", + "tracing", + "url", + "vbs", + "vec1", +] + [[package]] name = "hotshot-testing" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "anyhow", "async-broadcast", @@ -4495,7 +4494,7 @@ dependencies = [ "hotshot-macros", "hotshot-orchestrator", "hotshot-task", - "hotshot-task-impls", + "hotshot-task-impls 0.5.69 (git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown)", "hotshot-types", "itertools 0.13.0", "jf-signature", @@ -4520,7 +4519,7 @@ dependencies = [ [[package]] name = "hotshot-types" version = "0.1.11" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "anyhow", "ark-bn254", @@ -4536,7 +4535,7 @@ dependencies = [ "bincode", "bitvec", "blake3", - "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.4)", + "cdn-proto 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?branch=ss/fix-shutdown)", "committable", "custom_debug 0.5.1", "derivative", @@ -5886,7 +5885,7 @@ dependencies = [ [[package]] name = "libp2p-networking" version = "0.5.69" -source = "git+https://github.com/EspressoSystems/hotshot?tag=0.5.70#3e38cdf852816d73a3b4ffc58c5ccd3a0c6e8028" +source = "git+https://github.com/EspressoSystems//hotshot?branch=ss/fix-shutdown#e898422cc9a3fe855799a9b9e990d9ad84d769c6" dependencies = [ "anyhow", "async-compatibility-layer", @@ -6320,7 +6319,7 @@ dependencies = [ "hotshot", "hotshot-builder-api", "hotshot-events-service", - "hotshot-task-impls", + "hotshot-task-impls 0.5.69 (git+https://github.com/EspressoSystems/hotshot?tag=0.5.70)", "hotshot-types", "lru 0.12.4", "multimap", @@ -8718,8 +8717,8 @@ dependencies = [ "async-trait", "bincode", "bytesize", - "cdn-broker 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.3)", - "cdn-marshal 0.4.0 (git+https://github.com/EspressoSystems/Push-CDN?tag=0.4.3)", + "cdn-broker", + "cdn-marshal", "clap", "cld", "committable", @@ -8728,6 +8727,7 @@ dependencies = [ "derivative", "derive_more 0.99.18", "dotenvy", + "dyn-clone 1.0.17 (registry+https://github.com/rust-lang/crates.io-index)", "escargot", "espresso-macros", "espresso-types", diff --git a/Cargo.toml b/Cargo.toml index bbf16820a4..40cd2fcb43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ clap = { version = "4.4", features = ["derive", "env", "string"] } cld = "0.5" derive_more = "0.99.17" dotenvy = "0.15" +dyn-clone = "1.0" ethers = { version = "2.0", features = ["solc"] } futures = "0.3" @@ -73,11 +74,15 @@ hotshot-example-types = { git = "https://github.com/EspressoSystems/hotshot", ta cdn-broker = { git = "https://github.com/EspressoSystems/Push-CDN", features = [ "runtime-async-std", "global-permits", -], tag = "0.4.3", package = "cdn-broker" } +], branch = "ss/fix-shutdown", package = "cdn-broker" } cdn-marshal = { git = "https://github.com/EspressoSystems/Push-CDN", features = [ "runtime-async-std", "global-permits", -], tag = "0.4.3", package = "cdn-marshal" } +], branch="ss/fix-shutdown", package = "cdn-marshal" } +cdn-client = { git = "https://github.com/EspressoSystems/Push-CDN", features = [ + "runtime-async-std", + "global-permits", +], branch="ss/fix-shutdown", package = "cdn-client" } jf-plonk = { git = "https://github.com/EspressoSystems/jellyfish", tag = "0.4.5", features = [ "test-apis", @@ -120,6 +125,7 @@ rand_distr = "0.4" reqwest = "0.12" serde = { version = "1.0.195", features = ["derive"] } serde_json = "^1.0.113" +tempfile = "3.9" toml = "0.8" url = "2.3" vbs = "0.1" @@ -136,3 +142,13 @@ paste = "1.0" rand = "0.8.5" time = "0.3" trait-set = "0.3.0" + +[patch.'https://github.com/EspressoSystems/hotshot'] +hotshot = { git = "https://github.com/EspressoSystems//hotshot", branch = "ss/fix-shutdown" } +hotshot-orchestrator = { git = "https://github.com/EspressoSystems//hotshot", branch = "ss/fix-shutdown" } +hotshot-builder-api = { git = "https://github.com/EspressoSystems//hotshot", branch = "ss/fix-shutdown" } +hotshot-stake-table = { git = "https://github.com/EspressoSystems//hotshot", branch = "ss/fix-shutdown" } +hotshot-task = { git = "https://github.com/EspressoSystems//hotshot", branch = "ss/fix-shutdown" } +hotshot-testing = { git = "https://github.com/EspressoSystems//hotshot", branch = "ss/fix-shutdown" } +hotshot-types = { git = "https://github.com/EspressoSystems//hotshot", branch = "ss/fix-shutdown" } +hotshot-example-types = { git = "https://github.com/EspressoSystems//hotshot", branch = "ss/fix-shutdown" } diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index e50d4c37db..6514019537 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -7,7 +7,11 @@ edition = "2021" [features] default = ["libp2p"] -testing = ["hotshot-testing"] +testing = [ + "hotshot-query-service/testing", + "hotshot-testing", + "tempfile", +] libp2p = [] benchmarking = [] @@ -23,7 +27,7 @@ hotshot-testing = { workspace = true } pretty_assertions = { workspace = true } rand = "0.8.5" reqwest = { workspace = true } -tempfile = "3.9.0" +tempfile = { workspace = true } [build-dependencies] anyhow = { workspace = true } @@ -52,6 +56,7 @@ csv = "1" derivative = "2.2" derive_more = { workspace = true } dotenvy = { workspace = true } +dyn-clone = { workspace = true } espresso-types = { path = "../types", features = ["testing"] } ethers = { workspace = true } futures = { workspace = true } @@ -73,6 +78,7 @@ itertools = { workspace = true } jf-crhf = { workspace = true } jf-merkle-tree = { workspace = true } jf-rescue = { workspace = true } +tempfile = { workspace = true, optional = true } jf-signature = { workspace = true, features = ["bls", "schnorr"] } jf-vid = { workspace = true } diff --git a/sequencer/api/migrations/V37__anchor_leaf_chain.sql b/sequencer/api/migrations/V37__anchor_leaf_chain.sql new file mode 100644 index 0000000000..fc90ce32ad --- /dev/null +++ b/sequencer/api/migrations/V37__anchor_leaf_chain.sql @@ -0,0 +1,9 @@ +ALTER TABLE anchor_leaf + DROP COLUMN id, + DROP COLUMN height, + ADD CONSTRAINT anchor_leaf_pk PRIMARY KEY (view); + +CREATE TABLE event_stream ( + id SERIAL PRIMARY KEY, + last_processed_view BIGINT +); diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index 20d6b46d90..d80d64085f 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -14,9 +14,9 @@ use espresso_types::{ use ethers::prelude::Address; use futures::{ future::{BoxFuture, Future, FutureExt}, - stream::{BoxStream, Stream}, + stream::BoxStream, }; -use hotshot::types::{Event, SystemContextHandle}; +use hotshot::types::SystemContextHandle; use hotshot_events_service::events_source::{ EventFilterSet, EventsSource, EventsStreamer, StartupInfo, }; @@ -25,6 +25,7 @@ use hotshot_query_service::data_source::ExtensibleDataSource; use hotshot_state_prover::service::light_client_genesis_from_stake_table; use hotshot_types::{ data::ViewNumber, + event::Event, light_client::StateSignatureRequestBody, traits::{network::ConnectedNetwork, node_implementation::Versions}, }; @@ -99,13 +100,6 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp } } - fn event_stream(&self) -> impl Stream> + Unpin { - let state = self.clone(); - async move { state.consensus().await.read().await.event_stream() } - .boxed() - .flatten_stream() - } - async fn state_signer(&self) -> &StateSigner { &self.consensus.as_ref().get().await.get_ref().state_signer } @@ -393,7 +387,7 @@ pub mod test_helpers { use espresso_types::{ mock::MockStateCatchup, - v0::traits::{PersistenceOptions, StateCatchup}, + v0::traits::{NullEventConsumer, PersistenceOptions, StateCatchup}, NamespaceId, SequencerVersions, ValidatedState, }; use ethers::{prelude::Address, utils::Anvil}; @@ -566,20 +560,22 @@ pub mod test_helpers { async move { if i == 0 { opt.serve( - |metrics| { + |metrics, consumer| { let cfg = cfg.clone(); async move { - cfg.init_node( - 0, - state, - persistence, - catchup, - &*metrics, - STAKE_TABLE_CAPACITY_FOR_TEST, - Ver::instance(), - upgrades_map, - ) - .await + Ok(cfg + .init_node( + 0, + state, + persistence, + catchup, + &*metrics, + STAKE_TABLE_CAPACITY_FOR_TEST, + consumer, + Ver::instance(), + upgrades_map, + ) + .await) } .boxed() }, @@ -595,6 +591,7 @@ pub mod test_helpers { catchup, &NoMetrics, STAKE_TABLE_CAPACITY_FOR_TEST, + NullEventConsumer, Ver::instance(), upgrades_map, ) @@ -877,7 +874,7 @@ mod api_tests { use espresso_types::{Header, NamespaceId, SequencerVersions}; use ethers::utils::Anvil; use futures::stream::StreamExt; - use hotshot_query_service::availability::{LeafQueryData, VidCommonQueryData}; + use hotshot_query_service::availability::{BlockQueryData, VidCommonQueryData}; use portpicker::pick_unused_port; use sequencer_utils::test_utils::setup_test; @@ -888,7 +885,6 @@ mod api_tests { }; use tide_disco::error::ServerError; - use self::options::HotshotEvents; use super::*; use crate::testing::{wait_for_decide_on_handle, TestConfigBuilder}; @@ -937,17 +933,6 @@ mod api_tests { Client::new(format!("http://localhost:{port}").parse().unwrap()); client.connect(None).await; - // Wait for at least one empty block to be sequenced (after consensus starts VID). - client - .socket("availability/stream/leaves/0") - .subscribe::>() - .await - .unwrap() - .next() - .await - .unwrap() - .unwrap(); - let hash = client .post("submit/submit") .body_json(&txn) @@ -960,6 +945,18 @@ mod api_tests { // Wait for a Decide event containing transaction matching the one we sent let block_height = wait_for_decide_on_handle(&mut events, &txn).await as usize; tracing::info!(block_height, "transaction sequenced"); + + // Wait for the query service to update to this block height. + client + .socket(&format!("availability/stream/blocks/{block_height}")) + .subscribe::>() + .await + .unwrap() + .next() + .await + .unwrap() + .unwrap(); + let mut found_txn = false; let mut found_empty_block = false; for block_num in 0..=block_height { @@ -1013,62 +1010,6 @@ mod api_tests { let storage = D::create_storage().await; catchup_test_helper(|opt| D::options(&storage, opt)).await } - - #[async_std::test] - pub(crate) async fn test_hotshot_event_streaming() { - use HotshotEvents; - - setup_test(); - - let hotshot_event_streaming_port = - pick_unused_port().expect("No ports free for hotshot event streaming"); - let query_service_port = pick_unused_port().expect("No ports free for query service"); - - let url = format!("http://localhost:{hotshot_event_streaming_port}") - .parse() - .unwrap(); - - let hotshot_events = HotshotEvents { - events_service_port: hotshot_event_streaming_port, - }; - - let client: Client::Base> = Client::new(url); - - let options = Options::with_port(query_service_port).hotshot_events(hotshot_events); - - let anvil = Anvil::new().spawn(); - let l1 = anvil.endpoint().parse().unwrap(); - let network_config = TestConfigBuilder::default().l1_url(l1).build(); - let config = TestNetworkConfigBuilder::default() - .api_config(options) - .network_config(network_config) - .build(); - let _network = - TestNetwork::new(config, ::Base::instance()).await; - - let mut subscribed_events = client - .socket("hotshot-events/events") - .subscribe::>() - .await - .unwrap(); - - let total_count = 5; - // wait for these events to receive on client 1 - let mut receive_count = 0; - loop { - let event = subscribed_events.next().await.unwrap(); - tracing::info!( - "Received event in hotshot event streaming Client 1: {:?}", - event - ); - receive_count += 1; - if receive_count > total_count { - tracing::info!("Client Received atleast desired events, exiting loop"); - break; - } - } - assert_eq!(receive_count, total_count + 1); - } } #[cfg(test)] @@ -1080,6 +1021,7 @@ mod test { use espresso_types::{ mock::MockStateCatchup, + traits::NullEventConsumer, v0_1::{UpgradeMode, ViewBasedUpgrade}, FeeAccount, FeeAmount, Header, SequencerVersions, TimeBasedUpgrade, Timestamp, Upgrade, UpgradeType, ValidatedState, @@ -1112,6 +1054,7 @@ mod test { use self::{ data_source::{testing::TestableSequencerDataSource, PublicHotShotConfig}, + options::HotshotEvents, sql::DataSource as SqlDataSource, }; use super::*; @@ -1302,6 +1245,7 @@ mod test { ), &NoMetrics, test_helpers::STAKE_TABLE_CAPACITY_FOR_TEST, + NullEventConsumer, ::Base::instance(), Default::default(), ) @@ -1792,4 +1736,58 @@ mod test { .unwrap() ); } + + #[async_std::test] + async fn test_hotshot_event_streaming() { + setup_test(); + + let hotshot_event_streaming_port = + pick_unused_port().expect("No ports free for hotshot event streaming"); + let query_service_port = pick_unused_port().expect("No ports free for query service"); + + let url = format!("http://localhost:{hotshot_event_streaming_port}") + .parse() + .unwrap(); + + let hotshot_events = HotshotEvents { + events_service_port: hotshot_event_streaming_port, + }; + + let client: Client::Base> = Client::new(url); + + let options = Options::with_port(query_service_port).hotshot_events(hotshot_events); + + let anvil = Anvil::new().spawn(); + let l1 = anvil.endpoint().parse().unwrap(); + let network_config = TestConfigBuilder::default().l1_url(l1).build(); + let config = TestNetworkConfigBuilder::default() + .api_config(options) + .network_config(network_config) + .build(); + let _network = + TestNetwork::new(config, ::Base::instance()).await; + + let mut subscribed_events = client + .socket("hotshot-events/events") + .subscribe::>() + .await + .unwrap(); + + let total_count = 5; + // wait for these events to receive on client 1 + let mut receive_count = 0; + loop { + let event = subscribed_events.next().await.unwrap(); + tracing::info!( + "Received event in hotshot event streaming Client 1: {:?}", + event + ); + receive_count += 1; + if receive_count > total_count { + tracing::info!("Client Received atleast desired events, exiting loop"); + break; + } + } + assert_eq!(receive_count, total_count + 1); + } } diff --git a/sequencer/src/api/data_source.rs b/sequencer/src/api/data_source.rs index 0518795211..7c89adc2e4 100644 --- a/sequencer/src/api/data_source.rs +++ b/sequencer/src/api/data_source.rs @@ -435,12 +435,12 @@ impl PublicNetworkConfig { } } -#[cfg(test)] -pub(crate) mod testing { +#[cfg(any(test, feature = "testing"))] +pub mod testing { use super::{super::Options, *}; #[async_trait] - pub(crate) trait TestableSequencerDataSource: SequencerDataSource { + pub trait TestableSequencerDataSource: SequencerDataSource { type Storage: Sync; async fn create_storage() -> Self::Storage; diff --git a/sequencer/src/api/fs.rs b/sequencer/src/api/fs.rs index 542eeb7fe6..4795acce9b 100644 --- a/sequencer/src/api/fs.rs +++ b/sequencer/src/api/fs.rs @@ -28,7 +28,7 @@ impl SequencerDataSource for DataSource { impl CatchupDataSource for DataSource {} -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] mod impl_testable_data_source { use tempfile::TempDir; diff --git a/sequencer/src/api/options.rs b/sequencer/src/api/options.rs index 7ccaf05be4..1f55c0d415 100644 --- a/sequencer/src/api/options.rs +++ b/sequencer/src/api/options.rs @@ -1,12 +1,15 @@ //! Sequencer-specific API options and initialization. -use anyhow::bail; +use anyhow::{bail, Context}; use async_std::sync::{Arc, RwLock}; use clap::Parser; -use espresso_types::{v0::traits::SequencerPersistence, BlockMerkleTree, FeeMerkleTree, PubKey}; +use espresso_types::{ + v0::traits::{EventConsumer, NullEventConsumer, SequencerPersistence}, + BlockMerkleTree, FeeMerkleTree, PubKey, +}; use futures::{ channel::oneshot, - future::{BoxFuture, Future, FutureExt}, + future::{BoxFuture, Future}, }; use hotshot_events_service::events::Error as EventStreamingError; use hotshot_query_service::{ @@ -31,7 +34,7 @@ use super::{ StateSignatureDataSource, SubmitDataSource, }, endpoints, fs, sql, - update::update_loop, + update::ApiEventConsumer, ApiState, StorageState, }; use crate::{ @@ -153,7 +156,10 @@ impl Options { where N: ConnectedNetwork, P: SequencerPersistence, - F: FnOnce(Box) -> BoxFuture<'static, SequencerContext>, + F: FnOnce( + Box, + Box, + ) -> BoxFuture<'static, anyhow::Result>>, { // Create a channel to send the context to the web server after it is initialized. This // allows the web server to start before initialization can complete, since initialization @@ -164,89 +170,84 @@ impl Options { .await .expect("context initialized and sent over channel") }); - let init_context = move |metrics| { - let fut = init_context(metrics); - async move { - let ctx = fut.await; - if send_ctx.send(super::ConsensusState::from(&ctx)).is_err() { - tracing::warn!("API server exited without receiving context"); - } - ctx - } - .boxed() - }; let mut tasks = TaskList::default(); // The server state type depends on whether we are running a query or status API or not, so // we handle the two cases differently. - let metrics = if let Some(query_opt) = self.query.take() { - if let Some(opt) = self.storage_sql.take() { - self.init_with_query_module_sql(query_opt, opt, state, &mut tasks, bind_version) - .await? - } else if let Some(opt) = self.storage_fs.take() { - self.init_with_query_module_fs(query_opt, opt, state, &mut tasks, bind_version) - .await? - } else { - bail!("query module requested but not storage provided"); - } - } else if self.status.is_some() { - // If a status API is requested but no availability API, we use the `MetricsDataSource`, - // which allows us to run the status API with no persistent storage. - let ds = MetricsDataSource::default(); - let metrics = ds.populate_metrics(); - let mut app = App::<_, Error>::with_state(Arc::new(RwLock::new( - ExtensibleDataSource::new(ds, state.clone()), - ))); - - // Initialize status API. - let status_api = status::define_api(&Default::default(), bind_version)?; - app.register_module("status", status_api)?; - - self.init_hotshot_modules::<_, _, _, Ver>(&mut app)?; + let (metrics, consumer): (Box, Box) = + if let Some(query_opt) = self.query.take() { + if let Some(opt) = self.storage_sql.take() { + self.init_with_query_module_sql(query_opt, opt, state, &mut tasks, bind_version) + .await? + } else if let Some(opt) = self.storage_fs.take() { + self.init_with_query_module_fs(query_opt, opt, state, &mut tasks, bind_version) + .await? + } else { + bail!("query module requested but not storage provided"); + } + } else if self.status.is_some() { + // If a status API is requested but no availability API, we use the + // `MetricsDataSource`, which allows us to run the status API with no persistent + // storage. + let ds = MetricsDataSource::default(); + let metrics = ds.populate_metrics(); + let mut app = App::<_, Error>::with_state(Arc::new(RwLock::new( + ExtensibleDataSource::new(ds, state.clone()), + ))); + + // Initialize status API. + let status_api = status::define_api(&Default::default(), bind_version)?; + app.register_module("status", status_api)?; + + self.init_hotshot_modules::<_, _, _, Ver>(&mut app)?; + + if self.hotshot_events.is_some() { + self.init_and_spawn_hotshot_event_streaming_module( + state, + &mut tasks, + bind_version, + )?; + } - if self.hotshot_events.is_some() { - self.init_and_spawn_hotshot_event_streaming_module( - state, - &mut tasks, - bind_version, - )?; - } + tasks.spawn("API server", self.listen(self.http.port, app, bind_version)); - tasks.spawn("API server", self.listen(self.http.port, app, bind_version)); - - metrics - } else { - // If no status or availability API is requested, we don't need metrics or a query - // service data source. The only app state is the HotShot handle, which we use to submit - // transactions. - // - // If we have no availability API, we cannot load a saved leaf from local storage, so we - // better have been provided the leaf ahead of time if we want it at all. - let mut app = App::<_, Error>::with_state(RwLock::new(state.clone())); - - self.init_hotshot_modules::<_, _, _, Ver>(&mut app)?; - - if self.hotshot_events.is_some() { - self.init_and_spawn_hotshot_event_streaming_module( - state, - &mut tasks, - bind_version, - )?; - } + (metrics, Box::new(NullEventConsumer)) + } else { + // If no status or availability API is requested, we don't need metrics or a query + // service data source. The only app state is the HotShot handle, which we use to + // submit transactions. + // + // If we have no availability API, we cannot load a saved leaf from local storage, + // so we better have been provided the leaf ahead of time if we want it at all. + let mut app = App::<_, Error>::with_state(RwLock::new(state.clone())); + + self.init_hotshot_modules::<_, _, _, Ver>(&mut app)?; + + if self.hotshot_events.is_some() { + self.init_and_spawn_hotshot_event_streaming_module( + state, + &mut tasks, + bind_version, + )?; + } - tasks.spawn("API server", self.listen(self.http.port, app, bind_version)); + tasks.spawn("API server", self.listen(self.http.port, app, bind_version)); - Box::new(NoMetrics) - }; + (Box::new(NoMetrics), Box::new(NullEventConsumer)) + }; - Ok(init_context(metrics).await.with_task_list(tasks)) + let ctx = init_context(metrics, consumer).await?; + send_ctx + .send(super::ConsensusState::from(&ctx)) + .ok() + .context("API server exited without receiving context")?; + Ok(ctx.with_task_list(tasks)) } async fn init_app_modules( &self, ds: D, state: ApiState, - tasks: &mut TaskList, bind_version: Ver, ) -> anyhow::Result<( Box, @@ -277,12 +278,6 @@ impl Options { app.register_module("node", endpoints::node(bind_version)?)?; self.init_hotshot_modules::<_, _, _, Ver>(&mut app)?; - - tasks.spawn( - "query storage updater", - update_loop(ds.clone(), state.event_stream()), - ); - Ok((metrics, ds, app)) } @@ -293,7 +288,7 @@ impl Options { state: ApiState, tasks: &mut TaskList, bind_version: Ver, - ) -> anyhow::Result> + ) -> anyhow::Result<(Box, Box)> where N: ConnectedNetwork, P: SequencerPersistence, @@ -305,8 +300,8 @@ impl Options { ) .await?; - let (metrics, _, app) = self - .init_app_modules(ds, state.clone(), tasks, bind_version) + let (metrics, ds, app) = self + .init_app_modules(ds, state.clone(), bind_version) .await?; if self.hotshot_events.is_some() { @@ -317,7 +312,7 @@ impl Options { "API server", self.listen(self.http.port, app, Ver::instance()), ); - Ok(metrics) + Ok((metrics, Box::new(ApiEventConsumer::from(ds)))) } async fn init_with_query_module_sql( @@ -327,7 +322,7 @@ impl Options { state: ApiState, tasks: &mut TaskList, bind_version: Ver, - ) -> anyhow::Result> + ) -> anyhow::Result<(Box, Box)> where N: ConnectedNetwork, P: SequencerPersistence, @@ -339,7 +334,7 @@ impl Options { ) .await?; let (metrics, ds, mut app) = self - .init_app_modules(ds, state.clone(), tasks, bind_version) + .init_app_modules(ds, state.clone(), bind_version) .await?; if self.explorer.is_some() { @@ -362,7 +357,7 @@ impl Options { let get_node_state = async move { state.node_state().await.clone() }; tasks.spawn( "merklized state storage update loop", - update_state_storage_loop(ds, get_node_state), + update_state_storage_loop(ds.clone(), get_node_state), ); } @@ -374,7 +369,7 @@ impl Options { "API server", self.listen(self.http.port, app, Ver::instance()), ); - Ok(metrics) + Ok((metrics, Box::new(ApiEventConsumer::from(ds)))) } /// Initialize the modules for interacting with HotShot. diff --git a/sequencer/src/api/sql.rs b/sequencer/src/api/sql.rs index 409890352a..663d8f5ada 100644 --- a/sequencer/src/api/sql.rs +++ b/sequencer/src/api/sql.rs @@ -181,7 +181,7 @@ impl ChainConfigPersistence for DataSource { } } -#[cfg(test)] +#[cfg(any(test, feature = "testing"))] mod impl_testable_data_source { use hotshot_query_service::data_source::storage::sql::testing::TmpDb; diff --git a/sequencer/src/api/update.rs b/sequencer/src/api/update.rs index 4efb4eba90..b0d7916b4b 100644 --- a/sequencer/src/api/update.rs +++ b/sequencer/src/api/update.rs @@ -1,41 +1,59 @@ //! Update loop for query API state. use async_std::sync::{Arc, RwLock}; -use espresso_types::{v0::traits::SequencerPersistence, PubKey}; -use futures::stream::{Stream, StreamExt}; +use async_trait::async_trait; +use derivative::Derivative; +use derive_more::From; +use espresso_types::{ + v0::traits::{EventConsumer, SequencerPersistence}, + PubKey, +}; use hotshot::types::Event; use hotshot_query_service::data_source::{UpdateDataSource, VersionedDataSource}; use hotshot_types::traits::network::ConnectedNetwork; +use std::fmt::Debug; use vbs::version::StaticVersionType; use super::{data_source::SequencerDataSource, StorageState}; use crate::SeqTypes; -pub(super) async fn update_loop( - state: Arc>>, - mut events: impl Stream> + Unpin, -) where +#[derive(Derivative, From)] +#[derivative(Clone(bound = ""), Debug(bound = "D: Debug"))] +pub(crate) struct ApiEventConsumer +where N: ConnectedNetwork, P: SequencerPersistence, - D: SequencerDataSource + Send + Sync, + Ver: StaticVersionType, +{ + inner: Arc>>, +} + +#[async_trait] +impl EventConsumer for ApiEventConsumer +where + N: ConnectedNetwork, + P: SequencerPersistence, + D: SequencerDataSource + Debug + Send + Sync, + Ver: StaticVersionType, { - tracing::debug!("waiting for event"); - while let Some(event) = events.next().await { - let mut state = state.write().await; + async fn handle_event(&self, event: &Event) -> anyhow::Result<()> { + let mut state = self.inner.write().await; // If update results in an error, revert to undo partial state changes. We will continue // streaming events, as we can update our state based on future events and then filling in // the missing part of the state later, by fetching from a peer. - if let Err(err) = update_state(&mut *state, &event).await { + if let Err(err) = update_state(&mut *state, event).await { tracing::error!( ?event, %err, "failed to update API state", ); state.revert().await; + Err(err) + } else { + Ok(()) } } - tracing::warn!("end of HotShot event stream, updater task will exit"); } async fn update_state( diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index 7d353ff487..68caf5f71d 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -7,8 +7,8 @@ use async_std::{ }; use derivative::Derivative; use espresso_types::{ - v0::traits::SequencerPersistence, NodeState, PubKey, SequencerVersions, Transaction, - ValidatedState, + v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence}, + NodeState, PubKey, SequencerVersions, Transaction, ValidatedState, }; use futures::{ future::{join_all, Future}, @@ -40,6 +40,7 @@ use crate::{ state_signature::StateSigner, static_stake_table_commitment, Node, SeqTypes, }; + /// The consensus handle pub type Consensus = SystemContextHandle, SequencerVersions>; @@ -89,12 +90,13 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp metrics: &dyn Metrics, stake_table_capacity: u64, public_api_url: Option, + event_consumer: impl PersistenceEventConsumer + 'static, _: Ver, marketplace_config: MarketplaceConfig>, ) -> anyhow::Result { let config = &network_config.config; let pub_key = config.my_own_validator_config.public_key; - tracing::info!(%pub_key, "initializing consensus"); + tracing::info!(%pub_key, is_da = config.my_own_validator_config.is_da, "initializing consensus"); // Stick our node ID in `metrics` so it is easily accessible via the status API. metrics @@ -102,7 +104,7 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp .set(instance_state.node_id as usize); // Load saved consensus state from storage. - let initializer = persistence + let (initializer, anchor_view) = persistence .load_consensus_state(instance_state.clone()) .await?; @@ -166,9 +168,11 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp let roll_call_info = external_event_handler::RollCallInfo { public_api_url }; // Create the external event handler - let external_event_handler = ExternalEventHandler::new(network, roll_call_info, pub_key) - .await - .with_context(|| "Failed to create external event handler")?; + let mut tasks = TaskList::default(); + let external_event_handler = + ExternalEventHandler::new(&mut tasks, network, roll_call_info, pub_key) + .await + .with_context(|| "Failed to create external event handler")?; Ok(Self::new( handle, @@ -178,10 +182,14 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp event_streamer, instance_state, network_config, - )) + event_consumer, + anchor_view, + ) + .with_task_list(tasks)) } /// Constructor + #[allow(clippy::too_many_arguments)] fn new( handle: Consensus, persistence: Arc>, @@ -190,9 +198,12 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp event_streamer: Arc>>, node_state: NodeState, config: NetworkConfig, + event_consumer: impl PersistenceEventConsumer + 'static, + anchor_view: Option, ) -> Self { let events = handle.event_stream(); + let node_id = node_state.node_id; let mut ctx = Self { handle: Arc::new(RwLock::new(handle)), state_signer: Arc::new(state_signer), @@ -206,11 +217,14 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp ctx.spawn( "main event handler", handle_events( + node_id, events, persistence, ctx.state_signer.clone(), external_event_handler, Some(event_streamer.clone()), + event_consumer, + anchor_view, ), ); @@ -270,6 +284,10 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp self.handle.read().await.decided_state().await } + pub fn node_id(&self) -> u64 { + self.node_state.node_id + } + pub fn node_state(&self) -> NodeState { self.node_state.clone() } @@ -332,20 +350,35 @@ impl, P: SequencerPersistence, Ver: StaticVersionTyp } } +#[allow(clippy::too_many_arguments)] async fn handle_events( + node_id: u64, mut events: impl Stream> + Unpin, persistence: Arc>, state_signer: Arc>, external_event_handler: ExternalEventHandler, events_streamer: Option>>>, + event_consumer: impl PersistenceEventConsumer + 'static, + anchor_view: Option, ) { + if let Some(view) = anchor_view { + // Process and clean up any leaves that we may have persisted last time we were running but + // failed to handle due to a shutdown. + let mut p = persistence.write().await; + if let Err(err) = p.append_decided_leaves(view, vec![], &event_consumer).await { + tracing::warn!( + "failed to process decided leaves, chain may not be up to date: {err:#}" + ); + } + } + while let Some(event) = events.next().await { - tracing::debug!(?event, "consensus event"); + tracing::debug!(node_id, ?event, "consensus event"); { let mut p = persistence.write().await; // Store latest consensus state. - p.handle_event(&event).await; + p.handle_event(&event, &event_consumer).await; } // Generate state signature. state_signer.handle_event(&event).await; diff --git a/sequencer/src/external_event_handler.rs b/sequencer/src/external_event_handler.rs index 05f7073100..1f930a16ca 100644 --- a/sequencer/src/external_event_handler.rs +++ b/sequencer/src/external_event_handler.rs @@ -40,9 +40,6 @@ pub struct ExternalEventHandler { // The public key of the node pub public_key: BLSPubKey, - // The tasks that are running - pub _tasks: TaskList, - // The outbound message queue pub outbound_message_sender: Sender, } @@ -57,6 +54,7 @@ pub enum OutboundMessage { impl ExternalEventHandler { /// Creates a new `ExternalEventHandler` with the given network and roll call info pub async fn new>( + tasks: &mut TaskList, network: Arc, roll_call_info: RollCallInfo, public_key: BLSPubKey, @@ -65,8 +63,6 @@ impl ExternalEventHandler { let (outbound_message_sender, outbound_message_receiver) = async_compatibility_layer::channel::bounded(10); - let mut tasks: TaskList = Default::default(); - // Spawn the outbound message handling loop tasks.spawn( "ExternalEventHandler (RollCall)", @@ -88,7 +84,6 @@ impl ExternalEventHandler { Ok(Self { roll_call_info, public_key, - _tasks: tasks, outbound_message_sender, }) } diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index b62e635ec7..6b8a367406 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -15,7 +15,7 @@ use async_std::sync::RwLock; use catchup::StatePeers; use context::SequencerContext; use espresso_types::{ - BackoffParams, L1Client, NodeState, PubKey, SeqTypes, SequencerVersions, + traits::EventConsumer, BackoffParams, L1Client, NodeState, PubKey, SeqTypes, SequencerVersions, SolverAuctionResultsProvider, ValidatedState, }; use ethers::types::U256; @@ -134,6 +134,7 @@ pub async fn init_node( persistence_opt: P, l1_params: L1Params, bind_version: Ver, + event_consumer: impl EventConsumer + 'static, is_da: bool, identity: Identity, marketplace_config: MarketplaceConfig>, @@ -319,7 +320,7 @@ pub async fn init_node( }, CdnMetricsValue::new(metrics), ) - .with_context(|| "Failed to create CDN network")?; + .with_context(|| format!("Failed to create CDN network {node_index}"))?; // Initialize the Libp2p network (if enabled) #[cfg(feature = "libp2p")] @@ -334,7 +335,12 @@ pub async fn init_node( hotshot::traits::implementations::Libp2pMetricsValue::new(metrics), ) .await - .with_context(|| "Failed to create libp2p network")?; + .with_context(|| { + format!( + "Failed to create libp2p network on node {node_index}; binding to {:?}", + network_params.libp2p_bind_address + ) + })?; tracing::warn!("Waiting for at least one connection to be initialized"); futures::select! { @@ -408,6 +414,7 @@ pub async fn init_node( metrics, genesis.stake_table.capacity, network_params.public_api_url, + event_consumer, bind_version, marketplace_config, ) @@ -430,7 +437,7 @@ pub mod testing { use espresso_types::{ eth_signature_key::EthKeyPair, mock::MockStateCatchup, - v0::traits::{PersistenceOptions, StateCatchup}, + v0::traits::{EventConsumer, NullEventConsumer, PersistenceOptions, StateCatchup}, Event, FeeAccount, PubKey, SeqTypes, Transaction, Upgrade, }; use futures::{ @@ -662,6 +669,7 @@ pub mod testing { MockStateCatchup::default(), &NoMetrics, STAKE_TABLE_CAPACITY_FOR_TEST, + NullEventConsumer, bind_version, Default::default(), ) @@ -699,6 +707,7 @@ pub mod testing { catchup: impl StateCatchup + 'static, metrics: &dyn Metrics, stake_table_capacity: u64, + event_consumer: impl EventConsumer + 'static, bind_version: Ver, upgrades: BTreeMap, ) -> SequencerContext { @@ -760,6 +769,7 @@ pub mod testing { metrics, stake_table_capacity, None, // The public API URL + event_consumer, bind_version, MarketplaceConfig::> { auction_results_provider: Arc::new(SolverAuctionResultsProvider( diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 61317c6881..20cf2dca03 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -1,13 +1,14 @@ use std::{net::ToSocketAddrs, sync::Arc}; use clap::Parser; -use espresso_types::{SequencerVersions, SolverAuctionResultsProvider}; +use espresso_types::{traits::NullEventConsumer, SequencerVersions, SolverAuctionResultsProvider}; use futures::future::FutureExt; use hotshot::MarketplaceConfig; use hotshot_types::traits::{metrics::NoMetrics, node_implementation::Versions}; use sequencer::{ api::{self, data_source::DataSourceOptions}, - init_node, + context::SequencerContext, + init_node, network, options::{Modules, Options}, persistence, Genesis, L1Params, NetworkParams, }; @@ -22,7 +23,7 @@ async fn main() -> anyhow::Result<()> { tracing::warn!(?modules, "sequencer starting up"); if let Some(storage) = modules.storage_fs.take() { - init_with_storage( + run_with_storage( modules, opt, storage, @@ -30,7 +31,7 @@ async fn main() -> anyhow::Result<()> { ) .await } else if let Some(storage) = modules.storage_sql.take() { - init_with_storage( + run_with_storage( modules, opt, storage, @@ -39,7 +40,7 @@ async fn main() -> anyhow::Result<()> { .await } else { // Persistence is required. If none is provided, just use the local file system. - init_with_storage( + run_with_storage( modules, opt, persistence::fs::Options::default(), @@ -49,12 +50,30 @@ async fn main() -> anyhow::Result<()> { } } -async fn init_with_storage( +async fn run_with_storage( modules: Modules, opt: Options, storage_opt: S, bind_version: Ver, ) -> anyhow::Result<()> +where + S: DataSourceOptions, +{ + let ctx = init_with_storage(modules, opt, storage_opt, bind_version).await?; + + // Start doing consensus. + ctx.start_consensus().await; + ctx.join().await; + + Ok(()) +} + +async fn init_with_storage( + modules: Modules, + opt: Options, + storage_opt: S, + bind_version: Ver, +) -> anyhow::Result> where S: DataSourceOptions, { @@ -140,7 +159,7 @@ where http_opt .serve( - move |metrics| { + move |metrics, consumer| { async move { init_node( genesis, @@ -149,12 +168,12 @@ where storage_opt, l1_params, bind_version, + consumer, opt.is_da, opt.identity, marketplace_config, ) .await - .unwrap() } .boxed() }, @@ -170,6 +189,7 @@ where storage_opt, l1_params, bind_version, + NullEventConsumer, opt.is_da, opt.identity, marketplace_config, @@ -178,13 +198,11 @@ where } }; - // Start doing consensus. - ctx.start_consensus().await; - ctx.join().await; - - Ok(()) + Ok(ctx) } +mod restart_tests; + #[cfg(test)] mod test { use std::time::Duration; diff --git a/sequencer/src/persistence.rs b/sequencer/src/persistence.rs index 6611e2e75c..7a7b031929 100644 --- a/sequencer/src/persistence.rs +++ b/sequencer/src/persistence.rs @@ -46,12 +46,16 @@ mod testing { mod persistence_tests { use std::collections::BTreeMap; + use anyhow::bail; + use async_std::sync::{Arc, RwLock}; use committable::Committable; - use espresso_types::{Leaf, NodeState, PubKey, SeqTypes, ValidatedState}; + use espresso_types::{ + traits::EventConsumer, Event, Leaf, NodeState, PubKey, SeqTypes, ValidatedState, + }; use hotshot::types::{BLSPubKey, SignatureKey}; use hotshot_types::{ data::{DaProposal, QuorumProposal, VidDisperseShare, ViewNumber}, - event::HotShotAction, + event::{EventType, HotShotAction, LeafInfo}, message::Proposal, simple_certificate::QuorumCertificate, traits::{node_implementation::ConsensusTime, EncodeBytes}, @@ -63,43 +67,17 @@ mod persistence_tests { use super::*; - #[async_std::test] - pub async fn test_anchor_leaf() { - setup_test(); - - let tmp = P::tmp_storage().await; - let mut storage = P::connect(&tmp).await; - - // Initially, there is no saved leaf. - assert_eq!(storage.load_anchor_leaf().await.unwrap(), None); - - // Store a leaf. - let leaf1 = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; - let qc1 = QuorumCertificate::genesis(&ValidatedState::default(), &NodeState::mock()).await; - storage.save_anchor_leaf(&leaf1, &qc1).await.unwrap(); - assert_eq!( - storage.load_anchor_leaf().await.unwrap().unwrap(), - (leaf1.clone(), qc1.clone()) - ); - - // Store a newer leaf, make sure storage gets updated. - let mut leaf2 = leaf1.clone(); - *leaf2.block_header_mut().height_mut() += 1; - let mut qc2 = qc1.clone(); - qc2.data.leaf_commit = leaf2.commit(); - qc2.vote_commitment = qc2.data.commit(); - storage.save_anchor_leaf(&leaf2, &qc2).await.unwrap(); - assert_eq!( - storage.load_anchor_leaf().await.unwrap().unwrap(), - (leaf2.clone(), qc2.clone()) - ); + #[derive(Clone, Debug, Default)] + struct EventCollector { + events: Arc>>, + } - // Store an old leaf, make sure storage is unchanged. - storage.save_anchor_leaf(&leaf1, &qc1).await.unwrap(); - assert_eq!( - storage.load_anchor_leaf().await.unwrap().unwrap(), - (leaf2, qc2) - ); + #[async_trait] + impl EventConsumer for EventCollector { + async fn handle_event(&self, event: &Event) -> anyhow::Result<()> { + self.events.write().await.push(event.clone()); + Ok(()) + } } #[async_std::test] @@ -145,8 +123,17 @@ mod persistence_tests { ); } + fn leaf_info(leaf: Leaf) -> LeafInfo { + LeafInfo { + leaf, + vid_share: None, + state: Default::default(), + delta: None, + } + } + #[async_std::test] - pub async fn test_append_and_collect_garbage() { + pub async fn test_append_and_decide() { setup_test(); let tmp = P::tmp_storage().await; @@ -167,7 +154,7 @@ mod persistence_tests { let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); let signature = PubKey::sign(&privkey, &[]).unwrap(); let mut vid = VidDisperseShare:: { - view_number: ViewNumber::new(1), + view_number: ViewNumber::new(0), payload_commitment: Default::default(), share: disperse.shares[0].clone(), common: disperse.common, @@ -189,13 +176,23 @@ mod persistence_tests { _pd: Default::default(), }; - let vid_share1 = vid.clone().to_proposal(&privkey).unwrap().clone(); + let vid_share0 = vid.clone().to_proposal(&privkey).unwrap().clone(); + + storage.append_vid(&vid_share0).await.unwrap(); + + assert_eq!( + storage.load_vid_share(ViewNumber::new(0)).await.unwrap(), + Some(vid_share0.clone()) + ); + vid.view_number = ViewNumber::new(1); + + let vid_share1 = vid.clone().to_proposal(&privkey).unwrap().clone(); storage.append_vid(&vid_share1).await.unwrap(); assert_eq!( - storage.load_vid_share(ViewNumber::new(1)).await.unwrap(), - Some(vid_share1) + storage.load_vid_share(vid.view_number).await.unwrap(), + Some(vid_share1.clone()) ); vid.view_number = ViewNumber::new(2); @@ -205,7 +202,7 @@ mod persistence_tests { assert_eq!( storage.load_vid_share(vid.view_number).await.unwrap(), - Some(vid_share2) + Some(vid_share2.clone()) ); vid.view_number = ViewNumber::new(3); @@ -224,7 +221,7 @@ mod persistence_tests { let da_proposal_inner = DaProposal:: { encoded_transactions: leaf_payload_bytes_arc, metadata: leaf_payload.ns_table().clone(), - view_number: ViewNumber::new(1), + view_number: ViewNumber::new(0), }; let da_proposal = Proposal { @@ -236,11 +233,23 @@ mod persistence_tests { storage.append_da(&da_proposal).await.unwrap(); assert_eq!( - storage.load_da_proposal(ViewNumber::new(1)).await.unwrap(), + storage.load_da_proposal(ViewNumber::new(0)).await.unwrap(), Some(da_proposal.clone()) ); - let mut da_proposal2 = da_proposal.clone(); + let mut da_proposal1 = da_proposal.clone(); + da_proposal1.data.view_number = ViewNumber::new(1); + storage.append_da(&da_proposal1.clone()).await.unwrap(); + + assert_eq!( + storage + .load_da_proposal(da_proposal1.data.view_number) + .await + .unwrap(), + Some(da_proposal1.clone()) + ); + + let mut da_proposal2 = da_proposal1.clone(); da_proposal2.data.view_number = ViewNumber::new(2); storage.append_da(&da_proposal2.clone()).await.unwrap(); @@ -272,10 +281,7 @@ mod persistence_tests { assert_eq!( storage.load_quorum_proposals().await.unwrap(), - Some(BTreeMap::from_iter([( - ViewNumber::genesis(), - quorum_proposal1.clone() - )])) + BTreeMap::from_iter([(ViewNumber::genesis(), quorum_proposal1.clone())]) ); quorum_proposal.data.view_number = ViewNumber::new(1); @@ -287,13 +293,14 @@ mod persistence_tests { assert_eq!( storage.load_quorum_proposals().await.unwrap(), - Some(BTreeMap::from_iter([ + BTreeMap::from_iter([ (ViewNumber::genesis(), quorum_proposal1.clone()), (ViewNumber::new(1), quorum_proposal2.clone()) - ])) + ]) ); quorum_proposal.data.view_number = ViewNumber::new(2); + quorum_proposal.data.justify_qc.view_number = ViewNumber::new(1); let quorum_proposal3 = quorum_proposal.clone(); storage .append_quorum_proposal(&quorum_proposal3) @@ -302,14 +309,15 @@ mod persistence_tests { assert_eq!( storage.load_quorum_proposals().await.unwrap(), - Some(BTreeMap::from_iter([ + BTreeMap::from_iter([ (ViewNumber::genesis(), quorum_proposal1.clone()), (ViewNumber::new(1), quorum_proposal2.clone()), (ViewNumber::new(2), quorum_proposal3.clone()) - ])) + ]) ); - quorum_proposal.data.view_number = ViewNumber::new(10); + quorum_proposal.data.view_number = ViewNumber::new(3); + quorum_proposal.data.justify_qc.view_number = ViewNumber::new(2); // This one should stick around after GC runs. let quorum_proposal4 = quorum_proposal.clone(); @@ -320,17 +328,48 @@ mod persistence_tests { assert_eq!( storage.load_quorum_proposals().await.unwrap(), - Some(BTreeMap::from_iter([ - (ViewNumber::genesis(), quorum_proposal1), - (ViewNumber::new(1), quorum_proposal2), - (ViewNumber::new(2), quorum_proposal3), - (ViewNumber::new(10), quorum_proposal4.clone()) - ])) + BTreeMap::from_iter([ + (ViewNumber::genesis(), quorum_proposal1.clone()), + (ViewNumber::new(1), quorum_proposal2.clone()), + (ViewNumber::new(2), quorum_proposal3.clone()), + (ViewNumber::new(3), quorum_proposal4.clone()) + ]) ); - // Test garbage collection - // Deleting da proposals and vid shares with view number <=2 - storage.collect_garbage(ViewNumber::new(2)).await.unwrap(); + // Test decide and garbage collection. Pass in a leaf chain with no VID shares or payloads, + // so we have to fetch the missing data from storage. + let leaves = [ + Leaf::from_quorum_proposal(&quorum_proposal1.data), + Leaf::from_quorum_proposal(&quorum_proposal2.data), + Leaf::from_quorum_proposal(&quorum_proposal3.data), + Leaf::from_quorum_proposal(&quorum_proposal4.data), + ]; + let mut final_qc = leaves[3].justify_qc(); + final_qc.view_number += 1; + final_qc.data.leaf_commit = leaf.commit(); + let qcs = [ + leaves[1].justify_qc(), + leaves[2].justify_qc(), + leaves[3].justify_qc(), + final_qc, + ]; + + let consumer = EventCollector::default(); + let leaf_chain = leaves + .iter() + .take(3) + .map(|leaf| leaf_info(leaf.clone())) + .zip(&qcs) + .collect::>(); + tracing::info!(?leaf_chain, "decide view 2"); + storage + .append_decided_leaves( + ViewNumber::new(2), + leaf_chain.iter().map(|(leaf, qc)| (leaf, (*qc).clone())), + &consumer, + ) + .await + .unwrap(); for i in 0..=2 { assert_eq!( @@ -351,16 +390,244 @@ mod persistence_tests { assert_eq!( storage.load_vid_share(ViewNumber::new(3)).await.unwrap(), - Some(vid_share3) + Some(vid_share3.clone()) ); let proposals = storage.load_quorum_proposals().await.unwrap(); assert_eq!( proposals, - Some(BTreeMap::from_iter([( - ViewNumber::new(10), - quorum_proposal4 - )])) - ) + BTreeMap::from_iter([(ViewNumber::new(3), quorum_proposal4)]) + ); + + // A decide event should have been processed. + let events = consumer.events.read().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].view_number, ViewNumber::new(2)); + let EventType::Decide { qc, leaf_chain, .. } = &events[0].event else { + panic!("expected decide event, got {:?}", events[0]); + }; + assert_eq!(**qc, qcs[2]); + assert_eq!(leaf_chain.len(), 3, "{leaf_chain:#?}"); + for (leaf, info) in leaves.iter().zip(leaf_chain.iter().rev()) { + assert_eq!(info.leaf, *leaf); + let decided_vid_share = info.vid_share.as_ref().unwrap(); + assert_eq!(decided_vid_share.view_number, leaf.view_number()); + } + + // The decided leaf should not have been garbage collected. + assert_eq!( + storage.load_anchor_leaf().await.unwrap(), + Some((leaves[2].clone(), qcs[2].clone())) + ); + + // Process a second decide event. + let consumer = EventCollector::default(); + tracing::info!(leaf = ?leaves[3], qc = ?qcs[3], "decide view 3"); + storage + .append_decided_leaves( + ViewNumber::new(3), + vec![(&leaf_info(leaves[3].clone()), qcs[3].clone())], + &consumer, + ) + .await + .unwrap(); + + // A decide event should have been processed. + let events = consumer.events.read().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].view_number, ViewNumber::new(3)); + let EventType::Decide { qc, leaf_chain, .. } = &events[0].event else { + panic!("expected decide event, got {:?}", events[0]); + }; + assert_eq!(**qc, qcs[3]); + assert_eq!(leaf_chain.len(), 1); + let info = &leaf_chain[0]; + assert_eq!(info.leaf, leaves[3]); + + // The remaining data should have been GCed. + assert_eq!( + storage.load_da_proposal(ViewNumber::new(3)).await.unwrap(), + None + ); + + assert_eq!( + storage.load_vid_share(ViewNumber::new(3)).await.unwrap(), + None + ); + assert_eq!( + storage.load_quorum_proposals().await.unwrap(), + BTreeMap::new() + ); + } + + #[async_std::test] + pub async fn test_decide_with_failing_event_consumer() { + #[derive(Clone, Copy, Debug)] + struct FailConsumer; + + #[async_trait] + impl EventConsumer for FailConsumer { + async fn handle_event(&self, _: &Event) -> anyhow::Result<()> { + bail!("mock error injection"); + } + } + + setup_test(); + + let tmp = P::tmp_storage().await; + let mut storage = P::connect(&tmp).await; + + // Create a short blockchain. + let mut chain = vec![]; + + let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; + let leaf_payload = leaf.block_payload().unwrap(); + let leaf_payload_bytes_arc = leaf_payload.encode(); + let disperse = vid_scheme(2) + .disperse(leaf_payload_bytes_arc.clone()) + .unwrap(); + let (pubkey, privkey) = BLSPubKey::generated_from_seed_indexed([0; 32], 1); + let mut vid = VidDisperseShare:: { + view_number: ViewNumber::new(0), + payload_commitment: Default::default(), + share: disperse.shares[0].clone(), + common: disperse.common, + recipient_key: pubkey, + } + .to_proposal(&privkey) + .unwrap() + .clone(); + let mut quorum_proposal = QuorumProposal:: { + block_header: leaf.block_header().clone(), + view_number: ViewNumber::genesis(), + justify_qc: QuorumCertificate::genesis(&ValidatedState::default(), &NodeState::mock()) + .await, + upgrade_certificate: None, + proposal_certificate: None, + }; + let mut qc = + QuorumCertificate::genesis(&ValidatedState::default(), &NodeState::mock()).await; + + let block_payload_signature = BLSPubKey::sign(&privkey, &leaf_payload_bytes_arc) + .expect("Failed to sign block payload"); + let mut da_proposal = Proposal { + data: DaProposal:: { + encoded_transactions: leaf_payload_bytes_arc, + metadata: leaf_payload.ns_table().clone(), + view_number: ViewNumber::new(0), + }, + signature: block_payload_signature, + _pd: Default::default(), + }; + + for i in 0..4 { + quorum_proposal.view_number = ViewNumber::new(i); + let leaf = Leaf::from_quorum_proposal(&quorum_proposal); + qc.view_number = leaf.view_number(); + qc.data.leaf_commit = leaf.commit(); + vid.data.view_number = leaf.view_number(); + da_proposal.data.view_number = leaf.view_number(); + chain.push((leaf.clone(), qc.clone(), vid.clone(), da_proposal.clone())); + } + + // Add proposals. + for (_, _, vid, da) in &chain { + storage.append_da(da).await.unwrap(); + storage.append_vid(vid).await.unwrap(); + } + + // Decide 2 leaves, but fail in event processing. + let leaf_chain = chain + .iter() + .take(2) + .map(|(leaf, qc, _, _)| (leaf_info(leaf.clone()), qc.clone())) + .collect::>(); + storage + .append_decided_leaves( + ViewNumber::new(1), + leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())), + &FailConsumer, + ) + .await + .unwrap(); + // No garbage collection should have run. + for i in 0..4 { + assert!(storage + .load_vid_share(ViewNumber::new(i)) + .await + .unwrap() + .is_some()); + assert!(storage + .load_da_proposal(ViewNumber::new(i)) + .await + .unwrap() + .is_some()); + } + assert_eq!( + storage + .load_anchor_leaf() + .await + .unwrap() + .unwrap() + .0 + .view_number(), + ViewNumber::new(1) + ); + + // Now decide remaining leaves successfully. We should now garbage collect and process a + // decide event for all the leaves. + let consumer = EventCollector::default(); + let leaf_chain = chain + .iter() + .skip(2) + .map(|(leaf, qc, _, _)| (leaf_info(leaf.clone()), qc.clone())) + .collect::>(); + storage + .append_decided_leaves( + ViewNumber::new(3), + leaf_chain.iter().map(|(leaf, qc)| (leaf, qc.clone())), + &consumer, + ) + .await + .unwrap(); + // Garbage collection should have run. + for i in 0..4 { + assert!(storage + .load_vid_share(ViewNumber::new(i)) + .await + .unwrap() + .is_none()); + assert!(storage + .load_da_proposal(ViewNumber::new(i)) + .await + .unwrap() + .is_none()); + } + assert_eq!( + storage + .load_anchor_leaf() + .await + .unwrap() + .unwrap() + .0 + .view_number(), + ViewNumber::new(3) + ); + + // Check decide event. + let events = consumer.events.read().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].view_number, ViewNumber::new(3)); + let EventType::Decide { qc, leaf_chain, .. } = &events[0].event else { + panic!("expected decide event, got {:?}", events[0]); + }; + assert_eq!(**qc, chain[3].1); + assert_eq!(leaf_chain.len(), 4, "{leaf_chain:#?}"); + for ((leaf, _, _, _), info) in chain.iter().zip(leaf_chain.iter().rev()) { + assert_eq!(info.leaf, *leaf); + let decided_vid_share = info.vid_share.as_ref().unwrap(); + assert_eq!(decided_vid_share.view_number, leaf.view_number()); + assert!(info.leaf.block_payload().is_some()); + } } } diff --git a/sequencer/src/persistence/fs.rs b/sequencer/src/persistence/fs.rs index 16084f56be..d4270ea15f 100644 --- a/sequencer/src/persistence/fs.rs +++ b/sequencer/src/persistence/fs.rs @@ -1,27 +1,27 @@ -use std::{ - collections::BTreeMap, - fs::{self, File, OpenOptions}, - io::{Read, Seek, SeekFrom, Write}, - path::{Path, PathBuf}, -}; - use anyhow::{anyhow, Context}; +use async_std::sync::Arc; use async_trait::async_trait; use clap::Parser; use espresso_types::{ - v0::traits::{PersistenceOptions, SequencerPersistence}, - Leaf, NetworkConfig, SeqTypes, + v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence}, + Leaf, NetworkConfig, Payload, SeqTypes, }; use hotshot_types::{ consensus::CommitmentMap, data::{DaProposal, QuorumProposal, VidDisperseShare}, - event::HotShotAction, + event::{Event, EventType, HotShotAction, LeafInfo}, message::Proposal, simple_certificate::QuorumCertificate, - traits::node_implementation::ConsensusTime, + traits::{block_contents::BlockPayload, node_implementation::ConsensusTime}, utils::View, vote::HasViewNumber, }; +use std::{ + collections::BTreeMap, + fs::{self, File, OpenOptions}, + io::{Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, +}; use crate::ViewNumber; @@ -87,7 +87,13 @@ impl Persistence { self.path.join("highest_voted_view") } - fn anchor_leaf_path(&self) -> PathBuf { + /// Path to a directory containing decided leaves. + fn decided_leaf_path(&self) -> PathBuf { + self.path.join("decided_leaves") + } + + /// The path from previous versions where there was only a single file for anchor leaves. + fn legacy_anchor_leaf_path(&self) -> PathBuf { self.path.join("anchor_leaf") } @@ -176,35 +182,6 @@ impl SequencerPersistence for Persistence { Ok(cfg.to_file(path.display().to_string())?) } - async fn collect_garbage(&mut self, view: ViewNumber) -> anyhow::Result<()> { - let view_number = view.u64(); - - let delete_files = |dir_path: PathBuf| -> anyhow::Result<()> { - if !dir_path.is_dir() { - return Ok(()); - } - - for entry in fs::read_dir(dir_path)? { - let entry = entry?; - let path = entry.path(); - - if let Some(file) = path.file_stem().and_then(|n| n.to_str()) { - if let Ok(v) = file.parse::() { - if v <= view_number { - fs::remove_file(&path)?; - } - } - } - } - - Ok(()) - }; - - delete_files(self.da_dir_path())?; - delete_files(self.vid_dir_path())?; - delete_files(self.quorum_proposals_dir_path()) - } - async fn load_latest_acted_view(&self) -> anyhow::Result> { let path = self.voted_view_path(); if !path.is_file() { @@ -216,66 +193,126 @@ impl SequencerPersistence for Persistence { Ok(Some(ViewNumber::new(u64::from_le_bytes(bytes)))) } - async fn save_anchor_leaf( + async fn append_decided_leaves( &mut self, - leaf: &Leaf, - qc: &QuorumCertificate, + view: ViewNumber, + leaf_chain: impl IntoIterator, QuorumCertificate)> + Send, + consumer: &impl EventConsumer, ) -> anyhow::Result<()> { - self.replace( - &self.anchor_leaf_path(), - |mut file| { - // Check if we already have a later leaf before writing the new one. The height of - // the latest saved leaf is in the first 8 bytes of the file. - if file.metadata()?.len() < 8 { - // This shouldn't happen, but if there is an existing file smaller than 8 bytes, - // it is not encoding a valid height, and we want to proceed with the swap. - tracing::warn!("anchor leaf file smaller than 8 bytes will be replaced"); - return Ok(true); - } - let mut height_bytes = [0; 8]; - file.read_exact(&mut height_bytes).context("read height")?; - let height = u64::from_le_bytes(height_bytes); - if height >= leaf.height() { - tracing::warn!( - saved_height = height, - new_height = leaf.height(), - "not writing anchor leaf because saved leaf has newer height", - ); - return Ok(false); + let path = self.decided_leaf_path(); + + // Ensure the anchor leaf directory exists. + fs::create_dir_all(&path).context("creating anchor leaf directory")?; + + // Earlier versions stored only a single decided leaf in a regular file. If our storage is + // still on this version, migrate to a directory structure storing (possibly) many leaves. + let legacy_path = self.legacy_anchor_leaf_path(); + if !path.is_dir() && legacy_path.is_file() { + tracing::info!("migrating to multi-leaf storage"); + + // Move the existing data into the new directory. + let (leaf, qc) = self + .load_anchor_leaf() + .await? + .context("anchor leaf file exists but unable to load contents")?; + let view = leaf.view_number().u64(); + let bytes = bincode::serialize(&(leaf, qc))?; + let new_file = path.join(view.to_string()).with_extension("txt"); + fs::write(new_file, bytes).context(format!("writing anchor leaf file {view}"))?; + + // Now we can remove the old file. + fs::remove_file(&legacy_path).context("removing legacy anchor leaf file")?; + } + + for (info, qc) in leaf_chain { + let view = info.leaf.view_number().u64(); + let file_path = path.join(view.to_string()).with_extension("txt"); + self.replace( + &file_path, + |_| { + // Don't overwrite an existing leaf, but warn about it as this is likely not + // intended behavior from HotShot. + tracing::warn!(view, "duplicate decided leaf"); + Ok(false) + }, + |mut file| { + let bytes = bincode::serialize(&(&info.leaf, qc))?; + file.write_all(&bytes)?; + Ok(()) + }, + )?; + } + + // Event processing failure is not an error, since by this point we have at least managed to + // persist the decided leaves successfully, and the event processing will just run again at + // the next decide. If there is an error here, we just log it and return early with success + // to prevent GC from running before the decided leaves are processed. + match self.decide_event(view).await { + Ok(event) => { + if let Err(err) = consumer.handle_event(&event).await { + tracing::warn!(?view, "event processing failed: {err:#}"); + return Ok(()); } + } + Err(err) => { + tracing::warn!(?view, "event creation: {err:#}"); + return Ok(()); + } + } - // The existing leaf is older than the new leaf (this is the common case). Proceed - // with the swap. - Ok(true) - }, - |mut file| { - // Save the new leaf. First we write the height. - file.write_all(&leaf.height().to_le_bytes()) - .context("write height")?; - // Now serialize and write out the actual leaf and its corresponding QC. - let bytes = bincode::serialize(&(leaf, qc)).context("serialize leaf")?; - file.write_all(&bytes).context("write leaf")?; - Ok(()) - }, - ) + if let Err(err) = self.collect_garbage(view).await { + // Similarly, garbage collection is not an error. We have done everything we strictly + // needed to do, and GC will run again at the next decide. Log the error but do not + // return it. + tracing::warn!(?view, "GC failed: {err:#}"); + } + + Ok(()) } async fn load_anchor_leaf( &self, ) -> anyhow::Result)>> { - let path = self.anchor_leaf_path(); - if !path.is_file() { - return Ok(None); + if self.decided_leaf_path().is_dir() { + let mut anchor: Option<(Leaf, QuorumCertificate)> = None; + + // Return the latest decided leaf. + for entry in + fs::read_dir(self.decided_leaf_path()).context("opening decided leaf directory")? + { + let file = entry.context("reading decided leaf directory")?.path(); + let bytes = + fs::read(&file).context(format!("reading decided leaf {}", file.display()))?; + let (leaf, qc) = + bincode::deserialize::<(Leaf, QuorumCertificate)>(&bytes) + .context(format!("parsing decided leaf {}", file.display()))?; + if let Some((anchor_leaf, _)) = &anchor { + if leaf.view_number() > anchor_leaf.view_number() { + anchor = Some((leaf, qc)); + } + } else { + anchor = Some((leaf, qc)); + } + } + + return Ok(anchor); } - let mut file = File::open(path)?; - - // The first 8 bytes just contain the height of the leaf. We can skip this. - file.seek(SeekFrom::Start(8)).context("seek")?; - let bytes = file - .bytes() - .collect::, _>>() - .context("read")?; - Ok(Some(bincode::deserialize(&bytes).context("deserialize")?)) + + if self.legacy_anchor_leaf_path().is_file() { + // We may have an old version of storage, where there is just a single file for the + // anchor leaf. Read it and return the contents. + let mut file = File::open(self.legacy_anchor_leaf_path())?; + + // The first 8 bytes just contain the height of the leaf. We can skip this. + file.seek(SeekFrom::Start(8)).context("seek")?; + let bytes = file + .bytes() + .collect::, _>>() + .context("read")?; + return Ok(Some(bincode::deserialize(&bytes).context("deserialize")?)); + } + + Ok(None) } async fn load_undecided_state( @@ -449,10 +486,12 @@ impl SequencerPersistence for Persistence { } async fn load_quorum_proposals( &self, - ) -> anyhow::Result>>>> - { + ) -> anyhow::Result>>> { // First, get the proposal directory. let dir_path = self.quorum_proposals_dir_path(); + if !dir_path.is_dir() { + return Ok(Default::default()); + } // Then, we want to get the entries in this directory since they'll be the // key/value pairs for our map. @@ -464,12 +503,6 @@ impl SequencerPersistence for Persistence { }) .collect(); - // Do we have any entries? - if files.is_empty() { - // Don't bother continuing if we don't have any data. - return Ok(None); - } - // Read all of the files let proposal_files = files .into_iter() @@ -500,7 +533,133 @@ impl SequencerPersistence for Persistence { } } - Ok(Some(map)) + Ok(map) + } +} + +impl Persistence { + async fn collect_garbage(&mut self, view: ViewNumber) -> anyhow::Result<()> { + let view_number = view.u64(); + + let delete_files = |view_number: u64, dir_path: PathBuf| -> anyhow::Result<()> { + if !dir_path.is_dir() { + return Ok(()); + } + + for entry in fs::read_dir(dir_path)? { + let entry = entry?; + let path = entry.path(); + + if let Some(file) = path.file_stem().and_then(|n| n.to_str()) { + if let Ok(v) = file.parse::() { + if v <= view_number { + fs::remove_file(&path)?; + } + } + } + } + + Ok(()) + }; + + delete_files(view_number, self.da_dir_path())?; + delete_files(view_number, self.vid_dir_path())?; + delete_files(view_number, self.quorum_proposals_dir_path())?; + + // Save the most recent leaf as it will be our anchor point if the node restarts. + if view_number > 0 { + delete_files(view_number - 1, self.decided_leaf_path())?; + } + + Ok(()) + } + + async fn decide_event(&self, view: ViewNumber) -> anyhow::Result> { + // Construct a chain of all decided leaves up to `view` which have not yet been garbage + // collected. + let mut leaves = BTreeMap::new(); + let mut high_qc: Option> = None; + + for entry in fs::read_dir(self.decided_leaf_path())? { + let entry = entry?; + let path = entry.path(); + + let Some(file) = path.file_stem().and_then(|n| n.to_str()) else { + continue; + }; + let Ok(v) = file.parse::() else { + continue; + }; + if v > view.u64() { + continue; + } + + let bytes = + fs::read(&path).context(format!("reading decided leaf {}", path.display()))?; + let (mut leaf, qc) = + bincode::deserialize::<(Leaf, QuorumCertificate)>(&bytes) + .context(format!("parsing decided leaf {}", path.display()))?; + + // Include the VID share if available. + let vid_share = self + .load_vid_share(ViewNumber::new(v)) + .await? + .map(|proposal| proposal.data); + if vid_share.is_none() { + tracing::debug!(view = v, "VID share not available at decide"); + } + + // Fill in the full block payload using the DA proposals we had persisted. + if let Some(proposal) = self.load_da_proposal(ViewNumber::new(v)).await? { + let payload = Payload::from_bytes( + &proposal.data.encoded_transactions, + &proposal.data.metadata, + ); + leaf.fill_block_payload_unchecked(payload); + } else { + tracing::debug!(view = v, "DA proposal not available at decide"); + } + + let info = LeafInfo { + leaf, + vid_share, + + // Note: the following fields are not used in Decide event processing, and should be + // removed. For now, we just default them. + state: Default::default(), + delta: Default::default(), + }; + + leaves.insert(v, info); + if let Some(high_qc) = &mut high_qc { + if v > high_qc.view_number.u64() { + *high_qc = qc; + } + } else { + high_qc = Some(qc); + } + } + + // The invariant is that the oldest existing leaf in the `anchor_leaf` table -- if there is + // one -- was always included in the _previous_ decide event...but not removed from the + // database, because we always persist the most recent anchor leaf. + if let Some((oldest_view, _)) = leaves.first_key_value() { + // The only exception is when the oldest leaf is the genesis leaf; then there was no + // previous decide event. + if *oldest_view > 0 { + leaves.pop_first(); + } + } + + let high_qc = high_qc.context("no new leaves at decide event")?; + Ok(Event { + view_number: view, + event: EventType::Decide { + qc: Arc::new(high_qc), + block_size: None, + leaf_chain: Arc::new(leaves.into_values().rev().collect()), + }, + }) } } diff --git a/sequencer/src/persistence/no_storage.rs b/sequencer/src/persistence/no_storage.rs index 6576930e15..e0a01900d2 100644 --- a/sequencer/src/persistence/no_storage.rs +++ b/sequencer/src/persistence/no_storage.rs @@ -1,21 +1,21 @@ //! Mock implementation of persistence, for testing. #![cfg(any(test, feature = "testing"))] -use std::collections::BTreeMap; - +use async_std::sync::Arc; use async_trait::async_trait; use espresso_types::{ - v0::traits::{PersistenceOptions, SequencerPersistence}, + v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence}, Leaf, NetworkConfig, }; use hotshot_types::{ consensus::CommitmentMap, data::{DaProposal, QuorumProposal, VidDisperseShare}, - event::HotShotAction, + event::{Event, EventType, HotShotAction, LeafInfo}, message::Proposal, simple_certificate::QuorumCertificate, utils::View, }; +use std::collections::BTreeMap; use crate::{SeqTypes, ViewNumber}; @@ -48,16 +48,34 @@ impl SequencerPersistence for NoStorage { Ok(()) } - async fn collect_garbage(&mut self, _view: ViewNumber) -> anyhow::Result<()> { - Ok(()) - } - - async fn save_anchor_leaf( + async fn append_decided_leaves( &mut self, - _: &Leaf, - _: &QuorumCertificate, + view_number: ViewNumber, + leaves: impl IntoIterator, QuorumCertificate)> + Send, + consumer: &impl EventConsumer, ) -> anyhow::Result<()> { - Ok(()) + let (mut leaf_chain, mut qcs): (Vec<_>, Vec<_>) = leaves + .into_iter() + .map(|(info, qc)| (info.clone(), qc)) + .unzip(); + + // Put in reverse chronological order, as expected from Decide events. + leaf_chain.reverse(); + qcs.reverse(); + + // Generate decide event for the consumer. + let final_qc = qcs.pop().unwrap(); + + consumer + .handle_event(&Event { + view_number, + event: EventType::Decide { + leaf_chain: Arc::new(leaf_chain), + qc: Arc::new(final_qc), + block_size: None, + }, + }) + .await } async fn load_latest_acted_view(&self) -> anyhow::Result> { @@ -92,9 +110,8 @@ impl SequencerPersistence for NoStorage { async fn load_quorum_proposals( &self, - ) -> anyhow::Result>>>> - { - Ok(None) + ) -> anyhow::Result>>> { + Ok(Default::default()) } async fn append_vid( diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index 08ed947cdc..d7c615581c 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -1,5 +1,3 @@ -use std::{collections::BTreeMap, time::Duration}; - use anyhow::Context; use async_std::{ stream::StreamExt, @@ -10,8 +8,8 @@ use clap::Parser; use derivative::Derivative; use espresso_types::{ parse_duration, - v0::traits::{PersistenceOptions, SequencerPersistence, StateCatchup}, - BackoffParams, Leaf, NetworkConfig, + v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup}, + BackoffParams, Leaf, NetworkConfig, Payload, }; use futures::future::{BoxFuture, FutureExt}; use hotshot_query_service::data_source::{ @@ -24,13 +22,14 @@ use hotshot_query_service::data_source::{ use hotshot_types::{ consensus::CommitmentMap, data::{DaProposal, QuorumProposal, VidDisperseShare}, - event::HotShotAction, + event::{Event, EventType, HotShotAction, LeafInfo}, message::Proposal, simple_certificate::QuorumCertificate, - traits::node_implementation::ConsensusTime, + traits::{node_implementation::ConsensusTime, BlockPayload}, utils::View, vote::HasViewNumber, }; +use std::{collections::BTreeMap, time::Duration}; use crate::{catchup::SqlStateCatchup, SeqTypes, ViewNumber}; @@ -325,70 +324,53 @@ impl SequencerPersistence for Persistence { .await } - async fn collect_garbage(&mut self, view: ViewNumber) -> anyhow::Result<()> { - transaction(&mut self.db, |mut tx| { - async move { - let stmt1 = "DELETE FROM vid_share where view <= $1"; - tx.execute(stmt1, [&(view.u64() as i64)]).await?; - - let stmt2 = "DELETE FROM da_proposal where view <= $1"; - tx.execute(stmt2, [&(view.u64() as i64)]).await?; - - let stmt3 = "DELETE FROM quorum_proposals where view <= $1"; - tx.execute(stmt3, [&(view.u64() as i64)]).await?; - Ok(()) - } - .boxed() - }) - .await - } - - async fn save_anchor_leaf( + async fn append_decided_leaves( &mut self, - leaf: &Leaf, - qc: &QuorumCertificate, + view: ViewNumber, + leaf_chain: impl IntoIterator, QuorumCertificate)> + Send, + consumer: &(impl EventConsumer + 'static), ) -> anyhow::Result<()> { - let stmt = " - INSERT INTO anchor_leaf (id, height, view, leaf, qc) VALUES (0, $1, $2, $3, $4) - ON CONFLICT (id) DO UPDATE SET (height, view, leaf, qc) = ROW ( - GREATEST(anchor_leaf.height, excluded.height), - CASE - WHEN excluded.height > anchor_leaf.height THEN excluded.view - ELSE anchor_leaf.view - END, - CASE - WHEN excluded.height > anchor_leaf.height THEN excluded.leaf - ELSE anchor_leaf.leaf - END, - CASE - WHEN excluded.height > anchor_leaf.height THEN excluded.qc - ELSE anchor_leaf.qc - END - ) - "; - - let height = leaf.height() as i64; - let view = qc.view_number.u64() as i64; - let leaf_bytes = bincode::serialize(leaf)?; - let qc_bytes = bincode::serialize(qc)?; + let values = leaf_chain + .into_iter() + .map(|(info, qc)| { + let view = qc.view_number.u64() as i64; + let leaf_bytes = bincode::serialize(&info.leaf)?; + let qc_bytes = bincode::serialize(&qc)?; + Ok((view, leaf_bytes, qc_bytes)) + }) + .collect::>>()?; + // First, append the new leaves. We do this in its own transaction because even if GC or the + // event consumer later fails, there is no need to abort the storage of the leaves. transaction(&mut self.db, |mut tx| { async move { - tx.execute_one_with_retries( - stmt, - [ - sql_param(&height), - sql_param(&view), - sql_param(&leaf_bytes), - sql_param(&qc_bytes), - ], - ) - .await?; + let rows = values + .iter() + .map(|(view, leaf, qc)| [sql_param(view), sql_param(leaf), sql_param(qc)]); + + tx.upsert("anchor_leaf", ["view", "leaf", "qc"], ["view"], rows) + .await?; Ok(()) } .boxed() }) + .await?; + + // Generate an event for the new leaves and, only if it succeeds, clean up data we no longer + // need. + let consumer = dyn_clone::clone(consumer); + if let Err(err) = transaction(&mut self.db, move |tx| { + collect_garbage(tx, view, consumer).boxed() + }) .await + { + // GC/event processing failure is not an error, since by this point we have at least + // managed to persist the decided leaves successfully, and GC will just run again at the + // next decide. Log an error but do not return it. + tracing::warn!(?view, "GC/event processing failed: {err:#}"); + } + + Ok(()) } async fn load_latest_acted_view(&self) -> anyhow::Result> { @@ -407,7 +389,7 @@ impl SequencerPersistence for Persistence { ) -> anyhow::Result)>> { let Some(row) = self .db - .query_opt_static("SELECT leaf, qc FROM anchor_leaf WHERE id = 0") + .query_opt_static("SELECT leaf, qc FROM anchor_leaf ORDER BY view DESC LIMIT 1") .await? else { return Ok(None); @@ -484,14 +466,13 @@ impl SequencerPersistence for Persistence { async fn load_quorum_proposals( &self, - ) -> anyhow::Result>>>> - { + ) -> anyhow::Result>>> { let rows = self .db .query_static("SELECT * FROM quorum_proposals") .await?; - Ok(Some(BTreeMap::from_iter( + Ok(BTreeMap::from_iter( rows.map(|row| { let row = row?; let view: i64 = row.get("view"); @@ -503,7 +484,7 @@ impl SequencerPersistence for Persistence { }) .collect::>>() .await?, - ))) + )) } async fn append_vid( @@ -625,6 +606,186 @@ impl SequencerPersistence for Persistence { } } +async fn collect_garbage( + mut tx: Transaction<'_>, + view: ViewNumber, + consumer: impl EventConsumer, +) -> anyhow::Result<()> { + // Clean up and collect VID shares. + let mut vid_shares = tx + .query( + "DELETE FROM vid_share where view <= $1 RETURNING view, data", + [&(view.u64() as i64)], + ) + .await? + .map(|row| { + let row = row?; + let view: i64 = row.get("view"); + let data: Vec = row.get("data"); + let vid_proposal = + bincode::deserialize::>>(&data)?; + Ok((view as u64, vid_proposal.data)) + }) + .collect::>>() + .await?; + + // Clean up and collect DA proposals. + let mut da_proposals = tx + .query( + "DELETE FROM da_proposal where view <= $1 RETURNING view, data", + [&(view.u64() as i64)], + ) + .await? + .map(|row| { + let row = row?; + let view: i64 = row.get("view"); + let data: Vec = row.get("data"); + let da_proposal = + bincode::deserialize::>>(&data)?; + Ok((view as u64, da_proposal.data)) + }) + .collect::>>() + .await?; + + // Clean up and collect leaves, except do not delete the most recent leaf: we need to remember + // this so that in case we restart, we can pick up from the last decided leaf. We still do + // include this leaf in the query results (the `UNION` clause) so we can include it in the + // decide event we send to the consumer. + let mut leaves = tx + .query( + "SELECT view, leaf, qc FROM anchor_leaf WHERE view <= $1", + [&(view.u64() as i64)], + ) + .await? + .map(|row| { + let row = row?; + let view: i64 = row.get("view"); + let leaf_data: Vec = row.get("leaf"); + let leaf = bincode::deserialize::(&leaf_data)?; + let qc_data: Vec = row.get("qc"); + let qc = bincode::deserialize::>(&qc_data)?; + Ok((view as u64, (leaf, qc))) + }) + .collect::>>() + .await?; + tx.execute( + "DELETE FROM anchor_leaf WHERE view < $1", + [&(view.u64() as i64)], + ) + .await?; + + // Clean up old proposals. These are not part of the decide event we generate for the consumer, + // so we don't need to return them. + tx.execute( + "DELETE FROM quorum_proposals where view <= $1", + [&(view.u64() as i64)], + ) + .await?; + + // Exclude from the decide event any leaves which have definitely already been processed. We may + // have selected an already-processed leaf because the oldest leaf -- the last leaf processed in + // the previous decide event -- remained in the database to serve as the anchor leaf, so our + // query would have returned it. In fact, this will almost always be the case, but there are two + // cases where it might not be, and we must process this leaf after all: + // + // 1. The oldest leaf is the genesis leaf, and there _is_ no previous decide event + // 2. We previously stored some leaves in the database and then failed while processing the + // decide event, or shut down before generating the decide event, and so we are only now + // generating the decide event for those previous leaves. + // + // Since these cases (particularly case 2) are hard to account for explicitly, we just use a + // persistent value in the database to remember how far we have successfully processed the event + // stream. + let last_processed_view: Option = tx + .query_opt_static("SELECT last_processed_view FROM event_stream WHERE id = 1 LIMIT 1") + .await? + .map(|row| row.get("last_processed_view")); + let leaves = if let Some(v) = last_processed_view { + let new_leaves = leaves.split_off(&((v as u64) + 1)); + if !leaves.is_empty() { + tracing::debug!( + v, + remaining_leaves = new_leaves.len(), + ?leaves, + "excluding already-processed leaves from decide event" + ); + } + new_leaves + } else { + leaves + }; + + // Collate all the information by view number and construct a chain of leaves and a chain of + // corresponding QCs. + let (leaf_chain, qcs): (Vec<_>, Vec<_>) = leaves + .into_iter() + // Go in reverse chronological order, as expected by Decide events. + .rev() + .map(|(view, (mut leaf, qc))| { + // Include the VID share if available. + let vid_share = vid_shares.remove(&view); + if vid_share.is_none() { + tracing::debug!(view, "VID share not available at decide"); + } + + // Fill in the full block payload using the DA proposals we had persisted. + if let Some(proposal) = da_proposals.remove(&view) { + let payload = + Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata); + leaf.fill_block_payload_unchecked(payload); + } else { + tracing::debug!(view, "DA proposal not available at decide"); + } + + ( + LeafInfo { + leaf, + vid_share, + + // Note: the following fields are not used in Decide event processing, and + // should be removed. For now, we just default them. + state: Default::default(), + delta: Default::default(), + }, + qc, + ) + }) + .unzip(); + + // Generate decide event for the consumer. + let Some(final_qc) = qcs.into_iter().next() else { + tracing::info!(?view, "no new leaves at decide"); + return Ok(()); + }; + tracing::debug!(?view, ?final_qc, ?leaf_chain, "generating decide event"); + + consumer + .handle_event(&Event { + view_number: view, + event: EventType::Decide { + leaf_chain: Arc::new(leaf_chain), + qc: Arc::new(final_qc), + block_size: None, + }, + }) + .await?; + + // Now that we have definitely processed leaves up to `view`, we can update + // `last_processed_view` so we don't process these leaves again. We may still fail at this + // point, or shut down, and fail to complete this update. At worst this will lead to us sending + // a duplicate decide event the next time we are called; this is fine as the event consumer is + // required to be idempotent. + tx.upsert( + "event_stream", + ["id", "last_processed_view"], + ["id"], + [[sql_param(&1i32), sql_param(&(view.u64() as i64))]], + ) + .await?; + + Ok(()) +} + pub(crate) fn sql_param(param: &T) -> &(dyn ToSql + Sync) { param } diff --git a/sequencer/src/restart_tests.rs b/sequencer/src/restart_tests.rs new file mode 100644 index 0000000000..0402c968e5 --- /dev/null +++ b/sequencer/src/restart_tests.rs @@ -0,0 +1,753 @@ +#![cfg(test)] + +use super::*; +use anyhow::bail; +use async_compatibility_layer::art::async_timeout; +use async_std::task::{sleep, spawn, JoinHandle}; +use cdn_broker::{reexports::crypto::signature::KeyPair, Broker, Config as BrokerConfig}; +use cdn_marshal::{Config as MarshalConfig, Marshal}; +use derivative::Derivative; +use espresso_types::{traits::PersistenceOptions, PrivKey, PubKey, SeqTypes, SequencerVersions}; +use ethers::utils::{Anvil, AnvilInstance}; +use futures::{ + future::{join_all, try_join_all, BoxFuture, FutureExt}, + stream::{BoxStream, StreamExt}, +}; +use hotshot::traits::implementations::derive_libp2p_peer_id; +use hotshot_orchestrator::{ + config::{Libp2pConfig, NetworkConfig}, + run_orchestrator, +}; +use hotshot_types::{ + event::{Event, EventType}, + light_client::StateKeyPair, + traits::{ + node_implementation::{ConsensusTime, Versions}, + signature_key::SignatureKey, + }, +}; +use itertools::Itertools; +use portpicker::pick_unused_port; +use sequencer::{ + api::{self, data_source::testing::TestableSequencerDataSource, options::Http}, + genesis::StakeTableConfig, + network::cdn::{TestingDef, WrappedSignatureKey}, +}; +use sequencer_utils::test_utils::setup_test; +use std::{collections::HashSet, path::Path, time::Duration}; +use tempfile::TempDir; + +async fn test_restart_helper(network: (usize, usize), restart: (usize, usize), cdn: bool) { + setup_test(); + + let mut network = TestNetwork::new(network.0, network.1, cdn).await; + + // Let the network get going. + network.check_progress().await; + // Restart some combination of nodes and ensure progress resumes. + network.restart(restart.0, restart.1).await; + + network.shut_down().await; +} + +#[async_std::test] +async fn slow_test_restart_1_da_with_cdn() { + test_restart_helper((2, 3), (1, 0), true).await; +} + +#[async_std::test] +async fn slow_test_restart_1_regular_with_cdn() { + test_restart_helper((2, 3), (0, 1), true).await; +} + +#[async_std::test] +async fn slow_test_restart_f_with_cdn() { + test_restart_helper((4, 6), (1, 2), true).await; +} + +#[async_std::test] +async fn slow_test_restart_f_minus_1_with_cdn() { + test_restart_helper((4, 6), (1, 1), true).await; +} + +#[async_std::test] +async fn slow_test_restart_f_plus_1_with_cdn() { + test_restart_helper((4, 6), (1, 3), true).await; +} + +#[async_std::test] +async fn slow_test_restart_2f_with_cdn() { + test_restart_helper((4, 6), (1, 5), true).await; +} + +#[async_std::test] +async fn slow_test_restart_2f_minus_1_with_cdn() { + test_restart_helper((4, 6), (1, 4), true).await; +} + +#[async_std::test] +async fn slow_test_restart_2f_plus_1_with_cdn() { + test_restart_helper((4, 6), (2, 5), true).await; +} + +#[ignore] +#[async_std::test] +async fn slow_test_restart_all_with_cdn() { + test_restart_helper((2, 8), (2, 8), true).await; +} + +#[async_std::test] +async fn slow_test_restart_all_da_with_cdn() { + test_restart_helper((2, 8), (2, 0), true).await; +} + +#[async_std::test] +async fn slow_test_restart_1_da_without_cdn() { + test_restart_helper((2, 3), (1, 0), false).await; +} + +#[async_std::test] +async fn slow_test_restart_1_regular_without_cdn() { + test_restart_helper((2, 3), (0, 1), false).await; +} + +#[async_std::test] +async fn slow_test_restart_f_without_cdn() { + test_restart_helper((4, 6), (1, 2), false).await; +} + +#[async_std::test] +async fn slow_test_restart_f_minus_1_without_cdn() { + test_restart_helper((4, 6), (1, 1), false).await; +} + +#[async_std::test] +async fn slow_test_restart_f_plus_1_without_cdn() { + test_restart_helper((4, 6), (1, 3), false).await; +} + +#[async_std::test] +async fn slow_test_restart_2f_without_cdn() { + test_restart_helper((4, 6), (1, 5), false).await; +} + +#[async_std::test] +async fn slow_test_restart_2f_minus_1_without_cdn() { + test_restart_helper((4, 6), (1, 4), false).await; +} + +#[async_std::test] +async fn slow_test_restart_2f_plus_1_without_cdn() { + test_restart_helper((4, 6), (2, 5), false).await; +} + +#[ignore] +#[async_std::test] +async fn slow_test_restart_all_without_cdn() { + test_restart_helper((2, 8), (2, 8), false).await; +} + +#[async_std::test] +async fn slow_test_restart_all_da_without_cdn() { + test_restart_helper((2, 8), (2, 0), false).await; +} + +#[derive(Clone, Copy, Debug)] +struct NetworkParams<'a> { + genesis_file: &'a Path, + orchestrator_port: u16, + cdn_port: u16, + l1_provider: &'a str, + peer_ports: &'a [u16], +} + +#[derive(Clone, Debug)] +struct NodeParams { + api_port: u16, + libp2p_port: u16, + staking_key: PrivKey, + state_key: StateKeyPair, + is_da: bool, +} + +impl NodeParams { + fn new(ports: &mut PortPicker, i: u64, is_da: bool) -> Self { + Self { + api_port: ports.pick(), + libp2p_port: ports.pick(), + staking_key: PubKey::generated_from_seed_indexed([0; 32], i).1, + state_key: StateKeyPair::generate_from_seed_indexed([0; 32], i), + is_da, + } + } +} + +#[derive(Debug)] +struct TestNode { + storage: S::Storage, + context: Option< + SequencerContext< + network::Production, + ::Persistence, + ::Base, + >, + >, + modules: Modules, + opt: Options, + num_nodes: usize, +} + +impl TestNode { + #[tracing::instrument] + async fn new(network: NetworkParams<'_>, node: &NodeParams) -> Self { + tracing::info!(?network, ?node, "creating node",); + + let storage = S::create_storage().await; + let mut modules = Modules { + http: Some(Http::with_port(node.api_port)), + status: Some(Default::default()), + catchup: Some(Default::default()), + ..Default::default() + }; + if node.is_da { + modules.query = Some(Default::default()); + modules.state = Some(Default::default()); + } + + let mut opt = Options::parse_from([ + "sequencer", + "--private-staking-key", + &node.staking_key.to_string(), + "--private-state-key", + &node.state_key.sign_key_ref().to_string(), + "--genesis-file", + &network.genesis_file.display().to_string(), + "--orchestrator-url", + &format!("http://localhost:{}", network.orchestrator_port), + "--libp2p-bind-address", + &format!("0.0.0.0:{}", node.libp2p_port), + "--libp2p-advertise-address", + &format!("127.0.0.1:{}", node.libp2p_port), + "--cdn-endpoint", + &format!("127.0.0.1:{}", network.cdn_port), + "--state-peers", + &network + .peer_ports + .iter() + .map(|port| format!("http://127.0.0.1:{port}")) + .join(","), + "--l1-provider-url", + network.l1_provider, + ]); + opt.is_da = node.is_da; + Self { + storage, + modules, + opt, + num_nodes: network.peer_ports.len(), + context: None, + } + } + + fn stop(&mut self) -> BoxFuture<()> { + async { + if let Some(mut context) = self.context.take() { + tracing::info!(node_id = context.node_id(), "stopping node"); + context.shut_down().await; + } + } + .boxed() + } + + fn start(&mut self) -> BoxFuture<()> + where + S::Storage: Send, + { + async { + tracing::info!("starting node"); + + // If we are starting a node which had already been started and stopped, we may need to + // delay a bit for the OS to reclaim the node's P2P port. Otherwise initialization of + // libp2p may fail with "address already in use". Thus, retry the node initialization + // with a backoff. + let mut retries = 5; + let mut delay = Duration::from_secs(1); + let ctx = loop { + match init_with_storage( + self.modules.clone(), + self.opt.clone(), + S::persistence_options(&self.storage), + ::Base::instance(), + ) + .await + { + Ok(ctx) => break ctx, + Err(err) => { + tracing::error!(retries, ?delay, "initialization failed: {err:#}"); + if retries == 0 { + panic!("initialization failed too many times"); + } + + sleep(delay).await; + delay *= 2; + retries -= 1; + } + } + }; + + tracing::info!(node_id = ctx.node_id(), "starting consensus"); + ctx.start_consensus().await; + self.context = Some(ctx); + } + .boxed() + } + + async fn event_stream(&self) -> Option>> { + if let Some(ctx) = &self.context { + Some(ctx.event_stream().await.boxed()) + } else { + None + } + } + + fn check_progress_with_timeout(&self) -> BoxFuture> { + async { + let Some(context) = &self.context else { + tracing::info!("skipping progress check on stopped node"); + return Ok(()); + }; + let node_id = context.node_id(); + let next_view_timeout = { + context + .consensus() + .read() + .await + .hotshot + .config + .next_view_timeout + }; + // Give enough time for every node to propose, with every view timing out. This is + // conservative: of course if we actually make progress, not every view will time out, + // and we will take less than this amount of time. + let timeout = 2 * Duration::from_millis(next_view_timeout) * (self.num_nodes as u32); + match async_timeout(timeout, self.check_progress()).await { + Ok(res) => res, + Err(_) => bail!("timed out waiting for progress on node {node_id}"), + } + } + .boxed() + } + + async fn check_progress(&self) -> anyhow::Result<()> { + let Some(context) = &self.context else { + tracing::info!("skipping progress check on stopped node"); + return Ok(()); + }; + + let num_nodes = { + context + .consensus() + .read() + .await + .hotshot + .config + .num_nodes_with_stake + }; + let node_id = context.node_id(); + tracing::info!(node_id, num_nodes, "waiting for progress from node"); + + // Wait for a block proposed by this node. This proves that the node is tracking consensus + // (getting Decide events) and participating (able to propose). + let mut events = context.event_stream().await; + while let Some(event) = events.next().await { + let EventType::Decide { leaf_chain, .. } = event.event else { + continue; + }; + for leaf in leaf_chain.iter() { + if leaf.leaf.view_number().u64() % (num_nodes.get() as u64) == node_id { + tracing::info!( + node_id, + height = leaf.leaf.height(), + "got leaf proposed by this node" + ); + return Ok(()); + } + tracing::info!( + node_id, + height = leaf.leaf.height(), + view = leaf.leaf.view_number().u64(), + "leaf not proposed by this node" + ); + } + } + + bail!("node {node_id} event stream ended unexpectedly"); + } +} + +#[derive(Derivative)] +#[derivative(Debug)] +struct TestNetwork { + da_nodes: Vec>, + regular_nodes: Vec>, + tmp: TempDir, + orchestrator_task: Option>, + broker_task: Option>, + marshal_task: Option>, + #[derivative(Debug = "ignore")] + _anvil: AnvilInstance, +} + +impl Drop for TestNetwork { + fn drop(&mut self) { + if let Some(task) = self.orchestrator_task.take() { + async_std::task::block_on(task.cancel()); + } + if let Some(task) = self.broker_task.take() { + async_std::task::block_on(task.cancel()); + } + if let Some(task) = self.marshal_task.take() { + async_std::task::block_on(task.cancel()); + } + } +} + +impl TestNetwork { + async fn new(da_nodes: usize, regular_nodes: usize, cdn: bool) -> Self { + let mut ports = PortPicker::default(); + + let tmp = TempDir::new().unwrap(); + let genesis_file = tmp.path().join("genesis.toml"); + let genesis = Genesis { + chain_config: Default::default(), + stake_table: StakeTableConfig { capacity: 10 }, + accounts: Default::default(), + l1_finalized: Default::default(), + header: Default::default(), + upgrades: Default::default(), + }; + genesis.to_file(&genesis_file).unwrap(); + + let node_params = (0..da_nodes + regular_nodes) + .map(|i| NodeParams::new(&mut ports, i as u64, i < da_nodes)) + .collect::>(); + + let orchestrator_port = ports.pick(); + let orchestrator_task = Some(start_orchestrator(orchestrator_port, &node_params)); + + let cdn_dir = tmp.path().join("cdn"); + let cdn_port = ports.pick(); + let broker_task = if cdn { + Some(start_broker(&mut ports, &cdn_dir).await) + } else { + None + }; + let marshal_task = if cdn { + Some(start_marshal(&cdn_dir, cdn_port).await) + } else { + None + }; + + let anvil_port = ports.pick(); + let anvil = Anvil::new().port(anvil_port).spawn(); + let anvil_endpoint = anvil.endpoint(); + + let peer_ports = node_params + .iter() + .map(|node| node.api_port) + .collect::>(); + let network_params = NetworkParams { + genesis_file: &genesis_file, + orchestrator_port, + cdn_port, + l1_provider: &anvil_endpoint, + peer_ports: &peer_ports, + }; + + let mut network = Self { + da_nodes: join_all( + (0..da_nodes).map(|i| TestNode::new(network_params, &node_params[i])), + ) + .await, + regular_nodes: join_all( + (0..regular_nodes) + .map(|i| TestNode::new(network_params, &node_params[i + da_nodes])), + ) + .await, + tmp, + orchestrator_task, + broker_task, + marshal_task, + _anvil: anvil, + }; + + join_all( + network + .da_nodes + .iter_mut() + .map(TestNode::start) + .chain(network.regular_nodes.iter_mut().map(TestNode::start)), + ) + .await; + + network + } + + async fn check_progress(&self) { + try_join_all( + self.da_nodes + .iter() + .map(TestNode::check_progress_with_timeout) + .chain( + self.regular_nodes + .iter() + .map(TestNode::check_progress_with_timeout), + ), + ) + .await + .unwrap(); + } + + /// Restart indicated number of DA and non-DA nodes. + /// + /// If possible (less than a quorum of nodes have been stopped), check that remaining nodes can + /// still make progress without the restarted nodes. In any case, check that the network as a + /// whole makes progress once the restarted nodes are back online. + async fn restart(&mut self, da_nodes: usize, regular_nodes: usize) { + tracing::info!(da_nodes, regular_nodes, "shutting down some nodes"); + join_all( + self.da_nodes[..da_nodes] + .iter_mut() + .map(TestNode::stop) + .chain( + self.regular_nodes[..regular_nodes] + .iter_mut() + .map(TestNode::stop), + ), + ) + .await; + + // We use 3n/4 + 1 as the quorum threshold (fault tolerance f = n/4), even though the + // theoretical fault tolerance of HotStuff consensus is n/3, because our implementation does + // not currently re-randomize the order of leaders, and requires 4 consecutive honest + // leaders to commit. Thus, with 1/4 or more of the nodes dishonest, you could get unlucky + // and have one dishonest leader every 4, thus preventing consensus from progressing. + let quorum_threshold = 3 * self.num_nodes() / 4 + 1; + let da_threshold = 2 * self.da_nodes.len() / 3 + 1; + if self.num_nodes() - da_nodes - regular_nodes > quorum_threshold + && self.da_nodes.len() - da_nodes >= da_threshold + { + // If we are shutting down less than f nodes, the remaining nodes should be able to make + // progress, and we will check that that is the case. + // + // Note that not every node will be able to commit leaves, because a node requires the + // cooperation of the node after it to commit its proposal. But, as long as we have shut + // down fewer than the fault tolerance, at least *some* node will have a correct node + // after it and will be able to commit. Thus, we just grab an event stream and look for + // any decide. + tracing::info!("waiting for remaining nodes to progress"); + let mut events = if da_nodes < self.da_nodes.len() { + self.da_nodes[da_nodes].event_stream().await.unwrap() + } else { + self.regular_nodes[regular_nodes] + .event_stream() + .await + .unwrap() + }; + // Wait for a few decides, the first couple may be from before the restart. + for _ in 0..self.num_nodes() { + let timeout = Duration::from_secs((2 * self.num_nodes()) as u64); + async_timeout(timeout, async { + loop { + let event = events + .next() + .await + .expect("event stream terminated unexpectedly"); + let EventType::Decide { leaf_chain, .. } = event.event else { + continue; + }; + tracing::info!(?leaf_chain, "got decide, chain is progressing"); + break; + } + }) + .await + .expect("timed out waiting for progress with nodes down"); + } + } else { + // Make sure there is a brief delay before restarting the nodes; we need the OS to + // have time to clean up the ports they were using. + tracing::info!( + "shut down too many nodes to make progress; will continue after a brief delay" + ); + sleep(Duration::from_secs(2)).await; + } + + join_all( + self.da_nodes[..da_nodes] + .iter_mut() + .map(TestNode::start) + .chain( + self.regular_nodes[..regular_nodes] + .iter_mut() + .map(TestNode::start), + ), + ) + .await; + self.check_progress().await; + } + + async fn shut_down(mut self) { + tracing::info!("shutting down test network"); + join_all( + self.da_nodes + .iter_mut() + .map(TestNode::stop) + .chain(self.regular_nodes.iter_mut().map(TestNode::stop)), + ) + .await; + } + + fn num_nodes(&self) -> usize { + self.da_nodes.len() + self.regular_nodes.len() + } +} + +fn start_orchestrator(port: u16, nodes: &[NodeParams]) -> JoinHandle<()> { + // We don't run a builder in these tests, so use a very short timeout before nodes decide to + // build an empty block on their own. + let builder_timeout = Duration::from_millis(10); + // These tests frequently have nodes down and views failing, so we use a fairly short view + // timeout. + let view_timeout = Duration::from_secs(2); + + let num_nodes = nodes.len(); + let bootstrap_nodes = nodes + .iter() + .map(|node| { + let port = node.libp2p_port; + let peer_id = derive_libp2p_peer_id::(&node.staking_key).unwrap(); + let addr = format!("/ip4/127.0.0.1/udp/{port}/quic-v1") + .parse() + .unwrap(); + (peer_id, addr) + }) + .collect(); + + let mut config = NetworkConfig:: { + indexed_da: false, + libp2p_config: Some(Libp2pConfig { + bootstrap_nodes, + node_index: 0, + bootstrap_mesh_n_high: 4, + bootstrap_mesh_n_low: 4, + bootstrap_mesh_outbound_min: 4 / 2, + bootstrap_mesh_n: 4, + mesh_n_high: 4, + mesh_n_low: 4, + mesh_outbound_min: 4 / 2, + mesh_n: 4, + next_view_timeout: view_timeout.as_millis() as u64, + online_time: 10, + num_txn_per_round: 0, + server_mode: true, + builder_timeout, + }), + ..Default::default() + }; + config.config.num_nodes_with_stake = num_nodes.try_into().unwrap(); + config.config.da_staked_committee_size = num_nodes; + config.config.known_nodes_with_stake = vec![]; + config.config.known_da_nodes = vec![]; + config.config.known_nodes_without_stake = vec![]; + config.config.next_view_timeout = view_timeout.as_millis() as u64; + config.config.builder_timeout = builder_timeout; + + let bind = format!("http://0.0.0.0:{port}").parse().unwrap(); + spawn(async move { + match run_orchestrator(config, bind).await { + Ok(()) => tracing::warn!("orchestrator exited"), + Err(err) => tracing::error!(%err, "orchestrator failed"), + } + }) +} + +async fn start_broker(ports: &mut PortPicker, dir: &Path) -> JoinHandle<()> { + let (public_key, private_key) = PubKey::generated_from_seed_indexed([0; 32], 1337); + let public_port = ports.pick(); + let private_port = ports.pick(); + let broker_config: BrokerConfig> = BrokerConfig { + public_advertise_endpoint: format!("127.0.0.1:{}", public_port), + public_bind_endpoint: format!("127.0.0.1:{}", public_port), + private_advertise_endpoint: format!("127.0.0.1:{}", private_port), + private_bind_endpoint: format!("127.0.0.1:{}", private_port), + + metrics_bind_endpoint: None, + discovery_endpoint: dir.display().to_string(), + keypair: KeyPair { + public_key: WrappedSignatureKey(public_key), + private_key, + }, + + ca_cert_path: None, + ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), + }; + + spawn(async move { + match Broker::new(broker_config).await.unwrap().start().await { + Ok(()) => tracing::warn!("broker exited"), + Err(err) => tracing::error!("broker failed: {err:#}"), + } + }) +} + +async fn start_marshal(dir: &Path, port: u16) -> JoinHandle<()> { + let marshal_config = MarshalConfig { + bind_endpoint: format!("0.0.0.0:{port}"), + metrics_bind_endpoint: None, + discovery_endpoint: dir.display().to_string(), + ca_cert_path: None, + ca_key_path: None, + global_memory_pool_size: Some(1024 * 1024 * 1024), + }; + + spawn(async move { + match Marshal::>::new(marshal_config) + .await + .unwrap() + .start() + .await + { + Ok(()) => tracing::warn!("marshal exited"), + Err(err) => tracing::error!("marshal failed: {err:#}"), + } + }) +} + +/// Allocator for unused ports. +/// +/// While portpicker is able to pick ports that are currently unused by the OS, its allocation is +/// random, and it may return the same port twice if that port is still unused by the OS the second +/// time. This test suite allocates many ports, and it is often convenient to allocate many in a +/// batch, before starting the services that listen on them, so that the first port selected is not +/// "in use" when we select later ports in the same batch. +/// +/// This object keeps track not only of ports in use by the OS, but also ports it has already given +/// out, for which there may not yet be any listener. Thus, it is safe to use this to allocate many +/// ports at once, without a collision. +#[derive(Debug, Default)] +struct PortPicker { + allocated: HashSet, +} + +impl PortPicker { + fn pick(&mut self) -> u16 { + loop { + let port = pick_unused_port().unwrap(); + if self.allocated.insert(port) { + break port; + } + tracing::warn!(port, "picked port which is already allocated, will try again. If this error persists, try reducing the number of ports being used."); + } + } +} diff --git a/sequencer/src/state.rs b/sequencer/src/state.rs index 6044dce85e..690c554615 100644 --- a/sequencer/src/state.rs +++ b/sequencer/src/state.rs @@ -12,6 +12,7 @@ use hotshot_query_service::{ availability::{AvailabilityDataSource, LeafQueryData}, data_source::VersionedDataSource, merklized_state::{MerklizedStateHeightPersistence, UpdateStateData}, + status::StatusDataSource, types::HeightIndexed, }; use jf_merkle_tree::{LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme}; @@ -210,7 +211,8 @@ pub(crate) async fn update_state_storage_loop( let state = storage.upgradable_read().await; let last_height = state.get_last_state_height().await?; - tracing::info!(last_height, "updating state storage"); + let current_height = state.block_height().await?; + tracing::info!(last_height, current_height, "updating state storage"); let parent_leaf = state.get_leaf(last_height).await; let leaves = state.subscribe_leaves(last_height + 1).await; @@ -265,6 +267,7 @@ pub(crate) trait SequencerStateDataSource: 'static + Debug + AvailabilityDataSource + + StatusDataSource + VersionedDataSource + CatchupDataSource + UpdateStateData @@ -278,6 +281,7 @@ impl SequencerStateDataSource for T where T: 'static + Debug + AvailabilityDataSource + + StatusDataSource + VersionedDataSource + CatchupDataSource + UpdateStateData diff --git a/types/Cargo.toml b/types/Cargo.toml index ee88afdbed..8d44698967 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -23,6 +23,7 @@ cld = { workspace = true } committable = { workspace = true } contract-bindings = { path = "../contract-bindings" } derive_more = { workspace = true } +dyn-clone = { workspace = true } ethers = { workspace = true } fluent-asserter = "0.1.9" futures = { workspace = true } diff --git a/types/src/v0/traits.rs b/types/src/v0/traits.rs index de07e77c01..5761bcd3e3 100644 --- a/types/src/v0/traits.rs +++ b/types/src/v0/traits.rs @@ -1,11 +1,12 @@ //! This module contains all the traits used for building the sequencer types. //! It also includes some trait implementations that cannot be implemented in an external crate. -use std::{cmp::max, collections::BTreeMap, ops::Range, sync::Arc}; +use std::{cmp::max, collections::BTreeMap, fmt::Debug, ops::Range, sync::Arc}; use anyhow::{bail, ensure, Context}; use async_std::sync::RwLock; use async_trait::async_trait; -use committable::{Commitment, Committable}; +use committable::Commitment; +use dyn_clone::DynClone; use futures::{FutureExt, TryFutureExt}; use hotshot::{types::EventType, HotShotInitializer}; use hotshot_types::{ @@ -338,25 +339,9 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { /// Save the orchestrator config to storage. async fn save_config(&mut self, cfg: &NetworkConfig) -> anyhow::Result<()>; - async fn collect_garbage(&mut self, view: ViewNumber) -> anyhow::Result<()>; - - /// Saves the latest decided leaf. - /// - /// If the height of the new leaf is not greater than the height of the previous decided leaf, - /// storage is not updated. - async fn save_anchor_leaf( - &mut self, - leaf: &Leaf, - qc: &QuorumCertificate, - ) -> anyhow::Result<()>; - /// Load the highest view saved with [`save_voted_view`](Self::save_voted_view). async fn load_latest_acted_view(&self) -> anyhow::Result>; - /// Load the latest leaf saved with [`save_anchor_leaf`](Self::save_anchor_leaf). - async fn load_anchor_leaf(&self) - -> anyhow::Result)>>; - /// Load undecided state saved by consensus before we shut down. async fn load_undecided_state( &self, @@ -365,7 +350,7 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { /// Load the proposals saved by consensus async fn load_quorum_proposals( &self, - ) -> anyhow::Result>>>>; + ) -> anyhow::Result>>>; async fn load_vid_share( &self, @@ -379,11 +364,13 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { /// Load the latest known consensus state. /// /// Returns an initializer to resume HotShot from the latest saved state (or start from genesis, - /// if there is no saved state). + /// if there is no saved state). Also returns the anchor view number, which can be used as a + /// reference point to process any events which were not processed before a previous shutdown, + /// if applicable,. async fn load_consensus_state( &self, state: NodeState, - ) -> anyhow::Result> { + ) -> anyhow::Result<(HotShotInitializer, Option)> { let genesis_validated_state = ValidatedState::genesis(&state).0; let highest_voted_view = match self .load_latest_acted_view() @@ -399,7 +386,7 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { ViewNumber::genesis() } }; - let (leaf, high_qc) = match self + let (leaf, high_qc, anchor_view) = match self .load_anchor_leaf() .await .context("loading anchor leaf")? @@ -414,13 +401,16 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { high_qc.view_number ) ); - (leaf, high_qc) + + let anchor_view = leaf.view_number(); + (leaf, high_qc, Some(anchor_view)) } None => { tracing::info!("no saved leaf, starting from genesis leaf"); ( Leaf::genesis(&genesis_validated_state, &state).await, QuorumCertificate::genesis(&genesis_validated_state, &state).await, + None, ) } }; @@ -451,9 +441,7 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { let saved_proposals = self .load_quorum_proposals() .await - .context("loading saved proposals") - .unwrap_or_default() - .unwrap_or_default(); + .context("loading saved proposals")?; tracing::info!( ?leaf, @@ -465,45 +453,85 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { ?saved_proposals, "loaded consensus state" ); - Ok(HotShotInitializer::from_reload( - leaf, - state, - validated_state, - view, - saved_proposals, - high_qc, - undecided_leaves.into_values().collect(), - undecided_state, + + Ok(( + HotShotInitializer::from_reload( + leaf, + state, + validated_state, + view, + saved_proposals, + high_qc, + undecided_leaves.into_values().collect(), + undecided_state, + ), + anchor_view, )) } /// Update storage based on an event from consensus. - async fn handle_event(&mut self, event: &Event) { + async fn handle_event(&mut self, event: &Event, consumer: &(impl EventConsumer + 'static)) { if let EventType::Decide { leaf_chain, qc, .. } = &event.event { - if let Some(LeafInfo { leaf, .. }) = leaf_chain.first() { - if qc.view_number != leaf.view_number() { - tracing::error!( - leaf_view = ?leaf.view_number(), - qc_view = ?qc.view_number, - "latest leaf and QC are from different views!", - ); - return; - } - if let Err(err) = self.save_anchor_leaf(leaf, qc).await { - tracing::error!( - ?leaf, - hash = %leaf.commit(), - "Failed to save anchor leaf. When restarting make sure anchor leaf is at least as recent as this leaf. {err:#}", - ); - } - - if let Err(err) = self.collect_garbage(leaf.view_number()).await { - tracing::error!("Failed to garbage collect. {err:#}",); - } + let Some(LeafInfo { leaf, .. }) = leaf_chain.first() else { + // No new leaves. + return; + }; + + // Associate each decided leaf with a QC. + let chain = leaf_chain.iter().zip( + // The first (most recent) leaf corresponds to the QC triggering the decide event. + std::iter::once((**qc).clone()) + // Moving backwards in the chain, each leaf corresponds with the subsequent + // leaf's justify QC. + .chain(leaf_chain.iter().map(|leaf| leaf.leaf.justify_qc())), + ); + + if let Err(err) = self + .append_decided_leaves(leaf.view_number(), chain, consumer) + .await + { + tracing::error!( + "failed to save decided leaves, chain may not be up to date: {err:#}" + ); + return; } } } + /// Append decided leaves to persistent storage and emit a corresponding event. + /// + /// `consumer` will be sent a `Decide` event containing all decided leaves in persistent storage + /// up to and including `view`. If available in persistent storage, full block payloads and VID + /// info will also be included for each leaf. + /// + /// Once the new decided leaves have been processed, old data up to `view` will be garbage + /// collected The consumer's handling of this event is a prerequisite for the completion of + /// garbage collection: if the consumer fails to process the event, no data is deleted. This + /// ensures that, if called repeatedly, all decided leaves ever recorded in consensus storage + /// will eventually be passed to the consumer. + /// + /// Note that the converse is not true: if garbage collection fails, it is not guaranteed that + /// the consumer hasn't processed the decide event. Thus, in rare cases, some events may be + /// processed twice, or the consumer may get two events which share a subset of their data. + /// Thus, it is the consumer's responsibility to make sure its handling of each leaf is + /// idempotent. + /// + /// If the consumer fails to handle the new decide event, it may be retried, or simply postponed + /// until the next decide, at which point all persisted leaves from the failed GC run will be + /// included in the event along with subsequently decided leaves. + /// + /// This functionality is useful for keeping a separate view of the blockchain in sync with the + /// consensus storage. For example, the `consumer` could be used for moving data from consensus + /// storage to long-term archival storage. + async fn append_decided_leaves( + &mut self, + decided_view: ViewNumber, + leaf_chain: impl IntoIterator, QuorumCertificate)> + Send, + consumer: &(impl EventConsumer + 'static), + ) -> anyhow::Result<()>; + + async fn load_anchor_leaf(&self) + -> anyhow::Result)>>; async fn append_vid( &mut self, proposal: &Proposal>, @@ -528,6 +556,34 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { ) -> anyhow::Result<()>; } +#[async_trait] +pub trait EventConsumer: Debug + DynClone + Send + Sync { + async fn handle_event(&self, event: &Event) -> anyhow::Result<()>; +} + +dyn_clone::clone_trait_object!(EventConsumer); + +#[async_trait] +impl EventConsumer for Box +where + Self: Clone, + T: EventConsumer + ?Sized, +{ + async fn handle_event(&self, event: &Event) -> anyhow::Result<()> { + (**self).handle_event(event).await + } +} + +#[derive(Clone, Copy, Debug)] +pub struct NullEventConsumer; + +#[async_trait] +impl EventConsumer for NullEventConsumer { + async fn handle_event(&self, _event: &Event) -> anyhow::Result<()> { + Ok(()) + } +} + #[async_trait] impl Storage for Arc> { async fn append_vid(