Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions src/CHttpServer/CHttpServer/Http3/Http3FramingStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public override async ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> sou
var frameHeaderLength = PrepareFrameHeader(buffer.AsSpan(), source.Length, _frameType);
source.CopyTo(buffer.AsMemory(frameHeaderLength));
await _responseStream.WriteAsync(buffer.AsMemory(0..(source.Length + frameHeaderLength)), cancellationToken);
ArrayPool<byte>.Shared.Return(buffer);
ArrayPool<byte>.Shared.Return(buffer, true);
}
else
{
Expand Down Expand Up @@ -208,11 +208,6 @@ public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancel

private async Task<FlushResult> FlushAllSegmentsAsync(int startOffset, CancellationToken localToken)
{
if (!_currentSegment.IsEmpty)
_segments.Add(_currentSegment);
else if (_currentSegment.IsAllocated)
_currentSegment = _currentSegment with { Used = Memory<byte>.Empty };

// First segment handled with offset.
var memory = _segments[0];
Debug.Assert(!memory.IsEmpty);
Expand All @@ -227,6 +222,12 @@ private async Task<FlushResult> FlushAllSegmentsAsync(int startOffset, Cancellat
await _responseStream.WriteAsync(memory.Used, localToken);
_memoryPool.Return(memory.Reference, true);
}

// Last segment (the _currentSegment is not returned to the memory pool.
if (!_currentSegment.IsEmpty)
await _responseStream.WriteAsync(_currentSegment.Used, localToken);
_currentSegment = _currentSegment with { Used = Memory<byte>.Empty };

_segments.Clear();
_unflushedBytes = 0;
_responseStream.Flush();
Expand Down Expand Up @@ -263,24 +264,27 @@ public void Flush()

private void FlushAllSegments(int startOffset)
{
if (!_currentSegment.IsEmpty)
_segments.Add(_currentSegment);
else if (_currentSegment.IsAllocated)
_currentSegment = _currentSegment with { Used = Memory<byte>.Empty };

// First segment handled with offset.
var source = CollectionsMarshal.AsSpan(_segments);
ref var initialMemory = ref source[0];
Debug.Assert(!initialMemory.IsEmpty);
_responseStream.Write(initialMemory.Used.Span[startOffset..]);
_memoryPool.Return(initialMemory.Reference, true);

// Remaining segments.
for (int i = 1; i < _segments.Count; i++)
{
ref var memory = ref source[i];
if (memory.Used.Length > 0)
_responseStream.Write(memory.Used.Span);
_memoryPool.Return(memory.Reference, true);
}

// Last segment (the _currentSegment is not returned to the memory pool.
if (!_currentSegment.IsEmpty)
_responseStream.Write(_currentSegment.Used.Span);
_currentSegment = _currentSegment with { Used = Memory<byte>.Empty };

_segments.Clear();
_unflushedBytes = 0;
_responseStream.Flush();
Expand Down Expand Up @@ -335,15 +339,15 @@ private void ClearSegments(Span<Segment> source, bool clearCurrent = true)
for (int i = 0; i < source.Length; i++)
{
ref var memory = ref source[i];
if (memory.Reference.Length != 0)
if (memory.IsAllocated)
_memoryPool.Return(memory.Reference, true);
}
_unflushedBytes = 0;
_segments.Clear();
if (clearCurrent)
{
if (_currentSegment.IsAllocated)
_memoryPool.Return(_currentSegment.Reference);
_memoryPool.Return(_currentSegment.Reference, true);
_currentSegment = new Segment();
}
else
Expand Down
26 changes: 15 additions & 11 deletions src/CHttpServer/CHttpServer/Http3/Http3HeaderFramingStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,6 @@ public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancel

private async ValueTask<FlushResult> FlushAllSegmentsAsync(int startOffset, CancellationToken localToken)
{
if (!_currentSegment.IsEmpty)
_segments.Add(_currentSegment);
else if (_currentSegment.IsAllocated)
_currentSegment = _currentSegment with { Used = Memory<byte>.Empty };

// First segment handled with offset.
var memory = _segments[0];
Debug.Assert(!memory.IsEmpty);
Expand All @@ -196,6 +191,12 @@ private async ValueTask<FlushResult> FlushAllSegmentsAsync(int startOffset, Canc
await _responseStream.WriteAsync(memory.Used, localToken);
_memoryPool.Return(memory.Reference, true);
}

// Last segment (the _currentSegment is not returned to the memory pool.
if (!_currentSegment.IsEmpty)
await _responseStream.WriteAsync(_currentSegment.Used, localToken);
_currentSegment = _currentSegment with { Used = Memory<byte>.Empty };

_segments.Clear();
_unflushedBytes = 0;
_responseStream.Flush();
Expand Down Expand Up @@ -225,24 +226,27 @@ public void Flush()

private void FlushAllSegments(int startOffset)
{
if (!_currentSegment.IsEmpty)
_segments.Add(_currentSegment);
else if (_currentSegment.IsAllocated)
_currentSegment = _currentSegment with { Used = Memory<byte>.Empty };

// First segment handled with offset.
var source = CollectionsMarshal.AsSpan(_segments);
ref var initialMemory = ref source[0];
Debug.Assert(!initialMemory.IsEmpty);
_responseStream.Write(initialMemory.Used.Span[startOffset..]);
_memoryPool.Return(initialMemory.Reference, true);

// Remaining segments.
for (int i = 1; i < _segments.Count; i++)
{
ref var memory = ref source[i];
if (memory.Used.Length > 0)
_responseStream.Write(memory.Used.Span);
_memoryPool.Return(memory.Reference, true);
}

// Last segment (the _currentSegment is not returned to the memory pool.
if (!_currentSegment.IsEmpty)
_responseStream.Write(_currentSegment.Used.Span);
_currentSegment = _currentSegment with { Used = Memory<byte>.Empty };

_segments.Clear();
_unflushedBytes = 0;
_responseStream.Flush();
Expand Down Expand Up @@ -305,7 +309,7 @@ private void ClearSegments(Span<Segment> source, bool clearCurrent = true)
if (clearCurrent)
{
if (_currentSegment.IsAllocated)
_memoryPool.Return(_currentSegment.Reference);
_memoryPool.Return(_currentSegment.Reference, true);
_currentSegment = new Segment();
}
else
Expand Down
3 changes: 0 additions & 3 deletions tests/CHttpServer.Tests/CHttpServerIntegrationTests.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
using System.Net;
using System.Net.Http.Json;
using System.Text;

namespace CHttpServer.Tests;

[Collection(nameof(VanilaCHttpServerIntegrationTests))]
[CollectionDefinition(DisableParallelization = true)]
public class VanilaCHttpServerIntegrationTests : CHttpServerIntegrationTests, IClassFixture<TestServer>
{
private const int Port = 7222;
Expand All @@ -17,7 +15,6 @@ public VanilaCHttpServerIntegrationTests(TestServer testServer) : base(testServe
}

[Collection(nameof(PriorityCHttpServerIntegrationTests))]
[CollectionDefinition(DisableParallelization = true)]
public class PriorityCHttpServerIntegrationTests : CHttpServerIntegrationTests, IClassFixture<TestServer>
{
private const int Port = 7223;
Expand Down
2 changes: 0 additions & 2 deletions tests/CHttpServer.Tests/Http2ConnectionTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Text;
using CHttpServer.System.Net.Http.HPack;
Expand All @@ -9,7 +8,6 @@

namespace CHttpServer.Tests;

[SuppressMessage("Usage", "xUnit1051:Calls to methods which accept CancellationToken should use TestContext.Current.CancellationToken")]
public class Http2ConnectionTests
{
[Fact]
Expand Down
74 changes: 72 additions & 2 deletions tests/CHttpServer.Tests/Http3/Http3FramingStreamWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public async Task GetSpan_Flush_WritesToStreamWithFrameHeader(int payloadLength,

for (int i = 0; i < payloadLength; i++)
Assert.Equal((byte)i, result[i + headerLength]);

sut.Complete();
Assert.Equal(0, arrayPool.OutstandingBytes);
}
Expand Down Expand Up @@ -549,6 +549,76 @@ public async Task WriteAsync_Writes_FrameType()
Assert.Equal(expected, ms.ToArray());
}

[Fact]
public async Task ArrayPool_DoubleReturn_FlushAsync_Complete()
{
var sut = new Http3FramingStreamWriter(Stream.Null, 1, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
await sut.FlushAsync(TestContext.Current.CancellationToken);

sut.Complete(); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

[Fact]
public void ArrayPool_DoubleReturn_Flush_Complete()
{
var sut = new Http3FramingStreamWriter(Stream.Null, 1, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
sut.Flush();

sut.Complete(); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

[Fact]
public async Task ArrayPool_DoubleReturn_FlushAsync_CompleteAsync()
{
var sut = new Http3FramingStreamWriter(Stream.Null, 1, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
await sut.FlushAsync(TestContext.Current.CancellationToken);

await sut.CompleteAsync(); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

[Fact]
public async Task ArrayPool_DoubleReturn_Flush_CompleteAsync()
{
var sut = new Http3FramingStreamWriter(Stream.Null, 1, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
sut.Flush();

await sut.CompleteAsync(); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

[Fact]
public void ArrayPool_DoubleReturn_Flush_Reset()
{
var sut = new Http3FramingStreamWriter(Stream.Null, 1, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
sut.Flush();

sut.Reset(Stream.Null); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

private class TestArrayPool : ArrayPool<byte>
{
private readonly ArrayPool<byte> _internalPool;
Expand Down Expand Up @@ -576,7 +646,7 @@ public override byte[] Rent(int minimumLength)
public override void Return(byte[] array, bool clearArray = false)
{
_internalPool.Return(array, clearArray);
_rentedArrays.Remove(array);
Assert.True(_rentedArrays.Remove(array)); // Should not return an array twice.
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,76 @@ public async Task WriteAsync_Writes_FrameType()
Assert.Equal(expected, ms.ToArray());
}

[Fact]
public async Task ArrayPool_DoubleReturn_FlushAsync_Complete()
{
var sut = new Http3HeaderFramingStreamWriter(Stream.Null, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
await sut.FlushAsync(TestContext.Current.CancellationToken);

sut.Complete(); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

[Fact]
public void ArrayPool_DoubleReturn_Flush_Complete()
{
var sut = new Http3HeaderFramingStreamWriter(Stream.Null, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
sut.Flush();

sut.Complete(); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

[Fact]
public async Task ArrayPool_DoubleReturn_FlushAsync_CompleteAsync()
{
var sut = new Http3HeaderFramingStreamWriter(Stream.Null, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
await sut.FlushAsync(TestContext.Current.CancellationToken);

await sut.CompleteAsync(); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

[Fact]
public async Task ArrayPool_DoubleReturn_Flush_CompleteAsync()
{
var sut = new Http3HeaderFramingStreamWriter(Stream.Null, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
sut.Flush();

await sut.CompleteAsync(); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

[Fact]
public void ArrayPool_DoubleReturn_Flush_Reset()
{
var sut = new Http3HeaderFramingStreamWriter(Stream.Null, new TestArrayPool());
sut.GetMemory(8192);
sut.Advance(8100);
sut.GetMemory(8192);
sut.Advance(8100);
// Flushes 2 segments.
sut.Flush();

sut.Reset(Stream.Null); // Should not double clear segments. If so, TestArrayPool will throw an exception.
}

private class TestArrayPool : ArrayPool<byte>
{
private readonly ArrayPool<byte> _internalPool;
Expand Down Expand Up @@ -501,7 +571,7 @@ public override byte[] Rent(int minimumLength)
public override void Return(byte[] array, bool clearArray = false)
{
_internalPool.Return(array, clearArray);
_rentedArrays.Remove(array);
Assert.True(_rentedArrays.Remove(array));
}
}
}
Expand Down
Loading