Skip to content

Commit 1c977ff

Browse files
committed
small refactor. Organize code
1 parent 6d782d2 commit 1c977ff

10 files changed

Lines changed: 360 additions & 98 deletions

File tree

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/api/WebSocket.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,17 @@ public interface WebSocket {
126126
*/
127127
boolean sendBinaryBatch(List<ByteBuffer> fragments, boolean finalFragment);
128128

129+
/**
130+
* Returns the number of bytes currently queued for outbound transmission.
131+
*
132+
* <p>This value is intended for backpressure and observability. It reflects
133+
* bytes accepted by the implementation but not fully written to the
134+
* transport yet.</p>
135+
*
136+
* @return queued outbound bytes, or {@code -1} if unavailable
137+
* @since 5.7
138+
*/
139+
long queueSize();
129140
/**
130141
* Initiates the WebSocket close handshake.
131142
*

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/client/impl/protocol/Http2ExtendedConnectProtocol.java

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hc.client5.http.websocket.api.WebSocket;
5050
import org.apache.hc.client5.http.websocket.api.WebSocketClientConfig;
5151
import org.apache.hc.client5.http.websocket.api.WebSocketListener;
52+
import org.apache.hc.client5.http.websocket.transport.OutboundFlowSupport;
5253
import org.apache.hc.client5.http.websocket.transport.WebSocketFrameDecoder;
5354
import org.apache.hc.core5.annotation.Internal;
5455
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
@@ -608,6 +609,7 @@ private boolean isTooLarge(final long size) {
608609
private final class H2WebSocket implements WebSocket {
609610

610611
private final ArrayDeque<ByteBuffer> queue = new ArrayDeque<>();
612+
// Queue accounting invariant: bytes accepted for send but not fully written yet.
611613
private int queuedBytes;
612614
private final ReentrantLock queueLock = new ReentrantLock();
613615
private final ReentrantLock sendLock = new ReentrantLock();
@@ -651,6 +653,16 @@ public boolean sendBinary(final ByteBuffer data, final boolean finalFragment) {
651653
return sendData(FrameOpcode.BINARY, data, finalFragment);
652654
}
653655

656+
@Override
657+
public long queueSize() {
658+
queueLock.lock();
659+
try {
660+
return queuedBytes;
661+
} finally {
662+
queueLock.unlock();
663+
}
664+
}
665+
654666
@Override
655667
public CompletableFuture<Void> close(final int statusCode, final String reason) {
656668
if (!CloseCodec.isValidToSend(statusCode)) {
@@ -705,36 +717,17 @@ private boolean sendData(final int opcode, final ByteBuffer data, final boolean
705717
if (!open.get()) {
706718
return false;
707719
}
708-
int currentOpcode = outOpcode == -1 ? opcode : FrameOpcode.CONT;
709-
if (outOpcode == -1) {
710-
outOpcode = opcode;
711-
}
712-
final ByteBuffer ro = data.asReadOnlyBuffer();
713-
boolean ok = true;
714-
boolean firstFragment = currentOpcode != FrameOpcode.CONT;
715-
716-
while (ro.hasRemaining()) {
717-
final int n = Math.min(ro.remaining(), outChunk);
718-
final int oldLimit = ro.limit();
719-
final int newLimit = ro.position() + n;
720-
ro.limit(newLimit);
721-
final ByteBuffer slice = ro.slice();
722-
ro.limit(oldLimit);
723-
ro.position(newLimit);
724-
725-
final boolean lastSlice = !ro.hasRemaining() && fin;
726-
if (!enqueueDataFrame(currentOpcode, slice, lastSlice, firstFragment)) {
727-
ok = false;
728-
break;
729-
}
730-
currentOpcode = FrameOpcode.CONT;
731-
firstFragment = false;
732-
}
733-
734-
if (fin || !ok) {
735-
outOpcode = -1;
736-
}
737-
return ok;
720+
final OutboundFlowSupport.SendResult sendResult = OutboundFlowSupport.sendFragmented(
721+
opcode,
722+
outOpcode,
723+
data,
724+
fin,
725+
outChunk,
726+
false,
727+
open::get,
728+
this::enqueueDataFrame);
729+
outOpcode = sendResult.nextOpcode();
730+
return sendResult.accepted();
738731
} finally {
739732
sendLock.unlock();
740733
}
@@ -767,10 +760,7 @@ private boolean enqueue(final ByteBuffer frame, final boolean closeAfter) {
767760
queue.clear();
768761
queuedBytes = 0;
769762
} else {
770-
final long maxBytes = cfg.getMaxOutboundDataBytes() > 0
771-
? cfg.getMaxOutboundDataBytes()
772-
: (long) cfg.getMaxOutboundControlQueue() * (long) cfg.getMaxFrameSize();
773-
if (maxBytes > 0 && (long) queuedBytes + (long) frame.remaining() > maxBytes) {
763+
if (OutboundFlowSupport.exceedsOutboundByteLimit(cfg.getMaxOutboundDataBytes(), queuedBytes, frame.remaining())) {
774764
return false;
775765
}
776766
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.websocket.transport;
28+
29+
import java.nio.ByteBuffer;
30+
31+
import org.apache.hc.core5.annotation.Internal;
32+
import org.apache.hc.core5.websocket.frame.FrameOpcode;
33+
34+
/**
35+
* Shared outbound queue and fragmentation helpers used by HTTP/1.1 and HTTP/2 WebSocket transports.
36+
*/
37+
@Internal
38+
public final class OutboundFlowSupport {
39+
40+
@FunctionalInterface
41+
public interface SendGate {
42+
boolean canSend();
43+
}
44+
45+
@FunctionalInterface
46+
public interface FrameEnqueuer {
47+
boolean enqueue(int opcode, ByteBuffer payload, boolean fin, boolean firstFragment);
48+
}
49+
50+
public static final class SendResult {
51+
private final boolean accepted;
52+
private final int nextOpcode;
53+
54+
SendResult(final boolean accepted, final int nextOpcode) {
55+
this.accepted = accepted;
56+
this.nextOpcode = nextOpcode;
57+
}
58+
59+
public boolean accepted() {
60+
return accepted;
61+
}
62+
63+
public int nextOpcode() {
64+
return nextOpcode;
65+
}
66+
}
67+
68+
private OutboundFlowSupport() {
69+
}
70+
71+
public static boolean exceedsOutboundByteLimit(final long maxOutboundDataBytes, final long queuedBytes, final long frameBytes) {
72+
return maxOutboundDataBytes > 0 && queuedBytes + frameBytes > maxOutboundDataBytes;
73+
}
74+
75+
public static SendResult sendFragmented(
76+
final int messageOpcode,
77+
final int currentOutOpcode,
78+
final ByteBuffer data,
79+
final boolean finalFragment,
80+
final int outChunk,
81+
final boolean emitEmptyFrame,
82+
final SendGate gate,
83+
final FrameEnqueuer enqueuer) {
84+
85+
int nextOpcode = currentOutOpcode;
86+
int currentOpcode = nextOpcode == -1 ? messageOpcode : FrameOpcode.CONT;
87+
if (nextOpcode == -1) {
88+
nextOpcode = messageOpcode;
89+
}
90+
boolean firstFragment = currentOpcode != FrameOpcode.CONT;
91+
boolean ok = true;
92+
93+
final ByteBuffer ro = data.asReadOnlyBuffer();
94+
boolean shouldEmit = emitEmptyFrame || ro.hasRemaining();
95+
while (shouldEmit) {
96+
if (!gate.canSend()) {
97+
ok = false;
98+
break;
99+
}
100+
101+
final int n = Math.min(ro.remaining(), outChunk);
102+
final int oldLimit = ro.limit();
103+
final int newLimit = ro.position() + n;
104+
ro.limit(newLimit);
105+
final ByteBuffer slice = ro.slice();
106+
ro.limit(oldLimit);
107+
ro.position(newLimit);
108+
109+
final boolean lastSlice = !ro.hasRemaining() && finalFragment;
110+
if (!enqueuer.enqueue(currentOpcode, slice, lastSlice, firstFragment)) {
111+
ok = false;
112+
break;
113+
}
114+
currentOpcode = FrameOpcode.CONT;
115+
firstFragment = false;
116+
shouldEmit = ro.hasRemaining();
117+
}
118+
119+
if (finalFragment || !ok) {
120+
nextOpcode = -1;
121+
}
122+
return new SendResult(ok, nextOpcode);
123+
}
124+
}

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketOutbound.java

Lines changed: 35 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ boolean enqueueData(final OutFrame frame) {
177177
}
178178
final long limit = s.cfg.getMaxOutboundDataBytes();
179179
final long newSize = s.dataQueuedBytes.addAndGet(frame.size);
180-
if (limit > 0 && newSize > limit) {
180+
if (OutboundFlowSupport.exceedsOutboundByteLimit(limit, newSize - frame.size, frame.size)) {
181181
s.dataQueuedBytes.addAndGet(-frame.size);
182182
release(frame);
183183
return false;
@@ -285,6 +285,9 @@ OutFrame pooledCloseEcho(final ByteBuffer payload) {
285285
// package-private: used by WebSocketInbound.onDisconnected()
286286
void drainAndRelease() {
287287
if (s.activeWrite != null) {
288+
if (s.activeWrite.dataFrame) {
289+
s.dataQueuedBytes.addAndGet(-s.activeWrite.size);
290+
}
288291
release(s.activeWrite);
289292
s.activeWrite = null;
290293
}
@@ -358,71 +361,34 @@ private boolean sendData(final int opcode, final ByteBuffer data, final boolean
358361
final byte[] plain = toBytes(data);
359362
final WebSocketExtensionChain.Encoded enc =
360363
s.encChain.encode(plain, true, true);
361-
ByteBuffer ro = ByteBuffer.wrap(enc.payload);
362-
int currentOpcode = opcode;
363-
boolean firstFragment = true;
364-
if (!ro.hasRemaining()) {
365-
ro = ByteBuffer.allocate(0);
366-
}
367-
do {
368-
if (!s.open.get() || s.closeSent.get()) {
369-
return false;
370-
}
371-
final int n = Math.min(ro.remaining(), s.outChunk);
372-
final int oldLimit = ro.limit();
373-
final int newLimit = ro.position() + n;
374-
ro.limit(newLimit);
375-
final ByteBuffer slice = ro.slice();
376-
ro.limit(oldLimit);
377-
ro.position(newLimit);
378-
final boolean lastSlice = !ro.hasRemaining();
379-
final int rsv = enc.setRsvOnFirst && firstFragment ? s.rsvMask : 0;
380-
if (!enqueueData(pooledFrameWithRsv(currentOpcode, slice, lastSlice, rsv, true))) {
381-
return false;
382-
}
383-
currentOpcode = FrameOpcode.CONT;
384-
firstFragment = false;
385-
} while (ro.hasRemaining());
386-
return true;
387-
}
388-
389-
int currentOpcode = s.outOpcode == -1 ? opcode : FrameOpcode.CONT;
390-
if (s.outOpcode == -1) {
391-
s.outOpcode = opcode;
364+
final OutboundFlowSupport.SendResult sendResult = OutboundFlowSupport.sendFragmented(
365+
opcode,
366+
s.outOpcode,
367+
ByteBuffer.wrap(enc.payload),
368+
true,
369+
s.outChunk,
370+
true,
371+
() -> s.open.get() && !s.closeSent.get(),
372+
(frameOpcode, payload, frameFin, firstFragment) -> {
373+
final int rsv = enc.setRsvOnFirst && firstFragment ? s.rsvMask : 0;
374+
return enqueueData(pooledFrameWithRsv(frameOpcode, payload, frameFin, rsv, true));
375+
});
376+
s.outOpcode = sendResult.nextOpcode();
377+
return sendResult.accepted();
392378
}
393379

394-
final ByteBuffer ro = data.asReadOnlyBuffer();
395-
boolean ok = true;
396-
boolean firstFragment = currentOpcode != FrameOpcode.CONT;
397-
398-
while (ro.hasRemaining()) {
399-
if (!s.open.get() || s.closeSent.get()) {
400-
ok = false;
401-
break;
402-
}
403-
404-
final int n = Math.min(ro.remaining(), s.outChunk);
405-
406-
final int oldLimit = ro.limit();
407-
final int newLimit = ro.position() + n;
408-
ro.limit(newLimit);
409-
final ByteBuffer slice = ro.slice();
410-
ro.limit(oldLimit);
411-
ro.position(newLimit);
412-
413-
final boolean lastSlice = !ro.hasRemaining() && fin;
414-
if (!enqueueData(buildDataFrame(currentOpcode, slice, lastSlice, firstFragment))) {
415-
ok = false;
416-
break;
417-
}
418-
currentOpcode = FrameOpcode.CONT;
419-
firstFragment = false;
420-
}
421-
422-
if (fin || !ok) {
423-
s.outOpcode = -1;
424-
}
425-
return ok;
380+
final OutboundFlowSupport.SendResult sendResult = OutboundFlowSupport.sendFragmented(
381+
opcode,
382+
s.outOpcode,
383+
data,
384+
fin,
385+
s.outChunk,
386+
false,
387+
() -> s.open.get() && !s.closeSent.get(),
388+
(frameOpcode, payload, frameFin, firstFragment) ->
389+
enqueueData(buildDataFrame(frameOpcode, payload, frameFin, firstFragment)));
390+
s.outOpcode = sendResult.nextOpcode();
391+
return sendResult.accepted();
426392
} finally {
427393
s.writeLock.unlock();
428394
}
@@ -533,6 +499,11 @@ public boolean sendBinaryBatch(final List<ByteBuffer> fragments, final boolean f
533499
}
534500
}
535501

502+
@Override
503+
public long queueSize() {
504+
return s.dataQueuedBytes.get();
505+
}
506+
536507
@Override
537508
public CompletableFuture<Void> close(final int statusCode, final String reason) {
538509
final CompletableFuture<Void> future = new CompletableFuture<>();

httpclient5-websocket/src/main/java/org/apache/hc/client5/http/websocket/transport/WebSocketSessionState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ final class WebSocketSessionState {
6868
final ConcurrentLinkedQueue<WebSocketOutbound.OutFrame> ctrlOutbound = new ConcurrentLinkedQueue<>();
6969
final ConcurrentLinkedQueue<WebSocketOutbound.OutFrame> dataOutbound = new ConcurrentLinkedQueue<>();
7070
WebSocketOutbound.OutFrame activeWrite = null;
71+
// Queue accounting invariant: bytes accepted for data frames but not fully written yet.
7172
final AtomicLong dataQueuedBytes = new AtomicLong();
7273

7374
// Flags / locks

httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/client/WebSocketClientTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ public boolean sendBinaryBatch(final List<ByteBuffer> fragments, final boolean f
217217
}
218218
return true;
219219
}
220+
221+
@Override
222+
public long queueSize() {
223+
return 0;
224+
}
220225
}
221226

222227
private static CloseableWebSocketClient newClient() {

httpclient5-websocket/src/test/java/org/apache/hc/client5/http/websocket/client/impl/WebSocketClientImplTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ public boolean sendBinaryBatch(final List<ByteBuffer> fragments, final boolean f
290290
return true;
291291
}
292292

293+
@Override
294+
public long queueSize() {
295+
return 0;
296+
}
297+
293298
@Override
294299
public CompletableFuture<Void> close(final int statusCode, final String reason) {
295300
return CompletableFuture.completedFuture(null);

0 commit comments

Comments
 (0)