Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled batch supervisor #17353

Merged
merged 73 commits into from
Feb 11, 2025

Conversation

abhishekrb19
Copy link
Contributor

@abhishekrb19 abhishekrb19 commented Oct 15, 2024

This change introduces a scheduled batch supervisor in Druid. The supervisor periodically wakes up to submit an MSQ ingest query, allowing users to automate batch ingestion directly within Druid. Think of it as simple batch task workflows natively integrated into Druid, though it doesn't replace more sophisticated workflow management systems like Apache Airflow. This is an experimental feature.

Summary of changes:

The scheduled_batch supervisor can be configured as follows:

{
    "type": "scheduled_batch",
    "schedulerConfig": {
        "type": "unix",
        "schedule": "*/5 * * * *"
    },
    "spec": {
        "query": "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY DAY"
    },
    "suspended": false
}

The supervisor will submit the REPLACE sql query repeatedly every 5 minutes. The supervisor supports two types of cron scheduler configurations:

  1. Unix cron syntax:
    • Type must be set to unix.
    • Follows the standard Unix cron format, e.g., */5 * * * * to schedule the SQL task every 5 minutes.
    • Supports macro expressions such as @daily, @hourly, @monthly, etc.
  2. Quartz cron syntax:
    • Type must be set to quartz.
    • Offers more flexibility and control for scheduling tasks.
    • Example: 0 0 0 ? 3,6,9,12 MON-FRI to schedule tasks at midnight on weekdays during March, June, September, and December.

Key points:

  • User can specify the query along with any context in the spec. This structure is identical to what the MSQ task engine accepts.
  • Currently, the batch supervisor will repeatedly submit the DML SQL present in the spec as-is on its schedule.
  • This can be useful for recurring batch ingestion or reindexing tasks.
  • There is no parameterization or "change detection" support for input sources yet, but some simple mechanisms are planned for the future.
  • Users can configure multiple scheduled batch supervisors for the same datasource.
  • Only DML ingestion queries are currently supported by the scheduled batch supervisor.
  • Users can suspend, resume and terminate the scheduled batch supervisor. Suspending the supervisor stops new tasks from being scheduled.

Some implementation details:

  • The scheduled batch supervisor runs on the Overlord and communicates with the Broker to:
    a. Validate and parse the user-specified query.
    b. Submit MSQ queries to the /druid/v2/sql/task/ endpoint.
  • A ScheduledBatchScheduler is injected in the Overlord, which is responsible for scheduling and unscheduling all scheduled batch instances.
  • A BrokerClient implementation has been added, leveraging the ServiceClient functionality.
  • The SqlTaskStatus and its unit test SqlTaskStatusTest have been moved from the msq module to the sql module so it can be reused by the BrokerClient implementation in the sql module.
  • Added ExplainPlanInformation class, which is used to deserialize the explain plan response from the Broker.

The status API response for the supervisor contains the scheduler state along with active and completed tasks:

curl -X GET http://localhost:8888/druid/indexer/v1/supervisor/scheduled_batch__foo__3a916bcf-060d-4a42-aab6-904ff61d25cd/status
{
  "supervisorId": "scheduled_batch__foo__3764d545-62b8-4d52-a20c-568b009268e1",
  "state": "RUNNING",
  "lastTaskSubmittedTime": "2025-02-11T01:39:00.119Z",
  "nextTaskSubmissionTime": "2025-02-11T01:40:00.000Z",
  "taskReport": {
    "totalSubmittedTasks": 5,
    "totalSuccessfulTasks": 3,
    "totalFailedTasks": 2,
    "recentTasks": [
      {
        "taskStatus": {
          "id": "query-bb0fc147-2cde-4935-95ed-bf40a26568de",
          "status": "SUCCESS",
          "duration": 26628,
          "errorMsg": null,
          "location": {
            "host": null,
            "port": -1,
            "tlsPort": -1
          }
        },
        "updatedTime": "2025-02-11T01:37:26.780Z"
      },
      {
        "taskStatus": {
          "id": "query-1d51d1fc-ece5-4376-b64d-716ccecbd708",
          "status": "SUCCESS",
          "duration": 26367,
          "errorMsg": null,
          "location": {
            "host": null,
            "port": -1,
            "tlsPort": -1
          }
        },
        "updatedTime": "2025-02-11T01:38:26.493Z"
      },
      {
        "taskStatus": {
          "id": "query-baa8d243-ff57-4c18-b513-865687dd3e93",
          "status": "FAILED",
          "duration": 26197,
          "errorMsg": "Task execution process exited unsuccessfully with code[2]. See middleManager logs for more details.",
          "location": {
            "host": null,
            "port": -1,
            "tlsPort": -1
          }
        },
        "updatedTime": "2025-02-11T01:39:26.355Z"
      }
    ]
  }
}

Future Improvements:

  • Scheduler State Persistence: This feature doesn't yet persist the scheduler state. Adding persistence will improve robustness (e.g., preventing missed jobs during Overlord restarts).
  • Task Limits: There is currently no limit on the maximum number of tasks allowed by the scheduled batch supervisor. We may want to consider this in the future.
  • The Supervisors page in the Druid web-console should change a bit to accommodate this change (more broadly non-streaming supervisors).

Release Note

This change introduces an experimental feature called scheduled batch supervisor in Druid. The supervisor periodically wakes up to submit an MSQ ingest query, allowing users to automate batch ingestion directly within Druid. Think of it as simple batch task workflows natively integrated into Druid, though it doesn't replace more sophisticated workflow management systems like Apache Airflow. Currently, the supervisor will repeatedly submit the same MSQ query as-is. The payload for the supervisor is below, please check the PR summary for additional details:

{
    "type": "scheduled_batch",
    "schedulerConfig": {
        "type": "unix",
        "schedule": "*/5 * * * *"
    },
    "spec": {
        "query": "REPLACE INTO foo OVERWRITE ALL SELECT * FROM bar PARTITIONED BY DAY"
    },
    "suspended": false
}

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@github-actions github-actions bot added Area - Batch Ingestion Area - Querying Area - Dependencies Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Oct 15, 2024
@abhishekrb19 abhishekrb19 marked this pull request as draft October 15, 2024 21:49
abhishekrb19 added a commit that referenced this pull request Dec 6, 2024
… reusability (#17542)

This PR contains non-functional / refactoring changes of the following classes in the sql module:

1. Move ExplainPlan and ExplainAttributes fromsql/src/main/java/org/apache/druid/sql/http to processing/src/main/java/org/apache/druid/query/explain
2. Move sql/src/main/java/org/apache/druid/sql/SqlTaskStatus.java -> processing/src/main/java/org/apache/druid/query/http/SqlTaskStatus.java
3. Add a new class processing/src/main/java/org/apache/druid/query/http/ClientSqlQuery.java that is effectively a thin POJO version of SqlQuery in the sql module but without any of the Calcite functionality and business logic.
4. Move BrokerClient, BrokerClientImpl and Broker classes from sql/src/main/java/org/apache/druid/sql/client to server/src/main/java/org/apache/druid/client/broker.
5. Remove BrokerServiceModule that provided the BrokerClient. The functionality is now contained in ServiceClientModule in the server package itself which provides all the clients as well.

This is done so that we can reuse the said classes in #17353 without brining in Calcite and other dependencies to the Overlord.
The jobsExecutor is reponsible for submitting jobs to the broker for all
scheduled batch supervisors. The cronExecutor was simply decoupling the
submitting of jobs from the actual scheduled running of jobs. The service client
library already has async threads, so we remove the cronExecutor to keep things
simple as things are handled in an async manner by the service client. If we ever
observe and see evidence of  bottlenecks around task submission, etc, we should be
able to make jobsExecutor multiple threaded instead of single threaded.
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay on this, @abhishekrb19 !

I have tried to do a complete review. Once we have addressed these comments, we should be good to merge this PR.

Thanks a lot for your patience!! 🙂

@abhishekrb19
Copy link
Contributor Author

@kfaraz, thank you for the reviews! I believe I have addressed and/or responded to all of your comments.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor non-blocking comments. 🚀

Comment on lines +38 to +39
@JsonProperty("taskStatus") TaskStatus taskStatus,
@JsonProperty("updatedTime") DateTime updatedTime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since this constructor is not used for deserializing right now, we don't need the @JsonProperty annotation.

}

/**
* Used for internal tracking. So this field is *not* Jackson serialized to avoid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I am not sure if markdown is honored in javadocs. Use <i>, <b> or caps instead for emphasis.

@kfaraz
Copy link
Contributor

kfaraz commented Feb 11, 2025

@abhishekrb19 , the changes look good to me. You may proceed with the merge.

When you get the time though, please take another look at this comment
#17353 (comment)
If you feel that it's valid, it can be addressed in a follow up PR.

@abhishekrb19
Copy link
Contributor Author

Sounds good, just responded in the comment inline

@abhishekrb19 abhishekrb19 merged commit f776f33 into apache:master Feb 11, 2025
75 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants