-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing topic in produce request #19635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
…ing topic in produce request Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @FrankYang0529 for this patch, some little comments
Co-authored-by: Ken Huang <s7133700@gmail.com>
Co-authored-by: Ken Huang <s7133700@gmail.com>
Co-authored-by: Jhen-Yung Hsu <yungyung7654321@gmail.com>
…oduceReqeustVersionLessThan13 Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@@ -924,6 +924,33 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |||
sendRequests(requestKeyToRequest, false, topicNames) | |||
} | |||
|
|||
@Test | |||
def testAuthorizationWithTopicNotExistingForProduceReqeustVersionLessThan13(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Test
def testAuthorizationWithTopicNotExistingForProduceRequestVersionLessThan13(): Unit = {
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
def createTpd(version: Int) =
if (version <= 12) new ProduceRequestData.TopicProduceData()
.setName(tp.topic())
else new ProduceRequestData.TopicProduceData()
.setTopicId(getTopicIds()(tp.topic()))
for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) {
val request = requests.ProduceRequest.builder(new ProduceRequestData()
.setTopicData(new ProduceRequestData.TopicProduceDataCollection(
util.List.of(createTpd(version)
.setPartitionData(util.List.of(
new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition)
.setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes))))))
.iterator))
.setAcks(1.toShort)
.setTimeoutMs(5000))
.build(version.toShort)
val data = connectAndReceive[AbstractResponse](request, listenerName = listenerName).asInstanceOf[ProduceResponse].data().responses()
assertEquals(1, data.size())
val response = if (version <= 12) data.find(tp.topic(), Uuid.ZERO_UUID)
else data.find("", getTopicIds()(tp.topic()))
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(),
response.partitionResponses().asScala.find(_.index == part).get.errorCode, s"unexpected error for produce request version $version")
}
}
we need to test version 13, but it should use topic id instead of topic name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712, the version 13 is covered by testAuthorizationWithTopicNotExisting
. This PR focuses on old versions behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add comments for the version=13. Additionally, there is a typo: Reqeust
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 : Thanks for the PR. Left a comment.
val request = requests.ProduceRequest.builder(new ProduceRequestData() | ||
.setTopicData(new ProduceRequestData.TopicProduceDataCollection( | ||
util.List.of(new ProduceRequestData.TopicProduceData() | ||
.setName(tp.topic()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This topic exists on the broker, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We expect to return TOPIC_AUTHORIZATION_FAILED
(if it failed to authorize) regardless of topic existence if the request is using topic name. Maybe we should test both cases - 1) topic exists and 2) topic does not exist (similar to testAuthorizationFetchV12WithTopicNotExisting
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. I am just pointing out this doesn't match the test name, which says WithTopicNotExisting
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @junrao, This case follows testAuthorizationWithTopicNotExisting
. In AuthorizerIntegrationTest
, if a case needs a existing topic like case testAuthorizationWithTopicExisting
, it needs to call sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 could you please add the test case for the existent topic? Additionally, it would be useful to add case of existent topic for fetch RPC too.
@@ -413,8 +413,6 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
val topicPartition = new TopicPartition(topicName, partition.index()) | |||
if (topicName.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another subtle existing issue. If a produce request before v13 has an empty topic name, we used to return UNKNOWN_TOPIC_OR_PARTITION. Now, we return UNKNOWN_TOPIC_ID. It's probably better to change the condition to if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID))
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably better to change the condition to if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID)).
The default value of topic.topicId()
is Uuid.ZERO_UUID, and hence
if (topicName.isEmpty && topic.topicId().equals(Uuid.ZERO_UUID))is always true if request is before v13. Maybe we can try another condition:
if (topicName.isEmpty && version > 12`?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant if (topicName.isEmpty && !topic.topicId().equals(Uuid.ZERO_UUID))
.
val request = requests.ProduceRequest.builder(new ProduceRequestData() | ||
.setTopicData(new ProduceRequestData.TopicProduceDataCollection( | ||
util.List.of(new ProduceRequestData.TopicProduceData() | ||
.setName(tp.topic()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could. I am just pointing out this doesn't match the test name, which says WithTopicNotExisting
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, LGTM assuming the remaining comments are addressed.
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 : Thanks for the updated PR. A couple of more comments.
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 : Thanks for the updated PR. A couple of more comments.
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for updates.
*/ | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("kraft")) | ||
def testAuthorizationFetchV12WithTopicNotExisting(quorum: String): Unit = { | ||
@CsvSource(value = Array("false", "true")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please use @ValueSource(booleans = Array(true, false))
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it. Thanks.
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
Outdated
Show resolved
Hide resolved
…m13ToNewest Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 : Thanks for the updated PR. A few more comments.
@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
} | |||
|
|||
val topicPartition = new TopicPartition(topicName, partition.index()) | |||
if (topicName.isEmpty) | |||
// To compatible with the old version, only return UNKNOWN_TOPIC_ID if topicId is not default value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To compatible => To be compatible
|
||
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( | ||
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), | ||
ApiKeys.PRODUCE -> createProduceRequest, | ||
ApiKeys.PRODUCE -> createProduceRequest("", getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), ApiKeys.PRODUCE.latestVersion()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we expect the topicId to be present, should we throw an exception if the topicId can't be found?
@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
} | |||
|
|||
val topicPartition = new TopicPartition(topicName, partition.index()) | |||
if (topicName.isEmpty) | |||
// To compatible with the old version, only return UNKNOWN_TOPIC_ID if topicId is not default value | |||
if (topicName.isEmpty && !topic.topicId().equals(Uuid.ZERO_UUID)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the request version is 13 and the topicId is set to 0, we should return UNKNOWN_TOPIC_ID to match what's in the fetch request. Only if the request version is below 13 and topicName is empty, should we return UNKNOWN_TOPIC_OR_PARTITION. So, it seems that we should do fetchRequest.version() >= 13
instead of !topic.topicId().equals(Uuid.ZERO_UUID)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I left the similar comment before - #19635 (comment)
* and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to | ||
* prevent leaking the topic name. | ||
* This case covers produce request version from oldest to 12. | ||
* The newer version is covered by testAuthorizationWithTopicNotExisting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testAuthorizationWithTopicNotExisting => testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting
* and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to | ||
* prevent leaking the topic name. | ||
* This case covers fetch request version from oldest to 12. | ||
* The newer version is covered by testAuthorizationWithTopicNotExisting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testAuthorizationWithTopicNotExisting => testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting
@@ -1021,7 +1030,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |||
* The newer version is covered by testAuthorizationWithTopicNotExisting. | |||
*/ | |||
@ParameterizedTest | |||
@CsvSource(value = Array("false", "true")) | |||
@ValueSource(booleans = Array(true, false)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we change the ordering in testTopicIdAuthorization
too to be consistent??
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 : Thanks for the updated PR. A couple of more comments.
@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel, | |||
} | |||
|
|||
val topicPartition = new TopicPartition(topicName, partition.index()) | |||
if (topicName.isEmpty) | |||
// To be compatible with the old version, only return UNKNOWN_TOPIC_ID if topic name is not set and request version is bigger than 12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if topic name is not set and request version is bigger than 12 => if request version uses topicId, but the corresponding topic name can't be found
* The produce request only supports topic id above version 13. | ||
*/ | ||
@Test | ||
def testEmptyTopicNameAndIDForProduceVersionFrom13ToNewest(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is covered by testAuthorizationWithTopicNotExisting
, right?
Since topic name is sensitive information, it should return a
TOPIC_AUTHORIZATION_FAILED error for non-existing topic. The Fetch
request also follows this pattern.
Co-authored-by: John Doe zh2725284321@gmail.com