From f0e9b41eb0f2bd343bdd3e347d2a5670ef7ad776 Mon Sep 17 00:00:00 2001 From: imabdulbasit Date: Fri, 7 Feb 2025 16:52:07 +0500 Subject: [PATCH] fix migration completed check --- .../src/availability/query_data.rs | 4 +- .../src/data_source/storage/sql.rs | 25 ++-- sequencer/src/persistence/sql.rs | 130 +++++++++--------- 3 files changed, 79 insertions(+), 80 deletions(-) diff --git a/hotshot-query-service/src/availability/query_data.rs b/hotshot-query-service/src/availability/query_data.rs index 848d72f8ea..5c94e2f83f 100644 --- a/hotshot-query-service/src/availability/query_data.rs +++ b/hotshot-query-service/src/availability/query_data.rs @@ -325,7 +325,9 @@ impl BlockQueryData { where Payload: QueryablePayload, { - let leaf = Leaf::::genesis(validated_state, instance_state).await; + let leaf: Leaf2<_> = Leaf::::genesis(validated_state, instance_state) + .await + .into(); Self::new(leaf.block_header().clone(), leaf.block_payload().unwrap()) } diff --git a/hotshot-query-service/src/data_source/storage/sql.rs b/hotshot-query-service/src/data_source/storage/sql.rs index 96ad43a728..60c8ad2383 100644 --- a/hotshot-query-service/src/data_source/storage/sql.rs +++ b/hotshot-query-service/src/data_source/storage/sql.rs @@ -813,22 +813,24 @@ impl SqlStorage { pub async fn migrate_types(&self) -> anyhow::Result<()> { let mut offset = 0; let limit = 10000; + let mut tx = self.read().await.map_err(|err| QueryError::Error { + message: err.to_string(), + })?; + + let (is_migration_completed,) = + query_as::<(bool,)>("SELECT completed from leaf_migration LIMIT 1 ") + .fetch_one(tx.as_mut()) + .await?; + if is_migration_completed { + tracing::info!("leaf1 to leaf2 migration already completed"); + return Ok(()); + } loop { let mut tx = self.read().await.map_err(|err| QueryError::Error { message: err.to_string(), })?; - let (is_migration_completed,) = - query_as::<(bool,)>("SELECT completed from leaf_migration LIMIT 1 ") - .fetch_one(tx.as_mut()) - .await?; - - if is_migration_completed { - tracing::info!("leaf1 to leaf2 migration already completed"); - return Ok(()); - } - let rows = QueryBuilder::default() .query(&format!( "SELECT leaf, qc FROM leaf ORDER BY height LIMIT {} OFFSET {}", @@ -840,8 +842,7 @@ impl SqlStorage { drop(tx); if rows.is_empty() { - tracing::info!("no leaf1 rows found"); - return Ok(()); + break; } let mut leaf_rows = Vec::new(); diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index 8cf67b05da..ed8dad55ec 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -1293,22 +1293,21 @@ impl SequencerPersistence for Persistence { async fn migrate_anchor_leaf(&self) -> anyhow::Result<()> { let batch_size: i64 = 1000; let mut offset: i64 = 0; + let mut tx = self.db.read().await?; - loop { - let mut tx = self.db.read().await?; - - let (is_completed,) = query_as::<(bool,)>( - "SELECT completed from epoch_migration WHERE table_name = 'anchor_leaf'", - ) - .fetch_one(tx.as_mut()) - .await?; - - if is_completed { - tracing::info!("anchor leaf migration already done"); + let (is_completed,) = query_as::<(bool,)>( + "SELECT completed from epoch_migration WHERE table_name = 'anchor_leaf'", + ) + .fetch_one(tx.as_mut()) + .await?; - return Ok(()); - } + if is_completed { + tracing::info!("anchor leaf migration already done"); + return Ok(()); + } + loop { + let mut tx = self.db.read().await?; let rows = query("SELECT view, leaf, qc FROM anchor_leaf ORDER BY view LIMIT $1 OFFSET $2") .bind(batch_size) @@ -1374,22 +1373,22 @@ impl SequencerPersistence for Persistence { async fn migrate_da_proposals(&self) -> anyhow::Result<()> { let batch_size: i64 = 1000; let mut offset: i64 = 0; + let mut tx = self.db.read().await?; - loop { - let mut tx = self.db.read().await?; - - let (is_completed,) = query_as::<(bool,)>( - "SELECT completed from epoch_migration WHERE table_name = 'da_proposal'", - ) - .fetch_one(tx.as_mut()) - .await?; + let (is_completed,) = query_as::<(bool,)>( + "SELECT completed from epoch_migration WHERE table_name = 'da_proposal'", + ) + .fetch_one(tx.as_mut()) + .await?; - if is_completed { - tracing::info!("da proposals migration already done"); + if is_completed { + tracing::info!("da proposals migration already done"); - return Ok(()); - } + return Ok(()); + } + loop { + let mut tx = self.db.read().await?; let rows = query( "SELECT payload_hash, data FROM da_proposal ORDER BY view LIMIT $1 OFFSET $2", ) @@ -1454,22 +1453,21 @@ impl SequencerPersistence for Persistence { async fn migrate_vid_shares(&self) -> anyhow::Result<()> { let batch_size: i64 = 1000; let mut offset: i64 = 0; + let mut tx = self.db.read().await?; - loop { - let mut tx = self.db.read().await?; - - let (is_completed,) = query_as::<(bool,)>( - "SELECT completed from epoch_migration WHERE table_name = 'vid_share'", - ) - .fetch_one(tx.as_mut()) - .await?; - - if is_completed { - tracing::info!("vid_share migration already done"); + let (is_completed,) = query_as::<(bool,)>( + "SELECT completed from epoch_migration WHERE table_name = 'vid_share'", + ) + .fetch_one(tx.as_mut()) + .await?; - return Ok(()); - } + if is_completed { + tracing::info!("vid_share migration already done"); + return Ok(()); + } + loop { + let mut tx = self.db.read().await?; let rows = query("SELECT payload_hash, data FROM vid_share ORDER BY view LIMIT $1 OFFSET $2") .bind(batch_size) @@ -1581,22 +1579,21 @@ impl SequencerPersistence for Persistence { async fn migrate_quorum_proposals(&self) -> anyhow::Result<()> { let batch_size: i64 = 1000; let mut offset: i64 = 0; + let mut tx = self.db.read().await?; - loop { - let mut tx = self.db.read().await?; - - let (is_completed,) = query_as::<(bool,)>( - "SELECT completed from epoch_migration WHERE table_name = 'quorum_proposals'", - ) - .fetch_one(tx.as_mut()) - .await?; - - if is_completed { - tracing::info!("quorum proposals migration already done"); + let (is_completed,) = query_as::<(bool,)>( + "SELECT completed from epoch_migration WHERE table_name = 'quorum_proposals'", + ) + .fetch_one(tx.as_mut()) + .await?; - return Ok(()); - } + if is_completed { + tracing::info!("quorum proposals migration already done"); + return Ok(()); + } + loop { + let mut tx = self.db.read().await?; let rows = query("SELECT view, leaf_hash, data FROM quorum_proposals ORDER BY view LIMIT $1 OFFSET $2") .bind(batch_size) @@ -1662,22 +1659,21 @@ impl SequencerPersistence for Persistence { async fn migrate_quorum_certificates(&self) -> anyhow::Result<()> { let batch_size: i64 = 1000; let mut offset: i64 = 0; + let mut tx = self.db.read().await?; - loop { - let mut tx = self.db.read().await?; - - let (is_completed,) = query_as::<(bool,)>( - "SELECT completed from epoch_migration WHERE table_name = 'quorum_certificate'", - ) - .fetch_one(tx.as_mut()) - .await?; - - if is_completed { - tracing::info!(" quorum certificates migration already done"); + let (is_completed,) = query_as::<(bool,)>( + "SELECT completed from epoch_migration WHERE table_name = 'quorum_certificate'", + ) + .fetch_one(tx.as_mut()) + .await?; - return Ok(()); - } + if is_completed { + tracing::info!(" quorum certificates migration already done"); + return Ok(()); + } + loop { + let mut tx = self.db.read().await?; let rows = query("SELECT view, leaf_hash, data FROM quorum_certificate ORDER BY view LIMIT $1 OFFSET $2") .bind(batch_size) @@ -1855,7 +1851,7 @@ impl Provider for Persistence { Ok(Some((bytes,))) => bytes, Ok(None) => return None, Err(err) => { - tracing::warn!("error loading VID share: {err:#}"); + tracing::error!("error loading VID share: {err:#}"); return None; } }; @@ -1864,7 +1860,7 @@ impl Provider for Persistence { match bincode::deserialize(&bytes) { Ok(share) => share, Err(err) => { - tracing::warn!("error decoding VID share: {err:#}"); + tracing::error!("error decoding VID share: {err:#}"); return None; } }; @@ -1904,7 +1900,7 @@ impl Provider for Persistence { { Ok(proposal) => proposal, Err(err) => { - tracing::warn!("error decoding DA proposal: {err:#}"); + tracing::error!("error decoding DA proposal: {err:#}"); return None; } };