Skip to content

Commit

Permalink
Remove cpuTimeAcc from DataSource#createSegmentMapFunction's signature (
Browse files Browse the repository at this point in the history
#17623)

This patch removes cpuTimeAcc from DataSource#createSegmentMapFunction signature.

the method being executed doesn't need to know internally that they are being measured; it also complicates code-flow a bit - as it tries to avoid double counting.
  • Loading branch information
kgyrtkirk authored Feb 12, 2025
1 parent af1ad90 commit 5b43c91
Show file tree
Hide file tree
Showing 24 changed files with 114 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -163,7 +162,7 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(

if (segmentMapFnProcessor == null) {
final Function<SegmentReference, SegmentReference> segmentMapFn =
query.getDataSource().createSegmentMapFunction(query, new AtomicLong());
query.getDataSource().createSegmentMapFunction(query);
processorManager = processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(() -> segmentMapFnProcessor), processorManagerFn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void addFrame(final int channelNumber, final Frame frame)

private Function<SegmentReference, SegmentReference> createSegmentMapFunction()
{
return inlineChannelData(query.getDataSource()).createSegmentMapFunction(query, new AtomicLong());
return inlineChannelData(query.getDataSource()).createSegmentMapFunction(query);
}

DataSource inlineChannelData(final DataSource originalDataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -97,7 +96,7 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return Function.identity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public List<WritableFrameChannel> outputChannels()
@Override
public ReturnOrAwait<Function<SegmentReference, SegmentReference>> runIncrementally(final IntSet readableInputs)
{
return ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query, new AtomicLong()));
return ReturnOrAwait.returnObject(query.getDataSource().createSegmentMapFunction(query));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -113,7 +112,7 @@ public interface DataSource
* @param cpuTimeAcc the cpu time accumulator
* @return the segment function
*/
Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc);
Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query);

/**
* Returns an updated datasource based on the specified new source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.FilteredSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -122,19 +121,10 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAccumulator
)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
final Function<SegmentReference, SegmentReference> segmentMapFn = base.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment), filter)
);
final Function<SegmentReference, SegmentReference> segmentMapFn = base.createSegmentMapFunction(query);
return baseSegment -> new FilteredSegment(segmentMapFn.apply(baseSegment), filter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -171,7 +170,7 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query, AtomicLong cpuTimeAcc)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return Function.identity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -243,10 +242,7 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAcc
)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return Function.identity();
}
Expand Down
150 changes: 70 additions & 80 deletions processing/src/main/java/org/apache/druid/query/JoinDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
import org.apache.druid.segment.join.filter.JoinFilterPreAnalysisKey;
import org.apache.druid.segment.join.filter.JoinableClauses;
import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -65,7 +65,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -301,14 +300,12 @@ public Set<String> getVirtualColumnCandidates()

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTimeAccumulator
Query query
)
{
return createSegmentMapFunctionInternal(
analysis.getJoinBaseTableFilter().map(Filters::toFilter).orElse(null),
analysis.getPreJoinableClauses(),
cpuTimeAccumulator,
analysis.getBaseQuery().orElse(query)
);
}
Expand Down Expand Up @@ -444,86 +441,79 @@ private DataSourceAnalysis getAnalysisForDataSource()
private Function<SegmentReference, SegmentReference> createSegmentMapFunctionInternal(
@Nullable final Filter baseFilter,
final List<PreJoinableClause> clauses,
final AtomicLong cpuTimeAccumulator,
final Query<?> query
)
{
// compute column correlations here and RHS correlated values
return JvmUtils.safeAccumulateThreadCpuTime(
cpuTimeAccumulator,
() -> {
if (clauses.isEmpty()) {
return Function.identity();
} else {
final JoinableClauses joinableClauses = JoinableClauses.createClauses(
clauses,
joinableFactoryWrapper.getJoinableFactory()
);
final JoinFilterRewriteConfig filterRewriteConfig = JoinFilterRewriteConfig.forQuery(query);

// Pick off any join clauses that can be converted into filters.
final Set<String> requiredColumns = query.getRequiredColumns();
final Filter baseFilterToUse;
final List<JoinableClause> clausesToUse;

if (requiredColumns != null && filterRewriteConfig.isEnableRewriteJoinToFilter()) {
final Pair<List<Filter>, List<JoinableClause>> conversionResult = JoinableFactoryWrapper.convertJoinsToFilters(
joinableClauses.getJoinableClauses(),
requiredColumns,
Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), Integer.MAX_VALUE))
);

baseFilterToUse =
Filters.maybeAnd(
Lists.newArrayList(
Iterables.concat(
Collections.singleton(baseFilter),
conversionResult.lhs
)
)
).orElse(null);
clausesToUse = conversionResult.rhs;
} else {
baseFilterToUse = baseFilter;
clausesToUse = joinableClauses.getJoinableClauses();
}

// Analyze remaining join clauses to see if filters on them can be pushed down.
final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
new JoinFilterPreAnalysisKey(
filterRewriteConfig,
clausesToUse,
query.getVirtualColumns(),
Filters.maybeAnd(Arrays.asList(baseFilterToUse, Filters.toFilter(query.getFilter())))
.orElse(null)
if (clauses.isEmpty()) {
return Function.identity();
} else {
final JoinableClauses joinableClauses = JoinableClauses.createClauses(
clauses,
joinableFactoryWrapper.getJoinableFactory()
);
final JoinFilterRewriteConfig filterRewriteConfig = JoinFilterRewriteConfig.forQuery(query);

// Pick off any join clauses that can be converted into filters.
final Set<String> requiredColumns = query.getRequiredColumns();
final Filter baseFilterToUse;
final List<JoinableClause> clausesToUse;

if (requiredColumns != null && filterRewriteConfig.isEnableRewriteJoinToFilter()) {
final Pair<List<Filter>, List<JoinableClause>> conversionResult = JoinableFactoryWrapper.convertJoinsToFilters(
joinableClauses.getJoinableClauses(),
requiredColumns,
Ints.checkedCast(Math.min(filterRewriteConfig.getFilterRewriteMaxSize(), Integer.MAX_VALUE))
);

baseFilterToUse =
Filters.maybeAnd(
Lists.newArrayList(
Iterables.concat(
Collections.singleton(baseFilter),
conversionResult.lhs
)
)
);
final Function<SegmentReference, SegmentReference> baseMapFn;
// A join data source is not concrete
// And isConcrete() of an unnest datasource delegates to its base
// Hence, in the case of a Join -> Unnest -> Join
// if we just use isConcrete on the left
// the segment map function for the unnest would never get called
// This calls us to delegate to the segmentMapFunction of the left
// only when it is not a JoinDataSource
if (left instanceof JoinDataSource) {
baseMapFn = Function.identity();
} else {
baseMapFn = left.createSegmentMapFunction(
query,
cpuTimeAccumulator
);
}
return baseSegment ->
new HashJoinSegment(
baseMapFn.apply(baseSegment),
baseFilterToUse,
GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
joinFilterPreAnalysis
);
}
}
);
).orElse(null);
clausesToUse = conversionResult.rhs;
} else {
baseFilterToUse = baseFilter;
clausesToUse = joinableClauses.getJoinableClauses();
}

// Analyze remaining join clauses to see if filters on them can be pushed down.
final JoinFilterPreAnalysis joinFilterPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
new JoinFilterPreAnalysisKey(
filterRewriteConfig,
clausesToUse,
query.getVirtualColumns(),
Filters.maybeAnd(Arrays.asList(baseFilterToUse, Filters.toFilter(query.getFilter())))
.orElse(null)
)
);
final Function<SegmentReference, SegmentReference> baseMapFn;
// A join data source is not concrete
// And isConcrete() of an unnest datasource delegates to its base
// Hence, in the case of a Join -> Unnest -> Join
// if we just use isConcrete on the left
// the segment map function for the unnest would never get called
// This calls us to delegate to the segmentMapFunction of the left
// only when it is not a JoinDataSource
if (left instanceof JoinDataSource) {
baseMapFn = Function.identity();
} else {
baseMapFn = left.createSegmentMapFunction(
query
);
}
return baseSegment ->
new HashJoinSegment(
baseMapFn.apply(baseSegment),
baseFilterToUse,
GuavaUtils.firstNonNull(clausesToUse, ImmutableList.of()),
joinFilterPreAnalysis
);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -101,10 +100,7 @@ public boolean isConcrete()
}

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTime
)
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(Query query)
{
return Function.identity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

@JsonTypeName("query")
Expand Down Expand Up @@ -110,12 +109,11 @@ public boolean isConcrete()

@Override
public Function<SegmentReference, SegmentReference> createSegmentMapFunction(
Query query,
AtomicLong cpuTime
Query query
)
{
final Query<?> subQuery = this.getQuery();
return subQuery.getDataSource().createSegmentMapFunction(subQuery, cpuTime);
return subQuery.getDataSource().createSegmentMapFunction(subQuery);
}

@Override
Expand Down
Loading

0 comments on commit 5b43c91

Please sign in to comment.