Skip to content

KAFKA-19241: Updated tests in ShareFetchAcknowledgeRequestTest to reuse the socket for subsequent requests #19640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 7, 2025

Conversation

chirag-wadhwa5
Copy link
Contributor

Currently in the tests in ShareFetchAcknowledgeRequestTest, subsequent share fetch / share acknowledge requests creates a new socket everytime, even when the requests are sent by the same member. In reality, a single share consumer clisnet will reuse the same socket for all the share related requests in its lifetime. This PR changes the behaviour in the tests to align with reality and reuse the same socket for all requests by the same share group member.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) labels May 5, 2025
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka and removed triage PRs from the community labels May 5, 2025
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, some comments.

@@ -913,6 +902,20 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) {
assertEquals(expectedResponseData.results.asScala.toSet, deleteGroupsResponse.data.results.asScala.toSet)
}

protected def createSocket(): Socket = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be connectAny?

)
}

protected def createSocket(destination: Int): Socket = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be connect?

Comment on lines 940 to 945
protected def sendAndReceiveFromExistingSocket[T <: AbstractResponse](
request: AbstractRequest,
destination: Int
)(implicit classTag: ClassTag[T]): T = {
val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), cluster.clientListener())
openSockets += socket
IntegrationTestUtils.sendAndReceive[T](request, socket)
}

protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
request: AbstractRequest
socket: Socket
)(implicit classTag: ClassTag[T]): T = {
val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(), cluster.clientListener())
openSockets += socket
IntegrationTestUtils.sendAndReceive[T](request, socket)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this method and can't use IntegrationTestUtils.sendAndReceive[T](request, socket) directly?


assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode)
assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)

socket.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can have a check in tearDown that all open sockets are closed from the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I have now updated the tests to include a ListBuffer that stores any new sockets that are created. And in the tearDown, the list is iterated to close all the sockets that are still open

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we avoid this call here and rely on tearDown to close sockets then will it cause any trouble? Then you don't need to have an explicit if (!socket.isClosed) as well?

Comment on lines 150 to 152
if (!socket.isClosed) {
socket.close()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems strange that we are fetching from openSockets and checking if it's closed. So if it was closed then why not removed.


assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode)
assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)

socket.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if we avoid this call here and rely on tearDown to close sockets then will it cause any trouble? Then you don't need to have an explicit if (!socket.isClosed) as well?

Copy link
Contributor

@apoorvmittal10 apoorvmittal10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fixes.

@apoorvmittal10 apoorvmittal10 merged commit f3a4a1b into apache:trunk May 7, 2025
20 checks passed
shmily7829 pushed a commit to shmily7829/kafka that referenced this pull request May 7, 2025
…se the socket for subsequent requests (apache#19640)

Currently in the tests in ShareFetchAcknowledgeRequestTest, subsequent
share fetch / share acknowledge requests creates a new socket everytime,
even when the requests are sent by the same member. In reality, a single
share consumer clisnet will reuse the same socket for all the share
related requests in its lifetime. This PR changes the behaviour in the
tests to align with reality and reuse the same socket for all requests
by the same share group member.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants