-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-19241: Updated tests in ShareFetchAcknowledgeRequestTest to reuse the socket for subsequent requests #19640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…se the same socket in subsequent requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, some comments.
@@ -913,6 +902,20 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { | |||
assertEquals(expectedResponseData.results.asScala.toSet, deleteGroupsResponse.data.results.asScala.toSet) | |||
} | |||
|
|||
protected def createSocket(): Socket = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be connectAny
?
) | ||
} | ||
|
||
protected def createSocket(destination: Int): Socket = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be connect
?
protected def sendAndReceiveFromExistingSocket[T <: AbstractResponse]( | ||
request: AbstractRequest, | ||
destination: Int | ||
)(implicit classTag: ClassTag[T]): T = { | ||
val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), cluster.clientListener()) | ||
openSockets += socket | ||
IntegrationTestUtils.sendAndReceive[T](request, socket) | ||
} | ||
|
||
protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( | ||
request: AbstractRequest | ||
socket: Socket | ||
)(implicit classTag: ClassTag[T]): T = { | ||
val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(), cluster.clientListener()) | ||
openSockets += socket | ||
IntegrationTestUtils.sendAndReceive[T](request, socket) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need this method and can't use IntegrationTestUtils.sendAndReceive[T](request, socket)
directly?
|
||
assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode) | ||
assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs) | ||
|
||
socket.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we can have a check in tearDown
that all open sockets are closed from the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. I have now updated the tests to include a ListBuffer that stores any new sockets that are created. And in the tearDown, the list is iterated to close all the sockets that are still open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we avoid this call here and rely on tearDown
to close sockets then will it cause any trouble? Then you don't need to have an explicit if (!socket.isClosed)
as well?
if (!socket.isClosed) { | ||
socket.close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems strange that we are fetching from openSockets
and checking if it's closed. So if it was closed
then why not removed.
|
||
assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode) | ||
assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs) | ||
|
||
socket.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we avoid this call here and rely on tearDown
to close sockets then will it cause any trouble? Then you don't need to have an explicit if (!socket.isClosed)
as well?
…owledgeRequestTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the fixes.
…se the socket for subsequent requests (apache#19640) Currently in the tests in ShareFetchAcknowledgeRequestTest, subsequent share fetch / share acknowledge requests creates a new socket everytime, even when the requests are sent by the same member. In reality, a single share consumer clisnet will reuse the same socket for all the share related requests in its lifetime. This PR changes the behaviour in the tests to align with reality and reuse the same socket for all requests by the same share group member. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
Currently in the tests in ShareFetchAcknowledgeRequestTest, subsequent share fetch / share acknowledge requests creates a new socket everytime, even when the requests are sent by the same member. In reality, a single share consumer clisnet will reuse the same socket for all the share related requests in its lifetime. This PR changes the behaviour in the tests to align with reality and reuse the same socket for all requests by the same share group member.