Skip to content

Commit

Permalink
simplifying detecting non indexed cols
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Jan 8, 2025
1 parent 7968763 commit 14581e6
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ class PartitionStatsIndexSupport(spark: SparkSession,
val indexSchema = transposedPartitionStatsDF.schema
val indexedCols : Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
// to be fixed. HUDI-8836.
val hasNonIndexedCols = new AtomicBoolean(false)
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = indexedCols,
hasNonIndexedCols = hasNonIndexedCols)).reduce(And)
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols = indexedCols)).reduce(And)
Some(transposedPartitionStatsDF.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Expression, In, Literal}
import org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.DataSkippingUtils.{LITERAL_TRUE_EXPR, translateIntoColumnStatsIndexFilterExpr}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}

import java.util.concurrent.atomic.AtomicBoolean
Expand Down Expand Up @@ -104,9 +104,8 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,

protected def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], fileNamesFromPrunedPartitions: Set[String], isExpressionIndex: Boolean = false): Set[String] = {
val indexedCols : Seq[String] = metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
val hasNonIndexedFilters = new AtomicBoolean(false)
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex, indexedCols, hasNonIndexedFilters)).reduce(And)
if (hasNonIndexedFilters.get()) {
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex, indexedCols)).reduce(And)
if (indexFilter.equals(TrueLiteral)) {
// if there are any non indexed cols or we can't translate source expr, we have to read all files and may not benefit from col stats lookup.
fileNamesFromPrunedPartitions
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean

object DataSkippingUtils extends Logging {

val LITERAL_TRUE_EXPR = TrueLiteral

/**
* Translates provided {@link filterExpr} into corresponding filter-expression for column-stats index table
* to filter out candidate files that would hold records matching the original filter.
Expand All @@ -48,8 +50,7 @@ object DataSkippingUtils extends Logging {
def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression, isExpressionIndex: Boolean = false, indexedCols : Seq[String] = Seq.empty,
hasNonIndexedCols : AtomicBoolean = new AtomicBoolean(false)): Expression = {
try {
createColumnStatsIndexFilterExprInternal(dataTableFilterExpr, isExpressionIndex, indexedCols,
hasNonIndexedCols)
createColumnStatsIndexFilterExprInternal(dataTableFilterExpr, isExpressionIndex, indexedCols)
} catch {
case e: AnalysisException =>
logDebug(s"Failed to translated provided data table filter expr into column stats one ($dataTableFilterExpr)", e)
Expand All @@ -58,15 +59,14 @@ object DataSkippingUtils extends Logging {
}

private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr: Expression, isExpressionIndex: Boolean = false,
indexedCols : Seq[String],
hasNonIndexedCols : AtomicBoolean = new AtomicBoolean(false)): Expression = {
indexedCols : Seq[String]): Expression = {
// Try to transform original Source Table's filter expression into
// Column-Stats Index filter expression
tryComposeIndexFilterExpr(dataTableFilterExpr, isExpressionIndex, indexedCols, hasNonIndexedCols) match {
tryComposeIndexFilterExpr(dataTableFilterExpr, isExpressionIndex, indexedCols) match {
case Some(e) => e
// NOTE: In case we can't transform source filter expression, we fallback
// to {@code TrueLiteral}, to essentially avoid pruning any indexed files from scanning
case None => TrueLiteral
case None => LITERAL_TRUE_EXPR
}
}

Expand All @@ -76,11 +76,9 @@ object DataSkippingUtils extends Logging {
* @param sourceFilterExpr source filter expression of interest.
* @param isExpressionIndex true if this refers to an expression index.
* @param indexedCols list of columns indexed with col stats index in MDT.
* @param hasNonIndexedCols atomic boolean tracking if there are any non indexed columns.
* @return optionally transformed Expression. Returns None if column of interest it not indexed nor translatable.
*/
private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression, isExpressionIndex: Boolean = false,
indexedCols : Seq[String], hasNonIndexedCols : AtomicBoolean = new AtomicBoolean(false)): Option[Expression] = {
private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression, isExpressionIndex: Boolean = false, indexedCols : Seq[String]): Option[Expression] = {
//
// For translation of the Filter Expression for the Data Table into Filter Expression for Column Stats Index, we're
// assuming that
Expand Down Expand Up @@ -127,7 +125,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
genColumnValuesEqualToExpression(colName, valueExpr, targetExprBuilder)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -137,7 +134,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
genColumnValuesEqualToExpression(colName, valueExpr, targetExprBuilder)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -151,7 +147,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
Not(genColumnOnlyValuesEqualToExpression(colName, value, targetExprBuilder))
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -161,7 +156,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
Not(genColumnOnlyValuesEqualToExpression(colName, value, targetExprBuilder))
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -171,7 +165,6 @@ object DataSkippingUtils extends Logging {
getTargetIndexedColumnName(attrRef, indexedCols)
.map(colName => EqualTo(genColNumNullsExpr(colName), litNull))
.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -183,7 +176,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -193,7 +185,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
LessThan(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -205,7 +196,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
GreaterThan(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -215,7 +205,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
GreaterThan(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -227,7 +216,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
LessThanOrEqual(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -237,7 +225,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
LessThanOrEqual(targetExprBuilder.apply(genColMinValueExpr(colName)), value)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -249,7 +236,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
GreaterThanOrEqual(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -259,7 +245,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
GreaterThanOrEqual(targetExprBuilder.apply(genColMaxValueExpr(colName)), value)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -269,7 +254,6 @@ object DataSkippingUtils extends Logging {
getTargetIndexedColumnName(attribute, indexedCols)
.map(colName => GreaterThan(genColNumNullsExpr(colName), Literal(0)))
.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -284,7 +268,6 @@ object DataSkippingUtils extends Logging {
val valueCountExpr = genColValueCountExpr
Or(Or(IsNull(numNullExpr), IsNull(valueCountExpr)), LessThan(numNullExpr, valueCountExpr))
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -298,7 +281,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
list.map(lit => genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)).reduce(Or)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -319,7 +301,6 @@ object DataSkippingUtils extends Logging {
genColumnValuesEqualToExpression(colName, lit, targetExprBuilder)
}.reduce(Or)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -332,7 +313,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
Not(list.map(lit => genColumnOnlyValuesEqualToExpression(colName, lit, targetExprBuilder)).reduce(Or))
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -348,7 +328,6 @@ object DataSkippingUtils extends Logging {
val targetExprBuilder: Expression => Expression = swapAttributeRefInExpr(sourceExpr, attrRef, _)
genColumnValuesEqualToExpression(colName, v, targetExprBuilder)
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

Expand All @@ -363,39 +342,30 @@ object DataSkippingUtils extends Logging {
val maxValueExpr = targetExprBuilder.apply(genColMaxValueExpr(colName))
Not(And(StartsWith(minValueExpr, value), StartsWith(maxValueExpr, value)))
}.orElse({
hasNonIndexedCols.set(true)
Option.empty
})

case or: Or =>
val leftHasNonIndexedCols = new AtomicBoolean(false)
val resLeft = createColumnStatsIndexFilterExprInternal(or.left, isExpressionIndex = isExpressionIndex,
indexedCols = indexedCols, hasNonIndexedCols = leftHasNonIndexedCols)
val rightHasNonIndexedCols = new AtomicBoolean(false)
val resRight = createColumnStatsIndexFilterExprInternal(or.right, isExpressionIndex = isExpressionIndex,
indexedCols = indexedCols, hasNonIndexedCols = rightHasNonIndexedCols)
if (leftHasNonIndexedCols.get() || rightHasNonIndexedCols.get()) {
hasNonIndexedCols.set(true)
val resLeft = createColumnStatsIndexFilterExprInternal(or.left, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols)
val resRight = createColumnStatsIndexFilterExprInternal(or.right, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols)
if (resLeft.equals(LITERAL_TRUE_EXPR) || resRight.equals(LITERAL_TRUE_EXPR)) {
None
} else {
Option(Or(resLeft, resRight))
}

case and: And =>
val leftHasNonIndexedCols = new AtomicBoolean(false)
val resLeft = createColumnStatsIndexFilterExprInternal(and.left, isExpressionIndex = isExpressionIndex,
indexedCols = indexedCols, hasNonIndexedCols = leftHasNonIndexedCols)
val rightHasNonIndexedCols = new AtomicBoolean(false)
val resRight = createColumnStatsIndexFilterExprInternal(and.right, isExpressionIndex = isExpressionIndex,
indexedCols = indexedCols, hasNonIndexedCols = rightHasNonIndexedCols)
val resLeft = createColumnStatsIndexFilterExprInternal(and.left, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols)
val resRight = createColumnStatsIndexFilterExprInternal(and.right, isExpressionIndex = isExpressionIndex, indexedCols = indexedCols)
// only if both left and right has non indexed cols, we can set hasNonIndexedCols to true.
// If not, we can still afford to prune files based on col stats lookup.
if (leftHasNonIndexedCols.get() && rightHasNonIndexedCols.get()) {
hasNonIndexedCols.set(true)
val isLeftLiteralTrue = resLeft.equals(LITERAL_TRUE_EXPR)
val isRightLiteralTrue = resRight.equals(LITERAL_TRUE_EXPR)
if (isLeftLiteralTrue && isRightLiteralTrue) {
None
} else if (leftHasNonIndexedCols.get()) {
} else if (isLeftLiteralTrue) {
Option(resRight) // Ignore the non-indexed left expression
} else if (rightHasNonIndexedCols.get()) {
} else if (isRightLiteralTrue) {
Option(resLeft) // Ignore the non-indexed right expression
} else {
Option(And(resLeft, resRight))
Expand All @@ -409,11 +379,11 @@ object DataSkippingUtils extends Logging {

case Not(And(left: Expression, right: Expression)) =>
Option(createColumnStatsIndexFilterExprInternal(Or(Not(left), Not(right)), isExpressionIndex = isExpressionIndex,
indexedCols = indexedCols, hasNonIndexedCols = hasNonIndexedCols))
indexedCols = indexedCols))

case Not(Or(left: Expression, right: Expression)) =>
Option(createColumnStatsIndexFilterExprInternal(And(Not(left), Not(right)), isExpressionIndex = isExpressionIndex,
indexedCols = indexedCols, hasNonIndexedCols = hasNonIndexedCols))
indexedCols = indexedCols))

case _: Expression => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase {
dataFilter4,
Seq("c1", "c2"),
TrueLiteral,
false
true
)

// too many filters, out of which only half are indexed.
Expand Down Expand Up @@ -297,11 +297,14 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase {
}

def translateIntoColStatsExprAndValidate(dataFilter: Expression, indexedCols: Seq[String], expectedExpr: Expression,
expectedHasNonIndexedCols: Boolean): Unit = {
val hasNonIndexedCols = new AtomicBoolean(false)
val transformedExpr = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(dataFilter, false, indexedCols, hasNonIndexedCols)
assertEquals(expectedExpr.toString(), transformedExpr.toString())
assertEquals(expectedHasNonIndexedCols, hasNonIndexedCols.get())
expectTrueLiteral: Boolean): Unit = {
val transformedExpr = DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr(dataFilter, false, indexedCols)
assertTrue(expectedExpr.equals(transformedExpr))
if (expectTrueLiteral) {
assertTrue(expectedExpr.equals(TrueLiteral))
} else {
assertTrue(!expectedExpr.equals(TrueLiteral))
}
}

def generateColStatsExprForGreaterthanOrEquals(colName: String, colValue: String): Expression = {
Expand Down

0 comments on commit 14581e6

Please sign in to comment.