Skip to content

MINOR: introduce structure to keep member assignment with topic Ids #19645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ public abstract class AbstractMembershipManager<R extends AbstractResponse> impl
*/
static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator();

/**
* TopicIdPartition comparator based on topic name and partition (ignoring topic ID while sorting,
* as this is sorted mainly for logging purposes).
*/
static final Utils.TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new Utils.TopicIdPartitionComparator();

/**
* Group ID of the consumer group the member will be part of, provided when creating the current
* membership manager.
Expand Down Expand Up @@ -376,8 +370,8 @@ protected void processAssignmentReceived(Map<Uuid, SortedSet<Integer>> assignmen
*/
private void replaceTargetAssignmentWithNewAssignment(Map<Uuid, SortedSet<Integer>> assignment) {
currentTargetAssignment.updateWith(assignment).ifPresent(updatedAssignment -> {
log.debug("Target assignment updated from {} to {}. Member will reconcile it on the next poll.",
currentTargetAssignment, updatedAssignment);
log.debug("Member {} updated its target assignment from {} to {}. Member will reconcile it on the next poll.",
memberId, currentTargetAssignment, updatedAssignment);
currentTargetAssignment = updatedAssignment;
});
}
Expand Down Expand Up @@ -517,11 +511,10 @@ private void clearAssignment() {
* @param assignedPartitions Full assignment, to update in the subscription state
* @param addedPartitions Newly added partitions
*/
private void updateSubscriptionAwaitingCallback(SortedSet<TopicIdPartition> assignedPartitions,
private void updateSubscriptionAwaitingCallback(TopicIdPartitionSet assignedPartitions,
SortedSet<TopicPartition> addedPartitions) {
Set<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedPartitions);
subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions, addedPartitions);
notifyAssignmentChange(assignedTopicPartitions);
subscriptions.assignFromSubscribedAwaitingCallback(assignedPartitions.topicPartitions(), addedPartitions);
notifyAssignmentChange(assignedPartitions.topicPartitions());
}

/**
Expand All @@ -541,6 +534,7 @@ public void transitionToJoining() {
}
resetEpoch();
transitionTo(MemberState.JOINING);
log.debug("Member {} will join the group on the next call to poll.", memberId);
clearPendingAssignmentsAndLocalNamesCache();
}

Expand Down Expand Up @@ -618,6 +612,8 @@ protected CompletableFuture<Void> leaveGroup(boolean runCallbacks) {
clearAssignmentAndLeaveGroup();
});
} else {
log.debug("Member {} attempting to leave has no rebalance callbacks, " +
"so it will clear assignments and transition to send heartbeat to leave group.", memberId);
clearAssignmentAndLeaveGroup();
}

Expand Down Expand Up @@ -708,8 +704,10 @@ public void onHeartbeatRequestGenerated() {
transitionTo(MemberState.STABLE);
} else {
log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent " +
"to ack a previous reconciliation. New assignments are ready to " +
"be reconciled.", memberId, memberEpoch, MemberState.RECONCILING);
"to ack a previous reconciliation. \n" +
"\t\tCurrent assignment: {} \n" +
"\t\tTarget assignment: {}\n",
memberId, memberEpoch, MemberState.RECONCILING, currentAssignment, currentTargetAssignment);
transitionTo(MemberState.RECONCILING);
}
} else if (state == MemberState.LEAVING) {
Expand Down Expand Up @@ -839,7 +837,7 @@ public void maybeReconcile(boolean canCommit) {

// Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update
// if some topic IDs are not resolvable.
SortedSet<TopicIdPartition> assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate();
TopicIdPartitionSet assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate();
final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);

if (!currentAssignment.isNone() && resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
Expand All @@ -857,7 +855,7 @@ public void maybeReconcile(boolean canCommit) {
// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are
// being reconciled. Needed for interactions with the centralized subscription state that
// does not support topic IDs yet, and for the callbacks.
SortedSet<TopicPartition> assignedTopicPartitions = toTopicPartitionSet(assignedTopicIdPartitions);
SortedSet<TopicPartition> assignedTopicPartitions = assignedTopicIdPartitions.toTopicNamePartitionSet();
SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
ownedPartitions.addAll(subscriptions.assignedPartitions());

Expand Down Expand Up @@ -934,7 +932,7 @@ long getDeadlineMsForTimeout(final long timeoutMs) {
* transition. Note that if any of the 2 callbacks fails, the reconciliation should fail.
*/
private void revokeAndAssign(LocalAssignment resolvedAssignment,
SortedSet<TopicIdPartition> assignedTopicIdPartitions,
TopicIdPartitionSet assignedTopicIdPartitions,
SortedSet<TopicPartition> revokedPartitions,
SortedSet<TopicPartition> addedPartitions) {
CompletableFuture<Void> revocationResult;
Expand Down Expand Up @@ -1031,15 +1029,6 @@ protected CompletableFuture<Void> signalPartitionsLost(Set<TopicPartition> parti
return CompletableFuture.completedFuture(null);
}

/**
* Build set of {@link TopicPartition} from the given set of {@link TopicIdPartition}.
*/
protected SortedSet<TopicPartition> toTopicPartitionSet(SortedSet<TopicIdPartition> topicIdPartitions) {
SortedSet<TopicPartition> result = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
topicIdPartitions.forEach(topicIdPartition -> result.add(topicIdPartition.topicPartition()));
return result;
}

/**
* Visible for testing.
*/
Expand Down Expand Up @@ -1073,8 +1062,8 @@ void markReconciliationCompleted() {
* </li>
* </ol>
*/
private SortedSet<TopicIdPartition> findResolvableAssignmentAndTriggerMetadataUpdate() {
final SortedSet<TopicIdPartition> assignmentReadyToReconcile = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
private TopicIdPartitionSet findResolvableAssignmentAndTriggerMetadataUpdate() {
final TopicIdPartitionSet assignmentReadyToReconcile = new TopicIdPartitionSet();
final HashMap<Uuid, SortedSet<Integer>> unresolved = new HashMap<>(currentTargetAssignment.partitions);

// Try to resolve topic names from metadata cache or subscription cache, and move
Expand Down Expand Up @@ -1200,7 +1189,7 @@ CompletableFuture<Void> revokePartitions(Set<TopicPartition> partitionsToRevoke)
* @return Future that will complete when the callback execution completes.
*/
private CompletableFuture<Void> assignPartitions(
SortedSet<TopicIdPartition> assignedPartitions,
TopicIdPartitionSet assignedPartitions,
SortedSet<TopicPartition> addedPartitions) {

// Update assignment in the subscription state, and ensure that no fetching or positions
Expand All @@ -1218,7 +1207,7 @@ private CompletableFuture<Void> assignPartitions(
// returning no records, as no topic partitions are marked as fetchable. In contrast, with the classic consumer,
// if the first callback fails but the next one succeeds, polling can still retrieve data. To align with
// this behavior, we rely on assignedPartitions to avoid such scenarios.
subscriptions.enablePartitionsAwaitingCallback(toTopicPartitionSet(assignedPartitions));
subscriptions.enablePartitionsAwaitingCallback(assignedPartitions.topicPartitions());
} else {
// Keeping newly added partitions as non-fetchable after the callback failure.
// They will be retried on the next reconciliation loop, until it succeeds or the
Expand All @@ -1232,7 +1221,7 @@ private CompletableFuture<Void> assignPartitions(
});

// Clear topic names cache, removing topics that are not assigned to the member anymore.
Set<String> assignedTopics = assignedPartitions.stream().map(TopicIdPartition::topic).collect(Collectors.toSet());
Set<String> assignedTopics = assignedPartitions.topicNames();
assignedTopicNamesCache.values().retainAll(assignedTopics);

return result;
Expand Down Expand Up @@ -1450,16 +1439,13 @@ public LocalAssignment(long localEpoch, Map<Uuid, SortedSet<Integer>> partitions
}
}

public LocalAssignment(long localEpoch, SortedSet<TopicIdPartition> topicIdPartitions) {
public LocalAssignment(long localEpoch, TopicIdPartitionSet topicIdPartitions) {
Objects.requireNonNull(topicIdPartitions);
this.localEpoch = localEpoch;
this.partitions = new HashMap<>();
if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
Comment on lines 1444 to 1447
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider checking the value before assigning it?

topicIdPartitions.forEach(topicIdPartition -> {
Uuid topicId = topicIdPartition.topicId();
partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition());
});
this.partitions = topicIdPartitions.toTopicIdPartitionMap();
}

public String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

/**
* Represents a set of topic partitions, where each entry contains topic ID, topic name and partition number.
* Keeps in-memory references to provide easy access to this data in different forms.
* (ex. retrieve topic IDs only, topic names, partitions with topic names, partitions with topic IDs)
* Data is kept sorted by topic name and partition number, for improved logging.
*/
public class TopicIdPartitionSet {

/**
* TopicPartition comparator based on topic name and partition.
*/
static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator();

/**
* TopicIdPartition comparator based on topic name and partition.
* (Ignoring topic ID while sorting, as this is sorted mainly for logging purposes).
*/
static final Utils.TopicIdPartitionComparator TOPIC_ID_PARTITION_COMPARATOR = new Utils.TopicIdPartitionComparator();

private final SortedSet<TopicIdPartition> topicIdPartitions;
private final SortedSet<TopicPartition> topicPartitions;
private final Set<Uuid> topicIds;
private final SortedSet<String> topicNames;

public TopicIdPartitionSet() {
this.topicIdPartitions = new TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
this.topicPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
this.topicIds = new HashSet<>();
this.topicNames = new TreeSet<>();
}

/**
* Add a new topic (id+name) and partition. This will keep it, and also save references to the topic ID, topic name and partition.
*/
public void add(TopicIdPartition topicIdPartition) {
topicIdPartitions.add(topicIdPartition);
topicPartitions.add(topicIdPartition.topicPartition());
topicIds.add(topicIdPartition.topicId());
topicNames.add(topicIdPartition.topicPartition().topic());
}

public boolean isEmpty() {
return this.topicIdPartitions.isEmpty();
}

public SortedSet<TopicPartition> topicPartitions() {
return Collections.unmodifiableSortedSet(topicPartitions);
}

public Set<Uuid> topicIds() {
return Collections.unmodifiableSet(topicIds);
}

public SortedSet<String> topicNames() {
return Collections.unmodifiableSortedSet(topicNames);
}

/**
* @return Map of partition numbers per topic ID, sorted by topic names (for improved logging).
*/
public Map<Uuid, SortedSet<Integer>> toTopicIdPartitionMap() {
Map<Uuid, SortedSet<Integer>> partitions = new HashMap<>();
topicIdPartitions.forEach(topicIdPartition -> {
Uuid topicId = topicIdPartition.topicId();
partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition());
});
return partitions;
}

/**
* @return Set of topic partitions (with topic name and partition number)
*/
protected SortedSet<TopicPartition> toTopicNamePartitionSet() {
SortedSet<TopicPartition> result = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
topicIdPartitions.forEach(topicIdPartition -> result.add(topicIdPartition.topicPartition()));
return result;
}

@Override
public String toString() {
return this.topicIdPartitions.toString();
}
}
Loading