Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change dataset table to only hold Crucible datasets #7386

Merged
merged 12 commits into from
Jan 27, 2025
96 changes: 42 additions & 54 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use ipnetwork::IpNetwork;
use itertools::Itertools;
use nexus_config::PostgresConfigWithUrl;
use nexus_db_model::to_db_typed_uuid;
use nexus_db_model::Dataset;
use nexus_db_model::CrucibleDataset;
use nexus_db_model::Disk;
use nexus_db_model::DnsGroup;
use nexus_db_model::DnsName;
Expand Down Expand Up @@ -1689,16 +1689,16 @@ async fn cmd_db_disk_physical(
// zpool has the sled id, record that so we can find the serial number.
sled_ids.insert(zp.sled_id);

// Next, we find all the datasets that are on our zpool.
use db::schema::dataset::dsl as dataset_dsl;
let mut query = dataset_dsl::dataset.into_boxed();
// Next, we find all the Crucible datasets that are on our zpool.
use db::schema::crucible_dataset::dsl as dataset_dsl;
let mut query = dataset_dsl::crucible_dataset.into_boxed();
if !fetch_opts.include_deleted {
query = query.filter(dataset_dsl::time_deleted.is_null());
}

let datasets = query
.filter(dataset_dsl::pool_id.eq(zp.id()))
.select(Dataset::as_select())
.select(CrucibleDataset::as_select())
.load_async(&*conn)
.await
.context("loading dataset")?;
Expand All @@ -1724,7 +1724,7 @@ async fn cmd_db_disk_physical(
my_sled.serial_number()
);
}
println!("DATASETS: {:?}", dataset_ids);
println!("CRUCIBLE DATASETS: {:?}", dataset_ids);

let mut volume_ids = HashSet::new();
// Now, take the list of datasets we found and search all the regions
Expand Down Expand Up @@ -4592,22 +4592,22 @@ async fn cmd_db_validate_regions(
// the destroyed state) before hard-deleting the records in the database.

// First, get all region records (with their corresponding dataset)
let datasets_and_regions: Vec<(Dataset, Region)> = datastore
let datasets_and_regions: Vec<(CrucibleDataset, Region)> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
// Selecting all datasets and regions requires a full table scan
conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?;

use db::schema::dataset::dsl as dataset_dsl;
use db::schema::crucible_dataset::dsl as dataset_dsl;
use db::schema::region::dsl;

dsl::region
.inner_join(
dataset_dsl::dataset
dataset_dsl::crucible_dataset
.on(dsl::dataset_id.eq(dataset_dsl::id)),
)
.select((Dataset::as_select(), Region::as_select()))
.select((CrucibleDataset::as_select(), Region::as_select()))
.get_results_async(&conn)
.await
})
Expand All @@ -4628,8 +4628,9 @@ async fn cmd_db_validate_regions(
for (dataset, region) in &datasets_and_regions {
// If the dataset was expunged, do not attempt to contact the Crucible
// agent!
let in_service =
datastore.dataset_physical_disk_in_service(dataset.id()).await?;
let in_service = datastore
.crucible_dataset_physical_disk_in_service(dataset.id())
.await?;

if !in_service {
eprintln!(
Expand All @@ -4644,11 +4645,7 @@ async fn cmd_db_validate_regions(
use crucible_agent_client::types::State;
use crucible_agent_client::Client as CrucibleAgentClient;

let Some(dataset_addr) = dataset.address() else {
eprintln!("Dataset {} missing an IP address", dataset.id());
continue;
};

let dataset_addr = dataset.address();
let url = format!("http://{}", dataset_addr);
let client = CrucibleAgentClient::new(&url);

Expand Down Expand Up @@ -4730,18 +4727,17 @@ async fn cmd_db_validate_regions(
datasets_and_regions.iter().map(|(_, r)| r.id()).collect();

// Find all the Crucible datasets
let datasets: Vec<Dataset> = datastore
let datasets: Vec<CrucibleDataset> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
// Selecting all datasets and regions requires a full table scan
conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?;

use db::schema::dataset::dsl;
use db::schema::crucible_dataset::dsl;

dsl::dataset
.filter(dsl::kind.eq(nexus_db_model::DatasetKind::Crucible))
.select(Dataset::as_select())
dsl::crucible_dataset
.select(CrucibleDataset::as_select())
.get_results_async(&conn)
.await
})
Expand All @@ -4750,8 +4746,9 @@ async fn cmd_db_validate_regions(
for dataset in &datasets {
// If the dataset was expunged, do not attempt to contact the Crucible
// agent!
let in_service =
datastore.dataset_physical_disk_in_service(dataset.id()).await?;
let in_service = datastore
.crucible_dataset_physical_disk_in_service(dataset.id())
.await?;

if !in_service {
eprintln!(
Expand All @@ -4765,11 +4762,7 @@ async fn cmd_db_validate_regions(
use crucible_agent_client::types::State;
use crucible_agent_client::Client as CrucibleAgentClient;

let Some(dataset_addr) = dataset.address() else {
eprintln!("Dataset {} missing an IP address", dataset.id());
continue;
};

let dataset_addr = dataset.address();
let url = format!("http://{}", dataset_addr);
let client = CrucibleAgentClient::new(&url);

Expand Down Expand Up @@ -4856,25 +4849,26 @@ async fn cmd_db_validate_region_snapshots(
BTreeMap::default();

// First, get all region snapshot records (with their corresponding dataset)
let datasets_and_region_snapshots: Vec<(Dataset, RegionSnapshot)> = {
let datasets_region_snapshots: Vec<(Dataset, RegionSnapshot)> =
let datasets_and_region_snapshots: Vec<(CrucibleDataset, RegionSnapshot)> = {
let datasets_region_snapshots: Vec<(CrucibleDataset, RegionSnapshot)> =
datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
// Selecting all datasets and region snapshots requires a full table scan
// Selecting all datasets and region snapshots requires a
// full table scan
conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?;

use db::schema::dataset::dsl as dataset_dsl;
use db::schema::crucible_dataset::dsl as dataset_dsl;
use db::schema::region_snapshot::dsl;

dsl::region_snapshot
.inner_join(
dataset_dsl::dataset
dataset_dsl::crucible_dataset
.on(dsl::dataset_id.eq(dataset_dsl::id)),
)
.select((
Dataset::as_select(),
CrucibleDataset::as_select(),
RegionSnapshot::as_select(),
))
.get_results_async(&conn)
Expand Down Expand Up @@ -4906,8 +4900,9 @@ async fn cmd_db_validate_region_snapshots(

// If the dataset was expunged, do not attempt to contact the Crucible
// agent!
let in_service =
datastore.dataset_physical_disk_in_service(dataset.id()).await?;
let in_service = datastore
.crucible_dataset_physical_disk_in_service(dataset.id())
.await?;

if !in_service {
continue;
Expand All @@ -4917,11 +4912,7 @@ async fn cmd_db_validate_region_snapshots(
use crucible_agent_client::types::State;
use crucible_agent_client::Client as CrucibleAgentClient;

let Some(dataset_addr) = dataset.address() else {
eprintln!("Dataset {} missing an IP address", dataset.id());
continue;
};

let dataset_addr = dataset.address();
let url = format!("http://{}", dataset_addr);
let client = CrucibleAgentClient::new(&url);

Expand Down Expand Up @@ -5055,23 +5046,23 @@ async fn cmd_db_validate_region_snapshots(
}

// Second, get all regions
let datasets_and_regions: Vec<(Dataset, Region)> = {
let datasets_and_regions: Vec<(Dataset, Region)> = datastore
let datasets_and_regions: Vec<(CrucibleDataset, Region)> = {
let datasets_and_regions: Vec<(CrucibleDataset, Region)> = datastore
.pool_connection_for_tests()
.await?
.transaction_async(|conn| async move {
// Selecting all datasets and regions requires a full table scan
conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?;

use db::schema::dataset::dsl as dataset_dsl;
use db::schema::crucible_dataset::dsl as dataset_dsl;
use db::schema::region::dsl;

dsl::region
.inner_join(
dataset_dsl::dataset
dataset_dsl::crucible_dataset
.on(dsl::dataset_id.eq(dataset_dsl::id)),
)
.select((Dataset::as_select(), Region::as_select()))
.select((CrucibleDataset::as_select(), Region::as_select()))
.get_results_async(&conn)
.await
})
Expand All @@ -5085,8 +5076,9 @@ async fn cmd_db_validate_region_snapshots(
for (dataset, region) in datasets_and_regions {
// If the dataset was expunged, do not attempt to contact the Crucible
// agent!
let in_service =
datastore.dataset_physical_disk_in_service(dataset.id()).await?;
let in_service = datastore
.crucible_dataset_physical_disk_in_service(dataset.id())
.await?;

if !in_service {
continue;
Expand All @@ -5096,11 +5088,7 @@ async fn cmd_db_validate_region_snapshots(
use crucible_agent_client::types::State;
use crucible_agent_client::Client as CrucibleAgentClient;

let Some(dataset_addr) = dataset.address() else {
eprintln!("Dataset {} missing an IP address", dataset.id());
continue;
};

let dataset_addr = dataset.address();
let url = format!("http://{}", dataset_addr);
let client = CrucibleAgentClient::new(&url);

Expand Down
78 changes: 78 additions & 0 deletions nexus/db-model/src/crucible_dataset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use super::{Generation, Region, SqlU16};
use crate::collection::DatastoreCollectionConfig;
use crate::ipv6;
use crate::schema::{crucible_dataset, region};
use chrono::{DateTime, Utc};
use db_macros::Asset;
use serde::{Deserialize, Serialize};
use std::net::{Ipv6Addr, SocketAddrV6};
use uuid::Uuid;

/// Database representation of a Crucible dataset's live information.
///
/// This includes the socket address of the Crucible agent that owns this
/// dataset and the amount of space used.
#[derive(
Queryable,
Insertable,
Debug,
Clone,
Selectable,
Asset,
Deserialize,
Serialize,
PartialEq,
)]
#[diesel(table_name = crucible_dataset)]
#[asset(uuid_kind = DatasetKind)]
pub struct CrucibleDataset {
#[diesel(embed)]
identity: CrucibleDatasetIdentity,
time_deleted: Option<DateTime<Utc>>,
rcgen: Generation,

pub pool_id: Uuid,

ip: ipv6::Ipv6Addr,
port: SqlU16,

pub size_used: i64,
}

impl CrucibleDataset {
pub fn new(
id: omicron_uuid_kinds::DatasetUuid,
pool_id: Uuid,
addr: SocketAddrV6,
) -> Self {
Self {
identity: CrucibleDatasetIdentity::new(id),
time_deleted: None,
rcgen: Generation::new(),
pool_id,
ip: addr.ip().into(),
port: addr.port().into(),
size_used: 0,
}
}

pub fn address(&self) -> SocketAddrV6 {
self.address_with_port(self.port.into())
}

pub fn address_with_port(&self, port: u16) -> SocketAddrV6 {
SocketAddrV6::new(Ipv6Addr::from(self.ip), port, 0, 0)
}
}

// Datasets contain regions
impl DatastoreCollectionConfig<Region> for CrucibleDataset {
type CollectionId = Uuid;
type GenerationNumberColumn = crucible_dataset::dsl::rcgen;
type CollectionTimeDeletedColumn = crucible_dataset::dsl::time_deleted;
type CollectionIdColumn = region::dsl::dataset_id;
}
Loading
Loading