diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 72620d8964c6f..e8c16c4e88163 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel, } val topicPartition = new TopicPartition(topicName, partition.index()) - if (topicName.isEmpty) + // To be compatible with the old version, only return UNKNOWN_TOPIC_ID if request version uses topicId, but the corresponding topic name can't be found. + if (topicName.isEmpty && request.header.apiVersion > 12) nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID) - else if (!metadataCache.contains(topicPartition)) - nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) else topicIdToPartitionData += new TopicIdPartition(topicId, topicPartition) -> partition } @@ -429,6 +428,8 @@ class KafkaApis(val requestChannel: RequestChannel, val memoryRecords = partition.records.asInstanceOf[MemoryRecords] if (!authorizedTopics.contains(topicIdPartition.topic)) unauthorizedTopicResponses += topicIdPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) + else if (!metadataCache.contains(topicIdPartition.topicPartition)) + nonExistingTopicResponses += topicIdPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) else try { ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 47f3de97cf193..0df8470fe0241 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -53,7 +53,7 @@ import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{CsvSource, MethodSource} +import org.junit.jupiter.params.provider.{MethodSource, ValueSource} import java.util.Collections.singletonList import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic @@ -296,22 +296,22 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build() } - private def createProduceRequestWithId(id: Uuid) = { + private def createProduceRequest(name: String, id: Uuid, version: Short) = { requests.ProduceRequest.builder(new ProduceRequestData() - .setTopicData(new ProduceRequestData.TopicProduceDataCollection( - Collections.singletonList(new ProduceRequestData.TopicProduceData() - .setTopicId(id).setPartitionData(Collections.singletonList( - new ProduceRequestData.PartitionProduceData() - .setIndex(tp.partition) - .setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes)))))) - .iterator)) - .setAcks(1.toShort) - .setTimeoutMs(5000)) - .build() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection( + util.List.of(new ProduceRequestData.TopicProduceData() + .setName(name) + .setTopicId(id) + .setPartitionData(util.List.of( + new ProduceRequestData.PartitionProduceData() + .setIndex(tp.partition) + .setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes)))))) + .iterator)) + .setAcks(1.toShort) + .setTimeoutMs(5000)) + .build(version) } - private def createProduceRequest = createProduceRequestWithId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) - private def createFetchRequest = { val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] partitionMap.put(tp, new requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), @@ -326,6 +326,13 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { requests.FetchRequest.Builder.forConsumer(version, 100, Int.MaxValue, partitionMap).build() } + private def createFetchRequestWithEmptyTopicNameAndZeroTopicId(version: Short) = { + val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] + partitionMap.put(new TopicPartition("", part), + new requests.FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 100, Optional.of(27))) + requests.FetchRequest.Builder.forConsumer(version, 100, Int.MaxValue, partitionMap).build() + } + private def createFetchFollowerRequest = { val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] partitionMap.put(tp, new requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), @@ -836,11 +843,13 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { @Test def testAuthorizationWithTopicExisting(): Unit = { //First create the topic so we have a valid topic ID - sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) + createTopicWithBrokerPrincipal(topic) + val topicId = getTopicIds()(topic) + assertNotNull(topicId) val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), - ApiKeys.PRODUCE -> createProduceRequest, + ApiKeys.PRODUCE -> createProduceRequest("", topicId, ApiKeys.PRODUCE.latestVersion()), ApiKeys.FETCH -> createFetchRequest, ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, @@ -888,7 +897,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ) - sendRequests(requestKeyToRequest, true) + sendRequests(requestKeyToRequest) } /* @@ -900,7 +909,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { val topicNames = Map(id -> "topic") val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false), - ApiKeys.PRODUCE -> createProduceRequestWithId(id), + ApiKeys.PRODUCE -> createProduceRequest("", id, ApiKeys.PRODUCE.latestVersion()), ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, ApiKeys.FETCH.latestVersion()), ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, @@ -921,8 +930,76 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { sendRequests(requestKeyToRequest, false, topicNames) } + /** + * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if the client doesn't have permission + * and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to + * prevent leaking the topic name. + * This case covers produce request version from oldest to 12. + * The newer version is covered by testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting. + */ + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testAuthorizationProduceVersionFromOldestTo12(withTopicExisting: Boolean): Unit = { + if (withTopicExisting) { + createTopicWithBrokerPrincipal(topic) + } + + for (version <- ApiKeys.PRODUCE.oldestVersion to 12) { + val request = createProduceRequest(topic, Uuid.ZERO_UUID, version.toShort) + val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName) + val errorCode = response.asInstanceOf[ProduceResponse] + .data() + .responses() + .find(topic, Uuid.ZERO_UUID) + .partitionResponses.asScala.find(_.index == part).get + .errorCode + + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, s"unexpected error for produce request version $version") + } + } + + /** + * Test that the produce request fails with UNKNOWN_TOPIC_ID if topic id is zero when request version >= 13. + * The produce request only supports topic id above version 13. + */ + @Test + def testZeroTopicIdForProduceVersionFrom13ToNewest(): Unit = { + for (version <- 13 to ApiKeys.PRODUCE.latestVersion()) { + val request = createProduceRequest("", Uuid.ZERO_UUID, version.toShort) + val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName) + val errorCode = response.asInstanceOf[ProduceResponse] + .data() + .responses() + .find("", Uuid.ZERO_UUID) + .partitionResponses.asScala.find(_.index == part).get + .errorCode + + assertEquals(Errors.UNKNOWN_TOPIC_ID.code(), errorCode, s"unexpected error for produce request version $version") + } + } + + /** + * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if topic name is empty when request version <= 12. + * The produce request only supports topic name below version 12. + */ + @Test + def testEmptyTopicNameForProduceVersionFromOldestTo12(): Unit = { + for (version <- ApiKeys.PRODUCE.oldestVersion() to 12) { + val request = createProduceRequest("", Uuid.ZERO_UUID, version.toShort) + val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName) + val errorCode = response.asInstanceOf[ProduceResponse] + .data() + .responses() + .find("", Uuid.ZERO_UUID) + .partitionResponses.asScala.find(_.index == part).get + .errorCode + + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, s"unexpected error for produce request version $version") + } + } + @ParameterizedTest - @CsvSource(value = Array("false", "true")) + @ValueSource(booleans = Array(true, false)) def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = { val topicId = if (withTopicExisting) { createTopicWithBrokerPrincipal(topic) @@ -971,18 +1048,73 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { } } - /* - * even if the topic doesn't exist, request APIs should not leak the topic name + /** + * Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if the client doesn't have permission + * and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to + * prevent leaking the topic name. + * This case covers fetch request version from oldest to 12. + * The newer version is covered by testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting. */ - @Test - def testAuthorizationFetchV12WithTopicNotExisting(): Unit = { + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testAuthorizationFetchVersionFromOldestTo12(withTopicExisting: Boolean): Unit = { + if (withTopicExisting) { + createTopicWithBrokerPrincipal(topic) + } + val id = Uuid.ZERO_UUID - val topicNames = Map(id -> "topic") - val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 12), - ) + val topicNames = Map(id -> topic) + for (version <- ApiKeys.FETCH.oldestVersion to 12) { + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, version.toShort), + ) - sendRequests(requestKeyToRequest, false, topicNames) + sendRequests(requestKeyToRequest, withTopicExisting, topicNames) + } + } + + /** + * Test that the fetch request fails with UNKNOWN_TOPIC_ID if topic id is zero when request version >= 13. + * The fetch request only supports topic id above version 13. + */ + @Test + def testZeroTopicIdForFetchVersionFrom13ToNewest(): Unit = { + for (version <- 13 to ApiKeys.FETCH.latestVersion) { + val request = createFetchRequestWithEmptyTopicNameAndZeroTopicId(version.toShort) + val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName) + + val errorCode = response.asInstanceOf[FetchResponse] + .data() + .responses() + .get(0) + .partitions() + .get(0) + .errorCode + + assertEquals(Errors.UNKNOWN_TOPIC_ID.code(), errorCode, s"unexpected error for fetch request version $version") + } + } + + /** + * Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if topic name is empty when request version <= 12. + * The fetch request only supports topic name below version 12. + */ + @Test + def testEmptyTopicNameForFetchVersionFromOldestTo12(): Unit = { + for (version <- ApiKeys.FETCH.oldestVersion to 12) { + val request = createFetchRequestWithEmptyTopicNameAndZeroTopicId(version.toShort) + val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName) + + val errorCode = response.asInstanceOf[FetchResponse] + .data() + .responses() + .get(0) + .partitions() + .get(0) + .errorCode + + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, s"unexpected error for fetch request version $version") + } } @Test