Skip to content

KAFKA-17747: [3/N] Get rid of TopicMetadata in SubscribedTopicDescriberImpl #19611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,13 @@ public class GroupMetadataManager {

private static class UpdateSubscriptionMetadataResult {
private final int groupEpoch;
private final Map<String, TopicMetadata> subscriptionMetadata;
private final SubscriptionType subscriptionType;

UpdateSubscriptionMetadataResult(
int groupEpoch,
Map<String, TopicMetadata> subscriptionMetadata,
SubscriptionType subscriptionType
) {
this.groupEpoch = groupEpoch;
this.subscriptionMetadata = Objects.requireNonNull(subscriptionMetadata);
this.subscriptionType = Objects.requireNonNull(subscriptionType);
}
}
Expand Down Expand Up @@ -2231,7 +2228,6 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
);

int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
SubscriptionType subscriptionType = group.subscriptionType();

if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
Expand All @@ -2247,7 +2243,6 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
);

groupEpoch = result.groupEpoch;
subscriptionMetadata = result.subscriptionMetadata;
subscriptionType = result.subscriptionType;
}

Expand All @@ -2262,7 +2257,6 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
groupEpoch,
member,
updatedMember,
subscriptionMetadata,
subscriptionType,
records
);
Expand Down Expand Up @@ -2373,7 +2367,6 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
}

int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
SubscriptionType subscriptionType = group.subscriptionType();
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);

Expand Down Expand Up @@ -2416,7 +2409,6 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
);

groupEpoch = result.groupEpoch;
subscriptionMetadata = result.subscriptionMetadata;
subscriptionType = result.subscriptionType;
}

Expand All @@ -2431,7 +2423,6 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
groupEpoch,
member,
updatedMember,
subscriptionMetadata,
subscriptionType,
records
);
Expand Down Expand Up @@ -2605,7 +2596,6 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
group,
groupEpoch,
updatedMember,
subscriptionMetadata,
subscriptionType,
records
);
Expand Down Expand Up @@ -3624,29 +3614,26 @@ private UpdateSubscriptionMetadataResult updateSubscriptionMetadata(

return new UpdateSubscriptionMetadataResult(
groupEpoch,
subscriptionMetadata,
subscriptionType
);
}

/**
* Updates the target assignment according to the updated member and subscription metadata.
*
* @param group The ConsumerGroup.
* @param groupEpoch The group epoch.
* @param member The existing member.
* @param updatedMember The updated member.
* @param subscriptionMetadata The subscription metadata.
* @param subscriptionType The group subscription type.
* @param records The list to accumulate any new records.
* @param group The ConsumerGroup.
* @param groupEpoch The group epoch.
* @param member The existing member.
* @param updatedMember The updated member.
* @param subscriptionType The group subscription type.
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
private Assignment updateTargetAssignment(
ConsumerGroup group,
int groupEpoch,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
Map<String, TopicMetadata> subscriptionMetadata,
SubscriptionType subscriptionType,
List<CoordinatorRecord> records
) {
Expand All @@ -3659,11 +3646,10 @@ private Assignment updateTargetAssignment(
new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.withMetadataImage(metadataImage)
.withResolvedRegularExpressions(group.resolvedRegularExpressions())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);

Expand Down Expand Up @@ -3706,19 +3692,17 @@ private Assignment updateTargetAssignment(
/**
* Updates the target assignment according to the updated member and subscription metadata.
*
* @param group The ShareGroup.
* @param groupEpoch The group epoch.
* @param updatedMember The updated member.
* @param subscriptionMetadata The subscription metadata.
* @param subscriptionType The group subscription type.
* @param records The list to accumulate any new records.
* @param group The ShareGroup.
* @param groupEpoch The group epoch.
* @param updatedMember The updated member.
* @param subscriptionType The group subscription type.
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
private Assignment updateTargetAssignment(
ShareGroup group,
int groupEpoch,
ShareGroupMember updatedMember,
Map<String, TopicMetadata> subscriptionMetadata,
SubscriptionType subscriptionType,
List<CoordinatorRecord> records
) {
Expand All @@ -3730,12 +3714,11 @@ private Assignment updateTargetAssignment(
TargetAssignmentBuilder.ShareTargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(), groupEpoch, shareGroupAssignor)
.withMembers(group.members())
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withTopicAssignablePartitionsMap(initializedTopicPartitions)
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.withMetadataImage(metadataImage)
.addOrUpdateMember(updatedMember.memberId(), updatedMember);

long startTimeMs = time.milliseconds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,42 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.metadata.PartitionRegistration;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* The subscribed topic metadata class is used by the {@link PartitionAssignor} to obtain
* topic and partition metadata for the topics that the modern group is subscribed to.
*/
public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber {
/**
* The topic Ids mapped to their corresponding {@link TopicMetadata}
* object, which contains topic and partition metadata.
* The map of topic Ids to the set of allowed partitions for each topic.
* If this is empty, all partitions are allowed.
*/
private final Map<Uuid, TopicMetadata> topicMetadata;
private final Map<Uuid, Set<Integer>> topicPartitionAllowedMap;
private final Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield @smjn I changed this to Optional, because topicPartitionAllowedMap cannot be null in production code. The TargetAssignmentBuilder set default value as empty map [0] and always input the value to SubscribedTopicDescriberImpl [1]. We still need a value to indicate all partitions are assignable. I think it's better to use Optional.empty, so I change the default value in TargetAssignmentBuilder to Optional.empty. Developers still can use withTopicAssignablePartitionsMap to set a map value, but the value cannot be null, because it will be used in Optional.of. WDYT?

[0]

/**
* Topic partition assignable map.
*/
private Map<Uuid, Set<Integer>> topicAssignablePartitionsMap = new HashMap<>();

[1]

// Compute the assignment.
GroupAssignment newGroupAssignment = assignor.assign(
new GroupSpecImpl(
Collections.unmodifiableMap(memberSpecs),
subscriptionType,
invertedTargetAssignment
),
new SubscribedTopicDescriberImpl(topicMetadataMap, topicAssignablePartitionsMap)
);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, with Optional value, it's easier to include topicAssignablePartitionsMap in equals, hashCode, and toString functions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a related PR in this area #19739. It's correcting a mistake.


public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata) {
this(topicMetadata, null);
}
/**
* The metadata image that contains the latest metadata information.
*/
private final MetadataImage metadataImage;

public SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata> topicMetadata, Map<Uuid, Set<Integer>> topicPartitionAllowedMap) {
this.topicMetadata = Objects.requireNonNull(topicMetadata);
this.topicPartitionAllowedMap = topicPartitionAllowedMap;
public SubscribedTopicDescriberImpl(MetadataImage metadataImage) {
this(metadataImage, Optional.empty());
}

/**
* Map of topic Ids to topic metadata.
*
* @return The map of topic Ids to topic metadata.
*/
public Map<Uuid, TopicMetadata> topicMetadata() {
return this.topicMetadata;
public SubscribedTopicDescriberImpl(
MetadataImage metadataImage,
Optional<Map<Uuid, Set<Integer>>> topicPartitionAllowedMap
) {
this.metadataImage = Objects.requireNonNull(metadataImage);
this.topicPartitionAllowedMap = Objects.requireNonNull(topicPartitionAllowedMap);
}

/**
Expand All @@ -65,8 +66,8 @@ public Map<Uuid, TopicMetadata> topicMetadata() {
*/
@Override
public int numPartitions(Uuid topicId) {
TopicMetadata topic = this.topicMetadata.get(topicId);
return topic == null ? -1 : topic.numPartitions();
TopicImage topicImage = this.metadataImage.topics().getTopic(topicId);
return topicImage == null ? -1 : topicImage.partitions().size();
}

/**
Expand All @@ -79,49 +80,65 @@ public int numPartitions(Uuid topicId) {
*/
@Override
public Set<String> racksForPartition(Uuid topicId, int partition) {
TopicImage topic = metadataImage.topics().getTopic(topicId);
if (topic != null) {
PartitionRegistration partitionRegistration = topic.partitions().get(partition);
if (partitionRegistration != null) {
Set<String> racks = new HashSet<>();
for (int replica : partitionRegistration.replicas) {
// Only add the rack if it is available for the broker/replica.
metadataImage.cluster().broker(replica).rack().ifPresent(racks::add);
}
return racks;
}
}
return Set.of();
}

/**
* Returns a set of assignable partitions from the topic metadata.
* If the allowed partition map is null, all the partitions in the corresponding
* topic metadata are returned for the argument topic id. If allowed map is empty,
* Returns a set of assignable partitions from the metadata image.
* If the allowed partition map is Optional.empty(), all the partitions in the corresponding
* topic image are returned for the argument topic id. If allowed map is empty,
* empty set is returned.
*
* @param topicId The uuid of the topic
* @return Set of integers if assignable partitions available, empty otherwise.
*/
@Override
public Set<Integer> assignablePartitions(Uuid topicId) {
TopicMetadata topic = this.topicMetadata.get(topicId);
TopicImage topic = metadataImage.topics().getTopic(topicId);
if (topic == null) {
return Set.of();
}

if (topicPartitionAllowedMap == null) {
return IntStream.range(0, topic.numPartitions()).boxed().collect(Collectors.toUnmodifiableSet());
if (topicPartitionAllowedMap.isEmpty()) {
return topic.partitions().keySet();
}

return topicPartitionAllowedMap.getOrDefault(topicId, Set.of());
return topicPartitionAllowedMap.get().getOrDefault(topicId, Set.of());
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubscribedTopicDescriberImpl that = (SubscribedTopicDescriberImpl) o;
return topicMetadata.equals(that.topicMetadata);
if (!topicPartitionAllowedMap.equals(that.topicPartitionAllowedMap)) return false;
return metadataImage.equals(that.metadataImage);
}

@Override
public int hashCode() {
return topicMetadata.hashCode();
int result = metadataImage.hashCode();
result = 31 * result + topicPartitionAllowedMap.hashCode();
return result;
}

@Override
public String toString() {
return "SubscribedTopicMetadata(" +
"topicMetadata=" + topicMetadata +
"metadataImage=" + metadataImage +
", topicPartitionAllowedMap=" + topicPartitionAllowedMap +
')';
}
}
Loading