Skip to content

Refactored DescribeTopicPartitionsRequestHandler: Improve readability and add code documentation #19636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: trunk
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import kafka.network.RequestChannel;
import kafka.server.AuthHelper;
import kafka.server.KafkaConfig;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
Expand All @@ -29,7 +28,6 @@
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
import org.apache.kafka.common.resource.Resource;
import org.apache.kafka.metadata.MetadataCache;

import java.util.HashSet;
Expand All @@ -40,35 +38,101 @@
import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
import static org.apache.kafka.common.resource.ResourceType.TOPIC;

/**
* Handles the DescribeTopicPartitionsRequest, which provides metadata about topic partitions in a Kafka cluster.
* This handler is responsible for managing authorization checks, cursor validation, and constructing the response data
* for topics that are authorized for the requestor.
*/
public class DescribeTopicPartitionsRequestHandler {
MetadataCache metadataCache;
AuthHelper authHelper;
KafkaConfig config;

private final MetadataCache metadataCache;
private final AuthHelper authHelper;
private final KafkaConfig config;

/**
* Constructs a new DescribeTopicPartitionsRequestHandler.
*
* @param metadataCache The metadata cache used to retrieve topic information.
* @param authHelper The authentication helper used to check the authorization for topics.
* @param config The Kafka configuration.
*/
public DescribeTopicPartitionsRequestHandler(
MetadataCache metadataCache,
AuthHelper authHelper,
KafkaConfig config
) {
final MetadataCache metadataCache,
final AuthHelper authHelper,
final KafkaConfig config) {
this.metadataCache = metadataCache;
this.authHelper = authHelper;
this.config = config;
}

public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
DescribeTopicPartitionsRequestData request = ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
Set<String> topics = new HashSet<>();
boolean fetchAllTopics = request.topics().isEmpty();
DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
String cursorTopicName = cursor != null ? cursor.topicName() : "";
/**
* Handles the DescribeTopicPartitionsRequest and constructs a response containing metadata about topic partitions.
*
* @param abstractRequest The request containing the metadata request for topic partitions.
* @return A DescribeTopicPartitionsResponseData containing metadata for the requested topic partitions.
*/
public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
final RequestChannel.Request abstractRequest) {
final DescribeTopicPartitionsRequestData requestData = getRequestData(abstractRequest);

// Get topics to describe based on request data (all topics or specific ones)
final String cursorTopicName = requestData.cursor() != null ? cursor.topicName() : "";
final Set<String> topicsToDescribe = getTopicsToDescribe(requestData, cursorTopicName);

// Validate cursor if provided in the request
validateCursor(requestData.cursor(), topicsToDescribe);

// Handle topics that are unauthorized for the Describe operation
final Set<DescribeTopicPartitionsResponseTopic> unauthorizedForDescribeTopicMetadata = new HashSet<>();
final Stream<String> authorizedTopicsStream = filterAuthorizedTopics(
abstractRequest,
topicsToDescribe,
unauthorizedForDescribeTopicMetadata,
requestData.topics().isEmpty()
);

// Construct the response for authorized topics
final DescribeTopicPartitionsResponseData response =
buildResponse(authorizedTopicsStream, abstractRequest, requestData);

// Add unauthorized topics to the response to avoid disclosing their existence
response.topics().addAll(unauthorizedForDescribeTopicMetadata);
return response;
}

/**
* Extracts the request data from the abstract request.
*
* @param abstractRequest The incoming request.
* @return The request data for the DescribeTopicPartitionsRequest.
*/
private DescribeTopicPartitionsRequestData getRequestData(final RequestChannel.Request abstractRequest) {
return ((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
}

/**
* Determines the list of topics to describe based on the provided request data.
* It can either fetch all topics or only the ones specified in the request.
*
* @param requestData The request data containing the list of topics.
* @return A set of topics to describe.
*/
private Set<String> getTopicsToDescribe(
final DescribeTopicPartitionsRequestData requestData,
final String cursorTopicName
) {
final Set<String> topics = new HashSet<>();
final boolean fetchAllTopics = requestData.topics().isEmpty();
final DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();

// If no topics are specified, fetch all topics that come after the cursor topic
if (fetchAllTopics) {
metadataCache.getAllTopics().forEach(topicName -> {
if (topicName.compareTo(cursorTopicName) >= 0) {
topics.add(topicName);
}
});
} else {
request.topics().forEach(topic -> {
requestData.topics().forEach(topic -> {
String topicName = topic.name();
if (topicName.compareTo(cursorTopicName) >= 0) {
topics.add(topicName);
Expand All @@ -80,55 +144,110 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName());
}
}
return topics;
}

if (cursor != null && cursor.partitionIndex() < 0) {
// The partition id in cursor must be valid.
throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor);
}
/**
* Validates the cursor from the request. If the cursor is provided, it checks that the partition index is valid
* and that the topic in the cursor is included in the list of topics.
*
* @param cursor The cursor for pagination, if provided in the request.
* @param topicsToDescribe The list of topics that the requestor is authorized to describe.
*/
private void validateCursor(
final DescribeTopicPartitionsRequestData.Cursor cursor,
final Set<String> topicsToDescribe
) {
if (cursor != null) {
// Validate that the partition index in the cursor is valid
if (cursor.partitionIndex() < 0) {
throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor);
}

// Do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
Set<DescribeTopicPartitionsResponseTopic> unauthorizedForDescribeTopicMetadata = new HashSet<>();
// Ensure the cursor topic is included in the list of topics
if (!topicsToDescribe.contains(cursor.topicName())) {
throw new InvalidRequestException("DescribeTopicPartitionsRequest topic list should contain the cursor topic: " + cursor.topicName());
}
}
}

Stream<String> authorizedTopicsStream = topics.stream().sorted().filter(topicName -> {
boolean isAuthorized = authHelper.authorize(
abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, true, 1);
/**
* Filters the topics based on authorization. It ensures that only topics the requestor is authorized to describe are included.
* Unauthorized topics are added to the unauthorized topics list.
*
* @param abstractRequest The incoming request.
* @param topicsToDescribe The list of topics to filter.
* @param unauthorizedForDescribeTopicMetadata A set to store topics that the requestor is unauthorized to describe.
* @param fetchAllTopics A flag indicating whether to fetch all topics or only specified ones.
* @return A stream of authorized topic names.
*/
private Stream<String> filterAuthorizedTopics(
final RequestChannel.Request abstractRequest,
final Set<String> topicsToDescribe,
final Set<DescribeTopicPartitionsResponseTopic> unauthorizedForDescribeTopicMetadata,
final boolean fetchAllTopics
) {
return topicsToDescribe.stream().sorted().filter(topicName -> {
// Check authorization for each topic
final boolean isAuthorized = authHelper.authorize(abstractRequest.context(),
DESCRIBE, TOPIC, topicName, true, true, 1
);
if (!fetchAllTopics && !isAuthorized) {
// We should not return topicId when on unauthorized error, so we return zero uuid.
// If unauthorized, add the topic to the unauthorized list with an empty UUID
unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic(
Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of())
Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of())
);
}
return isAuthorized;
});
}

DescribeTopicPartitionsResponseData response = metadataCache.describeTopicResponse(
authorizedTopicsStream.iterator(),
abstractRequest.context().listenerName,
(String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1),
fetchAllTopics
/**
* Constructs the response data based on authorized topics.
*
* @param authorizedTopicsStream A stream of authorized topic names.
* @param abstractRequest The incoming request.
* @param requestData The request data containing the cursor and partition limits.
* @return The constructed response data with metadata for the authorized topics.
*/
private DescribeTopicPartitionsResponseData buildResponse(
final Stream<String> authorizedTopicsStream,
final RequestChannel.Request abstractRequest,
final DescribeTopicPartitionsRequestData requestData,
final String cursorTopicName
) {
return metadataCache.describeTopicResponse(
authorizedTopicsStream.iterator(),
abstractRequest.context().listenerName,
(String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1),
requestData.topics().isEmpty()
);

// get topic authorized operations
response.topics().forEach(topicData ->
topicData.setTopicAuthorizedOperations(authHelper.authorizedOperations(abstractRequest, new Resource(TOPIC, topicData.name()))));

response.topics().addAll(unauthorizedForDescribeTopicMetadata);
return response;
}

/**
* Constructs a DescribeTopicPartitionsResponseTopic object, which contains metadata about a single topic,
* including error codes, topic ID, partition data, and whether the topic is internal.
*
* @param error The error that occurred while accessing the topic.
* @param topic The name of the topic.
* @param topicId The unique identifier for the topic.
* @param isInternal Whether the topic is internal or not.
* @param partitionData The partition data associated with the topic.
* @return A DescribeTopicPartitionsResponseTopic object with the specified metadata.
*/
private DescribeTopicPartitionsResponseTopic describeTopicPartitionsResponseTopic(
Errors error,
String topic,
Uuid topicId,
Boolean isInternal,
List<DescribeTopicPartitionsResponsePartition> partitionData
final Errors error,
final String topic,
final Uuid topicId,
final Boolean isInternal,
final List<DescribeTopicPartitionsResponsePartition> partitionData
) {
return new DescribeTopicPartitionsResponseTopic()
.setErrorCode(error.code())
.setName(topic)
.setTopicId(topicId)
.setIsInternal(isInternal)
.setPartitions(partitionData);
.setErrorCode(error.code())
.setName(topic)
.setTopicId(topicId)
.setIsInternal(isInternal)
.setPartitions(partitionData);
}
}