Skip to content

KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing topic in produce request #19635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: trunk
Choose a base branch
from

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented May 4, 2025

Since topic name is sensitive information, it should return a
TOPIC_AUTHORIZATION_FAILED error for non-existing topic. The Fetch
request also follows this pattern.

Co-authored-by: John Doe zh2725284321@gmail.com

…ing topic in produce request

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank @FrankYang0529 for this patch, some little comments

FrankYang0529 and others added 2 commits May 4, 2025 21:26
Co-authored-by: Ken Huang <s7133700@gmail.com>
Co-authored-by: Ken Huang <s7133700@gmail.com>
Co-authored-by: Jhen-Yung Hsu <yungyung7654321@gmail.com>
…oduceReqeustVersionLessThan13

Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 requested a review from chia7712 May 5, 2025 12:53
Copy link
Collaborator

@TaiJuWu TaiJuWu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@@ -924,6 +924,33 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
sendRequests(requestKeyToRequest, false, topicNames)
}

@Test
def testAuthorizationWithTopicNotExistingForProduceReqeustVersionLessThan13(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  @Test
  def testAuthorizationWithTopicNotExistingForProduceRequestVersionLessThan13(): Unit = {
    sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
    def createTpd(version: Int) =
      if (version <= 12) new ProduceRequestData.TopicProduceData()
        .setName(tp.topic())
      else new ProduceRequestData.TopicProduceData()
        .setTopicId(getTopicIds()(tp.topic()))
    for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) {
      val request = requests.ProduceRequest.builder(new ProduceRequestData()
          .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
            util.List.of(createTpd(version)
                .setPartitionData(util.List.of(
                  new ProduceRequestData.PartitionProduceData()
                    .setIndex(tp.partition)
                    .setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes))))))
              .iterator))
          .setAcks(1.toShort)
          .setTimeoutMs(5000))
        .build(version.toShort)
      val data = connectAndReceive[AbstractResponse](request, listenerName = listenerName).asInstanceOf[ProduceResponse].data().responses()
      assertEquals(1, data.size())
      val response = if (version <= 12) data.find(tp.topic(), Uuid.ZERO_UUID)
      else data.find("", getTopicIds()(tp.topic()))
      assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(),
        response.partitionResponses().asScala.find(_.index == part).get.errorCode, s"unexpected error for produce request version $version")
    }
  }

we need to test version 13, but it should use topic id instead of topic name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712, the version 13 is covered by testAuthorizationWithTopicNotExisting. This PR focuses on old versions behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add comments for the version=13. Additionally, there is a typo: Reqeust

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 : Thanks for the PR. Left a comment.

val request = requests.ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
util.List.of(new ProduceRequestData.TopicProduceData()
.setName(tp.topic())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This topic exists on the broker, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect to return TOPIC_AUTHORIZATION_FAILED (if it failed to authorize) regardless of topic existence if the request is using topic name. Maybe we should test both cases - 1) topic exists and 2) topic does not exist (similar to testAuthorizationFetchV12WithTopicNotExisting)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I am just pointing out this doesn't match the test name, which says WithTopicNotExisting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao, This case follows testAuthorizationWithTopicNotExisting. In AuthorizerIntegrationTest, if a case needs a existing topic like case testAuthorizationWithTopicExisting, it needs to call sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 could you please add the test case for the existent topic? Additionally, it would be useful to add case of existent topic for fetch RPC too.

@@ -413,8 +413,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val topicPartition = new TopicPartition(topicName, partition.index())
if (topicName.isEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another subtle existing issue. If a produce request before v13 has an empty topic name, we used to return UNKNOWN_TOPIC_OR_PARTITION. Now, we return UNKNOWN_TOPIC_ID. It's probably better to change the condition to if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to change the condition to if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID)).

The default value of topic.topicId() is Uuid.ZERO_UUID, and hence if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID))is always true if request is before v13. Maybe we can try another condition:if (topicName.isEmpty && version > 12`?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant if (topicName.isEmpty && !topic.topicId().equals(Uuid.ZERO_UUID)).

val request = requests.ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
util.List.of(new ProduceRequestData.TopicProduceData()
.setName(tp.topic())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. I am just pointing out this doesn't match the test name, which says WithTopicNotExisting.

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, LGTM assuming the remaining comments are addressed.

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529
Copy link
Member Author

@junrao @chia7712 Thanks for review. I addressed all comments. Could you take a look when you have time? Thanks.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 : Thanks for the updated PR. A couple of more comments.

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 : Thanks for the updated PR. A couple of more comments.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for updates.

*/
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testAuthorizationFetchV12WithTopicNotExisting(quorum: String): Unit = {
@CsvSource(value = Array("false", "true"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please use @ValueSource(booleans = Array(true, false)) instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thanks.

…m13ToNewest

Signed-off-by: PoAn Yang <payang@apache.org>
@github-actions github-actions bot removed the small Small PRs label May 8, 2025
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 : Thanks for the updated PR. A few more comments.

@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}

val topicPartition = new TopicPartition(topicName, partition.index())
if (topicName.isEmpty)
// To compatible with the old version, only return UNKNOWN_TOPIC_ID if topicId is not default value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To compatible => To be compatible


val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
ApiKeys.PRODUCE -> createProduceRequest,
ApiKeys.PRODUCE -> createProduceRequest("", getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), ApiKeys.PRODUCE.latestVersion()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we expect the topicId to be present, should we throw an exception if the topicId can't be found?

@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}

val topicPartition = new TopicPartition(topicName, partition.index())
if (topicName.isEmpty)
// To compatible with the old version, only return UNKNOWN_TOPIC_ID if topicId is not default value
if (topicName.isEmpty && !topic.topicId().equals(Uuid.ZERO_UUID))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the request version is 13 and the topicId is set to 0, we should return UNKNOWN_TOPIC_ID to match what's in the fetch request. Only if the request version is below 13 and topicName is empty, should we return UNKNOWN_TOPIC_OR_PARTITION. So, it seems that we should do fetchRequest.version() >= 13 instead of !topic.topicId().equals(Uuid.ZERO_UUID).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I left the similar comment before - #19635 (comment)

* and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to
* prevent leaking the topic name.
* This case covers produce request version from oldest to 12.
* The newer version is covered by testAuthorizationWithTopicNotExisting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testAuthorizationWithTopicNotExisting => testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting

* and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to
* prevent leaking the topic name.
* This case covers fetch request version from oldest to 12.
* The newer version is covered by testAuthorizationWithTopicNotExisting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testAuthorizationWithTopicNotExisting => testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting

@@ -1021,7 +1030,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
* The newer version is covered by testAuthorizationWithTopicNotExisting.
*/
@ParameterizedTest
@CsvSource(value = Array("false", "true"))
@ValueSource(booleans = Array(true, false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change the ordering in testTopicIdAuthorization too to be consistent??

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 : Thanks for the updated PR. A couple of more comments.

@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}

val topicPartition = new TopicPartition(topicName, partition.index())
if (topicName.isEmpty)
// To be compatible with the old version, only return UNKNOWN_TOPIC_ID if topic name is not set and request version is bigger than 12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if topic name is not set and request version is bigger than 12 => if request version uses topicId, but the corresponding topic name can't be found

* The produce request only supports topic id above version 13.
*/
@Test
def testEmptyTopicNameAndIDForProduceVersionFrom13ToNewest(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is covered by testAuthorizationWithTopicNotExisting, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants