Skip to content

KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing topic in produce request #19635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}

val topicPartition = new TopicPartition(topicName, partition.index())
if (topicName.isEmpty)
// To be compatible with the old version, only return UNKNOWN_TOPIC_ID if request version uses topicId, but the corresponding topic name can't be found.
if (topicName.isEmpty && request.header.apiVersion > 12)
nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID)
else if (!metadataCache.contains(topicPartition))
nonExistingTopicResponses += new TopicIdPartition(topicId, topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
topicIdToPartitionData += new TopicIdPartition(topicId, topicPartition) -> partition
}
Expand All @@ -429,6 +428,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
if (!authorizedTopics.contains(topicIdPartition.topic))
unauthorizedTopicResponses += topicIdPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicIdPartition.topicPartition))
nonExistingTopicResponses += topicIdPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
try {
ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, MethodSource, ValueSource}
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}

import java.util.Collections.singletonList
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
Expand Down Expand Up @@ -296,22 +296,22 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build()
}

private def createProduceRequestWithId(id: Uuid) = {
private def createProduceRequest(name: String, id: Uuid, version: Short) = {
requests.ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
Collections.singletonList(new ProduceRequestData.TopicProduceData()
.setTopicId(id).setPartitionData(Collections.singletonList(
new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition)
.setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes))))))
.iterator))
.setAcks(1.toShort)
.setTimeoutMs(5000))
.build()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
util.List.of(new ProduceRequestData.TopicProduceData()
.setName(name)
.setTopicId(id)
.setPartitionData(util.List.of(
new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition)
.setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes))))))
.iterator))
.setAcks(1.toShort)
.setTimeoutMs(5000))
.build(version)
}

private def createProduceRequest = createProduceRequestWithId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))

private def createFetchRequest = {
val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData]
partitionMap.put(tp, new requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID),
Expand Down Expand Up @@ -837,11 +837,13 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
@ValueSource(strings = Array("kip932"))
def testAuthorizationWithTopicExisting(quorum: String): Unit = {
//First create the topic so we have a valid topic ID
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
createTopicWithBrokerPrincipal(topic)
val topicId = getTopicIds()(topic)
assertNotNull(topicId)

val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
ApiKeys.PRODUCE -> createProduceRequest,
ApiKeys.PRODUCE -> createProduceRequest("", topicId, ApiKeys.PRODUCE.latestVersion()),
ApiKeys.FETCH -> createFetchRequest,
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
Expand Down Expand Up @@ -889,7 +891,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
)

sendRequests(requestKeyToRequest, true)
sendRequests(requestKeyToRequest)
}

/*
Expand All @@ -902,7 +904,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
val topicNames = Map(id -> "topic")
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false),
ApiKeys.PRODUCE -> createProduceRequestWithId(id),
ApiKeys.PRODUCE -> createProduceRequest("", id, ApiKeys.PRODUCE.latestVersion()),
ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, ApiKeys.FETCH.latestVersion()),
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
Expand All @@ -923,8 +925,56 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
sendRequests(requestKeyToRequest, false, topicNames)
}

/**
* Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if the client doesn't have permission
* and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to
* prevent leaking the topic name.
* This case covers produce request version from oldest to 12.
* The newer version is covered by testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting.
*/
@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testAuthorizationProduceVersionFromOldestTo12(withTopicExisting: Boolean): Unit = {
if (withTopicExisting) {
createTopicWithBrokerPrincipal(topic)
}

for (version <- ApiKeys.PRODUCE.oldestVersion to 12) {
val request = createProduceRequest(topic, Uuid.ZERO_UUID, version.toShort)
val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName)
val errorCode = response.asInstanceOf[ProduceResponse]
.data()
.responses()
.find(topic, Uuid.ZERO_UUID)
.partitionResponses.asScala.find(_.index == part).get
.errorCode

assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, s"unexpected error for produce request version $version")
}
}

/**
* Test that the produce request fails with UNKNOWN_TOPIC_ID if both topic name and id are default values when request version >= 13.
* The produce request only supports topic id above version 13.
*/
@Test
def testEmptyTopicNameAndIDForProduceVersionFrom13ToNewest(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is covered by testAuthorizationWithTopicNotExisting, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a little different. The testAuthorizationWithTopicNotExisting uses non-zero UUID, but testEmptyTopicNameAndIDForProduceVersionFrom13ToNewest uses zero UUID. The case would like to make sure even if topic id is zero, it still returns UNKNOWN_TOPIC_ID error if topic name is empty and request version >= 13.

for (version <- 13 to ApiKeys.PRODUCE.latestVersion()) {
val request = createProduceRequest("", Uuid.ZERO_UUID, version.toShort)
val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName)
val errorCode = response.asInstanceOf[ProduceResponse]
.data()
.responses()
.find("", Uuid.ZERO_UUID)
.partitionResponses.asScala.find(_.index == part).get
.errorCode

assertEquals(Errors.UNKNOWN_TOPIC_ID.code(), errorCode, s"unexpected error for produce request version $version")
}
}

@ParameterizedTest
@CsvSource(value = Array("false", "true"))
@ValueSource(booleans = Array(true, false))
def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = {
val topicId = if (withTopicExisting) {
createTopicWithBrokerPrincipal(topic)
Expand Down Expand Up @@ -973,19 +1023,29 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}
}

/*
* even if the topic doesn't exist, request APIs should not leak the topic name
/**
* Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if the client doesn't have permission
* and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to
* prevent leaking the topic name.
* This case covers fetch request version from oldest to 12.
* The newer version is covered by testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting.
*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAuthorizationFetchV12WithTopicNotExisting(quorum: String): Unit = {
@ValueSource(booleans = Array(true, false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the ordering in testTopicIdAuthorization too to be consistent??

def testAuthorizationFetchVersionFromOldestTo12(withTopicExisting: Boolean): Unit = {
if (withTopicExisting) {
createTopicWithBrokerPrincipal(topic)
}

val id = Uuid.ZERO_UUID
val topicNames = Map(id -> "topic")
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 12),
)
val topicNames = Map(id -> topic)
for (version <- ApiKeys.FETCH.oldestVersion to 12) {
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, version.toShort),
)

sendRequests(requestKeyToRequest, false, topicNames)
sendRequests(requestKeyToRequest, withTopicExisting, topicNames)
}
}

@ParameterizedTest
Expand Down
Loading