From f1d3bf3c88d8d8ca1bfb9d5a5a115f33aeae54e6 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 19:35:13 +0100 Subject: [PATCH 01/19] Refactor DescribeTopicPartitionsRequestHandler: Improve readability and add code documentation --- ...DescribeTopicPartitionsRequestHandler.java | 196 +++++++++++++----- 1 file changed, 142 insertions(+), 54 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index d02fd9d3a7dae..9b05fed97eaab 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -20,7 +20,6 @@ import kafka.network.RequestChannel; import kafka.server.AuthHelper; import kafka.server.KafkaConfig; - import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData; @@ -40,27 +39,79 @@ import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; import static org.apache.kafka.common.resource.ResourceType.TOPIC; +/** + * Handles the DescribeTopicPartitionsRequest, which provides metadata about topic partitions in a Kafka cluster. + * This handler is responsible for managing authorization checks, cursor validation, and constructing the response data + * for topics that are authorized for the requestor. + */ public class DescribeTopicPartitionsRequestHandler { - MetadataCache metadataCache; - AuthHelper authHelper; - KafkaConfig config; - - public DescribeTopicPartitionsRequestHandler( - MetadataCache metadataCache, - AuthHelper authHelper, - KafkaConfig config - ) { + private final MetadataCache metadataCache; + private final AuthHelper authHelper; + private final KafkaConfig config; + + /** + * Constructs a new DescribeTopicPartitionsRequestHandler. + * + * @param metadataCache The metadata cache used to retrieve topic information. + * @param authHelper The authentication helper used to check the authorization for topics. + * @param config The Kafka configuration. + */ + public DescribeTopicPartitionsRequestHandler(MetadataCache metadataCache, AuthHelper authHelper, KafkaConfig config) { this.metadataCache = metadataCache; this.authHelper = authHelper; this.config = config; } + /** + * Handles the DescribeTopicPartitionsRequest and constructs a response containing metadata about topic partitions. + * + * @param abstractRequest The request containing the metadata request for topic partitions. + * @return A DescribeTopicPartitionsResponseData containing metadata for the requested topic partitions. + */ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) { - DescribeTopicPartitionsRequestData request = ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data(); + DescribeTopicPartitionsRequestData requestData = getRequestData(abstractRequest); + + // Get topics to describe based on request data (all topics or specific ones) + Set topicsToDescribe = getTopicsToDescribe(requestData); + + // Validate cursor if provided in the request + validateCursor(requestData.cursor(), topicsToDescribe); + + // Handle topics that are unauthorized for the Describe operation + Set unauthorizedTopics = new HashSet<>(); + Stream authorizedTopicsStream = filterAuthorizedTopics(abstractRequest, topicsToDescribe, unauthorizedTopics, requestData.topics().isEmpty()); + + // Construct the response for authorized topics + DescribeTopicPartitionsResponseData response = buildResponse(authorizedTopicsStream, abstractRequest, requestData); + + // Add unauthorized topics to the response to avoid disclosing their existence + response.topics().addAll(unauthorizedTopics); + return response; + } + + /** + * Extracts the request data from the abstract request. + * + * @param abstractRequest The incoming request. + * @return The request data for the DescribeTopicPartitionsRequest. + */ + private DescribeTopicPartitionsRequestData getRequestData(RequestChannel.Request abstractRequest) { + return ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data(); + } + + /** + * Determines the list of topics to describe based on the provided request data. + * It can either fetch all topics or only the ones specified in the request. + * + * @param requestData The request data containing the list of topics. + * @return A set of topics to describe. + */ + private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData requestData) { Set topics = new HashSet<>(); - boolean fetchAllTopics = request.topics().isEmpty(); - DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); - String cursorTopicName = cursor != null ? cursor.topicName() : ""; + boolean fetchAllTopics = requestData.topics().isEmpty(); + String cursorTopicName = requestData.cursor() != null ? requestData.cursor().topicName() : ""; + + // If no topics are specified, fetch all topics that come after the cursor topic if (fetchAllTopics) { metadataCache.getAllTopics().forEach(topicName -> { if (topicName.compareTo(cursorTopicName) >= 0) { @@ -68,67 +119,104 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( } }); } else { - request.topics().forEach(topic -> { + // Add the requested topics to the set, ensuring they come after the cursor topic + requestData.topics().forEach(topic -> { String topicName = topic.name(); if (topicName.compareTo(cursorTopicName) >= 0) { topics.add(topicName); } }); - if (cursor != null && !topics.contains(cursor.topicName())) { - // The topic in cursor must be included in the topic list if provided. - throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); + // Ensure the cursor topic is included in the list of topics + if (requestData.cursor() != null && !topics.contains(requestData.cursor().topicName())) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + requestData.cursor().topicName()); } } + return topics; + } - if (cursor != null && cursor.partitionIndex() < 0) { - // The partition id in cursor must be valid. - throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor); - } + /** + * Validates the cursor from the request. If the cursor is provided, it checks that the partition index is valid + * and that the topic in the cursor is included in the list of topics. + * + * @param cursor The cursor for pagination, if provided in the request. + * @param topicsToDescribe The list of topics that the requestor is authorized to describe. + */ + private void validateCursor(DescribeTopicPartitionsRequestData.Cursor cursor, Set topicsToDescribe) { + if (cursor != null) { + // Validate that the partition index in the cursor is valid + if (cursor.partitionIndex() < 0) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor); + } - // Do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not - Set unauthorizedForDescribeTopicMetadata = new HashSet<>(); + // Ensure the cursor topic is included in the list of topics + if (!topicsToDescribe.contains(cursor.topicName())) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); + } + } + } - Stream authorizedTopicsStream = topics.stream().sorted().filter(topicName -> { - boolean isAuthorized = authHelper.authorize( - abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1); + /** + * Filters the topics based on authorization. It ensures that only topics the requestor is authorized to describe are included. + * Unauthorized topics are added to the unauthorized topics list. + * + * @param abstractRequest The incoming request. + * @param topicsToDescribe The list of topics to filter. + * @param unauthorizedTopics A set to store topics that the requestor is unauthorized to describe. + * @param fetchAllTopics A flag indicating whether to fetch all topics or only specified ones. + * @return A stream of authorized topic names. + */ + private Stream filterAuthorizedTopics(RequestChannel.Request abstractRequest, Set topicsToDescribe, + Set unauthorizedTopics, boolean fetchAllTopics) { + return topicsToDescribe.stream().sorted().filter(topicName -> { + // Check authorization for each topic + boolean isAuthorized = authHelper.authorize(abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1); if (!fetchAllTopics && !isAuthorized) { - // We should not return topicId when on unauthorized error, so we return zero uuid. - unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic( - Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) + // If unauthorized, add the topic to the unauthorized list with an empty UUID + unauthorizedTopics.add(describeTopicPartitionsResponseTopic( + Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) ); } return isAuthorized; }); + } - DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse( - authorizedTopicsStream.iterator(), - abstractRequest.context().listenerName, - (String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0, - Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1), - fetchAllTopics + /** + * Constructs the response data based on authorized topics. + * + * @param authorizedTopicsStream A stream of authorized topic names. + * @param abstractRequest The incoming request. + * @param requestData The request data containing the cursor and partition limits. + * @return The constructed response data with metadata for the authorized topics. + */ + private DescribeTopicPartitionsResponseData buildResponse(Stream authorizedTopicsStream, RequestChannel.Request abstractRequest, DescribeTopicPartitionsRequestData requestData) { + return metadataCache.describeTopicResponse( + authorizedTopicsStream.iterator(), + abstractRequest.context().listenerName, + topicName -> topicName.equals(requestData.cursor() != null ? requestData.cursor().topicName() : "") ? requestData.cursor().partitionIndex() : 0, + Math.max(Math.min(config.maxRequestPartitionSizeLimit(), requestData.responsePartitionLimit()), 1), + requestData.topics().isEmpty() ); - - // get topic authorized operations - response.topics().forEach(topicData -> - topicData.setTopicAuthorizedOperations(authHelper.authorizedOperations(abstractRequest, new Resource(TOPIC, topicData.name())))); - - response.topics().addAll(unauthorizedForDescribeTopicMetadata); - return response; } + /** + * Constructs a DescribeTopicPartitionsResponseTopic object, which contains metadata about a single topic, + * including error codes, topic ID, partition data, and whether the topic is internal. + * + * @param error The error that occurred while accessing the topic. + * @param topic The name of the topic. + * @param topicId The unique identifier for the topic. + * @param isInternal Whether the topic is internal or not. + * @param partitionData The partition data associated with the topic. + * @return A DescribeTopicPartitionsResponseTopic object with the specified metadata. + */ private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic( - Errors error, - String topic, - Uuid topicId, - Boolean isInternal, - List partitionData - ) { + Errors error, String topic, Uuid topicId, Boolean isInternal, List partitionData) { return new DescribeTopicPartitionsResponseTopic() - .setErrorCode(error.code()) - .setName(topic) - .setTopicId(topicId) - .setIsInternal(isInternal) - .setPartitions(partitionData); + .setErrorCode(error.code()) + .setName(topic) + .setTopicId(topicId) + .setIsInternal(isInternal) + .setPartitions(partitionData); } } From fef54507d3b29555883c8f5b835c4bc7f8a9d783 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 19:41:58 +0100 Subject: [PATCH 02/19] update the describeTopicPartitionsResponseTopic method --- .../handlers/DescribeTopicPartitionsRequestHandler.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 9b05fed97eaab..970f65887a07a 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -211,7 +211,12 @@ private DescribeTopicPartitionsResponseData buildResponse(Stream authori * @return A DescribeTopicPartitionsResponseTopic object with the specified metadata. */ private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic( - Errors error, String topic, Uuid topicId, Boolean isInternal, List partitionData) { + Errors error, + String topic, + Uuid topicId, + Boolean isInternal, + List partitionData + ) { return new DescribeTopicPartitionsResponseTopic() .setErrorCode(error.code()) .setName(topic) From 7a1fc70a56965793e8ccc37a7c0e8a8b8a290905 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 19:59:49 +0100 Subject: [PATCH 03/19] update on cursor validation --- .../handlers/DescribeTopicPartitionsRequestHandler.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 970f65887a07a..2244d92a51798 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -127,10 +127,9 @@ private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData reque } }); - // Ensure the cursor topic is included in the list of topics - if (requestData.cursor() != null && !topics.contains(requestData.cursor().topicName())) { - throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + requestData.cursor().topicName()); - } + if (cursor != null && !topics.contains(cursor.topicName())) { + // The topic in cursor must be included in the topic list if provided. + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); } return topics; } From a9ff59eeb8e36946846dbfe5b1b1e33a912b943b Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:01:51 +0100 Subject: [PATCH 04/19] update on cursor validation --- .../DescribeTopicPartitionsRequestHandler.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 2244d92a51798..ba5997e2e4c00 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -78,14 +78,14 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( validateCursor(requestData.cursor(), topicsToDescribe); // Handle topics that are unauthorized for the Describe operation - Set unauthorizedTopics = new HashSet<>(); - Stream authorizedTopicsStream = filterAuthorizedTopics(abstractRequest, topicsToDescribe, unauthorizedTopics, requestData.topics().isEmpty()); + Set unauthorizedForDescribeTopicMetadata = new HashSet<>(); + Stream authorizedTopicsStream = filterAuthorizedTopics(abstractRequest, topicsToDescribe, unauthorizedForDescribeTopicMetadata, requestData.topics().isEmpty()); // Construct the response for authorized topics DescribeTopicPartitionsResponseData response = buildResponse(authorizedTopicsStream, abstractRequest, requestData); // Add unauthorized topics to the response to avoid disclosing their existence - response.topics().addAll(unauthorizedTopics); + response.topics().addAll(unauthorizedForDescribeTopicMetadata); return response; } @@ -161,18 +161,18 @@ private void validateCursor(DescribeTopicPartitionsRequestData.Cursor cursor, Se * * @param abstractRequest The incoming request. * @param topicsToDescribe The list of topics to filter. - * @param unauthorizedTopics A set to store topics that the requestor is unauthorized to describe. + * @param unauthorizedForDescribeTopicMetadata A set to store topics that the requestor is unauthorized to describe. * @param fetchAllTopics A flag indicating whether to fetch all topics or only specified ones. * @return A stream of authorized topic names. */ private Stream filterAuthorizedTopics(RequestChannel.Request abstractRequest, Set topicsToDescribe, - Set unauthorizedTopics, boolean fetchAllTopics) { + Set unauthorizedForDescribeTopicMetadata, boolean fetchAllTopics) { return topicsToDescribe.stream().sorted().filter(topicName -> { // Check authorization for each topic boolean isAuthorized = authHelper.authorize(abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1); if (!fetchAllTopics && !isAuthorized) { // If unauthorized, add the topic to the unauthorized list with an empty UUID - unauthorizedTopics.add(describeTopicPartitionsResponseTopic( + unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic( Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) ); } From 1f17a3b598485a662f993fbb4be74066a88a7cdb Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:05:53 +0100 Subject: [PATCH 05/19] update on cursor validation --- .../server/handlers/DescribeTopicPartitionsRequestHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index ba5997e2e4c00..0d8a1d87807be 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -127,7 +127,7 @@ private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData reque } }); - if (cursor != null && !topics.contains(cursor.topicName())) { + if (requestData.cursor() != null && !topics.contains(requestData.cursor().topicName())) { // The topic in cursor must be included in the topic list if provided. throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); } From d8751356b60d970f4c238126a2cb5b189be9e508 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:08:18 +0100 Subject: [PATCH 06/19] added final to varibles that should not change in the method --- .../DescribeTopicPartitionsRequestHandler.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 0d8a1d87807be..ec66661d28cd6 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -69,20 +69,20 @@ public DescribeTopicPartitionsRequestHandler(MetadataCache metadataCache, AuthHe * @return A DescribeTopicPartitionsResponseData containing metadata for the requested topic partitions. */ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) { - DescribeTopicPartitionsRequestData requestData = getRequestData(abstractRequest); + final DescribeTopicPartitionsRequestData requestData = getRequestData(abstractRequest); // Get topics to describe based on request data (all topics or specific ones) - Set topicsToDescribe = getTopicsToDescribe(requestData); + final Set topicsToDescribe = getTopicsToDescribe(requestData); // Validate cursor if provided in the request validateCursor(requestData.cursor(), topicsToDescribe); // Handle topics that are unauthorized for the Describe operation - Set unauthorizedForDescribeTopicMetadata = new HashSet<>(); - Stream authorizedTopicsStream = filterAuthorizedTopics(abstractRequest, topicsToDescribe, unauthorizedForDescribeTopicMetadata, requestData.topics().isEmpty()); + final Set unauthorizedForDescribeTopicMetadata = new HashSet<>(); + final Stream authorizedTopicsStream = filterAuthorizedTopics(abstractRequest, topicsToDescribe, unauthorizedForDescribeTopicMetadata, requestData.topics().isEmpty()); // Construct the response for authorized topics - DescribeTopicPartitionsResponseData response = buildResponse(authorizedTopicsStream, abstractRequest, requestData); + final DescribeTopicPartitionsResponseData response = buildResponse(authorizedTopicsStream, abstractRequest, requestData); // Add unauthorized topics to the response to avoid disclosing their existence response.topics().addAll(unauthorizedForDescribeTopicMetadata); From 8dbc97b4f372834288c22bb02fdd827d241cf734 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:19:46 +0100 Subject: [PATCH 07/19] cleanup code with stylesheet standard --- ...DescribeTopicPartitionsRequestHandler.java | 204 ++++++++++-------- 1 file changed, 113 insertions(+), 91 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index ec66661d28cd6..48e1e4a84f774 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -53,10 +53,13 @@ public class DescribeTopicPartitionsRequestHandler { * Constructs a new DescribeTopicPartitionsRequestHandler. * * @param metadataCache The metadata cache used to retrieve topic information. - * @param authHelper The authentication helper used to check the authorization for topics. - * @param config The Kafka configuration. + * @param authHelper The authentication helper used to check the authorization for topics. + * @param config The Kafka configuration. */ - public DescribeTopicPartitionsRequestHandler(MetadataCache metadataCache, AuthHelper authHelper, KafkaConfig config) { + public DescribeTopicPartitionsRequestHandler( + MetadataCache metadataCache, + AuthHelper authHelper, + KafkaConfig config) { this.metadataCache = metadataCache; this.authHelper = authHelper; this.config = config; @@ -68,7 +71,8 @@ public DescribeTopicPartitionsRequestHandler(MetadataCache metadataCache, AuthHe * @param abstractRequest The request containing the metadata request for topic partitions. * @return A DescribeTopicPartitionsResponseData containing metadata for the requested topic partitions. */ - public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) { + public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( + RequestChannel.Request abstractRequest) { final DescribeTopicPartitionsRequestData requestData = getRequestData(abstractRequest); // Get topics to describe based on request data (all topics or specific ones) @@ -79,10 +83,16 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( // Handle topics that are unauthorized for the Describe operation final Set unauthorizedForDescribeTopicMetadata = new HashSet<>(); - final Stream authorizedTopicsStream = filterAuthorizedTopics(abstractRequest, topicsToDescribe, unauthorizedForDescribeTopicMetadata, requestData.topics().isEmpty()); + final Stream authorizedTopicsStream = filterAuthorizedTopics( + abstractRequest, + topicsToDescribe, + unauthorizedForDescribeTopicMetadata, + requestData.topics().isEmpty() + ); // Construct the response for authorized topics - final DescribeTopicPartitionsResponseData response = buildResponse(authorizedTopicsStream, abstractRequest, requestData); + final DescribeTopicPartitionsResponseData response = + buildResponse(authorizedTopicsStream, abstractRequest, requestData); // Add unauthorized topics to the response to avoid disclosing their existence response.topics().addAll(unauthorizedForDescribeTopicMetadata); @@ -129,98 +139,110 @@ private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData reque if (requestData.cursor() != null && !topics.contains(requestData.cursor().topicName())) { // The topic in cursor must be included in the topic list if provided. - throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain " + + "the cursor topic: " + cursor.topicName()); + } + return topics; } - return topics; - } - /** - * Validates the cursor from the request. If the cursor is provided, it checks that the partition index is valid - * and that the topic in the cursor is included in the list of topics. - * - * @param cursor The cursor for pagination, if provided in the request. - * @param topicsToDescribe The list of topics that the requestor is authorized to describe. - */ - private void validateCursor(DescribeTopicPartitionsRequestData.Cursor cursor, Set topicsToDescribe) { - if (cursor != null) { - // Validate that the partition index in the cursor is valid - if (cursor.partitionIndex() < 0) { - throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor); - } + /** + * Validates the cursor from the request. If the cursor is provided, it checks that the partition index is valid + * and that the topic in the cursor is included in the list of topics. + * + * @param cursor The cursor for pagination, if provided in the request. + * @param topicsToDescribe The list of topics that the requestor is authorized to describe. + */ + private void validateCursor (DescribeTopicPartitionsRequestData.Cursor cursor, Set < String > topicsToDescribe){ + if (cursor != null) { + // Validate that the partition index in the cursor is valid + if (cursor.partitionIndex() < 0) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition " + + "must be valid: " + cursor); + } - // Ensure the cursor topic is included in the list of topics - if (!topicsToDescribe.contains(cursor.topicName())) { - throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); + // Ensure the cursor topic is included in the list of topics + if (!topicsToDescribe.contains(cursor.topicName())) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the " + + "cursor topic: " + cursor.topicName()); + } } } - } - /** - * Filters the topics based on authorization. It ensures that only topics the requestor is authorized to describe are included. - * Unauthorized topics are added to the unauthorized topics list. - * - * @param abstractRequest The incoming request. - * @param topicsToDescribe The list of topics to filter. - * @param unauthorizedForDescribeTopicMetadata A set to store topics that the requestor is unauthorized to describe. - * @param fetchAllTopics A flag indicating whether to fetch all topics or only specified ones. - * @return A stream of authorized topic names. - */ - private Stream filterAuthorizedTopics(RequestChannel.Request abstractRequest, Set topicsToDescribe, - Set unauthorizedForDescribeTopicMetadata, boolean fetchAllTopics) { - return topicsToDescribe.stream().sorted().filter(topicName -> { - // Check authorization for each topic - boolean isAuthorized = authHelper.authorize(abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1); - if (!fetchAllTopics && !isAuthorized) { - // If unauthorized, add the topic to the unauthorized list with an empty UUID - unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic( - Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) + /** + * Filters the topics based on authorization. It ensures that only topics the requestor is authorized to describe are included. + * Unauthorized topics are added to the unauthorized topics list. + * + * @param abstractRequest The incoming request. + * @param topicsToDescribe The list of topics to filter. + * @param unauthorizedForDescribeTopicMetadata A set to store topics that the requestor is unauthorized to describe. + * @param fetchAllTopics A flag indicating whether to fetch all topics or only specified ones. + * @return A stream of authorized topic names. + */ + private Stream filterAuthorizedTopics ( + RequestChannel.Request abstractRequest, + Set < String > topicsToDescribe, + Set < DescribeTopicPartitionsResponseTopic > unauthorizedForDescribeTopicMetadata, + boolean fetchAllTopics) + { + return topicsToDescribe.stream().sorted().filter(topicName -> { + // Check authorization for each topic + boolean isAuthorized = authHelper.authorize(abstractRequest.context(), + DESCRIBE, TOPIC, topicName, true, true, 1 ); - } - return isAuthorized; - }); - } + if (!fetchAllTopics && !isAuthorized) { + // If unauthorized, add the topic to the unauthorized list with an empty UUID + unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic( + Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) + ); + } + return isAuthorized; + }); + } - /** - * Constructs the response data based on authorized topics. - * - * @param authorizedTopicsStream A stream of authorized topic names. - * @param abstractRequest The incoming request. - * @param requestData The request data containing the cursor and partition limits. - * @return The constructed response data with metadata for the authorized topics. - */ - private DescribeTopicPartitionsResponseData buildResponse(Stream authorizedTopicsStream, RequestChannel.Request abstractRequest, DescribeTopicPartitionsRequestData requestData) { - return metadataCache.describeTopicResponse( - authorizedTopicsStream.iterator(), - abstractRequest.context().listenerName, - topicName -> topicName.equals(requestData.cursor() != null ? requestData.cursor().topicName() : "") ? requestData.cursor().partitionIndex() : 0, - Math.max(Math.min(config.maxRequestPartitionSizeLimit(), requestData.responsePartitionLimit()), 1), - requestData.topics().isEmpty() - ); - } + /** + * Constructs the response data based on authorized topics. + * + * @param authorizedTopicsStream A stream of authorized topic names. + * @param abstractRequest The incoming request. + * @param requestData The request data containing the cursor and partition limits. + * @return The constructed response data with metadata for the authorized topics. + */ + private DescribeTopicPartitionsResponseData buildResponse + (Stream < String > authorizedTopicsStream, RequestChannel.Request + abstractRequest, DescribeTopicPartitionsRequestData requestData){ + return metadataCache.describeTopicResponse( + authorizedTopicsStream.iterator(), + abstractRequest.context().listenerName, + topicName -> topicName.equals(requestData.cursor() != null + ? requestData.cursor().topicName() : "") ? requestData.cursor().partitionIndex() : 0, + Math.max(Math.min(config.maxRequestPartitionSizeLimit(), requestData.responsePartitionLimit()), 1), + requestData.topics().isEmpty() + ); + } - /** - * Constructs a DescribeTopicPartitionsResponseTopic object, which contains metadata about a single topic, - * including error codes, topic ID, partition data, and whether the topic is internal. - * - * @param error The error that occurred while accessing the topic. - * @param topic The name of the topic. - * @param topicId The unique identifier for the topic. - * @param isInternal Whether the topic is internal or not. - * @param partitionData The partition data associated with the topic. - * @return A DescribeTopicPartitionsResponseTopic object with the specified metadata. - */ - private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic( - Errors error, - String topic, - Uuid topicId, - Boolean isInternal, - List partitionData - ) { - return new DescribeTopicPartitionsResponseTopic() - .setErrorCode(error.code()) - .setName(topic) - .setTopicId(topicId) - .setIsInternal(isInternal) - .setPartitions(partitionData); + /** + * Constructs a DescribeTopicPartitionsResponseTopic object, which contains metadata about a single topic, + * including error codes, topic ID, partition data, and whether the topic is internal. + * + * @param error The error that occurred while accessing the topic. + * @param topic The name of the topic. + * @param topicId The unique identifier for the topic. + * @param isInternal Whether the topic is internal or not. + * @param partitionData The partition data associated with the topic. + * @return A DescribeTopicPartitionsResponseTopic object with the specified metadata. + */ + private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic ( + Errors error, + String topic, + Uuid topicId, + Boolean isInternal, + List < DescribeTopicPartitionsResponsePartition > partitionData + ){ + return new DescribeTopicPartitionsResponseTopic() + .setErrorCode(error.code()) + .setName(topic) + .setTopicId(topicId) + .setIsInternal(isInternal) + .setPartitions(partitionData); + } } -} From e6f5f3ca0a2dacc48c43d7695a289b78457d7750 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:25:02 +0100 Subject: [PATCH 08/19] cleanup code with stylesheet standard --- .../handlers/DescribeTopicPartitionsRequestHandler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 48e1e4a84f774..8abcf55473b9d 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -119,7 +119,8 @@ private DescribeTopicPartitionsRequestData getRequestData(RequestChannel.Request private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData requestData) { Set topics = new HashSet<>(); boolean fetchAllTopics = requestData.topics().isEmpty(); - String cursorTopicName = requestData.cursor() != null ? requestData.cursor().topicName() : ""; + DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); + String cursorTopicName = requestData.cursor() != null ? cursor.topicName() : ""; // If no topics are specified, fetch all topics that come after the cursor topic if (fetchAllTopics) { @@ -137,7 +138,7 @@ private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData reque } }); - if (requestData.cursor() != null && !topics.contains(requestData.cursor().topicName())) { + if (cursor != null && !topics.contains(cursor.topicName())) { // The topic in cursor must be included in the topic list if provided. throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain " + "the cursor topic: " + cursor.topicName()); From 280cb54f171b40bc1baffd2953e3c70adb8dd634 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:27:47 +0100 Subject: [PATCH 09/19] cleanup code with stylesheet standard --- .../server/handlers/DescribeTopicPartitionsRequestHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 8abcf55473b9d..ca3fc70e5d591 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -163,8 +163,7 @@ private void validateCursor (DescribeTopicPartitionsRequestData.Cursor cursor, S // Ensure the cursor topic is included in the list of topics if (!topicsToDescribe.contains(cursor.topicName())) { - throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the " + - "cursor topic: " + cursor.topicName()); + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); } } } From 0606a8f5f552146da821b90528bc3f13fb51292b Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:29:15 +0100 Subject: [PATCH 10/19] cleanup code with stylesheet standard --- .../server/handlers/DescribeTopicPartitionsRequestHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index ca3fc70e5d591..53c3d6bb29878 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -140,8 +140,7 @@ private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData reque if (cursor != null && !topics.contains(cursor.topicName())) { // The topic in cursor must be included in the topic list if provided. - throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain " + - "the cursor topic: " + cursor.topicName()); + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); } return topics; } From e19f9f8e1343cc5e0f92879d5197228f71679ce3 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:31:08 +0100 Subject: [PATCH 11/19] cleanup code with stylesheet standard --- .../handlers/DescribeTopicPartitionsRequestHandler.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 53c3d6bb29878..507648f430680 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -206,9 +206,10 @@ private Stream filterAuthorizedTopics ( * @param requestData The request data containing the cursor and partition limits. * @return The constructed response data with metadata for the authorized topics. */ - private DescribeTopicPartitionsResponseData buildResponse - (Stream < String > authorizedTopicsStream, RequestChannel.Request - abstractRequest, DescribeTopicPartitionsRequestData requestData){ + private DescribeTopicPartitionsResponseData buildResponse ( + Stream < String > authorizedTopicsStream, + RequestChannel.Request abstractRequest, + DescribeTopicPartitionsRequestData requestData){ return metadataCache.describeTopicResponse( authorizedTopicsStream.iterator(), abstractRequest.context().listenerName, From af858ddf18cdcd0fb2d71e1d9b15a6657944387f Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:32:55 +0100 Subject: [PATCH 12/19] cleanup code with stylesheet standard --- .../server/handlers/DescribeTopicPartitionsRequestHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 507648f430680..f413415122c8c 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -156,8 +156,7 @@ private void validateCursor (DescribeTopicPartitionsRequestData.Cursor cursor, S if (cursor != null) { // Validate that the partition index in the cursor is valid if (cursor.partitionIndex() < 0) { - throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition " + - "must be valid: " + cursor); + throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor); } // Ensure the cursor topic is included in the list of topics From 668acf4b826b8bb6a1b1ee36c7f9b5682fd3d5e9 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 20:41:18 +0100 Subject: [PATCH 13/19] added final to varibles that should not change in the method --- .../server/handlers/DescribeTopicPartitionsRequestHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index f413415122c8c..5795b289cdc63 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -180,8 +180,7 @@ private Stream filterAuthorizedTopics ( RequestChannel.Request abstractRequest, Set < String > topicsToDescribe, Set < DescribeTopicPartitionsResponseTopic > unauthorizedForDescribeTopicMetadata, - boolean fetchAllTopics) - { + boolean fetchAllTopics){ return topicsToDescribe.stream().sorted().filter(topicName -> { // Check authorization for each topic boolean isAuthorized = authHelper.authorize(abstractRequest.context(), From 6e9c7b3afffd242c18b69c61ddca4c3914f762de Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 21:43:23 +0100 Subject: [PATCH 14/19] cleanup code with stylesheet standard --- ...DescribeTopicPartitionsRequestHandler.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 5795b289cdc63..e56326a0ed0ce 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -57,9 +57,9 @@ public class DescribeTopicPartitionsRequestHandler { * @param config The Kafka configuration. */ public DescribeTopicPartitionsRequestHandler( - MetadataCache metadataCache, - AuthHelper authHelper, - KafkaConfig config) { + final MetadataCache metadataCache, + final AuthHelper authHelper, + final KafkaConfig config) { this.metadataCache = metadataCache; this.authHelper = authHelper; this.config = config; @@ -130,7 +130,6 @@ private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData reque } }); } else { - // Add the requested topics to the set, ensuring they come after the cursor topic requestData.topics().forEach(topic -> { String topicName = topic.name(); if (topicName.compareTo(cursorTopicName) >= 0) { @@ -180,10 +179,10 @@ private Stream filterAuthorizedTopics ( RequestChannel.Request abstractRequest, Set < String > topicsToDescribe, Set < DescribeTopicPartitionsResponseTopic > unauthorizedForDescribeTopicMetadata, - boolean fetchAllTopics){ + boolean fetchAllTopics){ return topicsToDescribe.stream().sorted().filter(topicName -> { // Check authorization for each topic - boolean isAuthorized = authHelper.authorize(abstractRequest.context(), + final boolean isAuthorized = authHelper.authorize(abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1 ); if (!fetchAllTopics && !isAuthorized) { @@ -205,9 +204,9 @@ private Stream filterAuthorizedTopics ( * @return The constructed response data with metadata for the authorized topics. */ private DescribeTopicPartitionsResponseData buildResponse ( - Stream < String > authorizedTopicsStream, - RequestChannel.Request abstractRequest, - DescribeTopicPartitionsRequestData requestData){ + final Stream authorizedTopicsStream, + final RequestChannel.Request abstractRequest, + final DescribeTopicPartitionsRequestData requestData){ return metadataCache.describeTopicResponse( authorizedTopicsStream.iterator(), abstractRequest.context().listenerName, @@ -230,12 +229,11 @@ private DescribeTopicPartitionsResponseData buildResponse ( * @return A DescribeTopicPartitionsResponseTopic object with the specified metadata. */ private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic ( - Errors error, - String topic, - Uuid topicId, - Boolean isInternal, - List < DescribeTopicPartitionsResponsePartition > partitionData - ){ + final Errors error, + final String topic, + final Uuid topicId, + final Boolean isInternal, + final List partitionData){ return new DescribeTopicPartitionsResponseTopic() .setErrorCode(error.code()) .setName(topic) From 2b4bbfe5ffcc950334e13981b22586284158534b Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 21:46:47 +0100 Subject: [PATCH 15/19] cleanup code with stylesheet standard --- ...DescribeTopicPartitionsRequestHandler.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index e56326a0ed0ce..847b8d4aed781 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -72,7 +72,7 @@ public DescribeTopicPartitionsRequestHandler( * @return A DescribeTopicPartitionsResponseData containing metadata for the requested topic partitions. */ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( - RequestChannel.Request abstractRequest) { + final RequestChannel.Request abstractRequest) { final DescribeTopicPartitionsRequestData requestData = getRequestData(abstractRequest); // Get topics to describe based on request data (all topics or specific ones) @@ -105,7 +105,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( * @param abstractRequest The incoming request. * @return The request data for the DescribeTopicPartitionsRequest. */ - private DescribeTopicPartitionsRequestData getRequestData(RequestChannel.Request abstractRequest) { + private DescribeTopicPartitionsRequestData getRequestData(final RequestChannel.Request abstractRequest) { return ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data(); } @@ -116,7 +116,7 @@ private DescribeTopicPartitionsRequestData getRequestData(RequestChannel.Request * @param requestData The request data containing the list of topics. * @return A set of topics to describe. */ - private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData requestData) { + private Set getTopicsToDescribe(final DescribeTopicPartitionsRequestData requestData) { Set topics = new HashSet<>(); boolean fetchAllTopics = requestData.topics().isEmpty(); DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); @@ -151,7 +151,10 @@ private Set getTopicsToDescribe(DescribeTopicPartitionsRequestData reque * @param cursor The cursor for pagination, if provided in the request. * @param topicsToDescribe The list of topics that the requestor is authorized to describe. */ - private void validateCursor (DescribeTopicPartitionsRequestData.Cursor cursor, Set < String > topicsToDescribe){ + private void validateCursor ( + final DescribeTopicPartitionsRequestData.Cursor cursor, + final Set topicsToDescribe + ){ if (cursor != null) { // Validate that the partition index in the cursor is valid if (cursor.partitionIndex() < 0) { @@ -176,10 +179,11 @@ private void validateCursor (DescribeTopicPartitionsRequestData.Cursor cursor, S * @return A stream of authorized topic names. */ private Stream filterAuthorizedTopics ( - RequestChannel.Request abstractRequest, - Set < String > topicsToDescribe, - Set < DescribeTopicPartitionsResponseTopic > unauthorizedForDescribeTopicMetadata, - boolean fetchAllTopics){ + final RequestChannel.Request abstractRequest, + final Set topicsToDescribe, + final Set unauthorizedForDescribeTopicMetadata, + final boolean fetchAllTopics + ){ return topicsToDescribe.stream().sorted().filter(topicName -> { // Check authorization for each topic final boolean isAuthorized = authHelper.authorize(abstractRequest.context(), @@ -206,7 +210,8 @@ private Stream filterAuthorizedTopics ( private DescribeTopicPartitionsResponseData buildResponse ( final Stream authorizedTopicsStream, final RequestChannel.Request abstractRequest, - final DescribeTopicPartitionsRequestData requestData){ + final DescribeTopicPartitionsRequestData requestData + ){ return metadataCache.describeTopicResponse( authorizedTopicsStream.iterator(), abstractRequest.context().listenerName, @@ -233,7 +238,8 @@ private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopi final String topic, final Uuid topicId, final Boolean isInternal, - final List partitionData){ + final List partitionData + ){ return new DescribeTopicPartitionsResponseTopic() .setErrorCode(error.code()) .setName(topic) From 6ac79179412273b4481bad723918cca975f6db04 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 21:54:30 +0100 Subject: [PATCH 16/19] update on vars and method parameters --- ...DescribeTopicPartitionsRequestHandler.java | 204 +++++++++--------- 1 file changed, 102 insertions(+), 102 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 847b8d4aed781..9df181588d79d 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest; -import org.apache.kafka.common.resource.Resource; import org.apache.kafka.metadata.MetadataCache; import java.util.HashSet; @@ -117,10 +116,10 @@ private DescribeTopicPartitionsRequestData getRequestData(final RequestChannel.R * @return A set of topics to describe. */ private Set getTopicsToDescribe(final DescribeTopicPartitionsRequestData requestData) { - Set topics = new HashSet<>(); - boolean fetchAllTopics = requestData.topics().isEmpty(); - DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); - String cursorTopicName = requestData.cursor() != null ? cursor.topicName() : ""; + final Set topics = new HashSet<>(); + final boolean fetchAllTopics = requestData.topics().isEmpty(); + final DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); + final String cursorTopicName = requestData.cursor() != null ? cursor.topicName() : ""; // If no topics are specified, fetch all topics that come after the cursor topic if (fetchAllTopics) { @@ -141,110 +140,111 @@ private Set getTopicsToDescribe(final DescribeTopicPartitionsRequestData // The topic in cursor must be included in the topic list if provided. throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); } - return topics; } + return topics; + } - /** - * Validates the cursor from the request. If the cursor is provided, it checks that the partition index is valid - * and that the topic in the cursor is included in the list of topics. - * - * @param cursor The cursor for pagination, if provided in the request. - * @param topicsToDescribe The list of topics that the requestor is authorized to describe. - */ - private void validateCursor ( - final DescribeTopicPartitionsRequestData.Cursor cursor, - final Set topicsToDescribe - ){ - if (cursor != null) { - // Validate that the partition index in the cursor is valid - if (cursor.partitionIndex() < 0) { - throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor); - } + /** + * Validates the cursor from the request. If the cursor is provided, it checks that the partition index is valid + * and that the topic in the cursor is included in the list of topics. + * + * @param cursor The cursor for pagination, if provided in the request. + * @param topicsToDescribe The list of topics that the requestor is authorized to describe. + */ + private void validateCursor( + final DescribeTopicPartitionsRequestData.Cursor cursor, + final Set topicsToDescribe + ) { + if (cursor != null) { + // Validate that the partition index in the cursor is valid + if (cursor.partitionIndex() < 0) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor); + } - // Ensure the cursor topic is included in the list of topics - if (!topicsToDescribe.contains(cursor.topicName())) { - throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); - } + // Ensure the cursor topic is included in the list of topics + if (!topicsToDescribe.contains(cursor.topicName())) { + throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName()); } } + } - /** - * Filters the topics based on authorization. It ensures that only topics the requestor is authorized to describe are included. - * Unauthorized topics are added to the unauthorized topics list. - * - * @param abstractRequest The incoming request. - * @param topicsToDescribe The list of topics to filter. - * @param unauthorizedForDescribeTopicMetadata A set to store topics that the requestor is unauthorized to describe. - * @param fetchAllTopics A flag indicating whether to fetch all topics or only specified ones. - * @return A stream of authorized topic names. - */ - private Stream filterAuthorizedTopics ( - final RequestChannel.Request abstractRequest, - final Set topicsToDescribe, - final Set unauthorizedForDescribeTopicMetadata, - final boolean fetchAllTopics - ){ - return topicsToDescribe.stream().sorted().filter(topicName -> { - // Check authorization for each topic - final boolean isAuthorized = authHelper.authorize(abstractRequest.context(), - DESCRIBE, TOPIC, topicName, true, true, 1 + /** + * Filters the topics based on authorization. It ensures that only topics the requestor is authorized to describe are included. + * Unauthorized topics are added to the unauthorized topics list. + * + * @param abstractRequest The incoming request. + * @param topicsToDescribe The list of topics to filter. + * @param unauthorizedForDescribeTopicMetadata A set to store topics that the requestor is unauthorized to describe. + * @param fetchAllTopics A flag indicating whether to fetch all topics or only specified ones. + * @return A stream of authorized topic names. + */ + private Stream filterAuthorizedTopics( + final RequestChannel.Request abstractRequest, + final Set topicsToDescribe, + final Set unauthorizedForDescribeTopicMetadata, + final boolean fetchAllTopics + ) { + return topicsToDescribe.stream().sorted().filter(topicName -> { + // Check authorization for each topic + final boolean isAuthorized = authHelper.authorize(abstractRequest.context(), + DESCRIBE, TOPIC, topicName, true, true, 1 + ); + if (!fetchAllTopics && !isAuthorized) { + // If unauthorized, add the topic to the unauthorized list with an empty UUID + unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic( + Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) ); - if (!fetchAllTopics && !isAuthorized) { - // If unauthorized, add the topic to the unauthorized list with an empty UUID - unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic( - Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of()) - ); - } - return isAuthorized; - }); - } + } + return isAuthorized; + }); + } - /** - * Constructs the response data based on authorized topics. - * - * @param authorizedTopicsStream A stream of authorized topic names. - * @param abstractRequest The incoming request. - * @param requestData The request data containing the cursor and partition limits. - * @return The constructed response data with metadata for the authorized topics. - */ - private DescribeTopicPartitionsResponseData buildResponse ( - final Stream authorizedTopicsStream, - final RequestChannel.Request abstractRequest, - final DescribeTopicPartitionsRequestData requestData - ){ - return metadataCache.describeTopicResponse( - authorizedTopicsStream.iterator(), - abstractRequest.context().listenerName, - topicName -> topicName.equals(requestData.cursor() != null - ? requestData.cursor().topicName() : "") ? requestData.cursor().partitionIndex() : 0, - Math.max(Math.min(config.maxRequestPartitionSizeLimit(), requestData.responsePartitionLimit()), 1), - requestData.topics().isEmpty() - ); - } + /** + * Constructs the response data based on authorized topics. + * + * @param authorizedTopicsStream A stream of authorized topic names. + * @param abstractRequest The incoming request. + * @param requestData The request data containing the cursor and partition limits. + * @return The constructed response data with metadata for the authorized topics. + */ + private DescribeTopicPartitionsResponseData buildResponse( + final Stream authorizedTopicsStream, + final RequestChannel.Request abstractRequest, + final DescribeTopicPartitionsRequestData requestData + ) { + return metadataCache.describeTopicResponse( + authorizedTopicsStream.iterator(), + abstractRequest.context().listenerName, + topicName -> topicName.equals(requestData.cursor() != null + ? requestData.cursor().topicName() : "") ? requestData.cursor().partitionIndex() : 0, + Math.max(Math.min(config.maxRequestPartitionSizeLimit(), requestData.responsePartitionLimit()), 1), + requestData.topics().isEmpty() + ); + } - /** - * Constructs a DescribeTopicPartitionsResponseTopic object, which contains metadata about a single topic, - * including error codes, topic ID, partition data, and whether the topic is internal. - * - * @param error The error that occurred while accessing the topic. - * @param topic The name of the topic. - * @param topicId The unique identifier for the topic. - * @param isInternal Whether the topic is internal or not. - * @param partitionData The partition data associated with the topic. - * @return A DescribeTopicPartitionsResponseTopic object with the specified metadata. - */ - private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic ( - final Errors error, - final String topic, - final Uuid topicId, - final Boolean isInternal, - final List partitionData - ){ - return new DescribeTopicPartitionsResponseTopic() - .setErrorCode(error.code()) - .setName(topic) - .setTopicId(topicId) - .setIsInternal(isInternal) - .setPartitions(partitionData); - } + /** + * Constructs a DescribeTopicPartitionsResponseTopic object, which contains metadata about a single topic, + * including error codes, topic ID, partition data, and whether the topic is internal. + * + * @param error The error that occurred while accessing the topic. + * @param topic The name of the topic. + * @param topicId The unique identifier for the topic. + * @param isInternal Whether the topic is internal or not. + * @param partitionData The partition data associated with the topic. + * @return A DescribeTopicPartitionsResponseTopic object with the specified metadata. + */ + private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic( + final Errors error, + final String topic, + final Uuid topicId, + final Boolean isInternal, + final List partitionData + ) { + return new DescribeTopicPartitionsResponseTopic() + .setErrorCode(error.code()) + .setName(topic) + .setTopicId(topicId) + .setIsInternal(isInternal) + .setPartitions(partitionData); } +} From b485feeda2d9bd89efbd4327acc5a7293c05d071 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 21:58:26 +0100 Subject: [PATCH 17/19] update on vars and method parameters --- .../handlers/DescribeTopicPartitionsRequestHandler.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index 9df181588d79d..a431d7afffb54 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -215,9 +215,8 @@ private DescribeTopicPartitionsResponseData buildResponse( return metadataCache.describeTopicResponse( authorizedTopicsStream.iterator(), abstractRequest.context().listenerName, - topicName -> topicName.equals(requestData.cursor() != null - ? requestData.cursor().topicName() : "") ? requestData.cursor().partitionIndex() : 0, - Math.max(Math.min(config.maxRequestPartitionSizeLimit(), requestData.responsePartitionLimit()), 1), + (String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0, + Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1), requestData.topics().isEmpty() ); } From 9a4435090615f70b77ae8eb8a4300b15e5c43465 Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 22:05:05 +0100 Subject: [PATCH 18/19] update on vars and method parameters --- .../handlers/DescribeTopicPartitionsRequestHandler.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index a431d7afffb54..aa5eb7b743232 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -75,7 +75,8 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest( final DescribeTopicPartitionsRequestData requestData = getRequestData(abstractRequest); // Get topics to describe based on request data (all topics or specific ones) - final Set topicsToDescribe = getTopicsToDescribe(requestData); + final String cursorTopicName = requestData.cursor() != null ? cursor.topicName() : ""; + final Set topicsToDescribe = getTopicsToDescribe(requestData, cursorTopicName); // Validate cursor if provided in the request validateCursor(requestData.cursor(), topicsToDescribe); @@ -115,11 +116,13 @@ private DescribeTopicPartitionsRequestData getRequestData(final RequestChannel.R * @param requestData The request data containing the list of topics. * @return A set of topics to describe. */ - private Set getTopicsToDescribe(final DescribeTopicPartitionsRequestData requestData) { + private Set getTopicsToDescribe( + final DescribeTopicPartitionsRequestData requestData, + final String cursorTopicName + ) { final Set topics = new HashSet<>(); final boolean fetchAllTopics = requestData.topics().isEmpty(); final DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor(); - final String cursorTopicName = requestData.cursor() != null ? cursor.topicName() : ""; // If no topics are specified, fetch all topics that come after the cursor topic if (fetchAllTopics) { From 768c82d1032d03c80c4a756be549529a5b19e11f Mon Sep 17 00:00:00 2001 From: Adeshina <91054814+integral1@users.noreply.github.com> Date: Sun, 4 May 2025 22:05:42 +0100 Subject: [PATCH 19/19] update on vars and method parameters --- .../server/handlers/DescribeTopicPartitionsRequestHandler.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java index aa5eb7b743232..7d7e543bb3d9e 100644 --- a/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java +++ b/core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java @@ -213,7 +213,8 @@ private Stream filterAuthorizedTopics( private DescribeTopicPartitionsResponseData buildResponse( final Stream authorizedTopicsStream, final RequestChannel.Request abstractRequest, - final DescribeTopicPartitionsRequestData requestData + final DescribeTopicPartitionsRequestData requestData, + final String cursorTopicName ) { return metadataCache.describeTopicResponse( authorizedTopicsStream.iterator(),