-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduled batch supervisor #17353
Merged
abhishekrb19
merged 73 commits into
apache:master
from
abhishekrb19:scheduled_batch_supervisor
Feb 11, 2025
Merged
Scheduled batch supervisor #17353
Changes from all commits
Commits
Show all changes
73 commits
Select commit
Hold shift + click to select a range
2afb4a9
Initial version of scheduled batch supervisor implementation.
abhishekrb19 3a20d0c
Minor logging adjustments.
abhishekrb19 7d4b93a
Address first round of static check issues.
abhishekrb19 80df85d
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 5dd0e63
Forbidden API check and checkstyle.
abhishekrb19 a15b0f0
Fix checkstyle and tests with new brokerClient.
abhishekrb19 e18b183
Class renames and fixup tests.
abhishekrb19 c6b79d9
Move SqlTaskStatusTest to sql package as well alongside its implement…
abhishekrb19 179f8fa
Rename tests to align with impl.
abhishekrb19 9d25c9a
Adjust test
abhishekrb19 11b6135
Intellij inspections.
abhishekrb19 8a0d660
Fix
abhishekrb19 23d73de
BrokerServiceModule.
abhishekrb19 5a33da6
Try licenses.yaml
abhishekrb19 a4aa243
Remove unused dependency and add missing undeclared dep.
abhishekrb19 cdcdf59
Simplify explain plan stuff.
abhishekrb19 00d133c
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 21bfc75
Better unix and quartz scheduler mapping and tests.
abhishekrb19 45fbd58
Only unix cron supports macros.
abhishekrb19 dfed561
Remove extraneous changes.
abhishekrb19 38f2447
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 0ef4a6d
Lines.
abhishekrb19 f88e0c7
Cleanup and renames for clarity.
abhishekrb19 55358f2
Rename java package name to scheduledbatch from batch.
abhishekrb19 fd1d33d
Checkpoint some changes.
abhishekrb19 345801c
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 066c4d6
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 21eaac5
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 7a44a11
minor inject status tracker.
abhishekrb19 92667ec
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 c34fcc8
Some status tracker cleanup.
abhishekrb19 afa0701
refactor ScheduledBatchSchedulerTest.
abhishekrb19 dfa8170
Constants
abhishekrb19 80fb1fd
local variables.
abhishekrb19 f78ac28
Cleanup scheduler config interface methods.
abhishekrb19 4b0fafe
Misc cleanup.
abhishekrb19 e3f49da
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 90c5559
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 13642d1
Checkpoint review changes.
abhishekrb19 9bfeafc
Move ExplainPlan and ExplainPlanAttributes classes to processing modu…
abhishekrb19 5429f4f
Move SqlTaskStatus to processing for better reuse.
abhishekrb19 9ac43d4
Add ClientSqlQuery and ClientSqlQueryTest placeholders in processing.
abhishekrb19 a9f6725
Move Broker, BrokerClient and BrokerClientImpl to server package alon…
abhishekrb19 eaaaf7b
Add simple test for ClientSqlQuery and clean up methods.
abhishekrb19 9cb3edf
Checkstyle fixes.
abhishekrb19 bd90cbc
Add back getters as we need them.
abhishekrb19 bea8b3f
Guice provider for BrokerClient in the ServiceClientModule. Remove Br…
abhishekrb19 89d79ff
Merge branch 'refactor_broker_classes' into scheduled_batch_superviso…
abhishekrb19 d295323
Adjust imports per the refactors.
abhishekrb19 0a4747d
Checkstyle fix.
abhishekrb19 0616081
Review comments and add ClientSqlParameter for the equivalent SqlPara…
abhishekrb19 b77d143
One more checkstyle import fix.
abhishekrb19 460941c
More checkstyle import order fixes...Checstyle --fail-at-end doesn't …
abhishekrb19 92fbde7
Merge branch 'refactor_broker_classes' into scheduled_batch_superviso…
abhishekrb19 8af963c
Checkpoint review changes.
abhishekrb19 6b57055
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 b5b459e
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 740b5be
Merge branch 'master' into scheduled_batch_supervisor_with_broker
abhishekrb19 8ed2854
Merge branch 'scheduled_batch_supervisor_with_broker' into scheduled_…
abhishekrb19 fb4fd92
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 4894011
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 07a0094
A shared single-threaded jobs executor.
abhishekrb19 be922f6
Remove cronExecutor to keep things simple.
abhishekrb19 8c17ea8
Suspended supervisor stops further scheduling.
abhishekrb19 76e94b3
Checkstyle.
abhishekrb19 f10296b
Remove unused supervisor state enum values.
abhishekrb19 5403a0f
Remove avatica dependency.
abhishekrb19 5addd13
Verify state again inside the runnable.
abhishekrb19 debfadb
Merge branch 'master' into scheduled_batch_supervisor
abhishekrb19 edfa8da
Review
abhishekrb19 14ff95b
Remove transitive dep licenses
abhishekrb19 389ef26
Add back jakarta license.
abhishekrb19 619fd0d
Review
abhishekrb19 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
73 changes: 73 additions & 0 deletions
73
...ice/src/main/java/org/apache/druid/indexing/scheduledbatch/BatchSupervisorTaskReport.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.scheduledbatch; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* Represents a summary report of task execution for a batch supervisor, including total counts | ||
* of submitted, successful, and failed tasks, along with a recent set of tasks. | ||
*/ | ||
public class BatchSupervisorTaskReport | ||
{ | ||
private final int totalSubmittedTasks; | ||
private final int totalSuccessfulTasks; | ||
private final int totalFailedTasks; | ||
private final List<BatchSupervisorTaskStatus> recentTasks; | ||
|
||
public BatchSupervisorTaskReport( | ||
@JsonProperty("totalSubmittedTasks") final int totalSubmittedTasks, | ||
@JsonProperty("totalSuccessfulTasks") final int totalSuccessfulTasks, | ||
@JsonProperty("totalFailedTasks") final int totalFailedTasks, | ||
@JsonProperty("recentTasks") final List<BatchSupervisorTaskStatus> recentTasks | ||
) | ||
{ | ||
this.totalSubmittedTasks = totalSubmittedTasks; | ||
this.totalSuccessfulTasks = totalSuccessfulTasks; | ||
this.totalFailedTasks = totalFailedTasks; | ||
this.recentTasks = recentTasks; | ||
} | ||
|
||
@JsonProperty | ||
public int getTotalSubmittedTasks() | ||
{ | ||
return totalSubmittedTasks; | ||
} | ||
|
||
@JsonProperty | ||
public int getTotalSuccessfulTasks() | ||
{ | ||
return totalSuccessfulTasks; | ||
} | ||
|
||
@JsonProperty | ||
public int getTotalFailedTasks() | ||
{ | ||
return totalFailedTasks; | ||
} | ||
|
||
@JsonProperty | ||
public List<BatchSupervisorTaskStatus> getRecentTasks() | ||
{ | ||
return recentTasks; | ||
} | ||
} |
78 changes: 78 additions & 0 deletions
78
...ice/src/main/java/org/apache/druid/indexing/scheduledbatch/BatchSupervisorTaskStatus.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.scheduledbatch; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnore; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import org.apache.druid.indexer.TaskStatus; | ||
import org.joda.time.DateTime; | ||
|
||
/** | ||
* Represents the status of a scheduled batch supervisor task and the timestamp of its last update. | ||
*/ | ||
public class BatchSupervisorTaskStatus | ||
{ | ||
private final String supervisorId; | ||
private final TaskStatus taskStatus; | ||
private final DateTime updatedTime; | ||
|
||
public BatchSupervisorTaskStatus( | ||
String supervisorId, // This field is used only for internal tracking, so not Jackson serializable | ||
@JsonProperty("taskStatus") TaskStatus taskStatus, | ||
@JsonProperty("updatedTime") DateTime updatedTime | ||
) | ||
{ | ||
this.supervisorId = supervisorId; | ||
this.taskStatus = taskStatus; | ||
this.updatedTime = updatedTime; | ||
} | ||
|
||
/** | ||
* Used for internal tracking. So this field is *not* Jackson serialized to avoid | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I am not sure if markdown is honored in javadocs. Use |
||
* redundant information in the user-facing objects. | ||
*/ | ||
@JsonIgnore | ||
public String getSupervisorId() | ||
{ | ||
return supervisorId; | ||
} | ||
|
||
@JsonProperty | ||
public TaskStatus getTaskStatus() | ||
{ | ||
return taskStatus; | ||
} | ||
|
||
@JsonProperty | ||
public DateTime getUpdatedTime() | ||
{ | ||
return updatedTime; | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
return "BatchSupervisorTaskStatus{" + | ||
"supervisorId='" + supervisorId + '\'' + | ||
", taskStatus=" + taskStatus + | ||
", updatedTime=" + updatedTime + | ||
'}'; | ||
} | ||
} |
52 changes: 52 additions & 0 deletions
52
...g-service/src/main/java/org/apache/druid/indexing/scheduledbatch/CronSchedulerConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.scheduledbatch; | ||
|
||
import com.fasterxml.jackson.annotation.JsonSubTypes; | ||
import com.fasterxml.jackson.annotation.JsonTypeInfo; | ||
import org.joda.time.DateTime; | ||
import org.joda.time.Duration; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
/** | ||
* Interface representing a configuration for scheduling tasks based on cron expressions. | ||
*/ | ||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") | ||
@JsonSubTypes(value = { | ||
@JsonSubTypes.Type(name = UnixCronSchedulerConfig.TYPE, value = UnixCronSchedulerConfig.class), | ||
@JsonSubTypes.Type(name = QuartzCronSchedulerConfig.TYPE, value = QuartzCronSchedulerConfig.class), | ||
}) | ||
public interface CronSchedulerConfig | ||
{ | ||
/** | ||
* Gets the next task submission time after the specified {@code referenceTime}, or {@code null} if no future | ||
* execution time exists based on the configured schedule (e.g., if the cron expression only specifies dates in the past). | ||
*/ | ||
@Nullable | ||
DateTime getNextTaskStartTimeAfter(DateTime referenceTime); | ||
|
||
/** | ||
* Gets the duration until the next task submission after the specified {@code referenceTime}, if no future | ||
* execution time exists based on the configured schedule (e.g., if the cron expression only specifies dates in the past). | ||
*/ | ||
@Nullable | ||
Duration getDurationUntilNextTaskStartTimeAfter(DateTime referenceTime); | ||
} |
76 changes: 76 additions & 0 deletions
76
...ng-service/src/main/java/org/apache/druid/indexing/scheduledbatch/CronSchedulerUtils.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.apache.druid.indexing.scheduledbatch; | ||
|
||
import com.cronutils.model.time.ExecutionTime; | ||
import org.apache.druid.java.util.common.DateTimes; | ||
import org.joda.time.DateTime; | ||
import org.joda.time.Duration; | ||
|
||
import javax.annotation.Nullable; | ||
import java.time.Instant; | ||
import java.time.ZoneId; | ||
import java.time.ZonedDateTime; | ||
import java.util.Optional; | ||
|
||
public class CronSchedulerUtils | ||
{ | ||
/** | ||
* Computes the next task submission time after the specified {@code referenceTime}. | ||
* <p> | ||
* Returns {@code null} if no future execution time exists based on the given {@code executionTime}, | ||
* which can occur if the schedule does not define any future occurrences. | ||
* </p> | ||
*/ | ||
@Nullable | ||
static DateTime getNextTaskStartTimeAfter(final ExecutionTime executionTime, final DateTime referenceTime) | ||
abhishekrb19 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
final Optional<ZonedDateTime> zonedDateTime = executionTime.nextExecution(convertToZonedDateTime(referenceTime)); | ||
if (zonedDateTime.isPresent()) { | ||
final ZonedDateTime zdt = zonedDateTime.get(); | ||
final Instant instant = zdt.toInstant(); | ||
return new DateTime(instant.toEpochMilli(), DateTimes.inferTzFromString(zdt.getZone().getId())); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
/** | ||
* Computes the duration until the next task submission time after the specified {@code referenceTime}. | ||
* <p> | ||
* Returns {@code null} if no future execution time exists based on the given {@code executionTime}, | ||
* which can occur if the schedule does not define any future occurrences. | ||
* </p> | ||
*/ | ||
@Nullable | ||
static Duration getDurationUntilNextTaskStartTimeAfter(final ExecutionTime executionTime, final DateTime referenceTime) | ||
{ | ||
final Optional<java.time.Duration> duration = executionTime.timeToNextExecution(convertToZonedDateTime(referenceTime)); | ||
return duration.map(value -> Duration.millis(value.toMillis())).orElse(null); | ||
} | ||
|
||
private static ZonedDateTime convertToZonedDateTime(final DateTime jodaDateTime) | ||
{ | ||
return ZonedDateTime.ofInstant( | ||
Instant.ofEpochMilli(jodaDateTime.getMillis()), | ||
ZoneId.of(jodaDateTime.getZone().getID()) | ||
); | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Since this constructor is not used for deserializing right now, we don't need the
@JsonProperty
annotation.