Skip to content

Commit

Permalink
fix migration completed check
Browse files Browse the repository at this point in the history
  • Loading branch information
imabdulbasit committed Feb 7, 2025
1 parent bc74fd8 commit f0e9b41
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 80 deletions.
4 changes: 3 additions & 1 deletion hotshot-query-service/src/availability/query_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ impl<Types: NodeType> BlockQueryData<Types> {
where
Payload<Types>: QueryablePayload<Types>,
{
let leaf = Leaf::<Types>::genesis(validated_state, instance_state).await;
let leaf: Leaf2<_> = Leaf::<Types>::genesis(validated_state, instance_state)
.await
.into();
Self::new(leaf.block_header().clone(), leaf.block_payload().unwrap())
}

Expand Down
25 changes: 13 additions & 12 deletions hotshot-query-service/src/data_source/storage/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,22 +813,24 @@ impl SqlStorage {
pub async fn migrate_types<Types: NodeType>(&self) -> anyhow::Result<()> {
let mut offset = 0;
let limit = 10000;
let mut tx = self.read().await.map_err(|err| QueryError::Error {
message: err.to_string(),
})?;

let (is_migration_completed,) =
query_as::<(bool,)>("SELECT completed from leaf_migration LIMIT 1 ")
.fetch_one(tx.as_mut())
.await?;

if is_migration_completed {
tracing::info!("leaf1 to leaf2 migration already completed");
return Ok(());
}
loop {
let mut tx = self.read().await.map_err(|err| QueryError::Error {
message: err.to_string(),
})?;

let (is_migration_completed,) =
query_as::<(bool,)>("SELECT completed from leaf_migration LIMIT 1 ")
.fetch_one(tx.as_mut())
.await?;

if is_migration_completed {
tracing::info!("leaf1 to leaf2 migration already completed");
return Ok(());
}

let rows = QueryBuilder::default()
.query(&format!(
"SELECT leaf, qc FROM leaf ORDER BY height LIMIT {} OFFSET {}",
Expand All @@ -840,8 +842,7 @@ impl SqlStorage {
drop(tx);

if rows.is_empty() {
tracing::info!("no leaf1 rows found");
return Ok(());
break;
}

let mut leaf_rows = Vec::new();
Expand Down
130 changes: 63 additions & 67 deletions sequencer/src/persistence/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,22 +1293,21 @@ impl SequencerPersistence for Persistence {
async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> {
let batch_size: i64 = 1000;
let mut offset: i64 = 0;
let mut tx = self.db.read().await?;

loop {
let mut tx = self.db.read().await?;

let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'anchor_leaf'",
)
.fetch_one(tx.as_mut())
.await?;

if is_completed {
tracing::info!("anchor leaf migration already done");
let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'anchor_leaf'",
)
.fetch_one(tx.as_mut())
.await?;

return Ok(());
}
if is_completed {
tracing::info!("anchor leaf migration already done");

return Ok(());
}
loop {
let mut tx = self.db.read().await?;
let rows =
query("SELECT view, leaf, qc FROM anchor_leaf ORDER BY view LIMIT $1 OFFSET $2")
.bind(batch_size)
Expand Down Expand Up @@ -1374,22 +1373,22 @@ impl SequencerPersistence for Persistence {
async fn migrate_da_proposals(&self) -> anyhow::Result<()> {
let batch_size: i64 = 1000;
let mut offset: i64 = 0;
let mut tx = self.db.read().await?;

loop {
let mut tx = self.db.read().await?;

let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'da_proposal'",
)
.fetch_one(tx.as_mut())
.await?;
let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'da_proposal'",
)
.fetch_one(tx.as_mut())
.await?;

if is_completed {
tracing::info!("da proposals migration already done");
if is_completed {
tracing::info!("da proposals migration already done");

return Ok(());
}
return Ok(());
}

loop {
let mut tx = self.db.read().await?;
let rows = query(
"SELECT payload_hash, data FROM da_proposal ORDER BY view LIMIT $1 OFFSET $2",
)
Expand Down Expand Up @@ -1454,22 +1453,21 @@ impl SequencerPersistence for Persistence {
async fn migrate_vid_shares(&self) -> anyhow::Result<()> {
let batch_size: i64 = 1000;
let mut offset: i64 = 0;
let mut tx = self.db.read().await?;

loop {
let mut tx = self.db.read().await?;

let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'vid_share'",
)
.fetch_one(tx.as_mut())
.await?;

if is_completed {
tracing::info!("vid_share migration already done");
let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'vid_share'",
)
.fetch_one(tx.as_mut())
.await?;

return Ok(());
}
if is_completed {
tracing::info!("vid_share migration already done");

return Ok(());
}
loop {
let mut tx = self.db.read().await?;
let rows =
query("SELECT payload_hash, data FROM vid_share ORDER BY view LIMIT $1 OFFSET $2")
.bind(batch_size)
Expand Down Expand Up @@ -1581,22 +1579,21 @@ impl SequencerPersistence for Persistence {
async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> {
let batch_size: i64 = 1000;
let mut offset: i64 = 0;
let mut tx = self.db.read().await?;

loop {
let mut tx = self.db.read().await?;

let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'quorum_proposals'",
)
.fetch_one(tx.as_mut())
.await?;

if is_completed {
tracing::info!("quorum proposals migration already done");
let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'quorum_proposals'",
)
.fetch_one(tx.as_mut())
.await?;

return Ok(());
}
if is_completed {
tracing::info!("quorum proposals migration already done");

return Ok(());
}
loop {
let mut tx = self.db.read().await?;
let rows =
query("SELECT view, leaf_hash, data FROM quorum_proposals ORDER BY view LIMIT $1 OFFSET $2")
.bind(batch_size)
Expand Down Expand Up @@ -1662,22 +1659,21 @@ impl SequencerPersistence for Persistence {
async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> {
let batch_size: i64 = 1000;
let mut offset: i64 = 0;
let mut tx = self.db.read().await?;

loop {
let mut tx = self.db.read().await?;

let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'quorum_certificate'",
)
.fetch_one(tx.as_mut())
.await?;

if is_completed {
tracing::info!(" quorum certificates migration already done");
let (is_completed,) = query_as::<(bool,)>(
"SELECT completed from epoch_migration WHERE table_name = 'quorum_certificate'",
)
.fetch_one(tx.as_mut())
.await?;

return Ok(());
}
if is_completed {
tracing::info!(" quorum certificates migration already done");

return Ok(());
}
loop {
let mut tx = self.db.read().await?;
let rows =
query("SELECT view, leaf_hash, data FROM quorum_certificate ORDER BY view LIMIT $1 OFFSET $2")
.bind(batch_size)
Expand Down Expand Up @@ -1855,7 +1851,7 @@ impl Provider<SeqTypes, VidCommonRequest> for Persistence {
Ok(Some((bytes,))) => bytes,
Ok(None) => return None,
Err(err) => {
tracing::warn!("error loading VID share: {err:#}");
tracing::error!("error loading VID share: {err:#}");
return None;
}
};
Expand All @@ -1864,7 +1860,7 @@ impl Provider<SeqTypes, VidCommonRequest> for Persistence {
match bincode::deserialize(&bytes) {
Ok(share) => share,
Err(err) => {
tracing::warn!("error decoding VID share: {err:#}");
tracing::error!("error decoding VID share: {err:#}");
return None;
}
};
Expand Down Expand Up @@ -1904,7 +1900,7 @@ impl Provider<SeqTypes, PayloadRequest> for Persistence {
{
Ok(proposal) => proposal,
Err(err) => {
tracing::warn!("error decoding DA proposal: {err:#}");
tracing::error!("error decoding DA proposal: {err:#}");
return None;
}
};
Expand Down

0 comments on commit f0e9b41

Please sign in to comment.