diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java index 41ad48bbc2513..3c0d53bc9da3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java @@ -66,12 +66,6 @@ public abstract class AbstractMembershipManager impl */ static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator(); - /** - * TopicIdPartition comparator based on topic name and partition (ignoring topic ID while sorting, - * as this is sorted mainly for logging purposes). - */ - static final Utils.TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new Utils.TopicIdPartitionComparator(); - /** * Group ID of the consumer group the member will be part of, provided when creating the current * membership manager. @@ -376,8 +370,8 @@ protected void processAssignmentReceived(Map> assignmen */ private void replaceTargetAssignmentWithNewAssignment(Map> assignment) { currentTargetAssignment.updateWith(assignment).ifPresent(updatedAssignment -> { - log.debug("Target assignment updated from {} to {}. Member will reconcile it on the next poll.", - currentTargetAssignment, updatedAssignment); + log.debug("Member {} updated its target assignment from {} to {}. Member will reconcile it on the next poll.", + memberId, currentTargetAssignment, updatedAssignment); currentTargetAssignment = updatedAssignment; }); } @@ -517,11 +511,10 @@ private void clearAssignment() { * @param assignedPartitions Full assignment, to update in the subscription state * @param addedPartitions Newly added partitions */ - private void updateSubscriptionAwaitingCallback(SortedSet assignedPartitions, + private void updateSubscriptionAwaitingCallback(TopicIdPartitionSet assignedPartitions, SortedSet addedPartitions) { - Set assignedTopicPartitions = toTopicPartitionSet(assignedPartitions); - subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions, addedPartitions); - notifyAssignmentChange(assignedTopicPartitions); + subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(), addedPartitions); + notifyAssignmentChange(assignedPartitions.topicPartitions()); } /** @@ -541,6 +534,7 @@ public void transitionToJoining() { } resetEpoch(); transitionTo(MemberState.JOINING); + log.debug("Member {} will join the group on the next call to poll.", memberId); clearPendingAssignmentsAndLocalNamesCache(); } @@ -618,6 +612,8 @@ protected CompletableFuture leaveGroup(boolean runCallbacks) { clearAssignmentAndLeaveGroup(); }); } else { + log.debug("Member {} attempting to leave has no rebalance callbacks, " + + "so it will clear assignments and transition to send heartbeat to leave group.", memberId); clearAssignmentAndLeaveGroup(); } @@ -708,8 +704,10 @@ public void onHeartbeatRequestGenerated() { transitionTo(MemberState.STABLE); } else { log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent " + - "to ack a previous reconciliation. New assignments are ready to " + - "be reconciled.", memberId, memberEpoch, MemberState.RECONCILING); + "to ack a previous reconciliation. \n" + + "\t\tCurrent assignment: {} \n" + + "\t\tTarget assignment: {}\n", + memberId, memberEpoch, MemberState.RECONCILING, currentAssignment, currentTargetAssignment); transitionTo(MemberState.RECONCILING); } } else if (state == MemberState.LEAVING) { @@ -839,7 +837,7 @@ public void maybeReconcile(boolean canCommit) { // Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update // if some topic IDs are not resolvable. - SortedSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); + TopicIdPartitionSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate(); final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions); if (!currentAssignment.isNone() && resolvedAssignment.partitions.equals(currentAssignment.partitions)) { @@ -857,7 +855,7 @@ public void maybeReconcile(boolean canCommit) { // Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are // being reconciled. Needed for interactions with the centralized subscription state that // does not support topic IDs yet, and for the callbacks. - SortedSet assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions); + SortedSet assignedTopicPartitions = assignedTopicIdPartitions.toTopicNamePartitionSet(); SortedSet ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); ownedPartitions.addAll(subscriptions.assignedPartitions()); @@ -934,7 +932,7 @@ long getDeadlineMsForTimeout(final long timeoutMs) { * transition. Note that if any of the 2 callbacks fails, the reconciliation should fail. */ private void revokeAndAssign(LocalAssignment resolvedAssignment, - SortedSet assignedTopicIdPartitions, + TopicIdPartitionSet assignedTopicIdPartitions, SortedSet revokedPartitions, SortedSet addedPartitions) { CompletableFuture revocationResult; @@ -1031,15 +1029,6 @@ protected CompletableFuture signalPartitionsLost(Set parti return CompletableFuture.completedFuture(null); } - /** - * Build set of {@link TopicPartition} from the given set of {@link TopicIdPartition}. - */ - protected SortedSet toTopicPartitionSet(SortedSet topicIdPartitions) { - SortedSet result = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); - topicIdPartitions.forEach(topicIdPartition -> result.add(topicIdPartition.topicPartition())); - return result; - } - /** * Visible for testing. */ @@ -1073,8 +1062,8 @@ void markReconciliationCompleted() { * * */ - private SortedSet findResolvableAssignmentAndTriggerMetadataUpdate() { - final SortedSet assignmentReadyToReconcile = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); + private TopicIdPartitionSet findResolvableAssignmentAndTriggerMetadataUpdate() { + final TopicIdPartitionSet assignmentReadyToReconcile = new TopicIdPartitionSet(); final HashMap> unresolved = new HashMap<>(currentTargetAssignment.partitions); // Try to resolve topic names from metadata cache or subscription cache, and move @@ -1200,7 +1189,7 @@ CompletableFuture revokePartitions(Set partitionsToRevoke) * @return Future that will complete when the callback execution completes. */ private CompletableFuture assignPartitions( - SortedSet assignedPartitions, + TopicIdPartitionSet assignedPartitions, SortedSet addedPartitions) { // Update assignment in the subscription state, and ensure that no fetching or positions @@ -1218,7 +1207,7 @@ private CompletableFuture assignPartitions( // returning no records, as no topic partitions are marked as fetchable. In contrast, with the classic consumer, // if the first callback fails but the next one succeeds, polling can still retrieve data. To align with // this behavior, we rely on assignedPartitions to avoid such scenarios. - subscriptions.enablePartitionsAwaitingCallback(toTopicPartitionSet(assignedPartitions)); + subscriptions.enablePartitionsAwaitingCallback(assignedPartitions.topicPartitions()); } else { // Keeping newly added partitions as non-fetchable after the callback failure. // They will be retried on the next reconciliation loop, until it succeeds or the @@ -1232,7 +1221,7 @@ private CompletableFuture assignPartitions( }); // Clear topic names cache, removing topics that are not assigned to the member anymore. - Set assignedTopics = assignedPartitions.stream().map(TopicIdPartition::topic).collect(Collectors.toSet()); + Set assignedTopics = assignedPartitions.topicNames(); assignedTopicNamesCache.values().retainAll(assignedTopics); return result; @@ -1450,16 +1439,13 @@ public LocalAssignment(long localEpoch, Map> partitions } } - public LocalAssignment(long localEpoch, SortedSet topicIdPartitions) { + public LocalAssignment(long localEpoch, TopicIdPartitionSet topicIdPartitions) { + Objects.requireNonNull(topicIdPartitions); this.localEpoch = localEpoch; - this.partitions = new HashMap<>(); if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) { throw new IllegalArgumentException("Local epoch must be set if there are partitions"); } - topicIdPartitions.forEach(topicIdPartition -> { - Uuid topicId = topicIdPartition.topicId(); - partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition()); - }); + this.partitions = topicIdPartitions.toTopicIdPartitionMap(); } public String toString() { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java new file mode 100644 index 0000000000000..920fb63515df3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Represents a set of topic partitions, where each entry contains topic ID, topic name and partition number. + * Keeps in-memory references to provide easy access to this data in different forms. + * (ex. retrieve topic IDs only, topic names, partitions with topic names, partitions with topic IDs) + * Data is kept sorted by topic name and partition number, for improved logging. + */ +public class TopicIdPartitionSet { + + /** + * TopicPartition comparator based on topic name and partition. + */ + static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator(); + + /** + * TopicIdPartition comparator based on topic name and partition. + * (Ignoring topic ID while sorting, as this is sorted mainly for logging purposes). + */ + static final Utils.TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new Utils.TopicIdPartitionComparator(); + + private final SortedSet topicIdPartitions; + private final SortedSet topicPartitions; + private final Set topicIds; + private final SortedSet topicNames; + + public TopicIdPartitionSet() { + this.topicIdPartitions = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR); + this.topicPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + this.topicIds = new HashSet<>(); + this.topicNames = new TreeSet<>(); + } + + /** + * Add a new topic (id+name) and partition. This will keep it, and also save references to the topic ID, topic name and partition. + */ + public void add(TopicIdPartition topicIdPartition) { + topicIdPartitions.add(topicIdPartition); + topicPartitions.add(topicIdPartition.topicPartition()); + topicIds.add(topicIdPartition.topicId()); + topicNames.add(topicIdPartition.topicPartition().topic()); + } + + public boolean isEmpty() { + return this.topicIdPartitions.isEmpty(); + } + + public SortedSet topicPartitions() { + return Collections.unmodifiableSortedSet(topicPartitions); + } + + public Set topicIds() { + return Collections.unmodifiableSet(topicIds); + } + + public SortedSet topicNames() { + return Collections.unmodifiableSortedSet(topicNames); + } + + /** + * @return Map of partition numbers per topic ID, sorted by topic names (for improved logging). + */ + public Map> toTopicIdPartitionMap() { + Map> partitions = new HashMap<>(); + topicIdPartitions.forEach(topicIdPartition -> { + Uuid topicId = topicIdPartition.topicId(); + partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition()); + }); + return partitions; + } + + /** + * @return Set of topic partitions (with topic name and partition number) + */ + protected SortedSet toTopicNamePartitionSet() { + SortedSet result = new TreeSet<>(TOPIC_PARTITION_COMPARATOR); + topicIdPartitions.forEach(topicIdPartition -> result.add(topicIdPartition.topicPartition())); + return result; + } + + @Override + public String toString() { + return this.topicIdPartitions.toString(); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSetTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSetTest.java new file mode 100644 index 0000000000000..f14e911578acc --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSetTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TopicIdPartitionSetTest { + + private TopicIdPartitionSet topicIdPartitionSet; + + + @BeforeEach + public void setUp() { + topicIdPartitionSet = new TopicIdPartitionSet(); + } + + @Test + public void testIsEmpty() { + assertTrue(topicIdPartitionSet.isEmpty()); + + TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + topicIdPartitionSet.add(topicIdPartition); + + assertFalse(topicIdPartitionSet.isEmpty()); + } + + @Test + public void testRetrieveTopicPartitions() { + TopicPartition tp1 = new TopicPartition("foo", 0); + TopicPartition tp2 = new TopicPartition("foo", 1); + TopicPartition tp3 = new TopicPartition("bar", 0); + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + topicIdPartitionSet.add(new TopicIdPartition(topicId1, tp1)); + topicIdPartitionSet.add(new TopicIdPartition(topicId1, tp2)); + topicIdPartitionSet.add(new TopicIdPartition(topicId2, tp3)); + + Set topicPartitionSet = topicIdPartitionSet.topicPartitions(); + assertEquals(3, topicPartitionSet.size()); + assertTrue(topicPartitionSet.contains(tp1)); + assertTrue(topicPartitionSet.contains(tp2)); + assertTrue(topicPartitionSet.contains(tp3)); + } + + @Test + public void testRetrieveTopicIds() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + topicIdPartitionSet.add(new TopicIdPartition(topicId1, new TopicPartition("foo", 0))); + topicIdPartitionSet.add(new TopicIdPartition(topicId1, new TopicPartition("foo", 1))); + topicIdPartitionSet.add(new TopicIdPartition(topicId2, new TopicPartition("bar", 0))); + + Set topicIds = topicIdPartitionSet.topicIds(); + assertEquals(2, topicIds.size()); + assertTrue(topicIds.contains(topicId1)); + assertTrue(topicIds.contains(topicId2)); + } + + @Test + public void testRetrieveTopicNames() { + String topic1 = "foo"; + String topic2 = "bar"; + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + topicIdPartitionSet.add(new TopicIdPartition(topicId1, new TopicPartition(topic1, 0))); + topicIdPartitionSet.add(new TopicIdPartition(topicId1, new TopicPartition(topic1, 1))); + topicIdPartitionSet.add(new TopicIdPartition(topicId2, new TopicPartition(topic2, 0))); + + Set topicNames = topicIdPartitionSet.topicNames(); + assertEquals(2, topicNames.size()); + assertTrue(topicNames.contains(topic1)); + assertTrue(topicNames.contains(topic2)); + } + + @Test + public void testRetrievedTopicNamesAreSorted() { + LinkedHashSet expectedOrderedTopicPartitions = new LinkedHashSet<>(); + expectedOrderedTopicPartitions.add(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic-z", 1))); + expectedOrderedTopicPartitions.add(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic-z", 0))); + expectedOrderedTopicPartitions.add(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic-a", 0))); + expectedOrderedTopicPartitions.add(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic-a", 1))); + + List reversed = new ArrayList<>(expectedOrderedTopicPartitions); + Collections.reverse(reversed); + reversed.forEach(tp -> topicIdPartitionSet.add(tp)); + + List topicPartitions = new ArrayList<>(topicIdPartitionSet.toTopicNamePartitionSet()); + + assertEquals(4, topicPartitions.size()); + assertEquals(new TopicPartition("topic-a", 0), topicPartitions.get(0)); + assertEquals(new TopicPartition("topic-a", 1), topicPartitions.get(1)); + assertEquals(new TopicPartition("topic-z", 0), topicPartitions.get(2)); + assertEquals(new TopicPartition("topic-z", 1), topicPartitions.get(3)); + } + + @Test + public void testToString() { + Uuid topicId1 = Uuid.randomUuid(); + TopicIdPartition tp1 = new TopicIdPartition(topicId1, new TopicPartition("topic-a", 0)); + TopicIdPartition tp2 = new TopicIdPartition(topicId1, new TopicPartition("topic-a", 1)); + TopicIdPartition tp3 = new TopicIdPartition(topicId1, new TopicPartition("topic-b", 0)); + topicIdPartitionSet.add(tp1); + topicIdPartitionSet.add(tp2); + topicIdPartitionSet.add(tp3); + + String toString = topicIdPartitionSet.toString(); + assertEquals(List.of(tp1, tp2, tp3).toString(), toString); + } + + @Test + public void testToStringSorted() { + Uuid topicId1 = Uuid.randomUuid(); + TopicIdPartition tp1 = new TopicIdPartition(topicId1, new TopicPartition("topic-a", 0)); + TopicIdPartition tpz1 = new TopicIdPartition(topicId1, new TopicPartition("topic-z", 0)); + TopicIdPartition tpz2 = new TopicIdPartition(topicId1, new TopicPartition("topic-z", 1)); + topicIdPartitionSet.add(tpz2); + topicIdPartitionSet.add(tpz1); + topicIdPartitionSet.add(tp1); + + String toString = topicIdPartitionSet.toString(); + assertEquals(List.of(tp1, tpz1, tpz2).toString(), toString); + } + +}