Skip to content

Commit

Permalink
MSQ: Write summary row for GROUP BY (). (#16326)
Browse files Browse the repository at this point in the history

This allows us to remove "msqIncompatible()" from a couple of tests that involve GROUP BY () summary rows.

To handle the case where the SQL layer drops a dimension like GROUP BY 'constant', this patch also adds a "hasDroppedDimensions" context flag to the groupBy query.
  • Loading branch information
gianm authored Feb 6, 2025
1 parent 3c25ddc commit c9b3585
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
Expand All @@ -44,6 +45,7 @@
import org.apache.druid.query.groupby.having.AlwaysHavingSpec;
import org.apache.druid.query.groupby.having.DimFilterHavingSpec;
import org.apache.druid.query.groupby.having.HavingSpec;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class GroupByPostShuffleFrameProcessor implements FrameProcessor<Object>
private Supplier<ResultRow> rowSupplierFromFrameCursor;
private ResultRow outputRow = null;
private FrameWriter frameWriter = null;
private long outputRows = 0L;

public GroupByPostShuffleFrameProcessor(
final GroupByQuery query,
Expand Down Expand Up @@ -139,6 +142,7 @@ public ReturnOrAwait<Object> runIncrementally(final IntSet readableInputs) throw
}

writeCurrentFrameIfNeeded();
writeEmptyAggregationsFrameIfNeeded();
return ReturnOrAwait.returnObject(Unit.instance());
} else {
final Frame frame = inputChannel.read();
Expand Down Expand Up @@ -263,6 +267,40 @@ private void writeCurrentFrameIfNeeded() throws IOException
outputChannel.write(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION));
frameWriter.close();
frameWriter = null;
outputRows += frame.numRows();
}
}

/**
* Generate a frame containing a single row with default values of all aggregations if needed. This method uses
* {@link GroupingEngine#summaryRowPreconditions(GroupByQuery)} to determine if such an operation is needed.
*
* Note that in cases where {@link GroupingEngine#summaryRowPreconditions(GroupByQuery)} returns true, the
* preceding {@link GroupByPreShuffleFrameProcessorFactory} stage would use an empty {@link ClusterBy}. Therefore,
* there would only be a single output partition of the prior stage, and therefore a single instance of
* this processor. This ensures that only a single null-aggregations row is generated for the entire stage.
*/
private void writeEmptyAggregationsFrameIfNeeded() throws IOException
{
if (outputRows == 0 && GroupingEngine.summaryRowPreconditions(query)) {
final int resultRowSize = query.getResultRowSignature().size();
this.outputRow = ResultRow.create(resultRowSize);
final Object[] emptyResultArray = TimeseriesQueryQueryToolChest.getEmptyAggregations(query.getAggregatorSpecs());
if (query.getResultRowHasTimestamp()) {
// Can happen if the query has granularity "ALL" but no intervals. In this case nothing is matched and the
// __time column will be ignored, but it's there in the result row signature anyway, so we need to populate it.
outputRow.set(0, 0L);
}
System.arraycopy(
emptyResultArray,
0,
outputRow.getArray(),
query.getResultRowAggregatorStart(),
emptyResultArray.length
);
setUpFrameWriterIfNeeded();
writeOutputRow();
writeCurrentFrameIfNeeded();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public QueryDefinition makeQueryDefinition(

partitionBoost = true;
} else {
shuffleSpecFactoryPreAggregation = doLimitOrOffset
shuffleSpecFactoryPreAggregation = doLimitOrOffset || intermediateClusterBy.isEmpty()
? ShuffleSpecFactories.singlePartition()
: resultShuffleSpecFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,4 @@ public void testJoinMultipleTablesWithWhereCondition()
)
.run();
}

@Override
@Test
public void testFilterParseLongNullable()
{
// this isn't really correct in default value mode, the result should be ImmutableList.of(new Object[]{0L})
// but MSQ is missing default aggregator values in empty group results. this override can be removed when this
// is fixed
testBuilder().queryContext(QUERY_CONTEXT_DEFAULT)
.sql("select count(*) from druid.foo where parse_long(dim1, 10) is null")
.expectedResults(
ImmutableList.of(new Object[]{4L})
)
.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ public class GroupByQuery extends BaseQuery<ResultRow>
public static final String CTX_TIMESTAMP_RESULT_FIELD = "timestampResultField";
public static final String CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY = "timestampResultFieldGranularity";
public static final String CTX_TIMESTAMP_RESULT_FIELD_INDEX = "timestampResultFieldInOriginalDimensions";

/**
* Context key for whether this query has any "dropped" dimensions. This is set true for queries like
* "GROUP BY 'constant'", and enables {@link GroupingEngine#summaryRowPreconditions(GroupByQuery)} to correctly
* determine whether to include a summary row.
*/
public static final String CTX_HAS_DROPPED_DIMENSIONS = "hasDroppedDimensions";
private static final String CTX_KEY_FUDGE_TIMESTAMP = "fudgeTimestamp";

private static final Comparator<ResultRow> NON_GRANULAR_TIME_COMP =
Expand Down Expand Up @@ -465,6 +472,14 @@ public boolean getApplyLimitPushDownFromContext()
return context().getBoolean(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true);
}

/**
* See {@link #CTX_HAS_DROPPED_DIMENSIONS}.
*/
public boolean hasDroppedDimensions()
{
return context().getBoolean(CTX_HAS_DROPPED_DIMENSIONS, false);
}

@Override
public Ordering getResultOrdering()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,10 @@ public static Sequence<ResultRow> wrapSummaryRowIfNeeded(GroupByQuery query, Seq
}));
}

private static boolean summaryRowPreconditions(GroupByQuery query)
/**
* Whether a query should include a summary row. True for queries that correspond to SQL GROUP BY ().
*/
public static boolean summaryRowPreconditions(GroupByQuery query)
{
LimitSpec limit = query.getLimitSpec();
if (limit instanceof DefaultLimitSpec) {
Expand All @@ -992,7 +995,7 @@ private static boolean summaryRowPreconditions(GroupByQuery query)
return false;
}
}
if (!query.getDimensions().isEmpty()) {
if (!query.getDimensions().isEmpty() || query.hasDroppedDimensions()) {
return false;
}
if (query.getGranularity().isFinerThan(Granularities.ALL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,9 @@ public Sequence<Result<TimeseriesResultValue>> doRun(
// Usally it is NOT Okay to materialize results via toList(), but Granularity is ALL thus
// we have only one record.
final List<Result<TimeseriesResultValue>> val = baseResults.toList();
finalSequence = val.isEmpty() ? Sequences.simple(Collections.singletonList(
getNullTimeseriesResultValue(query))) : Sequences.simple(val);
finalSequence = val.isEmpty()
? Sequences.simple(Collections.singletonList(getEmptyTimeseriesResultValue(query)))
: Sequences.simple(val);
} else {
finalSequence = baseResults;
}
Expand Down Expand Up @@ -227,32 +228,17 @@ public Comparator<Result<TimeseriesResultValue>> createResultComparator(Query<Re
return ResultGranularTimestampComparator.create(query.getGranularity(), ((TimeseriesQuery) query).isDescending());
}

private Result<TimeseriesResultValue> getNullTimeseriesResultValue(TimeseriesQuery query)
/**
* Returns a {@link TimeseriesResultValue} that corresponds to an empty-set aggregation, which is used in situations
* where we want to return a single result representing "nothing was aggregated".
*/
Result<TimeseriesResultValue> getEmptyTimeseriesResultValue(TimeseriesQuery query)
{
List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
String[] aggregatorNames = new String[aggregatorSpecs.size()];
RowSignature aggregatorsSignature =
RowSignature.builder().addAggregators(aggregatorSpecs, RowSignature.Finalization.UNKNOWN).build();
for (int i = 0; i < aggregatorSpecs.size(); i++) {
aggregators[i] =
aggregatorSpecs.get(i)
.factorize(
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
() -> new MapBasedRow(null, null),
aggregatorsSignature,
false,
false
)
);
aggregatorNames[i] = aggregatorSpecs.get(i).getName();
}
final Object[] resultArray = getEmptyAggregations(query.getAggregatorSpecs());
final DateTime start = query.getIntervals().isEmpty() ? DateTimes.EPOCH : query.getIntervals().get(0).getStart();
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(start);
for (int i = 0; i < aggregatorSpecs.size(); i++) {
bob.addMetric(aggregatorNames[i], aggregators[i].get());
aggregators[i].close();
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
bob.addMetric(query.getAggregatorSpecs().get(i).getName(), resultArray[i]);
}
return bob.build();
}
Expand Down Expand Up @@ -545,4 +531,36 @@ private Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> m
);
};
}

/**
* Returns a set of values that corresponds to an empty-set aggregation, which is used in situations
* where we want to return a single result representing "nothing was aggregated". The returned array has
* one element per {@link AggregatorFactory}.
*/
public static Object[] getEmptyAggregations(List<AggregatorFactory> aggregatorSpecs)
{
final Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
final RowSignature aggregatorsSignature =
RowSignature.builder().addAggregators(aggregatorSpecs, RowSignature.Finalization.UNKNOWN).build();
for (int i = 0; i < aggregatorSpecs.size(); i++) {
aggregators[i] =
aggregatorSpecs.get(i)
.factorize(
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
() -> new MapBasedRow(null, null),
aggregatorsSignature,
false,
false
)
);
}

final Object[] retVal = new Object[aggregatorSpecs.size()];
for (int i = 0; i < aggregatorSpecs.size(); i++) {
retVal[i] = aggregators[i].get();
aggregators[i].close();
}
return retVal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@RunWith(Parameterized.class)
public class TimeseriesQueryQueryToolChestTest
public class TimeseriesQueryQueryToolChestTest extends InitializedNullHandlingTest
{
private static final String TIMESTAMP_RESULT_FIELD_NAME = "d0";
private static final TimeseriesQueryQueryToolChest TOOL_CHEST = new TimeseriesQueryQueryToolChest(null);
Expand Down Expand Up @@ -449,4 +452,27 @@ public void testResultsAsArrays()
)
);
}

@Test
public void testGetEmptyTimeseriesResultValue()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.intervals("2000/P1D")
.dataSource("foo")
.aggregators(new CountAggregatorFactory("a0"), new LongSumAggregatorFactory("a1", "nofield"))
.build();

final Map<String, Object> resultMap = new HashMap<>();
resultMap.put("a0", 0L);
resultMap.put("a1", null);

Assert.assertEquals(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(resultMap)
),
TOOL_CHEST.getEmptyTimeseriesResultValue(query)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ private GroupByQuery toGroupByQuery()
if (queryGranularity != null) {
// group by more than one timestamp_floor
// eg: group by timestamp_floor(__time to DAY),timestamp_floor(__time, to HOUR)
queryGranularity = null;
theContext.clear();
break;
}
queryGranularity = granularity;
Expand All @@ -1449,7 +1449,13 @@ private GroupByQuery toGroupByQuery()
}
}
}
if (queryGranularity == null) {

if (grouping.getDimensions().isEmpty() && grouping.hasGroupingDimensionsDropped()) {
// GROUP BY ().
theContext.put(GroupByQuery.CTX_HAS_DROPPED_DIMENSIONS, grouping.hasGroupingDimensionsDropped());
}

if (theContext.isEmpty()) {
return query;
}
return query.withOverriddenContext(theContext);
Expand Down
Loading

0 comments on commit c9b3585

Please sign in to comment.