diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index d02fd9d3a7dae..7d7e543bb3d9e 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -20,7 +20,6 @@ import kafka.network.RequestChannel; import kafka.server.AuthHelper; import kafka.server.KafkaConfig; - import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData; @@ -29,7 +28,6 @@ import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest; -import org.apache.kafka.common.resource.Resource; import org.apache.kafka.metadata.MetadataCache; import java.util.HashSet; @@ -40,27 +38,93 @@ import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; import static org.apache.kafka.common.resource.ResourceType.TOPIC; +/** + * Handles the DescribeTopicPartitionsRequest, which provides metadata about topic partitions in a Kafka cluster. + * This handler is responsible for managing authorization checks, cursor validation, and constructing the response data + * for topics that are authorized for the requestor. + */ public class DescribeTopicPartitionsRequestHandler { - MetadataCache metadataCache; - AuthHelper authHelper; - KafkaConfig config; - + private final MetadataCache metadataCache; + private final AuthHelper authHelper; + private final KafkaConfig config; + + /** + * Constructs a new DescribeTopicPartitionsRequestHandler. + * + * @param metadataCache The metadata cache used to retrieve topic information. + * @param authHelper The authentication helper used to check the authorization for topics. + * @param config The Kafka configuration. + */ public DescribeTopicPartitionsRequestHandler( - MetadataCache metadataCache, - AuthHelper authHelper, - KafkaConfig config - ) { + final MetadataCache metadataCache, + final AuthHelper authHelper, + final KafkaConfig config) { this.metadataCache = metadataCache; this.authHelper = authHelper; this.config = config; } - public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) { - DescribeTopicPartitionsRequestData request = ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data(); - Set topics = new HashSet<>(); - boolean fetchAllTopics = request.topics().isEmpty(); - DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); - String cursorTopicName = cursor != null ? cursor.topicName() : ""; + /** + * Handles the DescribeTopicPartitionsRequest and constructs a response containing metadata about topic partitions. + * + * @param abstractRequest The request containing the metadata request for topic partitions. + * @return A DescribeTopicPartitionsResponseData containing metadata for the requested topic partitions. + */ + public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( + final RequestChannel.Request abstractRequest) { + final DescribeTopicPartitionsRequestData requestData = getRequestData(abstractRequest); + + // Get topics to describe based on request data (all topics or specific ones) + final String cursorTopicName = requestData.cursor() != null ? cursor.topicName() : ""; + final Set topicsToDescribe = getTopicsToDescribe(requestData, cursorTopicName); + + // Validate cursor if provided in the request + validateCursor(requestData.cursor(), topicsToDescribe); + + // Handle topics that are unauthorized for the Describe operation + final Set unauthorizedForDescribeTopicMetadata = new HashSet<>(); + final Stream authorizedTopicsStream = filterAuthorizedTopics( + abstractRequest, + topicsToDescribe, + unauthorizedForDescribeTopicMetadata, + requestData.topics().isEmpty() + ); + + // Construct the response for authorized topics + final DescribeTopicPartitionsResponseData response = + buildResponse(authorizedTopicsStream, abstractRequest, requestData); + + // Add unauthorized topics to the response to avoid disclosing their existence + response.topics().addAll(unauthorizedForDescribeTopicMetadata); + return response; + } + + /** + * Extracts the request data from the abstract request. + * + * @param abstractRequest The incoming request. + * @return The request data for the DescribeTopicPartitionsRequest. + */ + private DescribeTopicPartitionsRequestData getRequestData(final RequestChannel.Request abstractRequest) { + return ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data(); + } + + /** + * Determines the list of topics to describe based on the provided request data. + * It can either fetch all topics or only the ones specified in the request. + * + * @param requestData The request data containing the list of topics. + * @return A set of topics to describe. + */ + private Set getTopicsToDescribe( + final DescribeTopicPartitionsRequestData requestData, + final String cursorTopicName + ) { + final Set topics = new HashSet<>(); + final boolean fetchAllTopics = requestData.topics().isEmpty(); + final DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); + + // If no topics are specified, fetch all topics that come after the cursor topic if (fetchAllTopics) { metadataCache.getAllTopics().forEach(topicName -> { if (topicName.compareTo(cursorTopicName) >= 0) { @@ -68,7 +132,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( } }); } else { - request.topics().forEach(topic -> { + requestData.topics().forEach(topic -> { String topicName = topic.name(); if (topicName.compareTo(cursorTopicName) >= 0) { topics.add(topicName); @@ -80,55 +144,110 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); } } + return topics; + } - if (cursor != null && cursor.partitionIndex() < 0) { - // The partition id in cursor must be valid. - throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor); - } + /** + * Validates the cursor from the request. If the cursor is provided, it checks that the partition index is valid + * and that the topic in the cursor is included in the list of topics. + * + * @param cursor The cursor for pagination, if provided in the request. + * @param topicsToDescribe The list of topics that the requestor is authorized to describe. + */ + private void validateCursor( + final DescribeTopicPartitionsRequestData.Cursor cursor, + final Set topicsToDescribe + ) { + if (cursor != null) { + // Validate that the partition index in the cursor is valid + if (cursor.partitionIndex() < 0) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor); + } - // Do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not - Set unauthorizedForDescribeTopicMetadata = new HashSet<>(); + // Ensure the cursor topic is included in the list of topics + if (!topicsToDescribe.contains(cursor.topicName())) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); + } + } + } - Stream authorizedTopicsStream = topics.stream().sorted().filter(topicName -> { - boolean isAuthorized = authHelper.authorize( - abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1); + /** + * Filters the topics based on authorization. It ensures that only topics the requestor is authorized to describe are included. + * Unauthorized topics are added to the unauthorized topics list. + * + * @param abstractRequest The incoming request. + * @param topicsToDescribe The list of topics to filter. + * @param unauthorizedForDescribeTopicMetadata A set to store topics that the requestor is unauthorized to describe. + * @param fetchAllTopics A flag indicating whether to fetch all topics or only specified ones. + * @return A stream of authorized topic names. + */ + private Stream filterAuthorizedTopics( + final RequestChannel.Request abstractRequest, + final Set topicsToDescribe, + final Set unauthorizedForDescribeTopicMetadata, + final boolean fetchAllTopics + ) { + return topicsToDescribe.stream().sorted().filter(topicName -> { + // Check authorization for each topic + final boolean isAuthorized = authHelper.authorize(abstractRequest.context(), + DESCRIBE, TOPIC, topicName, true, true, 1 + ); if (!fetchAllTopics && !isAuthorized) { - // We should not return topicId when on unauthorized error, so we return zero uuid. + // If unauthorized, add the topic to the unauthorized list with an empty UUID unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic( - Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) + Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) ); } return isAuthorized; }); + } - DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse( - authorizedTopicsStream.iterator(), - abstractRequest.context().listenerName, - (String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0, - Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1), - fetchAllTopics + /** + * Constructs the response data based on authorized topics. + * + * @param authorizedTopicsStream A stream of authorized topic names. + * @param abstractRequest The incoming request. + * @param requestData The request data containing the cursor and partition limits. + * @return The constructed response data with metadata for the authorized topics. + */ + private DescribeTopicPartitionsResponseData buildResponse( + final Stream authorizedTopicsStream, + final RequestChannel.Request abstractRequest, + final DescribeTopicPartitionsRequestData requestData, + final String cursorTopicName + ) { + return metadataCache.describeTopicResponse( + authorizedTopicsStream.iterator(), + abstractRequest.context().listenerName, + (String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0, + Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1), + requestData.topics().isEmpty() ); - - // get topic authorized operations - response.topics().forEach(topicData -> - topicData.setTopicAuthorizedOperations(authHelper.authorizedOperations(abstractRequest, new Resource(TOPIC, topicData.name())))); - - response.topics().addAll(unauthorizedForDescribeTopicMetadata); - return response; } + /** + * Constructs a DescribeTopicPartitionsResponseTopic object, which contains metadata about a single topic, + * including error codes, topic ID, partition data, and whether the topic is internal. + * + * @param error The error that occurred while accessing the topic. + * @param topic The name of the topic. + * @param topicId The unique identifier for the topic. + * @param isInternal Whether the topic is internal or not. + * @param partitionData The partition data associated with the topic. + * @return A DescribeTopicPartitionsResponseTopic object with the specified metadata. + */ private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic( - Errors error, - String topic, - Uuid topicId, - Boolean isInternal, - List partitionData + final Errors error, + final String topic, + final Uuid topicId, + final Boolean isInternal, + final List partitionData ) { return new DescribeTopicPartitionsResponseTopic() - .setErrorCode(error.code()) - .setName(topic) - .setTopicId(topicId) - .setIsInternal(isInternal) - .setPartitions(partitionData); + .setErrorCode(error.code()) + .setName(topic) + .setTopicId(topicId) + .setIsInternal(isInternal) + .setPartitions(partitionData); } }