diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/CHANGELOG.md b/sdk/webpubsub/azure-messaging-webpubsub-client/CHANGELOG.md index 0e9980263a6f..bb9062b640e6 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/CHANGELOG.md +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed a race condition where Web PubSub client send operations could miss fast ack responses. + ### Other Changes ## 1.1.7 (2026-01-29) diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java index ec5cc290bf88..cef63321654a 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/WebPubSubAsyncClient.java @@ -315,7 +315,7 @@ public Mono joinGroup(String group, Long ackId) { if (ackId == null) { ackId = nextAckId(); } - return sendMessage(new JoinGroupMessage().setGroup(group).setAckId(ackId)).then(waitForAckMessage(ackId)) + return sendMessageAndWaitForAck(new JoinGroupMessage().setGroup(group).setAckId(ackId), ackId) .retryWhen(sendMessageRetrySpec) .map(result -> { groups.compute(group, @@ -346,7 +346,7 @@ public Mono leaveGroup(String group, Long ackId) { if (ackId == null) { ackId = nextAckId(); } - return sendMessage(new LeaveGroupMessage().setGroup(group).setAckId(ackId)).then(waitForAckMessage(ackId)) + return sendMessageAndWaitForAck(new LeaveGroupMessage().setGroup(group).setAckId(ackId), ackId) .retryWhen(sendMessageRetrySpec) .map(result -> { groups.compute(group, @@ -414,8 +414,7 @@ public Mono sendToGroup(String group, BinaryData content, WebPu .setAckId(ackId) .setNoEcho(options.isEchoDisabled()); - Mono sendMessageMono = sendMessage(message); - Mono responseMono = sendMessageMono.then(waitForAckMessage(ackId)); + Mono responseMono = sendMessageAndWaitForAck(message, ackId); return responseMono.retryWhen(sendMessageRetrySpec); } @@ -454,8 +453,7 @@ public Mono sendEvent(String eventName, BinaryData content, Web .setDataType(dataFormat.toString()) .setAckId(ackId); - Mono sendMessageMono = sendMessage(message); - Mono responseMono = sendMessageMono.then(waitForAckMessage(ackId)); + Mono responseMono = sendMessageAndWaitForAck(message, ackId); return responseMono.retryWhen(sendMessageRetrySpec); } @@ -514,13 +512,7 @@ public Flux receiveRejoinGroupFailedEvents() { } private long nextAckId() { - return ackId.getAndUpdate(value -> { - // keep positive - if (++value < 0) { - value = 0; - } - return value; - }); + return ackId.updateAndGet(value -> value == Long.MAX_VALUE ? 1 : Math.max(0, value) + 1); } private Flux receiveAckMessages() { @@ -540,6 +532,11 @@ private Mono sendMessage(WebPubSubMessage message) { })); } + private Mono sendMessageAndWaitForAck(WebPubSubMessage message, Long ackId) { + return Mono.defer(() -> Mono.zip(waitForAckMessage(ackId), sendMessage(message).thenReturn(true)) + .map(tuple -> tuple.getT1())); + } + private Mono checkStateBeforeSend() { return Mono.defer(() -> { WebPubSubClientState state = clientState.get(); diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java index eb167c731ec2..bdbc706d1b6d 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/main/java/com/azure/messaging/webpubsub/client/implementation/websocket/WebSocketClientHandler.java @@ -163,13 +163,25 @@ private boolean processMessage(ChannelHandlerContext context, WebSocketFrame web } private void publishBuffer() { - final ByteBuffer[] nioBuffers = compositeByteBuf.nioBuffers(); - - if (nioBuffers.length == 0) { + if (compositeByteBuf == null) { return; } + if (compositeByteBuf.refCnt() == 0) { + compositeByteBuf = null; + return; + } try { + if (compositeByteBuf.readableBytes() == 0) { + return; + } + + final ByteBuffer[] nioBuffers = compositeByteBuf.nioBuffers(); + + if (nioBuffers.length == 0) { + return; + } + final BinaryData data = BinaryData.fromListByteBuffer(Arrays.asList(nioBuffers)); final String collected = data.toString(); final WebPubSubMessage deserialized = messageDecoder.decode(collected); @@ -177,6 +189,7 @@ private void publishBuffer() { messageHandler.accept(deserialized); } finally { release(compositeByteBuf); + compositeByteBuf = null; } } @@ -198,9 +211,8 @@ CloseWebSocketFrame getServerCloseWebSocketFrame() { } private static void release(CompositeByteBuf buffer) { - if (buffer.refCnt() > 0) { + if (buffer != null && buffer.refCnt() > 0) { buffer.release(); - buffer.clear(); } } } diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/ClientTests.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/ClientTests.java index d4b5e38fcc07..b78f4c09a911 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/ClientTests.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/ClientTests.java @@ -93,9 +93,10 @@ public void testTwoClients() throws InterruptedException { @Test @LiveOnly - public void testClientCloseable() { + public void testClientCloseable() throws InterruptedException { CountDownLatch connectedLatch = new CountDownLatch(1); CountDownLatch stoppedLatch = new CountDownLatch(1); + CountDownLatch disconnectedLatch = new CountDownLatch(1); AtomicBoolean stoppedEventReceived = new AtomicBoolean(false); AtomicBoolean disconnectedEventReceived = new AtomicBoolean(false); @@ -105,16 +106,20 @@ public void testClientCloseable() { stoppedLatch.countDown(); }); client.addOnConnectedEventHandler(connectedEvent -> connectedLatch.countDown()); - client.addOnDisconnectedEventHandler(disconnectedEvent -> disconnectedEventReceived.set(true)); + client.addOnDisconnectedEventHandler(disconnectedEvent -> { + disconnectedEventReceived.set(true); + disconnectedLatch.countDown(); + }); client.start(); - connectedLatch.countDown(); + Assertions.assertTrue(connectedLatch.await(10, TimeUnit.SECONDS)); // stop not called explicitly } - stoppedLatch.countDown(); + Assertions.assertTrue(stoppedLatch.await(10, TimeUnit.SECONDS)); + Assertions.assertTrue(disconnectedLatch.await(10, TimeUnit.SECONDS)); // verify client stopped via Closeable Assertions.assertTrue(stoppedEventReceived.get()); diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/MockClientTests.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/MockClientTests.java index eac5d5c101ac..c3b3dfd75cc4 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/MockClientTests.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/MockClientTests.java @@ -4,13 +4,16 @@ package com.azure.messaging.webpubsub.client; import com.azure.messaging.webpubsub.client.implementation.WebPubSubClientState; +import com.azure.messaging.webpubsub.client.implementation.models.AckMessage; import com.azure.messaging.webpubsub.client.implementation.models.ConnectedMessage; import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessage; +import com.azure.messaging.webpubsub.client.implementation.models.WebPubSubMessageAck; import com.azure.messaging.webpubsub.client.implementation.websocket.SendResult; import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketClient; import com.azure.messaging.webpubsub.client.implementation.websocket.WebSocketSession; import com.azure.messaging.webpubsub.client.models.ConnectFailedException; import com.azure.messaging.webpubsub.client.models.ConnectedEvent; +import com.azure.messaging.webpubsub.client.models.WebPubSubResult; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -18,9 +21,11 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; public class MockClientTests { @@ -71,6 +76,32 @@ public void testConnect() throws InterruptedException { Assertions.assertEquals(1, events.size()); } + @Test + public void testGeneratedAckIdStartsAtOne() { + List ackIds = new ArrayList<>(); + AtomicReference> messageHandlerReference = new AtomicReference<>(); + + WebSocketClient mockWsClient = (cec, path, loggerReference, messageHandler, openHandler, closeHandler) -> { + messageHandlerReference.set(messageHandler); + WebSocketSession mockWsSession = new MockWebSocketSession(true, messageHandlerReference, ackIds); + openHandler.accept(mockWsSession); + messageHandler.accept(new ConnectedMessage("mock_connection_id")); + return mockWsSession; + }; + + WebPubSubClientBuilder builder = new WebPubSubClientBuilder(); + builder.webSocketClient = mockWsClient; + WebPubSubClient client = builder.clientAccessUrl("mock").buildClient(); + + client.start(); + WebPubSubResult joinResult = client.joinGroup("group"); + WebPubSubResult sendResult = client.sendToGroup("group", "message"); + + Assertions.assertEquals(1L, joinResult.getAckId()); + Assertions.assertEquals(2L, sendResult.getAckId()); + Assertions.assertIterableEquals(Arrays.asList(1L, 2L), ackIds); + } + private static void sendConnectedEvent(Consumer messageHandler) { Mono.delay(SMALL_DELAY) .then(Mono.fromRunnable(() -> messageHandler.accept(new ConnectedMessage("mock_connection_id"))) @@ -79,14 +110,34 @@ private static void sendConnectedEvent(Consumer messageHandler } private static final class MockWebSocketSession implements WebSocketSession { + private final boolean open; + private final AtomicReference> messageHandlerReference; + private final List ackIds; + + private MockWebSocketSession() { + this(false, null, null); + } + + private MockWebSocketSession(boolean open, AtomicReference> messageHandlerReference, + List ackIds) { + this.open = open; + this.messageHandlerReference = messageHandlerReference; + this.ackIds = ackIds; + } + @Override public boolean isOpen() { - return false; + return open; } @Override public void sendObjectAsync(Object data, Consumer handler) { - + if (data instanceof WebPubSubMessageAck) { + long ackId = ((WebPubSubMessageAck) data).getAckId(); + ackIds.add(ackId); + messageHandlerReference.get().accept(new AckMessage().setAckId(ackId).setSuccess(true)); + } + handler.accept(new SendResult()); } @Override diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java index ac86a5b9aa5c..79296d70d651 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestBase.java @@ -21,16 +21,31 @@ */ public class TestBase extends TestProxyTestBase { + private static volatile WebPubSubServiceClient serviceClient; + + private static WebPubSubServiceClient getServiceClient() { + if (serviceClient == null) { + synchronized (TestBase.class) { + if (serviceClient == null) { + Configuration configuration = Configuration.getGlobalConfiguration(); + + serviceClient + = new WebPubSubServiceClientBuilder().endpoint(configuration.get("WEB_PUB_SUB_ENDPOINT")) + .credential(TestUtils.getIdentityTestCredential(TestMode.LIVE)) + .hub("hub1") + .buildClient(); + } + } + } + return serviceClient; + } + protected static WebPubSubClientBuilder getClientBuilder() { return getClientBuilder("user1"); } protected static WebPubSubClientBuilder getClientBuilder(String userId) { - WebPubSubServiceClient client = new WebPubSubServiceClientBuilder() - .endpoint(Configuration.getGlobalConfiguration().get("WEB_PUB_SUB_ENDPOINT")) - .credential(TestUtils.getIdentityTestCredential(TestMode.LIVE)) - .hub("hub1") - .buildClient(); + WebPubSubServiceClient client = getServiceClient(); // client builder return new WebPubSubClientBuilder().credential(new WebPubSubClientCredential( diff --git a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestUtils.java b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestUtils.java index 2ab8e4c663f7..dd97cd992018 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestUtils.java +++ b/sdk/webpubsub/azure-messaging-webpubsub-client/src/test/java/com/azure/messaging/webpubsub/client/TestUtils.java @@ -35,9 +35,7 @@ private static TokenCredential getIdentityTestCredentialHelper() { Configuration config = Configuration.getGlobalConfiguration(); ChainedTokenCredentialBuilder builder - = new ChainedTokenCredentialBuilder().addLast(new EnvironmentCredentialBuilder().build()) - .addLast(new AzureCliCredentialBuilder().build()) - .addLast(new AzureDeveloperCliCredentialBuilder().build()); + = new ChainedTokenCredentialBuilder().addLast(new EnvironmentCredentialBuilder().build()); String serviceConnectionId = config.get("AZURESUBSCRIPTION_SERVICE_CONNECTION_ID"); String clientId = config.get("AZURESUBSCRIPTION_CLIENT_ID"); @@ -59,7 +57,9 @@ private static TokenCredential getIdentityTestCredentialHelper() { builder.addLast(trc -> azurePipelinesCredential.getToken(trc).subscribeOn(Schedulers.boundedElastic())); } - builder.addLast(new AzurePowerShellCredentialBuilder().build()); + builder.addLast(new AzurePowerShellCredentialBuilder().build()) + .addLast(new AzureCliCredentialBuilder().build()) + .addLast(new AzureDeveloperCliCredentialBuilder().build()); return builder.build(); } diff --git a/sdk/webpubsub/azure-messaging-webpubsub/src/test/java/com/azure/messaging/webpubsub/TestUtils.java b/sdk/webpubsub/azure-messaging-webpubsub/src/test/java/com/azure/messaging/webpubsub/TestUtils.java index 07c9dc896c4d..debc6bd78ec2 100644 --- a/sdk/webpubsub/azure-messaging-webpubsub/src/test/java/com/azure/messaging/webpubsub/TestUtils.java +++ b/sdk/webpubsub/azure-messaging-webpubsub/src/test/java/com/azure/messaging/webpubsub/TestUtils.java @@ -31,12 +31,12 @@ final class TestUtils { static String getSocketIOEndpoint() { return Configuration.getGlobalConfiguration() - .get("WEB_PUB_SUB_SOCKETIO_ENDPOINT", "http://testsocketioendpoint.webpubsubdev.azure.com"); + .get("WEB_PUB_SUB_SOCKETIO_ENDPOINT", "https://testsocketioendpoint.webpubsubdev.azure.com"); } static String getEndpoint() { return Configuration.getGlobalConfiguration() - .get("WEB_PUB_SUB_ENDPOINT", "http://testendpoint.webpubsubdev.azure.com"); + .get("WEB_PUB_SUB_ENDPOINT", "https://testendpoint.webpubsubdev.azure.com"); } static String getConnectionString() { diff --git a/sdk/webpubsub/test-resources.bicep b/sdk/webpubsub/test-resources.bicep index b511b6ca477d..2521c05395e0 100644 --- a/sdk/webpubsub/test-resources.bicep +++ b/sdk/webpubsub/test-resources.bicep @@ -8,10 +8,34 @@ param testApplicationOid string param location string = resourceGroup().location var webPubSubName = '${baseName}-e2e' +var webPubSubSocketIOName = '${baseName}-socketio-e2e' // Find role id by heading to the Web Pub Sub resource, selecting Access Control (IAM), Roles, choose the Role, // then click on View under Details and check out the JSON. -var webPubSubContributorRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') +var webPubSubOwnerRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '12cf5a90-567b-43ae-8102-96cf46c7d9b4') +var webPubSubOperatorRoleId = subscriptionResourceId('Microsoft.Authorization/roleDefinitions', 'c7393b34-138c-406f-901b-d8cf2b17e6ae') + +resource webPubSubSocketIO 'Microsoft.SignalRService/webPubSub@2024-10-01-preview' = { + name: webPubSubSocketIOName + location: location + kind: 'SocketIO' + sku: { + name: 'Standard_S1' + tier: 'Standard' + capacity: 1 + } + identity: { + type: 'None' + } + properties: { + tls: { + clientCertEnabled: false + } + publicNetworkAccess: 'Enabled' + disableLocalAuth: false + disableAadAuth: false + } +} resource webPubSub 'Microsoft.SignalRService/webPubSub@2021-10-01' = { name: webPubSubName @@ -41,12 +65,43 @@ resource webPubSub 'Microsoft.SignalRService/webPubSub@2021-10-01' = { } } -resource webPubSubContributor 'Microsoft.Authorization/roleAssignments@2020-04-01-preview' = { - name: guid('contributor', webPubSubName) +resource webPubSubOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('owner', webPubSub.id, testApplicationOid) scope: webPubSub properties: { - roleDefinitionId: webPubSubContributorRoleId + roleDefinitionId: webPubSubOwnerRoleId + principalId: testApplicationOid + principalType: 'ServicePrincipal' + } +} + +resource webPubSubOperatorRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('operator', webPubSub.id, testApplicationOid) + scope: webPubSub + properties: { + roleDefinitionId: webPubSubOperatorRoleId + principalId: testApplicationOid + principalType: 'ServicePrincipal' + } +} + +resource webPubSubSocketIOOwnerRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('owner', webPubSubSocketIO.id, testApplicationOid) + scope: webPubSubSocketIO + properties: { + roleDefinitionId: webPubSubOwnerRoleId + principalId: testApplicationOid + principalType: 'ServicePrincipal' + } +} + +resource webPubSubSocketIOOperatorRoleAssignment 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('operator', webPubSubSocketIO.id, testApplicationOid) + scope: webPubSubSocketIO + properties: { + roleDefinitionId: webPubSubOperatorRoleId principalId: testApplicationOid + principalType: 'ServicePrincipal' } } @@ -54,3 +109,4 @@ output AZURE_SUBSCRIPTION_ID string = subscription().subscriptionId output AZURE_RESOURCE_GROUP_NAME string = resourceGroup().name output WEB_PUB_SUB_CONNECTION_STRING string = webPubSub.listKeys().primaryConnectionString output WEB_PUB_SUB_ENDPOINT string = 'https://${webPubSub.properties.hostName}' +output WEB_PUB_SUB_SOCKETIO_ENDPOINT string = 'https://${webPubSubSocketIO.properties.hostName}'