diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala index 17e55ef9c9b1..972e8dbd1ccb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala @@ -27,15 +27,16 @@ import org.apache.hudi.common.model.{FileSlice, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.common.util.hash.ColumnIndexID +import org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadataUtil} import org.apache.hudi.util.JFunction - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{And, Expression} import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Column, SparkSession} +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ class PartitionStatsIndexSupport(spark: SparkSession, @@ -96,7 +97,9 @@ class PartitionStatsIndexSupport(spark: SparkSession, // column in a filter does not have the stats available, by making sure such a // filter does not prune any partition. val indexSchema = transposedPartitionStatsDF.schema - val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)).reduce(And) + val indexedCols : Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq + // to be fixed. HUDI-8836. + val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = indexedCols)).reduce(And) Some(transposedPartitionStatsDF.where(new Column(indexFilter)) .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) .collect() diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala index c698ad37ee82..06c012ac6089 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala @@ -25,12 +25,15 @@ import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.keygen.KeyGenUtils import org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR +import org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata} import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, In, Literal} -import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr +import org.apache.spark.sql.hudi.DataSkippingUtils.{LITERAL_TRUE_EXPR, translateIntoColumnStatsIndexFilterExpr} import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import scala.util.control.Breaks.{break, breakable} import scala.util.control.NonFatal @@ -99,31 +102,37 @@ abstract class SparkBaseIndexSupport(spark: SparkSession, (prunedPartitions, prunedFiles) } - protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], prunedFileNames: Set[String], isExpressionIndex: Boolean = false): Set[String] = { - val indexSchema = indexDf.schema - val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema, isExpressionIndex)).reduce(And) - val prunedCandidateFileNames = - indexDf.where(new Column(indexFilter)) - .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) + protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String], isExpressionIndex: Boolean = false): Set[String] = { + val indexedCols : Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq + val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex, indexedCols)).reduce(And) + if (indexFilter.equals(TrueLiteral)) { + // if there are any non indexed cols or we can't translate source expr, we have to read all files and may not benefit from col stats lookup. + fileNamesFromPrunedPartitions + } else { + // only lookup in col stats if all filters are eligible to be looked up in col stats index in MDT + val prunedCandidateFileNames = + indexDf.where(new Column(indexFilter)) + .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) + .collect() + .map(_.getString(0)) + .toSet + + // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every + // base-file or log file: since it's bound to clustering, which could occur asynchronously + // at arbitrary point in time, and is not likely to be touching all of the base files. + // + // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) + // files and all outstanding base-files or log files, and make sure that all base files and + // log file not represented w/in the index are included in the output of this method + val allIndexedFileNames = + indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) .collect() .map(_.getString(0)) .toSet + val notIndexedFileNames = fileNamesFromPrunedPartitions -- allIndexedFileNames - // NOTE: Col-Stats Index isn't guaranteed to have complete set of statistics for every - // base-file or log file: since it's bound to clustering, which could occur asynchronously - // at arbitrary point in time, and is not likely to be touching all of the base files. - // - // To close that gap, we manually compute the difference b/w all indexed (by col-stats-index) - // files and all outstanding base-files or log files, and make sure that all base files and - // log file not represented w/in the index are included in the output of this method - val allIndexedFileNames = - indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME) - .collect() - .map(_.getString(0)) - .toSet - val notIndexedFileNames = prunedFileNames -- allIndexedFileNames - - prunedCandidateFileNames ++ notIndexedFileNames + prunedCandidateFileNames ++ notIndexedFileNames + } } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala index 6e74c2f1e354..cfd722ceb1e6 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.ColumnStatsIndexSupport.{getMaxColumnNameFor, getMinColumnNameFor, getNullCountColumnNameFor, getValueCountColumnNameFor} import org.apache.hudi.SparkAdapterSupport import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.common.util.VisibleForTesting import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, HoodieCatalystExpressionUtils} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute @@ -30,8 +31,12 @@ import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils._ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String +import java.util.concurrent.atomic.AtomicBoolean + object DataSkippingUtils extends Logging { + val LITERAL_TRUE_EXPR = TrueLiteral + /** * Translates provided {@link filterExpr} into corresponding filter-expression for column-stats index table * to filter out candidate files that would hold records matching the original filter. @@ -42,9 +47,10 @@ object DataSkippingUtils extends Logging { * @param isExpressionIndex whether the index is an expression index * @return filter for column-stats index's table */ - def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression, indexSchema: StructType, isExpressionIndex: Boolean = false): Expression = { + def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression, isExpressionIndex: Boolean = false, indexedCols : Seq[String] = Seq.empty, + hasNonIndexedCols : AtomicBoolean = new AtomicBoolean(false)): Expression = { try { - createColumnStatsIndexFilterExprInternal(dataTableFilterExpr, indexSchema, isExpressionIndex) + createColumnStatsIndexFilterExprInternal(dataTableFilterExpr, isExpressionIndex, indexedCols) } catch { case e: AnalysisException => logDebug(s"Failed to translated provided data table filter expr into column stats one ($dataTableFilterExpr)", e) @@ -52,18 +58,27 @@ object DataSkippingUtils extends Logging { } } - private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr: Expression, indexSchema: StructType, isExpressionIndex: Boolean = false): Expression = { + private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr: Expression, isExpressionIndex: Boolean = false, + indexedCols : Seq[String]): Expression = { // Try to transform original Source Table's filter expression into // Column-Stats Index filter expression - tryComposeIndexFilterExpr(dataTableFilterExpr, indexSchema, isExpressionIndex) match { + tryComposeIndexFilterExpr(dataTableFilterExpr, isExpressionIndex, indexedCols) match { case Some(e) => e // NOTE: In case we can't transform source filter expression, we fallback // to {@code TrueLiteral}, to essentially avoid pruning any indexed files from scanning - case None => TrueLiteral + case None => LITERAL_TRUE_EXPR } } - private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression, indexSchema: StructType, isExpressionIndex: Boolean = false): Option[Expression] = { + /** + * Composes index filter expression to be looked up with col stats index in MDT. + * For eg, a filter from source as "colA = 'abc'" will get transformed to "colA_minValue <= 'abc' and colA_maxValue >= 'abc'" + * @param sourceFilterExpr source filter expression of interest. + * @param isExpressionIndex true if this refers to an expression index. + * @param indexedCols list of columns indexed with col stats index in MDT. + * @return optionally transformed Expression. Returns None if column of interest it not indexed nor translatable. + */ + private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression, isExpressionIndex: Boolean = false, indexedCols : Seq[String]): Option[Expression] = { // // For translation of the Filter Expression for the Data Table into Filter Expression for Column Stats Index, we're // assuming that @@ -100,7 +115,7 @@ object DataSkippingUtils extends Logging { // Filter "expr(colA) = B" and "B = expr(colA)" // Translates to "(expr(colA_minValue) <= B) AND (B <= expr(colA_maxValue))" condition for index lookup case EqualTo(sourceExpr @ AllowedTransformationExpression(attrRef), valueExpr: Expression) if isValueExpression(valueExpr) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => // NOTE: Since we're supporting (almost) arbitrary expressions of the form `f(colA) = B`, we have to // appropriately translate such original expression targeted at Data Table, to corresponding @@ -109,137 +124,171 @@ object DataSkippingUtils extends Logging { // corresponding column in the Column Stats Index val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) genColumnValuesEqualToExpression(colName, valueExpr, targetExprBuilder) - } + }.orElse({ + Option.empty + }) case EqualTo(valueExpr: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(valueExpr) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) genColumnValuesEqualToExpression(colName, valueExpr, targetExprBuilder) - } + }.orElse({ + Option.empty + }) // Filter "expr(colA) != B" and "B != expr(colA)" // Translates to "NOT(expr(colA_minValue) = B AND expr(colA_maxValue) = B)" // NOTE: This is NOT an inversion of `colA = b`, instead this filter ONLY excludes files for which `colA = B` // holds true case Not(EqualTo(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression)) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) Not(genColumnOnlyValuesEqualToExpression(colName, value, targetExprBuilder)) - } + }.orElse({ + Option.empty + }) case Not(EqualTo(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef))) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) Not(genColumnOnlyValuesEqualToExpression(colName, value, targetExprBuilder)) - } + }.orElse({ + Option.empty + }) // Filter "colA = null" // Translates to "colA_nullCount = null" for index lookup case EqualNullSafe(attrRef: AttributeReference, litNull @ Literal(null, _)) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map(colName => EqualTo(genColNumNullsExpr(colName), litNull)) + .orElse({ + Option.empty + }) // Filter "expr(colA) < B" and "B > expr(colA)" // Translates to "expr(colA_minValue) < B" for index lookup case LessThan(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value) - } + }.orElse({ + Option.empty + }) case GreaterThan(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => - val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) - LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value) - } + val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) + LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value) + }.orElse({ + Option.empty + }) // Filter "B < expr(colA)" and "expr(colA) > B" // Translates to "B < colA_maxValue" for index lookup case LessThan(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) GreaterThan(targetExprBuilder.apply(genColMaxValueExpr(colName)), value) - } + }.orElse({ + Option.empty + }) case GreaterThan(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) GreaterThan(targetExprBuilder.apply(genColMaxValueExpr(colName)), value) - } + }.orElse({ + Option.empty + }) // Filter "expr(colA) <= B" and "B >= expr(colA)" // Translates to "colA_minValue <= B" for index lookup case LessThanOrEqual(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) LessThanOrEqual(targetExprBuilder.apply(genColMinValueExpr(colName)), value) - } + }.orElse({ + Option.empty + }) case GreaterThanOrEqual(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) LessThanOrEqual(targetExprBuilder.apply(genColMinValueExpr(colName)), value) - } + }.orElse({ + Option.empty + }) // Filter "B <= expr(colA)" and "expr(colA) >= B" // Translates to "B <= colA_maxValue" for index lookup case LessThanOrEqual(value: Expression, sourceExpr @ AllowedTransformationExpression(attrRef)) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) GreaterThanOrEqual(targetExprBuilder.apply(genColMaxValueExpr(colName)), value) - } + }.orElse({ + Option.empty + }) case GreaterThanOrEqual(sourceExpr @ AllowedTransformationExpression(attrRef), value: Expression) if isValueExpression(value) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) GreaterThanOrEqual(targetExprBuilder.apply(genColMaxValueExpr(colName)), value) - } + }.orElse({ + Option.empty + }) // Filter "colA is null" // Translates to "colA_nullCount > 0" for index lookup case IsNull(attribute: AttributeReference) => - getTargetIndexedColumnName(attribute, indexSchema) + getTargetIndexedColumnName(attribute, indexedCols) .map(colName => GreaterThan(genColNumNullsExpr(colName), Literal(0))) + .orElse({ + Option.empty + }) // Filter "colA is not null" // Translates to "colA_nullCount = null or colA_valueCount = null or colA_nullCount < colA_valueCount" for index lookup // "colA_nullCount = null or colA_valueCount = null" means we are not certain whether the column is null or not, // hence we return True to ensure this does not affect the query. case IsNotNull(attribute: AttributeReference) => - getTargetIndexedColumnName(attribute, indexSchema) + getTargetIndexedColumnName(attribute, indexedCols) .map {colName => val numNullExpr = genColNumNullsExpr(colName) val valueCountExpr = genColValueCountExpr Or(Or(IsNull(numNullExpr), IsNull(valueCountExpr)), LessThan(numNullExpr, valueCountExpr)) - } + }.orElse({ + Option.empty + }) // Filter "expr(colA) in (B1, B2, ...)" // Translates to "(colA_minValue <= B1 AND colA_maxValue >= B1) OR (colA_minValue <= B2 AND colA_maxValue >= B2) ... " // for index lookup // NOTE: This is equivalent to "colA = B1 OR colA = B2 OR ..." case In(sourceExpr @ AllowedTransformationExpression(attrRef), list: Seq[Expression]) if list.forall(isValueExpression) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) list.map(lit => genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)).reduce(Or) - } + }.orElse({ + Option.empty + }) // Filter "expr(colA) in (B1, B2, ...)" // NOTE: [[InSet]] is an optimized version of the [[In]] expression, where every sub-expression w/in the // set is a static literal case InSet(sourceExpr @ AllowedTransformationExpression(attrRef), hset: Set[Any]) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) hset.map { value => @@ -251,17 +300,21 @@ object DataSkippingUtils extends Logging { } genColumnValuesEqualToExpression(colName, lit, targetExprBuilder) }.reduce(Or) - } + }.orElse({ + Option.empty + }) // Filter "expr(colA) not in (B1, B2, ...)" // Translates to "NOT((colA_minValue = B1 AND colA_maxValue = B1) OR (colA_minValue = B2 AND colA_maxValue = B2))" for index lookup // NOTE: This is NOT an inversion of `in (B1, B2, ...)` expr, this is equivalent to "colA != B1 AND colA != B2 AND ..." case Not(In(sourceExpr @ AllowedTransformationExpression(attrRef), list: Seq[Expression])) if list.forall(_.foldable) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) Not(list.map(lit => genColumnOnlyValuesEqualToExpression(colName, lit, targetExprBuilder)).reduce(Or)) - } + }.orElse({ + Option.empty + }) // Filter "colA like 'xxx%'" // Translates to "colA_minValue <= xxx AND xxx <= colA_maxValue" for index lookup @@ -270,35 +323,53 @@ object DataSkippingUtils extends Logging { // lexicographically, we essentially need to check that provided literal falls w/in min/max bounds of the // given column case StartsWith(sourceExpr @ AllowedTransformationExpression(attrRef), v @ Literal(_: UTF8String, _)) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) genColumnValuesEqualToExpression(colName, v, targetExprBuilder) - } + }.orElse({ + Option.empty + }) // Filter "expr(colA) not like 'xxx%'" // Translates to "NOT(expr(colA_minValue) like 'xxx%' AND expr(colA_maxValue) like 'xxx%')" for index lookup // NOTE: This is NOT an inversion of "colA like xxx" case Not(StartsWith(sourceExpr @ AllowedTransformationExpression(attrRef), value @ Literal(_: UTF8String, _))) => - getTargetIndexedColumnName(attrRef, indexSchema) + getTargetIndexedColumnName(attrRef, indexedCols) .map { colName => val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _) val minValueExpr = targetExprBuilder.apply(genColMinValueExpr(colName)) val maxValueExpr = targetExprBuilder.apply(genColMaxValueExpr(colName)) Not(And(StartsWith(minValueExpr, value), StartsWith(maxValueExpr, value))) - } + }.orElse({ + Option.empty + }) case or: Or => - val resLeft = createColumnStatsIndexFilterExprInternal(or.left, indexSchema) - val resRight = createColumnStatsIndexFilterExprInternal(or.right, indexSchema) - - Option(Or(resLeft, resRight)) + val resLeft = createColumnStatsIndexFilterExprInternal(or.left, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols) + val resRight = createColumnStatsIndexFilterExprInternal(or.right, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols) + if (resLeft.equals(LITERAL_TRUE_EXPR) || resRight.equals(LITERAL_TRUE_EXPR)) { + None + } else { + Option(Or(resLeft, resRight)) + } case and: And => - val resLeft = createColumnStatsIndexFilterExprInternal(and.left, indexSchema) - val resRight = createColumnStatsIndexFilterExprInternal(and.right, indexSchema) - - Option(And(resLeft, resRight)) + val resLeft = createColumnStatsIndexFilterExprInternal(and.left, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols) + val resRight = createColumnStatsIndexFilterExprInternal(and.right, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols) + // only if both left and right has non indexed cols, we can set hasNonIndexedCols to true. + // If not, we can still afford to prune files based on col stats lookup. + val isLeftLiteralTrue = resLeft.equals(LITERAL_TRUE_EXPR) + val isRightLiteralTrue = resRight.equals(LITERAL_TRUE_EXPR) + if (isLeftLiteralTrue && isRightLiteralTrue) { + None + } else if (isLeftLiteralTrue) { + Option(resRight) // Ignore the non-indexed left expression + } else if (isRightLiteralTrue) { + Option(resLeft) // Ignore the non-indexed right expression + } else { + Option(And(resLeft, resRight)) + } // // Pushing Logical NOT inside the AND/OR expressions @@ -307,29 +378,22 @@ object DataSkippingUtils extends Logging { // case Not(And(left: Expression, right: Expression)) => - Option(createColumnStatsIndexFilterExprInternal(Or(Not(left), Not(right)), indexSchema)) + Option(createColumnStatsIndexFilterExprInternal(Or(Not(left), Not(right)), isExpressionIndex = isExpressionIndex, + indexedCols = indexedCols)) case Not(Or(left: Expression, right: Expression)) => - Option(createColumnStatsIndexFilterExprInternal(And(Not(left), Not(right)), indexSchema)) + Option(createColumnStatsIndexFilterExprInternal(And(Not(left), Not(right)), isExpressionIndex = isExpressionIndex, + indexedCols = indexedCols)) case _: Expression => None } } - private def checkColIsIndexed(colName: String, indexSchema: StructType): Boolean = { - Set.apply( - getMinColumnNameFor(colName), - getMaxColumnNameFor(colName), - getNullCountColumnNameFor(colName) - ) - .forall(stat => indexSchema.exists(_.name == stat)) - } - - private def getTargetIndexedColumnName(resolvedExpr: AttributeReference, indexSchema: StructType): Option[String] = { + private def getTargetIndexedColumnName(resolvedExpr: AttributeReference, indexedCols: Seq[String]): Option[String] = { val colName = UnresolvedAttribute(getTargetColNameParts(resolvedExpr)).name // Verify that the column is indexed - if (checkColIsIndexed(colName, indexSchema)) { + if (indexedCols.contains(colName)) { Option.apply(colName) } else { None diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala index 94b29e290847..9494850c5e57 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSkippingUtils.scala @@ -125,7 +125,7 @@ class TestDataSkippingUtils extends HoodieSparkClientTestBase with SparkAdapterS @MethodSource(Array("testStringsLookupFilterExpressionsSource")) def testStringsLookupFilterExpressions(sourceExpr: Expression, input: Seq[IndexRow], output: Seq[String]): Unit = { val resolvedExpr = resolveExpr(spark, sourceExpr, sourceTableSchema) - val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) + val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexedCols = indexedCols) val sparkB = spark import sparkB.implicits._ @@ -155,7 +155,7 @@ class TestDataSkippingUtils extends HoodieSparkClientTestBase with SparkAdapterS } private def applyFilterExpr(resolvedExpr: Expression, input: Seq[IndexRow]): Seq[String] = { - val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexSchema) + val lookupFilter = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(resolvedExpr, indexedCols = indexedCols) val indexDf = spark.createDataFrame(input.map(_.toRow).asJava, indexSchema) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 9ceadf5b420f..144fc67fcdc3 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -46,6 +46,7 @@ import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions, config} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.codegen.TrueLiteral import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, Literal, Or} import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr import org.apache.spark.sql.types._ @@ -995,7 +996,8 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { ) columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF => - val andConditionActualFilter = andConditionFilters.map(translateIntoColumnStatsIndexFilterExpr(_, partialTransposedColStatsDF.schema)) + val andConditionActualFilter = andConditionFilters.map(translateIntoColumnStatsIndexFilterExpr(_, + indexedCols = targetColumnsToIndex)) .reduce(And) assertEquals(expectedAndConditionIndexedFilter, andConditionActualFilter) } @@ -1011,15 +1013,11 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase { GreaterThan(AttributeReference("c4", StringType, nullable = true)(), Literal("c4 filed value"))) ) - val expectedOrConditionIndexedFilter = Or( - GreaterThan(UnresolvedAttribute("c1_maxValue"), Literal(1)), - Literal(true) - ) - columnStatsIndex.loadTransposed(requestedColumns, shouldReadInMemory) { partialTransposedColStatsDF => - val orConditionActualFilter = orConditionFilters.map(translateIntoColumnStatsIndexFilterExpr(_, partialTransposedColStatsDF.schema)) + val orConditionActualFilter = orConditionFilters.map(translateIntoColumnStatsIndexFilterExpr(_, + indexedCols = targetColumnsToIndex)) .reduce(And) - assertEquals(expectedOrConditionIndexedFilter, orConditionActualFilter) + assertEquals(Literal("true").toString(), orConditionActualFilter.toString()) } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala index e560c8059fd8..67c4b4bb1b9e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala @@ -33,9 +33,12 @@ import org.apache.hudi.functional.ColumnStatIndexTestBase.{ColumnStatsTestCase, import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY import org.apache.hudi.metadata.HoodieMetadataFileSystemView import org.apache.hudi.util.JavaConversions - import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression, GreaterThan, Literal} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BitwiseOr, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, LessThanOrEqual, Literal, Or} +import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils.{AllowedTransformationExpression, swapAttributeRefInExpr} +import org.apache.spark.sql.hudi.{ColumnStatsExpressionUtils, DataSkippingUtils} import org.apache.spark.sql.types.StringType import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test @@ -43,7 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import java.io.File - +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { @@ -94,6 +97,219 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { setupTable(testCase, metadataOpts, commonOpts, shouldValidate = true, useShortSchema = true, validationSortColumns = Seq("c1_maxValue", "c1_minValue", "c2_maxValue", "c2_minValue", "c3_maxValue", "c3_minValue")) + + // predicate with c2. should prune based on col stats lookup + var dataFilter: Expression = EqualTo(attribute("c2"), literal("619sdc")) + verifyPruningFileCount(commonOpts, dataFilter) + // predicate w/ c5. should not lookup in col stats since the column is not indexed. + var dataFilter1: Expression = GreaterThan(attribute("c5"), literal("70")) + verifyPruningFileCount(commonOpts, dataFilter1, false) + + // a mix of two cols, where c2 is indexed and c5 is not indexed. but since its 'AND', pruning should kick in. + var dataFilter2 = And(dataFilter1, EqualTo(attribute("c2"), literal("619sdc"))) + verifyPruningFileCount(commonOpts, dataFilter2, true) + // adding an AND clause + dataFilter2 = And(dataFilter2, EqualTo(attribute("c5"), literal("100"))) + verifyPruningFileCount(commonOpts, dataFilter2, true) + // adding an OR clause where the col is indexed. expected to prune + var dataFilter2_1 = Or(dataFilter2, EqualTo(attribute("c2"), literal("619sda"))) + verifyPruningFileCount(commonOpts, dataFilter2_1, true) + // adding another Or clause, but this time the col is not indexed. So, no pruning expected. + dataFilter2_1 = Or(dataFilter2_1, EqualTo(attribute("c5"), literal("120"))) + verifyPruningFileCount(commonOpts, dataFilter2_1, false) + + // a mix of two cols, where c2 is indexed and c5 is not indexed. but since its 'OR', pruning should be by passed. + var dataFilter3 = Or(dataFilter1, EqualTo(attribute("c2"), literal("619sdc"))) + verifyPruningFileCount(commonOpts, dataFilter3, false) + // adding an OR clause + dataFilter3 = Or(dataFilter3, EqualTo(attribute("c5"), literal("100"))) + verifyPruningFileCount(commonOpts, dataFilter3, false) + // adding AND clause where the col is indexed. Expected to prune. + var dataFilter3_1 = And(dataFilter3, EqualTo(attribute("c2"), literal("619sda"))) + verifyPruningFileCount(commonOpts, dataFilter3_1, true) + // adding another AND clause where the col is not indexed. Still expected to prune since c2 = 619sda could still be pruned. + dataFilter3_1 = And(dataFilter3_1, EqualTo(attribute("c5"), literal("200"))) + verifyPruningFileCount(commonOpts, dataFilter3_1, true) + // adding an Or clause where the col is indexed. expected to prune. + var dataFilter3_2 = Or(dataFilter3_1, EqualTo(attribute("c2"), literal("619sda"))) + verifyPruningFileCount(commonOpts, dataFilter3_2, true) + // adding an Or clause where the col is not indexed. not expected to prune + dataFilter3_2 = Or(dataFilter3_2, EqualTo(attribute("c5"), literal("250"))) + verifyPruningFileCount(commonOpts, dataFilter3_2, false) + } + + @Test + def testTranslateIntoColumnStatsIndexFilterExpr(): Unit = { + var dataFilter: Expression = EqualTo(attribute("c1"), literal("619sdc")) + var expectedExpr: Expression = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + translateIntoColStatsExprAndValidate(dataFilter, Seq("c1"), expectedExpr, false) + + // if c1 is not indexed, we should get empty expr + translateIntoColStatsExprAndValidate(dataFilter, Seq.empty, TrueLiteral, true) + + // c1 = 619sdc and c2 = 100, where both c1 and c2 are indexed. + val dataFilter1 = And(dataFilter, EqualTo(attribute("c2"), literal("100"))) + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c2","100")) + translateIntoColStatsExprAndValidate(dataFilter1, Seq("c1","c2"), expectedExpr, false) + + // c1 = 619sdc and c2 = 100, where only c1 is indexed. we expect only c1 to be part of translated expr + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + translateIntoColStatsExprAndValidate(dataFilter1, Seq("c1"), expectedExpr, false) + + // c1 = 619sdc and c2 = 100 and c3 = 200, where all 3 (c1, c2 and c3) are indexed. + val dataFilter1_1 = And(dataFilter1, EqualTo(attribute("c3"), literal("200"))) + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c2","100")) + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c3","200")) + translateIntoColStatsExprAndValidate(dataFilter1_1, Seq("c1","c2","c3"), expectedExpr, false) + + // c1 = 619sdc and c2 = 100 and c3 = 200, where all only c1 and c3 are indexed. + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c3","200")) + translateIntoColStatsExprAndValidate(dataFilter1_1, Seq("c1","c3"), expectedExpr, false) + + // c1 = 619sdc and c2 = 100 and c3 = 200, where only c1 is indexed. we expect only c1 to be part of translated expr + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + translateIntoColStatsExprAndValidate(dataFilter1_1, Seq("c1"), expectedExpr, false) + + // c1 = 619sdc and c2 = 100 and c3 = 200, where none of c1, or c2 or c3 is indexed. + translateIntoColStatsExprAndValidate(dataFilter1_1, Seq(""), TrueLiteral, true) + + // c1 = 619sdc Or c2 = 100, where both c1 and c2 are indexed. + val dataFilter2 = Or(dataFilter, EqualTo(attribute("c2"), literal("100"))) + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + expectedExpr = Or(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c2","100")) + translateIntoColStatsExprAndValidate(dataFilter2, Seq("c1","c2"), expectedExpr, false) + + // c1 = 619sdc Or c2 = 100, where only c1 is indexed. Since its a 'or' condition, we can't translate the expr and we expect TrueLiteral. + translateIntoColStatsExprAndValidate(dataFilter2, Seq("c1"), TrueLiteral, true) + + // lets mix `and` and 'or' together. + // c1 = 619sdc and c2 = 100 or c3 = 200, where all of them (c1, c2 and c3) are indexed. + val dataFilter1_2 = Or(dataFilter1, EqualTo(attribute("c3"), literal("200"))) + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c2","100")) + expectedExpr = Or(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c3","200")) + translateIntoColStatsExprAndValidate(dataFilter1_2, Seq("c1","c2","c3"), expectedExpr, false) + + // c1 = 619sdc and c2 = 100 or c3 = 200, where c3 is not indexed. + translateIntoColStatsExprAndValidate(dataFilter1_2, Seq("c1","c2"), TrueLiteral, true) + + // c1 = 619sdc and c2 = 100 or c3 = 200, where c2 is not indexed. + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + expectedExpr = Or(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c3","200")) + translateIntoColStatsExprAndValidate(dataFilter1_2, Seq("c1","c3"), expectedExpr, false) + + // c1 = 619sdc and c2 = 100 or c3 = 200, where c1 is not indexed. + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c2","100") + expectedExpr = Or(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c3","200")) + translateIntoColStatsExprAndValidate(dataFilter1_2, Seq("c2","c3"), expectedExpr, false) + + // c1 = 619sdc and c2 = 100 or c3 = 200, where c1 and c3 is not indexed. + translateIntoColStatsExprAndValidate(dataFilter1_2, Seq("c2"), TrueLiteral, true) + + // c1 = 619sdc and c2 = 100 or c3 = 200, where c1 and c2 is not indexed. + translateIntoColStatsExprAndValidate(dataFilter1_2, Seq("c3"), TrueLiteral, true) + + // c1 = 619sdc and c2 = 100 or c3 = 200, where c2 and c3 is not indexed. + translateIntoColStatsExprAndValidate(dataFilter1_2, Seq("c1"), TrueLiteral, true) + + val dataFilter2_2 = And(dataFilter2, EqualTo(attribute("c3"), literal("200"))) + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + expectedExpr = Or(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c2","100")) + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c3","200")) + translateIntoColStatsExprAndValidate(dataFilter2_2, Seq("c1","c2","c3"), expectedExpr, false) + + // c1 = 619sdc or c2 = 100 and c3 = 200, where c3 is not indexed. + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","619sdc") + expectedExpr = Or(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c2","100")) + translateIntoColStatsExprAndValidate(dataFilter2_2, Seq("c1","c2"), expectedExpr, false) + + // c1 = 619sdc or c2 = 100 and c3 = 200, where c2 is not indexed. + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c3","200") + translateIntoColStatsExprAndValidate(dataFilter2_2, Seq("c1","c3"), expectedExpr, false) + + // c1 = 619sdc or c2 = 100 and c3 = 200, where c1 is not indexed. + translateIntoColStatsExprAndValidate(dataFilter2_2, Seq("c2","c3"), expectedExpr, false) + + // c1 = 619sdc or c2 = 100 and c3 = 200, where c1 and c3 is not indexed. + translateIntoColStatsExprAndValidate(dataFilter2_2, Seq("c2"), TrueLiteral, true) + + // c1 = 619sdc or c2 = 100 and c3 = 200, where c1 and c2 is not indexed. + translateIntoColStatsExprAndValidate(dataFilter2_2, Seq("c3"), expectedExpr, false) + + // c1 = 619sdc or c2 = 100 and c3 = 200, where c2 and c3 is not indexed. + translateIntoColStatsExprAndValidate(dataFilter2_2, Seq("c1"), TrueLiteral, true) + + // nested And and Or case + val dataFilter3 = And( + Or( + EqualTo(attribute("c1"), literal("619sdc")), + And(EqualTo(attribute("c2"), literal("100")), EqualTo(attribute("c3"), literal("200"))) + ), + EqualTo(attribute("c4"), literal("300")) + ) + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c2","100") + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c3","200")) + expectedExpr = Or(generateColStatsExprForGreaterthanOrEquals("c1","619sdc"), expectedExpr) + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals("c4","300")) + translateIntoColStatsExprAndValidate( + dataFilter3, + Seq("c1", "c2", "c3", "c4"), + expectedExpr, + false + ) + + // unsupported filter type + val dataFilter4 = BitwiseOr( + EqualTo(attribute("c1"), literal("619sdc")), + EqualTo(attribute("c2"), literal("100")) + ) + translateIntoColStatsExprAndValidate( + dataFilter4, + Seq("c1", "c2"), + TrueLiteral, + true + ) + + // too many filters, out of which only half are indexed. + val largeFilter = (1 to 100).map(i => EqualTo(attribute(s"c$i"), literal("value"))).reduce(And) + val indexedColumns = (1 to 50).map(i => s"c$i") + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","value") + (2 to 50).map(i => { + expectedExpr = And(expectedExpr, generateColStatsExprForGreaterthanOrEquals(s"c$i","value")) + }) + translateIntoColStatsExprAndValidate(largeFilter, indexedColumns, expectedExpr, false) + + // nested field part of data filters. + val dataFilter5 = And( + EqualTo(attribute("c1"), literal("val1")), + EqualTo(attribute("nested.inner_c2"), literal("val2")) + ) + expectedExpr = generateColStatsExprForGreaterthanOrEquals("c1","val1") + translateIntoColStatsExprAndValidate( + dataFilter5, + Seq("c1"), + expectedExpr, + false + ) + + } + + def translateIntoColStatsExprAndValidate(dataFilter: Expression, indexedCols: Seq[String], expectedExpr: Expression, + expectTrueLiteral: Boolean): Unit = { + val transformedExpr = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(dataFilter, false, indexedCols) + assertTrue(expectedExpr.equals(transformedExpr)) + if (expectTrueLiteral) { + assertTrue(expectedExpr.equals(TrueLiteral)) + } else { + assertTrue(!expectedExpr.equals(TrueLiteral)) + } + } + + def generateColStatsExprForGreaterthanOrEquals(colName: String, colValue: String): Expression = { + val expectedExpr: Expression = GreaterThanOrEqual(UnresolvedAttribute(colName + "_maxValue"), literal(colValue)) + And(LessThanOrEqual(UnresolvedAttribute(colName + "_minValue"), literal(colValue)), expectedExpr) } @ParameterizedTest @@ -406,7 +622,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { verifySQLQueries(numRecordsForFirstQuery, numRecordsForSecondQuery, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) commonOpts = commonOpts + (DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key -> "true") // TODO: https://issues.apache.org/jira/browse/HUDI-6657 - Investigate why below assertions fail with full table scan enabled. - //verifySQLQueries(numRecordsForFirstQuery, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) + // verifySQLQueries(numRecordsForFirstQuery, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, commonOpts, isTableDataSameAsAfterSecondInstant) var dataFilter: Expression = GreaterThan(attribute("c5"), literal("70")) verifyPruningFileCount(commonOpts, dataFilter) @@ -418,18 +634,26 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { verifyPruningFileCount(commonOpts, dataFilter) } - private def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression): Unit = { + private def verifyPruningFileCount(opts: Map[String, String], dataFilter: Expression, shouldPrune : Boolean = true): Unit = { // with data skipping val commonOpts = opts + ("path" -> basePath) var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, includeLogFiles = true) val filteredPartitionDirectories = fileIndex.listFiles(Seq(), Seq(dataFilter)) val filteredFilesCount = filteredPartitionDirectories.flatMap(s => s.files).size - assertTrue(filteredFilesCount < getLatestDataFilesCount(opts)) + if (shouldPrune) { + assertTrue(filteredFilesCount < getLatestDataFilesCount(opts)) + } else { + assertEquals(filteredFilesCount, getLatestDataFilesCount(opts)) + } // with no data skipping fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + (DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = true) val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), Seq(dataFilter)).flatMap(s => s.files).size - assertTrue(filteredFilesCount < filesCountWithNoSkipping) + if (shouldPrune) { + assertTrue(filteredFilesCount < filesCountWithNoSkipping) + } else { + assertEquals(filteredFilesCount, filesCountWithNoSkipping) + } } private def getLatestDataFilesCount(opts: Map[String, String], includeLogFiles: Boolean = true) = {