Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion] Offset management, support rewind by offset or timestamp #17354

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

xuxiong1
Copy link

Description

This PR supports starting the ingestion from a user-specified timestamp or offset when creating the new index.

  • added two new ResetState: REWIND_BY_OFFSET and REWIND_BY_TIMESTAMP
  • added new setting ingestion_source.pointer.init.reset.value to IndexMetadata, controls the rewind offset or timestamp in millis
  • added corresponding integration tests

Related Issues

Resolves #17318

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing labels Feb 13, 2025
Copy link
Contributor

❌ Gradle check result for 7c573c3: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand the index level settings are controlling the offset resets. I am assuming that we expect this to work in a single shard and single partition setup

return offsetAndTimestamp.offset();
});
if (offset < 0) {
// no message found for the timestamp, return the latest offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can fallback to the policy of auto.offset.reset?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, the message not found could happen when the user specifies a timestamp larger than the latest in topic, in this case, return to the latest offset makes sense.
If the user specifies a timestamp smaller than the earliest, the offset will be automatically set to 0 as the earliest.

0,
0,
Property.IndexScope,
Property.Dynamic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this dynamic?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually i followed the same as the index.ingestion_source.pointer.init.reset, which is also set to dynamic.
I'll change both of them to final.


@ExperimentalApi
public static class PointerInitReset {
private final String type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many types do we have? shall we consider enum?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, it will be mapped to enum StreamPoller.ResetState

    /**
     *  a reset state to indicate how to reset the pointer
     */
    @ExperimentalApi
    enum ResetState {
        EARLIEST,
        LATEST,
        REWIND_BY_OFFSET,
        REWIND_BY_TIMESTAMP,
        NONE,
    }

I'll replace it with ResetState enum here to be more consistent.

* @param offset the offset value
* @return the ingestion shard pointer corresponding to the given offset
*/
IngestionShardPointer pointerFromOffset(long offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this challenge is that not all streaming sources use long for offset, for example, kinesis uses 128 bit.
we can use reset from a given pointer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the problem is we're passing a long value from setting.

    public static final Setting<Long> INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.longSetting(
        SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE,
        0,
        0,
        Property.IndexScope,
        Property.Final
    );

in that case, passing a long value might not be sufficient. I'll try to pass a string value.

@@ -52,6 +52,7 @@ public class DefaultStreamPoller implements StreamPoller {
private IngestionShardPointer batchStartPointer;

private ResetState resetState;
private final long resetValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again reset value cannot always be represented in long

@xuxiong1
Copy link
Author

As I understand the index level settings are controlling the offset resets. I am assuming that we expect this to work in a single shard and single partition setup

should also work for multi-shard setup, shards will be mapped to partitions.

Copy link
Contributor

❌ Gradle check result for 21eefdd: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: xuxiong1 <[email protected]>
Signed-off-by: xuxiong1 <[email protected]>
Signed-off-by: xuxiong1 <[email protected]>
Copy link
Contributor

❌ Gradle check result for 701a54f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Offset management for pull-based ingestion
3 participants