Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Jan 8, 2025
1 parent 479be6d commit dbed266
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,28 @@ public void append(long largestOffset,
// append the messages
long appendedBytes = log.append(records);
LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, log.file(), largestOffset);
// Update the in memory max timestamp and corresponding offset.
if (largestTimestampMs > maxTimestampSoFar()) {
maxTimestampAndOffsetSoFar = new TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
}
// append an entry to the timestamp index at MemoryRecords level (if needed)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
}

// append an entry to the offset index at batches level (if needed)
long batchMaxTimestamp = RecordBatch.NO_TIMESTAMP;
long batchShallowOffsetOfMaxTimestamp = -1L;
// append an entry to the index at batches level (if needed)
for (RecordBatch batch : records.batches()) {
if (bytesSinceLastIndexEntry > indexIntervalBytes &&
batch.lastOffset() >= offsetIndex().lastOffset()) {
if (batch.maxTimestamp() > batchMaxTimestamp) {
batchMaxTimestamp = batch.maxTimestamp();
batchShallowOffsetOfMaxTimestamp = batch.lastOffset();
}

// Update the in memory max timestamp and corresponding offset.
if (batchMaxTimestamp > maxTimestampSoFar()) {
maxTimestampAndOffsetSoFar = new TimestampOffset(batchMaxTimestamp, batchShallowOffsetOfMaxTimestamp);
}

if (bytesSinceLastIndexEntry > indexIntervalBytes) {
offsetIndex().append(batch.lastOffset(), physicalPosition);

// max timestamp may not be monotonic, so we need to check it to avoid the time index append error
if (batchMaxTimestamp >= timeIndex().lastEntry().timestamp)
timeIndex().maybeAppend(batchMaxTimestamp, shallowOffsetOfMaxTimestampSoFar());

bytesSinceLastIndexEntry = 0;
}
physicalPosition += batch.sizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,34 @@ record = MemoryRecords.readableRecords(buffer2);
assertEquals(2, segment.offsetIndex().entries());
assertTrue(segment.offsetIndex().lookup(2L).position > segment.offsetIndex().lookup(1L).position);

assertEquals(2, segment.timeIndex().entries());
assertTrue(segment.timeIndex().lookup(2L).offset > segment.timeIndex().lookup(1L).offset);
}

@Test
public void testNonMonotonicIndexForMultipleBatchesInMemoryRecords() throws IOException {
LogSegment segment = createSegment(0, 1, Time.SYSTEM);

ByteBuffer buffer1 = ByteBuffer.allocate(1024);
// append first batch to buffer1
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 0);
builder.append(1L, "key1".getBytes(), "value1".getBytes());
builder.close();

// append second batch to buffer1
builder = MemoryRecords.builder(buffer1, Compression.NONE, TimestampType.CREATE_TIME, 1);
builder.append(0L, "key1".getBytes(), "value1".getBytes());
builder.close();

buffer1.flip();
MemoryRecords record = MemoryRecords.readableRecords(buffer1);
segment.append(1L, 1L, 1L, record);

assertEquals(1, segment.offsetIndex().entries());
assertEquals(1, segment.offsetIndex().lookup(1L).offset);

assertEquals(1, segment.timeIndex().entries());
assertEquals(2L, segment.timeIndex().lookup(2L).offset);
assertEquals(0L, segment.timeIndex().lookup(1L).offset);
}

private ProducerStateManager newProducerStateManager() throws IOException {
Expand Down

0 comments on commit dbed266

Please sign in to comment.