-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing topic in produce request #19635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
FrankYang0529
wants to merge
19
commits into
apache:trunk
Choose a base branch
from
FrankYang0529:KAFKA-19234
base: trunk
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+92
−31
Open
KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing topic in produce request #19635
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
18d0ddc
KAFKA-19234: broker should return UNAUTHORIZATION error for non-exist…
FrankYang0529 24c300f
Update core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
FrankYang0529 20b5735
Update core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
FrankYang0529 62f136a
Update core/src/main/scala/kafka/server/KafkaApis.scala
FrankYang0529 b1dacbb
Merge branch 'trunk' into KAFKA-19234
FrankYang0529 1df0668
KAFKA-19234: Add test case testAuthorizationWithTopicNotExistingForPr…
FrankYang0529 698f9ac
revert KafkaApisTest
FrankYang0529 8c845f4
address comment
FrankYang0529 150e5c9
Merge branch 'trunk' into KAFKA-19234
FrankYang0529 fbcb617
Merge branch 'trunk' into KAFKA-19234
FrankYang0529 2368c81
address comment
FrankYang0529 d35d552
Merge branch 'trunk' into KAFKA-19234
FrankYang0529 159d3ec
address comment
FrankYang0529 4cd03ea
Merge branch 'trunk' into KAFKA-19234
FrankYang0529 758663d
add comment and test case testEmptyTopicNameAndIDForProduceVersionFro…
FrankYang0529 73e5756
Merge branch 'trunk' into KAFKA-19234
FrankYang0529 70067da
address comment
FrankYang0529 0c60e51
Merge branch 'trunk' into KAFKA-19234
FrankYang0529 d2c0fb3
address comment
FrankYang0529 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,7 +53,7 @@ import org.apache.kafka.security.authorizer.AclEntry | |
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST | ||
import org.junit.jupiter.api.Assertions._ | ||
import org.junit.jupiter.params.ParameterizedTest | ||
import org.junit.jupiter.params.provider.{CsvSource, MethodSource, ValueSource} | ||
import org.junit.jupiter.params.provider.{MethodSource, ValueSource} | ||
|
||
import java.util.Collections.singletonList | ||
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic | ||
|
@@ -296,22 +296,22 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |
new requests.MetadataRequest.Builder(List(topic).asJava, allowAutoTopicCreation).build() | ||
} | ||
|
||
private def createProduceRequestWithId(id: Uuid) = { | ||
private def createProduceRequest(name: String, id: Uuid, version: Short) = { | ||
requests.ProduceRequest.builder(new ProduceRequestData() | ||
.setTopicData(new ProduceRequestData.TopicProduceDataCollection( | ||
Collections.singletonList(new ProduceRequestData.TopicProduceData() | ||
.setTopicId(id).setPartitionData(Collections.singletonList( | ||
new ProduceRequestData.PartitionProduceData() | ||
.setIndex(tp.partition) | ||
.setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes)))))) | ||
.iterator)) | ||
.setAcks(1.toShort) | ||
.setTimeoutMs(5000)) | ||
.build() | ||
.setTopicData(new ProduceRequestData.TopicProduceDataCollection( | ||
util.List.of(new ProduceRequestData.TopicProduceData() | ||
.setName(name) | ||
.setTopicId(id) | ||
.setPartitionData(util.List.of( | ||
new ProduceRequestData.PartitionProduceData() | ||
.setIndex(tp.partition) | ||
.setRecords(MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("test".getBytes)))))) | ||
.iterator)) | ||
.setAcks(1.toShort) | ||
.setTimeoutMs(5000)) | ||
.build(version) | ||
} | ||
|
||
private def createProduceRequest = createProduceRequestWithId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID)) | ||
|
||
private def createFetchRequest = { | ||
val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] | ||
partitionMap.put(tp, new requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID), | ||
|
@@ -837,11 +837,13 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |
@ValueSource(strings = Array("kip932")) | ||
def testAuthorizationWithTopicExisting(quorum: String): Unit = { | ||
//First create the topic so we have a valid topic ID | ||
sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest)) | ||
createTopicWithBrokerPrincipal(topic) | ||
val topicId = getTopicIds()(topic) | ||
assertNotNull(topicId) | ||
|
||
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( | ||
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true), | ||
ApiKeys.PRODUCE -> createProduceRequest, | ||
ApiKeys.PRODUCE -> createProduceRequest("", topicId, ApiKeys.PRODUCE.latestVersion()), | ||
ApiKeys.FETCH -> createFetchRequest, | ||
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, | ||
ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, | ||
|
@@ -889,7 +891,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest | ||
) | ||
|
||
sendRequests(requestKeyToRequest, true) | ||
sendRequests(requestKeyToRequest) | ||
} | ||
|
||
/* | ||
|
@@ -902,7 +904,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |
val topicNames = Map(id -> "topic") | ||
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( | ||
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = false), | ||
ApiKeys.PRODUCE -> createProduceRequestWithId(id), | ||
ApiKeys.PRODUCE -> createProduceRequest("", id, ApiKeys.PRODUCE.latestVersion()), | ||
ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, ApiKeys.FETCH.latestVersion()), | ||
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, | ||
ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, | ||
|
@@ -923,8 +925,56 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |
sendRequests(requestKeyToRequest, false, topicNames) | ||
} | ||
|
||
/** | ||
* Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if the client doesn't have permission | ||
* and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to | ||
* prevent leaking the topic name. | ||
* This case covers produce request version from oldest to 12. | ||
* The newer version is covered by testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting. | ||
*/ | ||
@ParameterizedTest | ||
@ValueSource(booleans = Array(true, false)) | ||
def testAuthorizationProduceVersionFromOldestTo12(withTopicExisting: Boolean): Unit = { | ||
if (withTopicExisting) { | ||
createTopicWithBrokerPrincipal(topic) | ||
} | ||
|
||
for (version <- ApiKeys.PRODUCE.oldestVersion to 12) { | ||
val request = createProduceRequest(topic, Uuid.ZERO_UUID, version.toShort) | ||
val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName) | ||
val errorCode = response.asInstanceOf[ProduceResponse] | ||
.data() | ||
.responses() | ||
.find(topic, Uuid.ZERO_UUID) | ||
.partitionResponses.asScala.find(_.index == part).get | ||
.errorCode | ||
|
||
assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, s"unexpected error for produce request version $version") | ||
} | ||
} | ||
|
||
/** | ||
* Test that the produce request fails with UNKNOWN_TOPIC_ID if both topic name and id are default values when request version >= 13. | ||
* The produce request only supports topic id above version 13. | ||
*/ | ||
@Test | ||
def testEmptyTopicNameAndIDForProduceVersionFrom13ToNewest(): Unit = { | ||
for (version <- 13 to ApiKeys.PRODUCE.latestVersion()) { | ||
val request = createProduceRequest("", Uuid.ZERO_UUID, version.toShort) | ||
val response = connectAndReceive[AbstractResponse](request, listenerName = listenerName) | ||
val errorCode = response.asInstanceOf[ProduceResponse] | ||
.data() | ||
.responses() | ||
.find("", Uuid.ZERO_UUID) | ||
.partitionResponses.asScala.find(_.index == part).get | ||
.errorCode | ||
|
||
assertEquals(Errors.UNKNOWN_TOPIC_ID.code(), errorCode, s"unexpected error for produce request version $version") | ||
} | ||
} | ||
|
||
@ParameterizedTest | ||
@CsvSource(value = Array("false", "true")) | ||
@ValueSource(booleans = Array(true, false)) | ||
def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = { | ||
val topicId = if (withTopicExisting) { | ||
createTopicWithBrokerPrincipal(topic) | ||
|
@@ -973,19 +1023,29 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest { | |
} | ||
} | ||
|
||
/* | ||
* even if the topic doesn't exist, request APIs should not leak the topic name | ||
/** | ||
* Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if the client doesn't have permission | ||
* and topic name is used in the request. Even if the topic doesn't exist, we return TOPIC_AUTHORIZATION_FAILED to | ||
* prevent leaking the topic name. | ||
* This case covers fetch request version from oldest to 12. | ||
* The newer version is covered by testAuthorizationWithTopicNotExisting and testAuthorizationWithTopicExisting. | ||
*/ | ||
@ParameterizedTest | ||
@ValueSource(strings = Array("kraft")) | ||
def testAuthorizationFetchV12WithTopicNotExisting(quorum: String): Unit = { | ||
@ValueSource(booleans = Array(true, false)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we change the ordering in |
||
def testAuthorizationFetchVersionFromOldestTo12(withTopicExisting: Boolean): Unit = { | ||
if (withTopicExisting) { | ||
createTopicWithBrokerPrincipal(topic) | ||
} | ||
|
||
val id = Uuid.ZERO_UUID | ||
val topicNames = Map(id -> "topic") | ||
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( | ||
ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 12), | ||
) | ||
val topicNames = Map(id -> topic) | ||
for (version <- ApiKeys.FETCH.oldestVersion to 12) { | ||
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( | ||
ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, version.toShort), | ||
) | ||
|
||
sendRequests(requestKeyToRequest, false, topicNames) | ||
sendRequests(requestKeyToRequest, withTopicExisting, topicNames) | ||
} | ||
} | ||
|
||
@ParameterizedTest | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is covered by
testAuthorizationWithTopicNotExisting
, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a little different. The
testAuthorizationWithTopicNotExisting
uses non-zero UUID, buttestEmptyTopicNameAndIDForProduceVersionFrom13ToNewest
uses zero UUID. The case would like to make sure even if topic id is zero, it still returns UNKNOWN_TOPIC_ID error if topic name is empty and request version >= 13.