-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Pull-based Ingestion] Offset management, support rewind by offset or timestamp #17354
base: main
Are you sure you want to change the base?
Conversation
34a9a0b
to
7c573c3
Compare
❌ Gradle check result for 7c573c3: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand the index level settings are controlling the offset resets. I am assuming that we expect this to work in a single shard and single partition setup
return offsetAndTimestamp.offset(); | ||
}); | ||
if (offset < 0) { | ||
// no message found for the timestamp, return the latest offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can fallback to the policy of auto.offset.reset
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually, the message not found could happen when the user specifies a timestamp larger than the latest in topic, in this case, return to the latest offset makes sense.
If the user specifies a timestamp smaller than the earliest, the offset will be automatically set to 0 as the earliest.
0, | ||
0, | ||
Property.IndexScope, | ||
Property.Dynamic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this dynamic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually i followed the same as the index.ingestion_source.pointer.init.reset
, which is also set to dynamic.
I'll change both of them to final.
|
||
@ExperimentalApi | ||
public static class PointerInitReset { | ||
private final String type; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how many types do we have? shall we consider enum?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, it will be mapped to enum StreamPoller.ResetState
/**
* a reset state to indicate how to reset the pointer
*/
@ExperimentalApi
enum ResetState {
EARLIEST,
LATEST,
REWIND_BY_OFFSET,
REWIND_BY_TIMESTAMP,
NONE,
}
I'll replace it with ResetState enum here to be more consistent.
* @param offset the offset value | ||
* @return the ingestion shard pointer corresponding to the given offset | ||
*/ | ||
IngestionShardPointer pointerFromOffset(long offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this challenge is that not all streaming sources use long for offset, for example, kinesis uses 128 bit.
we can use reset from a given pointer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, the problem is we're passing a long value from setting.
public static final Setting<Long> INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.longSetting(
SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE,
0,
0,
Property.IndexScope,
Property.Final
);
in that case, passing a long value might not be sufficient. I'll try to pass a string value.
@@ -52,6 +52,7 @@ public class DefaultStreamPoller implements StreamPoller { | |||
private IngestionShardPointer batchStartPointer; | |||
|
|||
private ResetState resetState; | |||
private final long resetValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again reset value cannot always be represented in long
should also work for multi-shard setup, shards will be mapped to partitions. |
❌ Gradle check result for 21eefdd: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: xuxiong1 <[email protected]>
Signed-off-by: xuxiong1 <[email protected]>
Signed-off-by: xuxiong1 <[email protected]>
21eefdd
to
701a54f
Compare
❌ Gradle check result for 701a54f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Description
This PR supports starting the ingestion from a user-specified timestamp or offset when creating the new index.
REWIND_BY_OFFSET
andREWIND_BY_TIMESTAMP
ingestion_source.pointer.init.reset.value
toIndexMetadata
, controls the rewind offset or timestamp in millisRelated Issues
Resolves #17318
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.