Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add linera sync-validator CLI command #3156

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,16 @@ pub enum ClientCommand {
chain_id: Option<ChainId>,
},

/// Synchronizes a validator with the local state of chains.
SyncValidator {
/// The public key of the validator to synchronize.
name: ValidatorName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to make this optional? If not specified, just try to update all validators?


/// The chains to synchronize, or the default chain if empty.
#[arg(long, num_args = 0..)]
chains: Vec<ChainId>,
},

/// Add or modify a validator (admin only)
SetValidator {
/// The public key of the validator.
Expand Down
49 changes: 49 additions & 0 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3366,6 +3366,55 @@ where
},
)
}

/// Attempts to update a validator with the local information.
#[instrument(level = "trace")]
pub async fn sync_validator(
&self,
remote_node: RemoteNode<P::Node>,
) -> Result<(), ChainClientError> {
let validator_chain_state = remote_node
.handle_chain_info_query(ChainInfoQuery::new(self.chain_id))
.await?;
let local_chain_state = self.client.local_node.chain_info(self.chain_id).await?;

let Some(missing_certificate_count) = local_chain_state
.next_block_height
.0
.checked_sub(validator_chain_state.next_block_height.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can also return if it's zero?

Suggested change
.checked_sub(validator_chain_state.next_block_height.0)
.checked_sub(validator_chain_state.next_block_height.0)
.filter(|count| count > 0)

else {
debug!("Validator is up-to-date with local state");
return Ok(());
};

let missing_certificates_end = usize::try_from(local_chain_state.next_block_height.0)
.expect("`usize` should be at least `u64`");
let missing_certificates_start = missing_certificates_end
- usize::try_from(missing_certificate_count).expect("`usize` should be at least `u64`");

let missing_certificate_hashes = self
.client
.local_node
.chain_state_view(self.chain_id)
.await?
.confirmed_log
.read(missing_certificates_start..missing_certificates_end)
.await?;

for certificate_hash in missing_certificate_hashes {
let certificate = self
.client
.storage
.read_certificate(certificate_hash)
.await?;
Comment on lines +3405 to +3409
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The certificates can probably be downloaded in one single operation.
Though I agree that the handle_confirmed certificate is likely more time limiting.


remote_node
.handle_confirmed_certificate(certificate, CrossChainMessageDelivery::NonBlocking)
.await?;
}

Ok(())
}
}

/// The outcome of trying to commit a list of incoming messages and operations to the chain.
Expand Down
3 changes: 2 additions & 1 deletion linera-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ pub mod simple;

pub mod grpc;

pub use client::Client;
pub use message::RpcMessage;
pub use node_provider::NodeOptions;
pub use node_provider::{NodeOptions, NodeProvider};

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[cfg_attr(with_testing, derive(Eq, PartialEq))]
Expand Down
29 changes: 29 additions & 0 deletions linera-service/src/cli_wrappers/local_net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use linera_base::{
data_types::Amount,
};
use linera_client::storage::{StorageConfig, StorageConfigNamespace};
use linera_core::node::ValidatorNodeProvider;
use linera_execution::ResourceControlPolicy;
#[cfg(all(feature = "storage-service", with_testing))]
use linera_storage_service::common::storage_service_test_endpoint;
Expand Down Expand Up @@ -627,6 +628,34 @@ impl LocalNet {
self.running_validators.insert(validator, validator_proxy);
Ok(())
}

/// Terminates all the processes of a given `validator`.
pub async fn stop_validator(&mut self, validator: usize) -> Result<()> {
if let Some(mut validator) = self.running_validators.remove(&validator) {
validator.terminate().await?;
}
Comment on lines +634 to +636
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer an error to be reported here.

Ok(())
}

/// Returns a [`linera_rpc::Client`] to interact directly with a `validator`.
pub async fn validator_client(&mut self, validator: usize) -> Result<linera_rpc::Client> {
let node_provider = linera_rpc::NodeProvider::new(linera_rpc::NodeOptions {
send_timeout: Duration::from_secs(1),
recv_timeout: Duration::from_secs(1),
retry_delay: Duration::ZERO,
max_retries: 0,
});

let port = Self::proxy_port(validator);
let schema = match self.network.internal {
Network::Grpc | Network::Grpcs => "grpc",
Network::Tcp => "tcp",
Network::Udp => "udp",
};
Comment on lines +650 to +654
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should not be here, but instead in the cli_wrapper/mod.rs implementation of Network.
We have plenty of such functions, but I think those functions should be in the same place.

let address = format!("{schema}:localhost:{port}");

Ok(node_provider.make_node(&address)?)
}
}

#[cfg(with_testing)]
Expand Down
20 changes: 20 additions & 0 deletions linera-service/src/cli_wrappers/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,26 @@ impl ClientWrapper {
Ok(())
}

/// Runs `linera sync-validator`.
pub async fn sync_validator(
&self,
chain_ids: impl IntoIterator<Item = &ChainId>,
validator_name: ValidatorName,
) -> Result<()> {
let mut command = self.command().await?;
command
.arg("sync-validator")
.arg(validator_name.to_string());
let mut chain_ids = chain_ids.into_iter().peekable();
if chain_ids.peek().is_some() {
command
.arg("--chains")
.args(chain_ids.map(ChainId::to_string));
}
command.spawn_and_wait_for_stdout().await?;
Ok(())
}

/// Runs `linera faucet`.
pub async fn run_faucet(
&self,
Expand Down
28 changes: 28 additions & 0 deletions linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,33 @@ impl Runnable for Job {
println!("{}/{} OK.", num_ok_validators, committee.validators().len());
}

SyncValidator { name, mut chains } => {
if chains.is_empty() {
chains.push(context.default_chain());
}

let first_chain_id = chains[0];
let first_chain = context.make_chain_client(first_chain_id)?;
let committee = first_chain.local_committee().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Alternatively, we could try the admin chain's committee. Or specify an address rather than a name on the command line? Not sure what's best.)


let validator_address = committee.network_address(&name).ok_or_else(|| {
anyhow!("Validator {name} is not known by the chain {first_chain_id}")
})?;
let validator_node = context.make_node_provider().make_node(validator_address)?;
let validator = RemoteNode {
name,
node: validator_node,
};

first_chain.sync_validator(validator.clone()).await?;

for chain_id in chains.into_iter().skip(1) {
let chain = context.make_chain_client(chain_id)?;

chain.sync_validator(validator.clone()).await?;
}
}

command @ (SetValidator { .. }
| RemoveValidator { .. }
| ResourceControlPolicy { .. }) => {
Expand Down Expand Up @@ -1422,6 +1449,7 @@ fn log_file_name_for(command: &ClientCommand) -> Cow<'static, str> {
| ClientCommand::ProcessInbox { .. }
| ClientCommand::QueryValidator { .. }
| ClientCommand::QueryValidators { .. }
| ClientCommand::SyncValidator { .. }
| ClientCommand::SetValidator { .. }
| ClientCommand::RemoveValidator { .. }
| ClientCommand::ResourceControlPolicy { .. }
Expand Down
72 changes: 71 additions & 1 deletion linera-service/tests/local_net_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use std::{env, path::PathBuf, process::Command, time::Duration};
use anyhow::Result;
use common::INTEGRATION_TEST_GUARD;
use linera_base::{
data_types::Amount,
data_types::{Amount, BlockHeight},
identifiers::{Account, AccountOwner, ChainId},
};
use linera_core::{data_types::ChainInfoQuery, node::ValidatorNode};
use linera_service::{
cli_wrappers::{
local_net::{
Expand Down Expand Up @@ -845,3 +846,72 @@ async fn test_end_to_end_benchmark(mut config: LocalNetConfig) -> Result<()> {

Ok(())
}

/// Tests if the `sync-validator` command uploads missing certificates to a validator.
// TODO: Fix test for simple-net
// #[cfg_attr(feature = "scylladb", test_case(LocalNetConfig::new_test(Database::ScyllaDb, Network::Udp) ; "scylladb_udp"))]
#[cfg_attr(feature = "scylladb", test_case(LocalNetConfig::new_test(Database::ScyllaDb, Network::Grpc) ; "scylladb_grpc"))]
#[cfg_attr(feature = "storage-service", test_case(LocalNetConfig::new_test(Database::Service, Network::Grpc) ; "storage_service_grpc"))]
// #[cfg_attr(feature = "storage-service", test_case(LocalNetConfig::new_test(Database::Service, Network::Tcp) ; "storage_service_tcp"))]
#[cfg_attr(feature = "dynamodb", test_case(LocalNetConfig::new_test(Database::DynamoDb, Network::Grpc) ; "aws_grpc"))]
// #[cfg_attr(feature = "scylladb", test_case(LocalNetConfig::new_test(Database::ScyllaDb, Network::Tcp) ; "scylladb_tcp"))]
// #[cfg_attr(feature = "dynamodb", test_case(LocalNetConfig::new_test(Database::DynamoDb, Network::Tcp) ; "aws_tcp"))]
// #[cfg_attr(feature = "dynamodb", test_case(LocalNetConfig::new_test(Database::DynamoDb, Network::Udp) ; "aws_udp"))]
Comment on lines +852 to +859
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the outcommented stuff that seems to come from copy paste.

#[test_log::test(tokio::test)]
async fn test_sync_validator(config: LocalNetConfig) -> Result<()> {
let _guard = INTEGRATION_TEST_GUARD.lock().await;
tracing::info!("Starting test {}", test_name!());

const BLOCKS_TO_CREATE: usize = 5;

let (mut net, client) = config.instantiate().await?;

// Stop a validator to force it to lag behind the others
net.stop_validator(0).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe introduce a let LAGGING_VALIDATOR = 0 to make the code a little clearer.


// Create some blocks
let sender_chain = client.default_chain().expect("Client has no default chain");
let (_, receiver_chain) = client
.open_chain(sender_chain, None, Amount::from_tokens(1_000))
.await?;

for amount in 1..=BLOCKS_TO_CREATE {
client
.transfer(
Amount::from_tokens(amount as u128),
sender_chain,
receiver_chain,
)
.await?;
}

// Restart the stopped validator
net.start_validator(0).await?;

let lagging_validator = net.validator_client(0).await?;

let state_before_sync = lagging_validator
.handle_chain_info_query(ChainInfoQuery::new(sender_chain))
.await?;
assert_eq!(state_before_sync.info.next_block_height, BlockHeight::ZERO);

// Synchronize the validator
let validator_name = net
.validator_name(0)
.expect("Missing name for the first validator")
.parse()?;
client
.sync_validator([&sender_chain], validator_name)
.await
.expect("Missing lagging validator name");

let state_after_sync = lagging_validator
.handle_chain_info_query(ChainInfoQuery::new(sender_chain))
.await?;
assert_eq!(
state_after_sync.info.next_block_height,
BlockHeight(BLOCKS_TO_CREATE as u64 + 1)
);

Ok(())
}
Loading