Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled batch supervisor #17353

Merged
merged 73 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
2afb4a9
Initial version of scheduled batch supervisor implementation.
abhishekrb19 Oct 15, 2024
3a20d0c
Minor logging adjustments.
abhishekrb19 Oct 15, 2024
7d4b93a
Address first round of static check issues.
abhishekrb19 Oct 15, 2024
80df85d
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Oct 16, 2024
5dd0e63
Forbidden API check and checkstyle.
abhishekrb19 Oct 16, 2024
a15b0f0
Fix checkstyle and tests with new brokerClient.
abhishekrb19 Oct 16, 2024
e18b183
Class renames and fixup tests.
abhishekrb19 Oct 16, 2024
c6b79d9
Move SqlTaskStatusTest to sql package as well alongside its implement…
abhishekrb19 Oct 16, 2024
179f8fa
Rename tests to align with impl.
abhishekrb19 Oct 16, 2024
9d25c9a
Adjust test
abhishekrb19 Oct 16, 2024
11b6135
Intellij inspections.
abhishekrb19 Oct 16, 2024
8a0d660
Fix
abhishekrb19 Oct 16, 2024
23d73de
BrokerServiceModule.
abhishekrb19 Oct 16, 2024
5a33da6
Try licenses.yaml
abhishekrb19 Oct 16, 2024
a4aa243
Remove unused dependency and add missing undeclared dep.
abhishekrb19 Oct 16, 2024
cdcdf59
Simplify explain plan stuff.
abhishekrb19 Oct 16, 2024
00d133c
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Oct 16, 2024
21bfc75
Better unix and quartz scheduler mapping and tests.
abhishekrb19 Oct 16, 2024
45fbd58
Only unix cron supports macros.
abhishekrb19 Oct 16, 2024
dfed561
Remove extraneous changes.
abhishekrb19 Oct 16, 2024
38f2447
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Oct 16, 2024
0ef4a6d
Lines.
abhishekrb19 Oct 16, 2024
f88e0c7
Cleanup and renames for clarity.
abhishekrb19 Oct 17, 2024
55358f2
Rename java package name to scheduledbatch from batch.
abhishekrb19 Oct 17, 2024
fd1d33d
Checkpoint some changes.
abhishekrb19 Oct 17, 2024
345801c
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Oct 22, 2024
066c4d6
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Oct 23, 2024
21eaac5
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Oct 23, 2024
7a44a11
minor inject status tracker.
abhishekrb19 Oct 23, 2024
92667ec
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Oct 27, 2024
c34fcc8
Some status tracker cleanup.
abhishekrb19 Oct 27, 2024
afa0701
refactor ScheduledBatchSchedulerTest.
abhishekrb19 Oct 27, 2024
dfa8170
Constants
abhishekrb19 Oct 27, 2024
80fb1fd
local variables.
abhishekrb19 Oct 27, 2024
f78ac28
Cleanup scheduler config interface methods.
abhishekrb19 Oct 27, 2024
4b0fafe
Misc cleanup.
abhishekrb19 Oct 28, 2024
e3f49da
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Nov 5, 2024
90c5559
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Nov 27, 2024
13642d1
Checkpoint review changes.
abhishekrb19 Nov 29, 2024
9bfeafc
Move ExplainPlan and ExplainPlanAttributes classes to processing modu…
abhishekrb19 Dec 6, 2024
5429f4f
Move SqlTaskStatus to processing for better reuse.
abhishekrb19 Dec 6, 2024
9ac43d4
Add ClientSqlQuery and ClientSqlQueryTest placeholders in processing.
abhishekrb19 Dec 6, 2024
a9f6725
Move Broker, BrokerClient and BrokerClientImpl to server package alon…
abhishekrb19 Dec 6, 2024
eaaaf7b
Add simple test for ClientSqlQuery and clean up methods.
abhishekrb19 Dec 6, 2024
9cb3edf
Checkstyle fixes.
abhishekrb19 Dec 6, 2024
bd90cbc
Add back getters as we need them.
abhishekrb19 Dec 6, 2024
bea8b3f
Guice provider for BrokerClient in the ServiceClientModule. Remove Br…
abhishekrb19 Dec 6, 2024
89d79ff
Merge branch 'refactor_broker_classes' into scheduled_batch_superviso…
abhishekrb19 Dec 6, 2024
d295323
Adjust imports per the refactors.
abhishekrb19 Dec 6, 2024
0a4747d
Checkstyle fix.
abhishekrb19 Dec 6, 2024
0616081
Review comments and add ClientSqlParameter for the equivalent SqlPara…
abhishekrb19 Dec 6, 2024
b77d143
One more checkstyle import fix.
abhishekrb19 Dec 6, 2024
460941c
More checkstyle import order fixes...Checstyle --fail-at-end doesn't …
abhishekrb19 Dec 6, 2024
92fbde7
Merge branch 'refactor_broker_classes' into scheduled_batch_superviso…
abhishekrb19 Dec 6, 2024
8af963c
Checkpoint review changes.
abhishekrb19 Dec 6, 2024
6b57055
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Dec 6, 2024
b5b459e
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Dec 6, 2024
740b5be
Merge branch 'master' into scheduled_batch_supervisor_with_broker
abhishekrb19 Dec 6, 2024
8ed2854
Merge branch 'scheduled_batch_supervisor_with_broker' into scheduled_…
abhishekrb19 Dec 6, 2024
fb4fd92
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Dec 6, 2024
4894011
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Dec 20, 2024
07a0094
A shared single-threaded jobs executor.
abhishekrb19 Dec 20, 2024
be922f6
Remove cronExecutor to keep things simple.
abhishekrb19 Dec 20, 2024
8c17ea8
Suspended supervisor stops further scheduling.
abhishekrb19 Dec 20, 2024
76e94b3
Checkstyle.
abhishekrb19 Dec 20, 2024
f10296b
Remove unused supervisor state enum values.
abhishekrb19 Dec 20, 2024
5403a0f
Remove avatica dependency.
abhishekrb19 Dec 20, 2024
5addd13
Verify state again inside the runnable.
abhishekrb19 Dec 20, 2024
debfadb
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 Feb 10, 2025
edfa8da
Review
abhishekrb19 Feb 10, 2025
14ff95b
Remove transitive dep licenses
abhishekrb19 Feb 10, 2025
389ef26
Add back jakarta license.
abhishekrb19 Feb 10, 2025
619fd0d
Review
abhishekrb19 Feb 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions indexing-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@
<artifactId>commons-collections4</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>${cronutils.version}</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.inject.Binder;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.scheduledbatch.ScheduledBatchSupervisorSpec;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.server.coordinator.CompactionSupervisorConfig;

Expand All @@ -47,7 +48,8 @@ public List<? extends Module> getJacksonModules()
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(CompactionSupervisorSpec.class, CompactionSupervisorSpec.TYPE)
new NamedType(CompactionSupervisorSpec.class, CompactionSupervisorSpec.TYPE),
new NamedType(ScheduledBatchSupervisorSpec.class, ScheduledBatchSupervisorSpec.TYPE)
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.indexing.overlord.duty.OverlordDutyExecutor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.scheduledbatch.ScheduledBatchTaskManager;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
Expand Down Expand Up @@ -89,6 +90,7 @@ public DruidOverlord(
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue,
final CompactionScheduler compactionScheduler,
final ScheduledBatchTaskManager scheduledBatchTaskManager,
final ObjectMapper mapper,
final TaskContextEnricher taskContextEnricher
)
Expand Down Expand Up @@ -161,6 +163,7 @@ public void start()
{
taskMaster.becomeFullLeader();
compactionScheduler.start();
scheduledBatchTaskManager.start();

// Announce the node only after all the services have been initialized
initialized = true;
Expand All @@ -171,6 +174,7 @@ public void start()
public void stop()
{
serviceAnnouncer.unannounce(node);
scheduledBatchTaskManager.stop();
compactionScheduler.stop();
taskMaster.downgradeToHalfLeader();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.scheduledbatch;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

/**
* Represents a summary report of task execution for a batch supervisor, including total counts
* of submitted, successful, and failed tasks, along with a recent set of tasks.
*/
public class BatchSupervisorTaskReport
{
private final int totalSubmittedTasks;
private final int totalSuccessfulTasks;
private final int totalFailedTasks;
private final List<BatchSupervisorTaskStatus> recentTasks;

public BatchSupervisorTaskReport(
@JsonProperty("totalSubmittedTasks") final int totalSubmittedTasks,
@JsonProperty("totalSuccessfulTasks") final int totalSuccessfulTasks,
@JsonProperty("totalFailedTasks") final int totalFailedTasks,
@JsonProperty("recentTasks") final List<BatchSupervisorTaskStatus> recentTasks
)
{
this.totalSubmittedTasks = totalSubmittedTasks;
this.totalSuccessfulTasks = totalSuccessfulTasks;
this.totalFailedTasks = totalFailedTasks;
this.recentTasks = recentTasks;
}

@JsonProperty
public int getTotalSubmittedTasks()
{
return totalSubmittedTasks;
}

@JsonProperty
public int getTotalSuccessfulTasks()
{
return totalSuccessfulTasks;
}

@JsonProperty
public int getTotalFailedTasks()
{
return totalFailedTasks;
}

@JsonProperty
public List<BatchSupervisorTaskStatus> getRecentTasks()
{
return recentTasks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.scheduledbatch;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.TaskStatus;
import org.joda.time.DateTime;

/**
* Represents the status of a scheduled batch supervisor task and the timestamp of its last update.
*/
public class BatchSupervisorTaskStatus
{
private final String supervisorId;
private final TaskStatus taskStatus;
private final DateTime updatedTime;

public BatchSupervisorTaskStatus(
String supervisorId, // This field is used only for internal tracking, so not Jackson serializable
@JsonProperty("taskStatus") TaskStatus taskStatus,
@JsonProperty("updatedTime") DateTime updatedTime
Comment on lines +38 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since this constructor is not used for deserializing right now, we don't need the @JsonProperty annotation.

)
{
this.supervisorId = supervisorId;
this.taskStatus = taskStatus;
this.updatedTime = updatedTime;
}

/**
* Used for internal tracking. So this field is *not* Jackson serialized to avoid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I am not sure if markdown is honored in javadocs. Use <i>, <b> or caps instead for emphasis.

* redundant information in the user-facing objects.
*/
@JsonIgnore
public String getSupervisorId()
{
return supervisorId;
}

@JsonProperty
public TaskStatus getTaskStatus()
{
return taskStatus;
}

@JsonProperty
public DateTime getUpdatedTime()
{
return updatedTime;
}

@Override
public String toString()
{
return "BatchSupervisorTaskStatus{" +
"supervisorId='" + supervisorId + '\'' +
", taskStatus=" + taskStatus +
", updatedTime=" + updatedTime +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.scheduledbatch;

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;

/**
* Interface representing a configuration for scheduling tasks based on cron expressions.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = UnixCronSchedulerConfig.TYPE, value = UnixCronSchedulerConfig.class),
@JsonSubTypes.Type(name = QuartzCronSchedulerConfig.TYPE, value = QuartzCronSchedulerConfig.class),
})
public interface CronSchedulerConfig
{
/**
* Gets the next task submission time after the specified {@code referenceTime}, or {@code null} if no future
* execution time exists based on the configured schedule (e.g., if the cron expression only specifies dates in the past).
*/
@Nullable
DateTime getNextTaskStartTimeAfter(DateTime referenceTime);

/**
* Gets the duration until the next task submission after the specified {@code referenceTime}, if no future
* execution time exists based on the configured schedule (e.g., if the cron expression only specifies dates in the past).
*/
@Nullable
Duration getDurationUntilNextTaskStartTimeAfter(DateTime referenceTime);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.scheduledbatch;

import com.cronutils.model.time.ExecutionTime;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Optional;

public class CronSchedulerUtils
{
/**
* Computes the next task submission time after the specified {@code referenceTime}.
* <p>
* Returns {@code null} if no future execution time exists based on the given {@code executionTime},
* which can occur if the schedule does not define any future occurrences.
* </p>
*/
@Nullable
static DateTime getNextTaskStartTimeAfter(final ExecutionTime executionTime, final DateTime referenceTime)
abhishekrb19 marked this conversation as resolved.
Show resolved Hide resolved
{
final Optional<ZonedDateTime> zonedDateTime = executionTime.nextExecution(convertToZonedDateTime(referenceTime));
if (zonedDateTime.isPresent()) {
final ZonedDateTime zdt = zonedDateTime.get();
final Instant instant = zdt.toInstant();
return new DateTime(instant.toEpochMilli(), DateTimes.inferTzFromString(zdt.getZone().getId()));
} else {
return null;
}
}

/**
* Computes the duration until the next task submission time after the specified {@code referenceTime}.
* <p>
* Returns {@code null} if no future execution time exists based on the given {@code executionTime},
* which can occur if the schedule does not define any future occurrences.
* </p>
*/
@Nullable
static Duration getDurationUntilNextTaskStartTimeAfter(final ExecutionTime executionTime, final DateTime referenceTime)
{
final Optional<java.time.Duration> duration = executionTime.timeToNextExecution(convertToZonedDateTime(referenceTime));
return duration.map(value -> Duration.millis(value.toMillis())).orElse(null);
}

private static ZonedDateTime convertToZonedDateTime(final DateTime jodaDateTime)
{
return ZonedDateTime.ofInstant(
Instant.ofEpochMilli(jodaDateTime.getMillis()),
ZoneId.of(jodaDateTime.getZone().getID())
);
}
}
Loading
Loading