diff --git a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala index e7926df3e36cf..f7675651cd6dd 100644 --- a/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala @@ -913,6 +913,24 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponseData.results.asScala.toSet, deleteGroupsResponse.data.results.asScala.toSet) } + protected def connectAny(): Socket = { + val socket: Socket = IntegrationTestUtils.connect( + cluster.anyBrokerSocketServer(), + cluster.clientListener() + ) + openSockets += socket + socket + } + + protected def connect(destination: Int): Socket = { + val socket: Socket = IntegrationTestUtils.connect( + brokerSocketServer(destination), + cluster.clientListener() + ) + openSockets += socket + socket + } + protected def connectAndReceive[T <: AbstractResponse]( request: AbstractRequest )(implicit classTag: ClassTag[T]): T = { @@ -934,23 +952,6 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { ) } - protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( - request: AbstractRequest, - destination: Int - )(implicit classTag: ClassTag[T]): T = { - val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), cluster.clientListener()) - openSockets += socket - IntegrationTestUtils.sendAndReceive[T](request, socket) - } - - protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( - request: AbstractRequest - )(implicit classTag: ClassTag[T]): T = { - val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(), cluster.clientListener()) - openSockets += socket - IntegrationTestUtils.sendAndReceive[T](request, socket) - } - private def brokerSocketServer(brokerId: Int): SocketServer = { getBrokers.find { broker => broker.config.brokerId == brokerId diff --git a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala index 7aba491536fd0..aef8d49aa099e 100644 --- a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala @@ -28,6 +28,7 @@ import org.apache.kafka.server.common.Feature import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, Timeout} +import java.net.Socket import java.util import java.util.Collections import scala.jdk.CollectionConverters._ @@ -59,8 +60,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)) ) + val socket: Socket = connectAny() + val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode) assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs) @@ -75,8 +78,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val groupId: String = "group" val metadata: ShareRequestMetadata = new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.INITIAL_EPOCH) + val socket: Socket = connectAny() + val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, Map.empty) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) assertEquals(Errors.UNSUPPORTED_VERSION.code, shareAcknowledgeResponse.data.errorCode) } @@ -122,9 +127,11 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connect(nonReplicaId) + // Send the share fetch request to the non-replica and verify the error code val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest, nonReplicaId) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) assertEquals(30000, shareFetchResponse.data.acquisitionLockTimeoutMs) val partitionData = shareFetchResponse.responseData(topicNames).get(topicIdPartition) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionData.errorCode) @@ -163,8 +170,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -174,7 +183,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -227,8 +236,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2, topicIdPartition3) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partitions - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic partitions created above @@ -245,7 +256,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // as the share partitions might not be initialized yet. So, we retry until we get the response. var responses = Seq[ShareFetchResponseData.PartitionData]() TestUtils.waitUntilTrue(() => { - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs) @@ -334,15 +345,19 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty + val socket1: Socket = connect(leader1) + val socket2: Socket = connect(leader2) + val socket3: Socket = connect(leader3) + // Send the first share fetch request to initialize the share partitions // Create different share fetch requests for different partitions as they may have leaders on separate brokers var shareFetchRequest1 = createShareFetchRequest(groupId, metadata, send1, Seq.empty, acknowledgementsMap) var shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap) var shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap) - var shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1, destination = leader1) - var shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2, destination = leader2) - var shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3, destination = leader3) + var shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1) + var shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2) + var shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) initProducer() // Producing 10 records to the topic partitions created above @@ -356,9 +371,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareFetchRequest2 = createShareFetchRequest(groupId, metadata, send2, Seq.empty, acknowledgementsMap) shareFetchRequest3 = createShareFetchRequest(groupId, metadata, send3, Seq.empty, acknowledgementsMap) - shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1, destination = leader1) - shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2, destination = leader2) - shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3, destination = leader3) + shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1) + shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2) + shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) val shareFetchResponseData1 = shareFetchResponse1.data() assertEquals(Errors.NONE.code, shareFetchResponseData1.errorCode) @@ -439,8 +454,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize share partitions - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -451,7 +468,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -478,7 +495,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -500,7 +517,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -554,8 +571,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send, 15000) + sendFirstShareFetchRequest(memberId, groupId, send, socket, 15000) initProducer() // Producing 10 records to the topic created above @@ -566,7 +585,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -595,7 +614,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -620,7 +639,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -672,8 +691,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partiion - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -684,7 +705,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -711,7 +732,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -730,7 +751,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -782,8 +803,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -794,7 +817,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -839,7 +862,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo releaseAcknowledgementSent = true } shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -896,8 +919,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -908,7 +933,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -935,7 +960,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -957,7 +982,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1009,8 +1034,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1021,7 +1048,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1050,7 +1077,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(3.toByte))).asJava) // Reject the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1075,7 +1102,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1129,8 +1156,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the shar partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1141,7 +1170,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1168,7 +1197,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records var shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - var shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + var shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) var shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -1187,7 +1216,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1213,7 +1242,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records again shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -1235,7 +1264,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1276,7 +1305,6 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo def testShareFetchRequestSuccessfulSharingBetweenMultipleConsumers(): Unit = { val groupId: String = "group" - val memberId = Uuid.randomUuid() val memberId1 = Uuid.randomUuid() val memberId2 = Uuid.randomUuid() val memberId3 = Uuid.randomUuid() @@ -1291,8 +1319,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket1: Socket = connectAny() + val socket2: Socket = connectAny() + val socket3: Socket = connectAny() + // Sending a dummy share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId1, groupId, send, socket1) initProducer() // Producing 10000 records to the topic created above @@ -1312,9 +1344,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest3 = createShareFetchRequest(groupId, metadata3, send, Seq.empty, acknowledgementsMap3, minBytes = 100, maxBytes = 1500) - val shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1) - val shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2) - val shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3) + val shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1) + val shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2) + val shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) val shareFetchResponseData1 = shareFetchResponse1.data() @@ -1384,10 +1416,14 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket1: Socket = connectAny() + val socket2: Socket = connectAny() + val socket3: Socket = connectAny() + // Sending 3 dummy share Fetch Requests with to inititlaize the share partitions for each share group\ - sendFirstShareFetchRequest(memberId1, groupId1, send) - sendFirstShareFetchRequest(memberId2, groupId2, send) - sendFirstShareFetchRequest(memberId3, groupId3, send) + sendFirstShareFetchRequest(memberId1, groupId1, send, socket1) + sendFirstShareFetchRequest(memberId2, groupId2, send, socket2) + sendFirstShareFetchRequest(memberId3, groupId3, send, socket3) initProducer() // Producing 10 records to the topic created above @@ -1407,9 +1443,9 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val acknowledgementsMap3: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest3 = createShareFetchRequest(groupId3, metadata3, send, Seq.empty, acknowledgementsMap3) - val shareFetchResponse1 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest1) - val shareFetchResponse2 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest2) - val shareFetchResponse3 = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest3) + val shareFetchResponse1 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest1, socket1) + val shareFetchResponse2 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest2, socket2) + val shareFetchResponse3 = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest3, socket3) val shareFetchResponseData1 = shareFetchResponse1.data() @@ -1475,8 +1511,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1487,7 +1525,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1516,7 +1554,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1542,7 +1580,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(19) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1583,8 +1621,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1595,7 +1635,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var acknowledgementsMapForFetch: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1624,7 +1664,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMapForFetch) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1651,7 +1691,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(19) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Accept the records val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMapForAcknowledge) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.NONE.code, shareAcknowledgeResponseData.errorCode) @@ -1704,6 +1744,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch() @@ -1711,7 +1753,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) // Acknowledgements in the Initial Fetch Request val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() // The response will have a top level error code because this is an Initial Fetch request with acknowledgement data present @@ -1750,6 +1792,8 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val topicId = topicIds.get(topic) val topicIdPartition = new TopicIdPartition(topicId, new TopicPartition(topic, partition)) + val socket: Socket = connectAny() + // Send the share fetch request to fetch the records produced above val metadata: ShareRequestMetadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareAcknowledgeRequestData.AcknowledgementBatch]] = @@ -1759,7 +1803,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode) @@ -1798,8 +1842,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1809,7 +1855,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1831,7 +1877,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.nextEpoch(shareSessionEpoch)) metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareFetchResponseData.errorCode) @@ -1870,8 +1916,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1881,7 +1929,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1908,7 +1956,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode) @@ -1948,8 +1996,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -1959,7 +2009,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) var shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - var shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + var shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) var shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -1981,7 +2031,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch) metadata = new ShareRequestMetadata(wrongMemberId, shareSessionEpoch) shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareFetchResponseData.errorCode) @@ -2026,11 +2076,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket1: Socket = connectAny() + val socket2: Socket = connectAny() + val socket3: Socket = connectAny() + // member1 sends share fetch request to register it's share session. Note it does not close the socket connection after. TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId1, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket1) val shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData.errorCode == Errors.NONE.code }, "Share fetch request failed", 5000) @@ -2039,7 +2093,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId2, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket2) val shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData.errorCode == Errors.NONE.code }, "Share fetch request failed", 5000) @@ -2050,20 +2104,22 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket3) val shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData.errorCode == Errors.SHARE_SESSION_NOT_FOUND.code }, "Share fetch request failed", 5000) - // Now we will close the socket connections for the above three members, mimicking a client disconnection + // Now we will close the socket connections for the members, mimicking a client disconnection closeSockets() - // Since the socket connections were closed before, the corresponding share sessions were dropped from the ShareSessionCache - // on the broker. Now, since the cache is empty, new share sessions can be registered + val socket4: Socket = connectAny() + + // Since one of the socket connections was closed before, the corresponding share session was dropped from the ShareSessionCache + // on the broker. Now, since the cache is not full, new share sessions can be registered TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId3, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket4) val shareFetchResponseData = shareFetchResponse.data() shareFetchResponseData.errorCode == Errors.NONE.code }, "Share fetch request failed", 5000) @@ -2103,8 +2159,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -2114,7 +2172,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo var shareSessionEpoch = ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH) var metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -2141,7 +2199,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo .setLastOffset(9) .setAcknowledgeTypes(Collections.singletonList(1.toByte))).asJava) val shareAcknowledgeRequest = createShareAcknowledgeRequest(groupId, metadata, acknowledgementsMap) - val shareAcknowledgeResponse = connectAndReceiveWithoutClosingSocket[ShareAcknowledgeResponse](shareAcknowledgeRequest) + val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket) val shareAcknowledgeResponseData = shareAcknowledgeResponse.data() assertEquals(Errors.SHARE_SESSION_NOT_FOUND.code, shareAcknowledgeResponseData.errorCode) @@ -2182,8 +2240,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition1, topicIdPartition2) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic partitions created above @@ -2200,7 +2260,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // as the share partitions might not be initialized yet. So, we retry until we get the response. var responses = Seq[ShareFetchResponseData.PartitionData]() TestUtils.waitUntilTrue(() => { - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) assertEquals(30000, shareFetchResponseData.acquisitionLockTimeoutMs) @@ -2226,7 +2286,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo metadata = new ShareRequestMetadata(memberId, shareSessionEpoch) val forget: Seq[TopicIdPartition] = Seq(topicIdPartition1) shareFetchRequest = createShareFetchRequest(groupId, metadata, Seq.empty, forget, acknowledgementsMap) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -2277,8 +2337,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -2288,7 +2350,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap, maxRecords = 1, batchSize = 1) - val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -2339,8 +2401,10 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val send: Seq[TopicIdPartition] = Seq(topicIdPartition) + val socket: Socket = connectAny() + // Send the first share fetch request to initialize the share partition - sendFirstShareFetchRequest(memberId, groupId, send) + sendFirstShareFetchRequest(memberId, groupId, send, socket) initProducer() // Producing 10 records to the topic created above @@ -2350,7 +2414,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch(ShareRequestMetadata.INITIAL_EPOCH)) val acknowledgementsMap: Map[TopicIdPartition, util.List[ShareFetchRequestData.AcknowledgementBatch]] = Map.empty val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, acknowledgementsMap, maxRecords = 5, batchSize = 1) - val shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode) @@ -2371,12 +2435,12 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo // For initial fetch request, the response may not be available in the first attempt when the share // partition is not initialized yet. Hence, wait for response from all partitions before proceeding. - private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition], lockTimeout: Int = 30000): Unit = { + private def sendFirstShareFetchRequest(memberId: Uuid, groupId: String, topicIdPartitions: Seq[TopicIdPartition], socket: Socket, lockTimeout: Int = 30000): Unit = { val partitions: util.Set[Integer] = new util.HashSet() TestUtils.waitUntilTrue(() => { val metadata = new ShareRequestMetadata(memberId, ShareRequestMetadata.INITIAL_EPOCH) val shareFetchRequest = createShareFetchRequest(groupId, metadata, topicIdPartitions, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = IntegrationTestUtils.sendAndReceive[ShareFetchResponse](shareFetchRequest, socket) val shareFetchResponseData = shareFetchResponse.data() assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)