Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[store] Introduce ChainStoreUpdateAdapter and EpochStoreUpdateAdapter and use in epoch_sync #12866

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1483,22 +1483,6 @@ impl<'a> ChainStoreUpdate<'a> {
}
}

/// Save header head in Epoch Sync
/// Checking validity of header head is delegated to Epoch Sync methods
pub fn force_save_header_head(&mut self, t: &Tip) -> Result<(), Error> {
self.try_save_latest_known(t.height)?;

// TODO #3488
// Bowen: It seems that height_to_hashes is used to update DBCol::BlockHeight, which stores blocks,
// not block headers, by height. Therefore I wonder whether this line here breaks some invariant
// since now we potentially don't have the corresponding block in storage.

//self.chain_tore_cache_update.height_to_hashes.insert(t.height, Some(t.last_block_hash));
//self.chain_store_cache_update.next_block_hashes.insert(t.prev_block_hash, t.last_block_hash);
self.header_head = Some(t.clone());
Ok(())
}

/// Update header head and height to hash index for this branch.
pub fn save_header_head_if_not_challenged(&mut self, t: &Tip) -> Result<(), Error> {
if t.height > self.chain_store.get_genesis_height() {
Expand Down Expand Up @@ -1622,13 +1606,6 @@ impl<'a> ChainStoreUpdate<'a> {
Ok(())
}

/// Used only in Epoch Sync finalization
/// Validity of Header is checked by Epoch Sync methods
pub fn save_block_header_no_update_tree(&mut self, header: BlockHeader) -> Result<(), Error> {
self.chain_store_cache_update.headers.insert(*header.hash(), header);
Ok(())
}

pub fn save_block_header(&mut self, header: BlockHeader) -> Result<(), Error> {
self.update_and_save_block_merkle_tree(&header)?;
self.chain_store_cache_update.headers.insert(*header.hash(), header);
Expand Down
87 changes: 39 additions & 48 deletions chain/client/src/sync/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use near_primitives::types::validator_stake::ValidatorStake;
use near_primitives::types::{
AccountId, ApprovalStake, Balance, BlockHeight, BlockHeightDelta, EpochId,
};
use near_primitives::utils::{compression::CompressedData, index_to_bytes};
use near_primitives::utils::compression::CompressedData;
use near_primitives::version::ProtocolFeature;
use near_store::adapter::StoreAdapter;
use near_store::{DBCol, Store};
use near_store::adapter::{StoreAdapter, StoreUpdateAdapter};
use near_store::Store;
use rand::seq::SliceRandom;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -572,34 +572,37 @@ impl EpochSync {

self.verify_proof(&proof, epoch_manager)?;

let mut store_update = chain.chain_store.store().store_update();
let store = chain.chain_store.store();
let mut store_update = store.store_update();

// Store the EpochSyncProof, so that this node can derive a more recent EpochSyncProof
// to facilitate epoch sync of other nodes.
let proof = EpochSyncProof::V1(proof); // convert to avoid cloning
store_update.set_ser(DBCol::EpochSyncProof, &[], &proof)?;
store_update.epoch_store_update().set_epoch_sync_proof(&proof);
let proof = proof.into_v1();

let last_header = proof.current_epoch.first_block_header_in_epoch;
let mut update = chain.mut_chain_store().store_update();
update.save_block_header_no_update_tree(last_header.clone())?;
update.save_block_header_no_update_tree(
proof.current_epoch.last_block_header_in_prev_epoch,
)?;
update.save_block_header_no_update_tree(
proof.current_epoch.second_last_block_header_in_prev_epoch.clone(),
)?;
update.save_block_header_no_update_tree(
proof
.all_epochs
.get(proof.all_epochs.len() - 2)
.unwrap()
.last_final_block_header
.clone(),
)?;
update.force_save_header_head(&Tip::from_header(&last_header))?;
update.save_final_head(&Tip::from_header(&self.genesis))?;

// Save blocks and headers to the store.
// Set the header head and final head.
let mut chain_store_update = store.chain_store().store_update();

for block_header in [
&last_header,
&proof.current_epoch.last_block_header_in_prev_epoch,
&proof.current_epoch.second_last_block_header_in_prev_epoch,
&proof.all_epochs.get(proof.all_epochs.len() - 2).unwrap().last_final_block_header,
] {
chain_store_update.set_block_header_only(block_header);
chain_store_update.update_block_header_hashes_by_height(block_header);
}

chain_store_update.set_header_head(&Tip::from_header(&last_header));
chain_store_update.set_final_head(&Tip::from_header(&self.genesis));

chain_store_update.commit()?;

// Initialize the epoch manager with the last epoch.
epoch_manager.init_after_epoch_sync(
&mut store_update,
proof.last_epoch.first_block_in_epoch,
Expand All @@ -613,44 +616,32 @@ impl EpochSync {
proof.last_epoch.next_next_epoch_info,
)?;

// At this point `update` contains headers of 3 last blocks of last past epoch
// At this point store contains headers of 3 last blocks of last past epoch
// and header of the first block of current epoch.
// At least the third last block of last past epoch is final.
// It means that `update` contains header of last final block of the first block of current epoch.
// It means that store contains header of last final block of the first block of current epoch.
let last_header_last_finalized_height =
update.get_block_header(last_header.last_final_block())?.height();
store.chain_store().get_block_header(last_header.last_final_block())?.height();
let mut first_block_info_in_epoch =
BlockInfo::from_header(&last_header, last_header_last_finalized_height);
// We need to populate fields below manually, as they are set to defaults by `BlockInfo::from_header`.
*first_block_info_in_epoch.epoch_first_block_mut() = *last_header.hash();
*first_block_info_in_epoch.epoch_id_mut() = *last_header.epoch_id();

store_update.insert_ser(
DBCol::BlockInfo,
&borsh::to_vec(first_block_info_in_epoch.hash()).unwrap(),
&first_block_info_in_epoch,
)?;

store_update.set_ser(
DBCol::BlockOrdinal,
&index_to_bytes(proof.current_epoch.partial_merkle_tree_for_first_block.size()),
store_update.epoch_store_update().set_block_info(&first_block_info_in_epoch);
store_update.chain_store_update().set_block_ordinal(
proof.current_epoch.partial_merkle_tree_for_first_block.size(),
last_header.hash(),
)?;

store_update.set_ser(
DBCol::BlockHeight,
&borsh::to_vec(&last_header.height()).unwrap(),
);
store_update
.chain_store_update()
.set_block_height(last_header.hash(), last_header.height());
store_update.chain_store_update().set_block_merkle_tree(
last_header.hash(),
)?;

store_update.set_ser(
DBCol::BlockMerkleTree,
last_header.hash().as_bytes(),
&proof.current_epoch.partial_merkle_tree_for_first_block,
)?;
);

update.merge(store_update);
update.commit()?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the change above looks like a simple rewrite, it is definitely NOT trivial!! There are couple of things happening here that are different.

We are no longer using chain_store_update but instead using the standard store_update.

Originally we relied on functions like save_block_header_no_update_tree, force_save_header_head and save_final_head in chain store update along with update.commit()? which internally calls update.finalize().

(!!) With the new code, we are directly going and making changes to the store layer instead of relying on the weird crazy behavior of update.finalize()

We also need to save the new blocks into store, i.e. call chain_store_update.commit()?; before actually reading from the store right below that as we are no longer relying on the non-committed, temp to store values in update.get_block_header(...) part of the code.

I had to print out all the DB ops from the old code and the new code and compare them to check if we are doing things properly. This is tested well manually.

One tiny thing to be noted is that as part of the finalize behavior in the old chain store update code, we were calling crate::state_sync::update_sync_hashes(...) on all the blocks we were updating, but this isn't required as part of epoch sync. cc @marcelo-gonzalez.

One of those crazy things that's a byproduct of using the heavy hammer of chain store update... :(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update_sync_hashes

Could you explain why? We likely want to sync state just after we finished epoch sync, so we need to make sure update_sync_hashes is called for some blocks, I just don't understand which ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the original code in debug mode and we are calling this function below for last_header

fn on_new_epoch(store_update: &mut StoreUpdate, header: &BlockHeader) -> Result<(), Error> {
    let num_new_chunks = vec![0u8; header.chunk_mask().len()];
    store_update.set_ser(DBCol::StateSyncNewChunks, header.hash().as_ref(), &num_new_chunks)?;
    Ok(())
}

Here, update_sync_hashes only triggers at the epoch boundary, which happens in epoch sync at last_header, i.e. proof.current_epoch.first_block_header_in_epoch.

Here, the key we are adding is DBCol::StateSyncNewChunks and the description says something like this

    /// Stores a mapping from a block's hash to a list (indexed by ShardIndex like a header's chunk_mask())
    /// of the number of new chunks up to that block after the first block in the epoch. Used in calculating
    /// the right "sync_hash" for state sync after the StateSyncHashUpdate protocol feature is enabled.
    /// - *Rows*: `CryptoHash`
    /// - *Column type*: `Vec<u8>`
    StateSyncNewChunks,

Makes me believe it's not super important and would eventually be updated with the next block processing as well with a different code path.

However, even after all this, in case further convincing is required, after epoch sync, we anyway do block sync for the current epoch so it anyway doesn't make sense to think about state_sync related things for that epoch. state_sync related things would get reset with the next epoch processing.

@marcelo-gonzalez could you please check if this argument makes sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to print out all the DB ops from the old code and the new code and compare them to check if we are doing things properly. This is tested well manually.

Just to reiterate, even though this code path is different, I manually tested this by looking into the actual store_update db ops and the only difference was one entry, that of DBCol::StateSyncNewChunks that I described above.

Copy link
Contributor

@marcelo-gonzalez marcelo-gonzalez Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so in the previous code where we call save_block_header_no_update_tree on those headers in epoch sync, we first call it on proof.current_epoch.first_block_header_in_epoch, and like the name says, it's the first block in an epoch. So update_sync_hashes() will just set all zeros in the new chunks column for that block hash since it sees it's the first block in an epoch.

Then every other block header we're calling save_block_header_no_update_tree() on is in the previous epoch, and none of the other headers in that epoch have been saved before all this. This has us return early from update_sync_hashes() without doing anything bc of this check here. Note the comment about epoch sync there

So tldr, only proof.current_epoch.first_block_header_in_epoch actually does something when we call update_sync_hashes() and the others can safely be skipped. Actually there are several other comments in that file about epoch sync possibly meaning that some stuff has not been processed befeore. It would def simplify things if we could make this assumption: if update_sync_hashes() is called on some header, then if its prev header in the same epoch, then it has already been called previously on that prev header. Right now if that isn't the case we just silently return and assume it's bc of epoch sync, but it would be good to be able to WARN in that case that somethng is wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I understand correctly, this PR does make a behavior change where update_sync_hashes() will no longer be called on proof.current_epoch.first_block_header_in_epoch, but it's ok because that block is not going to be in the current epoch, so we header sync until we get to the current epoch and then update_sync_hashes() works as normal when we get to the new epoch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ Yes, that was my reasoning and wanted your confirmation!

store_update.commit()?;

*status = SyncStatus::EpochSyncDone;
tracing::info!(epoch_id=?last_header.epoch_id(), "Bootstrapped from epoch sync");
Expand Down
99 changes: 97 additions & 2 deletions core/store/src/adapter/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use near_primitives::utils::{get_block_shard_id, get_outcome_id_block_hash, inde
use near_primitives::views::LightClientBlockView;

use crate::{
get_genesis_height, DBCol, Store, CHUNK_TAIL_KEY, FINAL_HEAD_KEY, FORK_TAIL_KEY,
get_genesis_height, DBCol, Store, StoreUpdate, CHUNK_TAIL_KEY, FINAL_HEAD_KEY, FORK_TAIL_KEY,
HEADER_HEAD_KEY, HEAD_KEY, LARGEST_TARGET_HEIGHT_KEY, TAIL_KEY,
};

use super::StoreAdapter;
use super::{StoreAdapter, StoreUpdateAdapter, StoreUpdateHolder};

#[derive(Clone)]
pub struct ChainStoreAdapter {
Expand All @@ -46,6 +46,12 @@ impl ChainStoreAdapter {
Self { store, genesis_height }
}

pub fn store_update(&self) -> ChainStoreUpdateAdapter<'static> {
ChainStoreUpdateAdapter {
store_update: StoreUpdateHolder::Owned(self.store.store_update()),
}
}

/// The chain head.
pub fn head(&self) -> Result<Tip, Error> {
option_to_not_found(self.store.get_ser(DBCol::BlockMisc, HEAD_KEY), "HEAD")
Expand Down Expand Up @@ -390,6 +396,95 @@ impl ChainStoreAdapter {
}
}

pub struct ChainStoreUpdateAdapter<'a> {
store_update: StoreUpdateHolder<'a>,
}

impl Into<StoreUpdate> for ChainStoreUpdateAdapter<'static> {
fn into(self) -> StoreUpdate {
self.store_update.into()
}
}

impl ChainStoreUpdateAdapter<'static> {
pub fn commit(self) -> io::Result<()> {
let store_update: StoreUpdate = self.into();
store_update.commit()
}
}

impl<'a> StoreUpdateAdapter for ChainStoreUpdateAdapter<'a> {
fn store_update(&mut self) -> &mut StoreUpdate {
&mut self.store_update
}
}

impl<'a> ChainStoreUpdateAdapter<'a> {
pub fn new(store_update: &'a mut StoreUpdate) -> Self {
Self { store_update: StoreUpdateHolder::Reference(store_update) }
}

/// USE THIS FUNCTION WITH CARE; proceed only if you know what you're doing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: down the road nobody is going to "know what they're doing". It will not be clear what "use with care" means. I suggest to just warn specifically what things can go wrong if you use it directly. (You already mentioned one, but maybe there are more and maybe what you mentioned can be explained a bit more.)

/// Typically while saving the block header we would also like to update
/// block_header_hashes_by_height and update block_merkle_tree
pub fn set_block_header_only(&mut self, header: &BlockHeader) {
self.store_update.insert_ser(DBCol::BlockHeader, header.hash().as_ref(), header).unwrap();
}

/// USE THIS FUNCTION WITH CARE; proceed only if you know what you're doing.
/// Typically block_header_hashes_by_height is saved while saving the block header
pub fn set_block_header_hashes_by_height(
&mut self,
height: BlockHeight,
hash_set: &HashSet<CryptoHash>,
) {
self.store_update
.set_ser(DBCol::HeaderHashesByHeight, &index_to_bytes(height), hash_set)
.unwrap();
}

/// USE THIS FUNCTION WITH CARE; proceed only if you know what you're doing.
/// Typically block_merkle_tree is saved while saving the block header
pub fn set_block_merkle_tree(
&mut self,
block_hash: &CryptoHash,
block_merkle_tree: &PartialMerkleTree,
) {
self.store_update
.set_ser(DBCol::BlockMerkleTree, block_hash.as_ref(), block_merkle_tree)
.unwrap();
}

pub fn set_block_ordinal(&mut self, block_ordinal: NumBlocks, block_hash: &CryptoHash) {
self.store_update
.set_ser(DBCol::BlockOrdinal, &index_to_bytes(block_ordinal), block_hash)
.unwrap();
}

pub fn set_block_height(&mut self, hash: &CryptoHash, height: BlockHeight) {
self.store_update
.set_ser(DBCol::BlockHeight, &borsh::to_vec(&height).unwrap(), hash)
.unwrap();
}

pub fn set_header_head(&mut self, header_head: &Tip) {
self.store_update.set_ser(DBCol::BlockMisc, HEADER_HEAD_KEY, header_head).unwrap();
}

pub fn set_final_head(&mut self, final_head: &Tip) {
self.store_update.set_ser(DBCol::BlockMisc, FINAL_HEAD_KEY, final_head).unwrap();
}

/// This function is normally clubbed with set_block_header_only
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then we can just unite both methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking of doing that but thought of leaving the primitives. I had plans in the future to integrate functions like these into a helper module like, update_block or similar, but thought of leaving them as is for now.

I can follow up in a different PR

pub fn update_block_header_hashes_by_height(&mut self, header: &BlockHeader) {
let height = header.height();
let mut hash_set =
self.store_update.store.chain_store().get_all_header_hashes_by_height(height).unwrap();
hash_set.insert(*header.hash());
self.set_block_header_hashes_by_height(height, &hash_set);
}
}

fn option_to_not_found<T, F>(res: io::Result<Option<T>>, field_name: F) -> Result<T, Error>
where
F: std::string::ToString,
Expand Down
59 changes: 57 additions & 2 deletions core/store/src/adapter/epoch_store.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::io;

use borsh::BorshDeserialize;
use near_chain_primitives::Error;
use near_primitives::epoch_block_info::BlockInfo;
Expand All @@ -8,9 +10,9 @@ use near_primitives::errors::EpochError;
use near_primitives::hash::CryptoHash;
use near_primitives::types::{BlockHeight, EpochId};

use crate::{DBCol, Store};
use crate::{DBCol, Store, StoreUpdate};

use super::StoreAdapter;
use super::{StoreAdapter, StoreUpdateAdapter, StoreUpdateHolder};

#[derive(Clone)]
pub struct EpochStoreAdapter {
Expand All @@ -28,6 +30,12 @@ impl EpochStoreAdapter {
Self { store }
}

pub fn store_update(&self) -> EpochStoreUpdateAdapter<'static> {
EpochStoreUpdateAdapter {
store_update: StoreUpdateHolder::Owned(self.store.store_update()),
}
}

pub fn get_epoch_start(&self, epoch_id: &EpochId) -> Result<BlockHeight, EpochError> {
self.store
.get_ser::<BlockHeight>(DBCol::EpochStart, epoch_id.as_ref())?
Expand Down Expand Up @@ -71,3 +79,50 @@ impl EpochStoreAdapter {
.map(|proof| proof.into_v1()))
}
}

pub struct EpochStoreUpdateAdapter<'a> {
store_update: StoreUpdateHolder<'a>,
}

impl Into<StoreUpdate> for EpochStoreUpdateAdapter<'static> {
fn into(self) -> StoreUpdate {
self.store_update.into()
}
}

impl EpochStoreUpdateAdapter<'static> {
pub fn commit(self) -> io::Result<()> {
let store_update: StoreUpdate = self.into();
store_update.commit()
}
}

impl<'a> StoreUpdateAdapter for EpochStoreUpdateAdapter<'a> {
fn store_update(&mut self) -> &mut StoreUpdate {
&mut self.store_update
}
}

impl<'a> EpochStoreUpdateAdapter<'a> {
pub fn new(store_update: &'a mut StoreUpdate) -> Self {
Self { store_update: StoreUpdateHolder::Reference(store_update) }
}

pub fn set_epoch_start(&mut self, epoch_id: &EpochId, start: BlockHeight) {
self.store_update.set_ser(DBCol::EpochStart, epoch_id.as_ref(), &start).unwrap();
}

pub fn set_block_info(&mut self, block_info: &BlockInfo) {
self.store_update
.insert_ser(DBCol::BlockInfo, block_info.hash().as_ref(), block_info)
.unwrap();
}

pub fn set_epoch_info(&mut self, epoch_id: &EpochId, epoch_info: &EpochInfo) {
self.store_update.set_ser(DBCol::EpochInfo, epoch_id.as_ref(), epoch_info).unwrap();
}

pub fn set_epoch_sync_proof(&mut self, proof: &EpochSyncProof) {
self.store_update.set_ser(DBCol::EpochSyncProof, &[], &proof).unwrap();
}
}
8 changes: 8 additions & 0 deletions core/store/src/adapter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ pub trait StoreAdapter {
pub trait StoreUpdateAdapter: Sized {
fn store_update(&mut self) -> &mut StoreUpdate;

fn chain_store_update(&mut self) -> chain_store::ChainStoreUpdateAdapter {
chain_store::ChainStoreUpdateAdapter::new(self.store_update())
}

fn epoch_store_update(&mut self) -> epoch_store::EpochStoreUpdateAdapter {
epoch_store::EpochStoreUpdateAdapter::new(self.store_update())
}

fn flat_store_update(&mut self) -> flat_store::FlatStoreUpdateAdapter {
flat_store::FlatStoreUpdateAdapter::new(self.store_update())
}
Expand Down
Loading