diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java index e9783b8366c5..f2c476047d11 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleFrameProcessor.java @@ -25,6 +25,7 @@ import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.processor.FrameProcessor; import org.apache.druid.frame.processor.FrameProcessors; import org.apache.druid.frame.processor.FrameRowTooLargeException; @@ -44,6 +45,7 @@ import org.apache.druid.query.groupby.having.AlwaysHavingSpec; import org.apache.druid.query.groupby.having.DimFilterHavingSpec; import org.apache.druid.query.groupby.having.HavingSpec; +import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; @@ -83,6 +85,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor private Supplier rowSupplierFromFrameCursor; private ResultRow outputRow = null; private FrameWriter frameWriter = null; + private long outputRows = 0L; public GroupByPostShuffleFrameProcessor( final GroupByQuery query, @@ -139,6 +142,7 @@ public ReturnOrAwait runIncrementally(final IntSet readableInputs) throw } writeCurrentFrameIfNeeded(); + writeEmptyAggregationsFrameIfNeeded(); return ReturnOrAwait.returnObject(Unit.instance()); } else { final Frame frame = inputChannel.read(); @@ -263,6 +267,40 @@ private void writeCurrentFrameIfNeeded() throws IOException outputChannel.write(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION)); frameWriter.close(); frameWriter = null; + outputRows += frame.numRows(); + } + } + + /** + * Generate a frame containing a single row with default values of all aggregations if needed. This method uses + * {@link GroupingEngine#summaryRowPreconditions(GroupByQuery)} to determine if such an operation is needed. + * + * Note that in cases where {@link GroupingEngine#summaryRowPreconditions(GroupByQuery)} returns true, the + * preceding {@link GroupByPreShuffleFrameProcessorFactory} stage would use an empty {@link ClusterBy}. Therefore, + * there would only be a single output partition of the prior stage, and therefore a single instance of + * this processor. This ensures that only a single null-aggregations row is generated for the entire stage. + */ + private void writeEmptyAggregationsFrameIfNeeded() throws IOException + { + if (outputRows == 0 && GroupingEngine.summaryRowPreconditions(query)) { + final int resultRowSize = query.getResultRowSignature().size(); + this.outputRow = ResultRow.create(resultRowSize); + final Object[] emptyResultArray = TimeseriesQueryQueryToolChest.getEmptyAggregations(query.getAggregatorSpecs()); + if (query.getResultRowHasTimestamp()) { + // Can happen if the query has granularity "ALL" but no intervals. In this case nothing is matched and the + // __time column will be ignored, but it's there in the result row signature anyway, so we need to populate it. + outputRow.set(0, 0L); + } + System.arraycopy( + emptyResultArray, + 0, + outputRow.getArray(), + query.getResultRowAggregatorStart(), + emptyResultArray.length + ); + setUpFrameWriterIfNeeded(); + writeOutputRow(); + writeCurrentFrameIfNeeded(); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java index 42f39fb78e56..d8be6ca423e6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java @@ -148,7 +148,7 @@ public QueryDefinition makeQueryDefinition( partitionBoost = true; } else { - shuffleSpecFactoryPreAggregation = doLimitOrOffset + shuffleSpecFactoryPreAggregation = doLimitOrOffset || intermediateClusterBy.isEmpty() ? ShuffleSpecFactories.singlePartition() : resultShuffleSpecFactory; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java index 566778ee7c99..33aa39e21a53 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java @@ -181,19 +181,4 @@ public void testJoinMultipleTablesWithWhereCondition() ) .run(); } - - @Override - @Test - public void testFilterParseLongNullable() - { - // this isn't really correct in default value mode, the result should be ImmutableList.of(new Object[]{0L}) - // but MSQ is missing default aggregator values in empty group results. this override can be removed when this - // is fixed - testBuilder().queryContext(QUERY_CONTEXT_DEFAULT) - .sql("select count(*) from druid.foo where parse_long(dim1, 10) is null") - .expectedResults( - ImmutableList.of(new Object[]{4L}) - ) - .run(); - } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java index 79282735f384..7a72002e1edf 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java @@ -91,6 +91,13 @@ public class GroupByQuery extends BaseQuery public static final String CTX_TIMESTAMP_RESULT_FIELD = "timestampResultField"; public static final String CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY = "timestampResultFieldGranularity"; public static final String CTX_TIMESTAMP_RESULT_FIELD_INDEX = "timestampResultFieldInOriginalDimensions"; + + /** + * Context key for whether this query has any "dropped" dimensions. This is set true for queries like + * "GROUP BY 'constant'", and enables {@link GroupingEngine#summaryRowPreconditions(GroupByQuery)} to correctly + * determine whether to include a summary row. + */ + public static final String CTX_HAS_DROPPED_DIMENSIONS = "hasDroppedDimensions"; private static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp"; private static final Comparator NON_GRANULAR_TIME_COMP = @@ -465,6 +472,14 @@ public boolean getApplyLimitPushDownFromContext() return context().getBoolean(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true); } + /** + * See {@link #CTX_HAS_DROPPED_DIMENSIONS}. + */ + public boolean hasDroppedDimensions() + { + return context().getBoolean(CTX_HAS_DROPPED_DIMENSIONS, false); + } + @Override public Ordering getResultOrdering() { diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java index acb07c2298fd..91629706788b 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupingEngine.java @@ -983,7 +983,10 @@ public static Sequence wrapSummaryRowIfNeeded(GroupByQuery query, Seq })); } - private static boolean summaryRowPreconditions(GroupByQuery query) + /** + * Whether a query should include a summary row. True for queries that correspond to SQL GROUP BY (). + */ + public static boolean summaryRowPreconditions(GroupByQuery query) { LimitSpec limit = query.getLimitSpec(); if (limit instanceof DefaultLimitSpec) { @@ -992,7 +995,7 @@ private static boolean summaryRowPreconditions(GroupByQuery query) return false; } } - if (!query.getDimensions().isEmpty()) { + if (!query.getDimensions().isEmpty() || query.hasDroppedDimensions()) { return false; } if (query.getGranularity().isFinerThan(Granularities.ALL)) { diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 91fddf49eefa..41ecbf1ddd24 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -160,8 +160,9 @@ public Sequence> doRun( // Usally it is NOT Okay to materialize results via toList(), but Granularity is ALL thus // we have only one record. final List> val = baseResults.toList(); - finalSequence = val.isEmpty() ? Sequences.simple(Collections.singletonList( - getNullTimeseriesResultValue(query))) : Sequences.simple(val); + finalSequence = val.isEmpty() + ? Sequences.simple(Collections.singletonList(getEmptyTimeseriesResultValue(query))) + : Sequences.simple(val); } else { finalSequence = baseResults; } @@ -227,32 +228,17 @@ public Comparator> createResultComparator(Query getNullTimeseriesResultValue(TimeseriesQuery query) + /** + * Returns a {@link TimeseriesResultValue} that corresponds to an empty-set aggregation, which is used in situations + * where we want to return a single result representing "nothing was aggregated". + */ + Result getEmptyTimeseriesResultValue(TimeseriesQuery query) { - List aggregatorSpecs = query.getAggregatorSpecs(); - Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()]; - String[] aggregatorNames = new String[aggregatorSpecs.size()]; - RowSignature aggregatorsSignature = - RowSignature.builder().addAggregators(aggregatorSpecs, RowSignature.Finalization.UNKNOWN).build(); - for (int i = 0; i < aggregatorSpecs.size(); i++) { - aggregators[i] = - aggregatorSpecs.get(i) - .factorize( - RowBasedColumnSelectorFactory.create( - RowAdapters.standardRow(), - () -> new MapBasedRow(null, null), - aggregatorsSignature, - false, - false - ) - ); - aggregatorNames[i] = aggregatorSpecs.get(i).getName(); - } + final Object[] resultArray = getEmptyAggregations(query.getAggregatorSpecs()); final DateTime start = query.getIntervals().isEmpty() ? DateTimes.EPOCH : query.getIntervals().get(0).getStart(); TimeseriesResultBuilder bob = new TimeseriesResultBuilder(start); - for (int i = 0; i < aggregatorSpecs.size(); i++) { - bob.addMetric(aggregatorNames[i], aggregators[i].get()); - aggregators[i].close(); + for (int i = 0; i < query.getAggregatorSpecs().size(); i++) { + bob.addMetric(query.getAggregatorSpecs().get(i).getName(), resultArray[i]); } return bob.build(); } @@ -545,4 +531,36 @@ private Function, Result> m ); }; } + + /** + * Returns a set of values that corresponds to an empty-set aggregation, which is used in situations + * where we want to return a single result representing "nothing was aggregated". The returned array has + * one element per {@link AggregatorFactory}. + */ + public static Object[] getEmptyAggregations(List aggregatorSpecs) + { + final Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()]; + final RowSignature aggregatorsSignature = + RowSignature.builder().addAggregators(aggregatorSpecs, RowSignature.Finalization.UNKNOWN).build(); + for (int i = 0; i < aggregatorSpecs.size(); i++) { + aggregators[i] = + aggregatorSpecs.get(i) + .factorize( + RowBasedColumnSelectorFactory.create( + RowAdapters.standardRow(), + () -> new MapBasedRow(null, null), + aggregatorsSignature, + false, + false + ) + ); + } + + final Object[] retVal = new Object[aggregatorSpecs.size()]; + for (int i = 0; i < aggregatorSpecs.size(); i++) { + retVal[i] = aggregators[i].get(); + aggregators[i].close(); + } + return retVal; + } } diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java index f5e46f04c008..1ae5964d17ef 100644 --- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java +++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChestTest.java @@ -44,15 +44,18 @@ import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; @RunWith(Parameterized.class) -public class TimeseriesQueryQueryToolChestTest +public class TimeseriesQueryQueryToolChestTest extends InitializedNullHandlingTest { private static final String TIMESTAMP_RESULT_FIELD_NAME = "d0"; private static final TimeseriesQueryQueryToolChest TOOL_CHEST = new TimeseriesQueryQueryToolChest(null); @@ -449,4 +452,27 @@ public void testResultsAsArrays() ) ); } + + @Test + public void testGetEmptyTimeseriesResultValue() + { + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .intervals("2000/P1D") + .dataSource("foo") + .aggregators(new CountAggregatorFactory("a0"), new LongSumAggregatorFactory("a1", "nofield")) + .build(); + + final Map resultMap = new HashMap<>(); + resultMap.put("a0", 0L); + resultMap.put("a1", null); + + Assert.assertEquals( + new Result<>( + DateTimes.of("2000"), + new TimeseriesResultValue(resultMap) + ), + TOOL_CHEST.getEmptyTimeseriesResultValue(query) + ); + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 72f72cd6d932..0330b9db909a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -1428,7 +1428,7 @@ private GroupByQuery toGroupByQuery() if (queryGranularity != null) { // group by more than one timestamp_floor // eg: group by timestamp_floor(__time to DAY),timestamp_floor(__time, to HOUR) - queryGranularity = null; + theContext.clear(); break; } queryGranularity = granularity; @@ -1449,7 +1449,13 @@ private GroupByQuery toGroupByQuery() } } } - if (queryGranularity == null) { + + if (grouping.getDimensions().isEmpty() && grouping.hasGroupingDimensionsDropped()) { + // GROUP BY (). + theContext.put(GroupByQuery.CTX_HAS_DROPPED_DIMENSIONS, grouping.hasGroupingDimensionsDropped()); + } + + if (theContext.isEmpty()) { return query; } return query.withOverriddenContext(theContext); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 125acc578f3e..90a64d5fc703 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -3857,7 +3857,6 @@ public void testGroupingWithNullInFilter() @Test public void testGroupingWithNullPlusNonNullInFilter() { - msqIncompatible(); testQuery( "SELECT COUNT(*) FROM foo WHERE dim1 IN (NULL, 'abc')", ImmutableList.of( @@ -3877,7 +3876,6 @@ public void testGroupingWithNullPlusNonNullInFilter() @Test public void testGroupingWithNotNullPlusNonNullInFilter() { - msqIncompatible(); testQuery( "SELECT COUNT(*) FROM foo WHERE dim1 NOT IN (NULL, 'abc')", ImmutableList.of( @@ -3902,7 +3900,7 @@ public void testGroupingWithNotNullPlusNonNullInFilter() @Test public void testGroupByNothingWithLiterallyFalseFilter() { - msqIncompatible(); + // Result of MAX(cnt) when nothing matches the filter. testQuery( "SELECT COUNT(*), MAX(cnt) FROM druid.foo WHERE 1 = 0", ImmutableList.of( @@ -3928,7 +3926,6 @@ public void testGroupByNothingWithLiterallyFalseFilter() @Test public void testGroupByNothingWithImpossibleTimeFilter() { - msqIncompatible(); // Regression test for https://github.com/apache/druid/issues/7671 testQuery( @@ -4331,7 +4328,6 @@ public void testCountStarWithLongColumnFiltersForceRange() @Test public void testCountStarWithLongColumnFiltersOnFloatLiterals() { - msqIncompatible(); testQuery( "SELECT COUNT(*) FROM druid.foo WHERE cnt > 1.1 and cnt < 100000001.0", ImmutableList.of( @@ -9762,8 +9758,7 @@ public void testTimeseriesDescending() @Test public void testTimeseriesEmptyResultsAggregatorDefaultValues() { - msqIncompatible(); - // timeseries with all granularity have a single group, so should return default results for given aggregators + // timeseries with all granularity has a single group, so should return default results for given aggregators. testQuery( "SELECT\n" + " count(*),\n" @@ -9848,13 +9843,149 @@ public void testTimeseriesEmptyResultsAggregatorDefaultValues() } @Test - public void testTimeseriesEmptyResultsAggregatorDefaultValuesNonVectorized() + public void testEmptyResultsAggregatorWithHavingTrue() { - // Empty-dataset aggregation queries in MSQ return an empty row, rather than a single row as SQL requires. - msqIncompatible(); + // GROUP BY () that matches nothing should return an empty result row with default aggregator values. + // Adding a HAVING retains the row, if the HAVING matches the default aggregators. + testQuery( + "SELECT\n" + + " COUNT(*)\n" + + "FROM druid.numfoo\n" + + "WHERE __time >= TIMESTAMP '4000-01-01 00:00:00' AND __time < TIMESTAMP '4001-01-01 00:00:00'\n" + + "HAVING COUNT(*) = 0", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE3) + .setInterval(querySegmentSpec(Intervals.of("4000/P1Y"))) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .setHavingSpec(having(equality("a0", 0L, ColumnType.LONG))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{0L} + ) + ); + } + @Test + public void testEmptyResultsAggregatorWithHavingFalse() + { + // GROUP BY () that matches nothing should return an empty result row with default aggregator values. + // Adding a HAVING omits the row, if the HAVING does not match the default aggregators. + testQuery( + "SELECT\n" + + " COUNT(*)\n" + + "FROM druid.numfoo\n" + + "WHERE __time >= TIMESTAMP '4000-01-01 00:00:00' AND __time < TIMESTAMP '4001-01-01 00:00:00'\n" + + "HAVING COUNT(*) = 1", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(CalciteTests.DATASOURCE3) + .setInterval(querySegmentSpec(Intervals.of("4000/P1Y"))) + .setGranularity(Granularities.ALL) + .setAggregatorSpecs(new CountAggregatorFactory("a0")) + .setHavingSpec(having(equality("a0", 1L, ColumnType.LONG))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of() + ); + } + + @Test + public void testTimeseriesEmptyResultsAggregatorDefaultValuesTimeFilterMatchesNothing() + { + // timeseries with all granularity has a single group, so should return default results for given aggregators. + testQuery( + "SELECT\n" + + " count(*),\n" + + " COUNT(DISTINCT dim1),\n" + + " APPROX_COUNT_DISTINCT(distinct dim1),\n" + + " sum(dbl1),\n" + + " max(dbl1),\n" + + " min(dbl1),\n" + + " sum(l1),\n" + + " max(l1),\n" + + " min(l1),\n" + + " avg(l1),\n" + + " avg(dbl1)\n" + + "FROM druid.numfoo\n" + + "WHERE __time >= TIMESTAMP '4000-01-01 00:00:00' AND __time < TIMESTAMP '4001-01-01 00:00:00'", + ImmutableList.of( + Druids.newTimeseriesQueryBuilder() + .dataSource(CalciteTests.DATASOURCE3) + .intervals(querySegmentSpec(Intervals.of("4000/P1Y"))) + .granularity(Granularities.ALL) + .aggregators( + aggregators( + new CountAggregatorFactory("a0"), + new CardinalityAggregatorFactory( + "a1", + null, + ImmutableList.of(DefaultDimensionSpec.of("dim1")), + false, + true + ), + new CardinalityAggregatorFactory( + "a2", + null, + ImmutableList.of(DefaultDimensionSpec.of("dim1")), + false, + true + ), + new DoubleSumAggregatorFactory("a3", "dbl1"), + new DoubleMaxAggregatorFactory("a4", "dbl1"), + new DoubleMinAggregatorFactory("a5", "dbl1"), + new LongSumAggregatorFactory("a6", "l1"), + new LongMaxAggregatorFactory("a7", "l1"), + new LongMinAggregatorFactory("a8", "l1"), + new DoubleSumAggregatorFactory("a9:sum", "l1"), + new FilteredAggregatorFactory( + new CountAggregatorFactory("a9:count"), + notNull("l1") + ), + new DoubleSumAggregatorFactory("a10:sum", "dbl1"), + new FilteredAggregatorFactory( + new CountAggregatorFactory("a10:count"), + notNull("dbl1") + ) + ) + ) + .postAggregators( + new ArithmeticPostAggregator( + "a9", + "quotient", + ImmutableList.of( + new FieldAccessPostAggregator(null, "a9:sum"), + new FieldAccessPostAggregator(null, "a9:count") + ) + ), + new ArithmeticPostAggregator( + "a10", + "quotient", + ImmutableList.of( + new FieldAccessPostAggregator(null, "a10:sum"), + new FieldAccessPostAggregator(null, "a10:count") + ) + ) + ) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of(new Object[]{0L, 0L, 0L, null, null, null, null, null, null, null, null}) + ); + } + + @Test + public void testTimeseriesEmptyResultsAggregatorDefaultValuesNonVectorized() + { + // This test is like testTimeseriesEmptyResultsAggregatorDefaultValues, but includes some non-vectorizable + // aggregators. cannotVectorize(); - // timeseries with all granularity have a single group, so should return default results for given aggregators + + // timeseries with all granularity has a single group, so should return default results for given aggregators. testQuery( "SELECT\n" + " ANY_VALUE(dim1, 1024),\n"