Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jan 7, 2025
1 parent 3e45524 commit 6ca1ca6
Showing 1 changed file with 112 additions and 11 deletions.
123 changes: 112 additions & 11 deletions crates/core/src/file_group/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ impl FileGroupReader {
})
}

fn create_boolean_array_mask(&self, records: &RecordBatch) -> Result<BooleanArray> {
let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
for filter in &self.and_filters {
let col_name = filter.field.name().as_str();
let col_values = records
.column_by_name(col_name)
.ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not found")))?;

Check warning on line 93 in crates/core/src/file_group/reader.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/file_group/reader.rs#L93

Added line #L93 was not covered by tests

let comparison = filter.apply_comparsion(col_values)?;
mask = and(&mask, &comparison)?;
}
Ok(mask)
}

pub async fn read_file_slice_by_base_file_path(
&self,
relative_path: &str,
Expand All @@ -98,17 +112,7 @@ impl FileGroupReader {
return Ok(records);
}

let mut mask = BooleanArray::from(vec![true; records.num_rows()]);
for filter in &self.and_filters {
let col_name = filter.field.name().as_str();
let col_values = records
.column_by_name(col_name)
.ok_or_else(|| ReadFileSliceError(format!("Column {col_name} not found")))?;

let comparison = filter.apply_comparsion(col_values)?;
mask = and(&mask, &comparison)?;
}

let mask = self.create_boolean_array_mask(&records)?;
filter_record_batch(&records, &mask)
.map_err(|e| ReadFileSliceError(format!("Failed to filter records: {e:?}")))

Check warning on line 117 in crates/core/src/file_group/reader.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/file_group/reader.rs#L115-L117

Added lines #L115 - L117 were not covered by tests
}
Expand All @@ -122,6 +126,12 @@ impl FileGroupReader {
#[cfg(test)]
mod tests {
use super::*;
use crate::error::CoreError;
use crate::expr::filter::FilterField;
use arrow::array::{ArrayRef, Int64Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
use url::Url;

#[test]
Expand All @@ -132,6 +142,41 @@ mod tests {
assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
}

fn create_test_schema() -> Schema {
Schema::new(vec![
Field::new("_hoodie_commit_time", DataType::Utf8, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int64, false),
])
}

#[tokio::test]
async fn test_new_with_filters() -> Result<()> {
let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
let storage = Storage::new_with_base_url(base_url)?;
let schema = create_test_schema();

// Test case 1: Empty filters
let reader = FileGroupReader::new_with_filters(storage.clone(), &[], &schema)?;
assert!(reader.and_filters.is_empty());

// Test case 2: Multiple filters
let filters = vec![
FilterField::new("_hoodie_commit_time").gt("0"),
FilterField::new("age").gte("18"),
];
let reader = FileGroupReader::new_with_filters(storage.clone(), &filters, &schema)?;
assert_eq!(reader.and_filters.len(), 2);

// Test case 3: Invalid field name should error
let invalid_filters = vec![FilterField::new("non_existent_field").eq("value")];
assert!(
FileGroupReader::new_with_filters(storage.clone(), &invalid_filters, &schema).is_err()
);

Ok(())
}

#[test]
fn test_new_with_options() -> Result<()> {
let options = vec![("key1", "value1"), ("key2", "value2")];
Expand All @@ -155,4 +200,60 @@ mod tests {
.await;
assert!(matches!(result.unwrap_err(), ReadFileSliceError(_)));
}

fn create_test_record_batch() -> Result<RecordBatch> {
let schema = Arc::new(create_test_schema());

let commit_times: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3", "4", "5"]));
let names: ArrayRef = Arc::new(StringArray::from(vec![
"Alice", "Bob", "Charlie", "David", "Eve",
]));
let ages: ArrayRef = Arc::new(Int64Array::from(vec![25, 30, 35, 40, 45]));

RecordBatch::try_new(schema, vec![commit_times, names, ages]).map_err(CoreError::ArrowError)
}

#[test]
fn test_create_boolean_array_mask() -> Result<()> {
let storage =
Storage::new_with_base_url(Url::parse("file:///non-existent-path/table").unwrap())?;
let schema = create_test_schema();
let records = create_test_record_batch()?;

// Test case 1: No filters
let reader = FileGroupReader::new_with_filters(storage.clone(), &[], &schema)?;
let mask = reader.create_boolean_array_mask(&records)?;
assert_eq!(mask, BooleanArray::from(vec![true; 5]));

// Test case 2: Single filter on commit time
let filters = vec![FilterField::new("_hoodie_commit_time").gt("2")];
let reader = FileGroupReader::new_with_filters(storage.clone(), &filters, &schema)?;
let mask = reader.create_boolean_array_mask(&records)?;
assert_eq!(
mask,
BooleanArray::from(vec![false, false, true, true, true]),
"Expected only records with commit_time > '2'"
);

// Test case 3: Multiple AND filters
let filters = vec![
FilterField::new("_hoodie_commit_time").gt("2"),
FilterField::new("age").lt("40"),
];
let reader = FileGroupReader::new_with_filters(storage.clone(), &filters, &schema)?;
let mask = reader.create_boolean_array_mask(&records)?;
assert_eq!(
mask,
BooleanArray::from(vec![false, false, true, false, false]),
"Expected only record with commit_time > '2' AND age < 40"
);

// Test case 4: Filter resulting in all false
let filters = vec![FilterField::new("age").gt("100")];
let reader = FileGroupReader::new_with_filters(storage.clone(), &filters, &schema)?;
let mask = reader.create_boolean_array_mask(&records)?;
assert_eq!(mask, BooleanArray::from(vec![false; 5]));

Ok(())
}
}

0 comments on commit 6ca1ca6

Please sign in to comment.