Skip to content

Commit

Permalink
Merge branch 'trunk' into KAFKA-18034
Browse files Browse the repository at this point in the history
# Conflicts:
#	clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
  • Loading branch information
m1a2st committed Jan 8, 2025
2 parents 7b2cada + 3f9d2c2 commit 01b39c5
Show file tree
Hide file tree
Showing 31 changed files with 1,012 additions and 273 deletions.
20 changes: 20 additions & 0 deletions bin/kafka-share-consumer-perf-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ShareConsumerPerformance "$@"
20 changes: 20 additions & 0 deletions bin/windows/kafka-share-consumer-perf-test.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ShareConsumerPerformance %*
EndLocal
Original file line number Diff line number Diff line change
Expand Up @@ -1169,14 +1169,16 @@ class ResultHandler {
* signal the completion when all results are known.
*/
public void complete(TopicIdPartition partition, Acknowledgements acknowledgements, boolean isCommitAsync) {
if (acknowledgements != null) {
if (!isCommitAsync && acknowledgements != null) {
result.put(partition, acknowledgements);
}
// For commitAsync, we do not wait for other results to complete, we prepare a background event
// for every ShareAcknowledgeResponse.
// For commitAsync, we send out a background event for every TopicIdPartition, so we use a singletonMap each time.
if (isCommitAsync) {
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements));
if (acknowledgements != null) {
maybeSendShareAcknowledgeCommitCallbackEvent(Collections.singletonMap(partition, acknowledgements));
}
} else if (remainingResults != null && remainingResults.decrementAndGet() == 0) {
maybeSendShareAcknowledgeCommitCallbackEvent(result);
future.ifPresent(future -> future.complete(result));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, FetchConfi

return ShareFetchRequest.Builder.forConsumer(
groupId, nextMetadata, fetchConfig.maxWaitMs,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize,
fetchConfig.minBytes, fetchConfig.maxBytes, fetchConfig.fetchSize, fetchConfig.maxPollRecords,
added, removed, acknowledgementBatches);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ public ApplicationEventHandler(final LogContext logContext,
public void add(final ApplicationEvent event) {
Objects.requireNonNull(event, "ApplicationEvent provided to add must be non-null");
event.setEnqueuedMs(time.milliseconds());
// Record the updated queue size before actually adding the event to the queue
// to avoid race conditions (the background thread is continuously removing from this queue)
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size() + 1);
applicationEventQueue.add(event);
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size());
wakeupNetworkThread();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public BackgroundEventHandler(final BlockingQueue<BackgroundEvent> backgroundEve
public void add(BackgroundEvent event) {
Objects.requireNonNull(event, "BackgroundEvent provided to add must be non-null");
event.setEnqueuedMs(time.milliseconds());
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size() + 1);
backgroundEventQueue.add(event);
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Builder(ShareFetchRequestData data, boolean enableUnstableLastVersion) {
}

public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
int maxWait, int minBytes, int maxBytes, int fetchSize,
int maxWait, int minBytes, int maxBytes, int fetchSize, int batchSize,
List<TopicIdPartition> send, List<TopicIdPartition> forget,
Map<TopicIdPartition, List<ShareFetchRequestData.AcknowledgementBatch>> acknowledgementsMap) {
ShareFetchRequestData data = new ShareFetchRequestData();
Expand All @@ -67,6 +67,7 @@ public static Builder forConsumer(String groupId, ShareRequestMetadata metadata,
data.setMaxWaitMs(maxWait);
data.setMinBytes(minBytes);
data.setMaxBytes(maxBytes);
data.setBatchSize(batchSize);

// Build a map of topics to fetch keyed by topic ID, and within each a map of partitions keyed by index
Map<Uuid, Map<Integer, ShareFetchRequestData.FetchPartition>> fetchMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "0+", "default": "0x7fffffff",
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
{ "name": "BatchSize", "type": "int32", "versions": "0+",
"about": "The optimal number of records for batches of acquired records and acknowledgements." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID."},
Expand All @@ -45,7 +47,7 @@
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
"about": "TO BE REMOVED. The maximum bytes to fetch from this partition. 0 when only acknowledgement with no fetching is required. See KIP-74 for cases where this limit may not be honored." },
{ "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
"about": "Record batches to acknowledge.", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testTopLevelErrorConstructor() throws InterruptedException {
partitionFutures.completeExceptionally(Errors.GROUP_AUTHORIZATION_FAILED.exception());
DeleteConsumerGroupOffsetsResult topLevelErrorResult =
new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);
TestUtils.assertFutureError(topLevelErrorResult.all(), GroupAuthorizationException.class);
TestUtils.assertFutureThrows(topLevelErrorResult.all(), GroupAuthorizationException.class);
}

@Test
Expand All @@ -79,9 +79,9 @@ public void testPartitionMissingInResponseErrorConstructor() throws InterruptedE
DeleteConsumerGroupOffsetsResult missingPartitionResult =
new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);

TestUtils.assertFutureError(missingPartitionResult.all(), IllegalArgumentException.class);
TestUtils.assertFutureThrows(missingPartitionResult.all(), IllegalArgumentException.class);
assertNull(missingPartitionResult.partitionResult(tpZero).get());
TestUtils.assertFutureError(missingPartitionResult.partitionResult(tpOne), IllegalArgumentException.class);
TestUtils.assertFutureThrows(missingPartitionResult.partitionResult(tpOne), IllegalArgumentException.class);
}

@Test
Expand Down Expand Up @@ -110,9 +110,9 @@ private DeleteConsumerGroupOffsetsResult createAndVerifyPartitionLevelError() th
DeleteConsumerGroupOffsetsResult partitionLevelErrorResult =
new DeleteConsumerGroupOffsetsResult(partitionFutures, partitions);

TestUtils.assertFutureError(partitionLevelErrorResult.all(), UnknownTopicOrPartitionException.class);
TestUtils.assertFutureThrows(partitionLevelErrorResult.all(), UnknownTopicOrPartitionException.class);
assertNull(partitionLevelErrorResult.partitionResult(tpZero).get());
TestUtils.assertFutureError(partitionLevelErrorResult.partitionResult(tpOne), UnknownTopicOrPartitionException.class);
TestUtils.assertFutureThrows(partitionLevelErrorResult.partitionResult(tpOne), UnknownTopicOrPartitionException.class);
return partitionLevelErrorResult;
}
}
Loading

0 comments on commit 01b39c5

Please sign in to comment.