diff --git a/src/RESPite/Buffers/CycleBuffer.cs b/src/RESPite/Buffers/CycleBuffer.cs index 14774b357..48db5c1e5 100644 --- a/src/RESPite/Buffers/CycleBuffer.cs +++ b/src/RESPite/Buffers/CycleBuffer.cs @@ -40,32 +40,39 @@ public partial struct CycleBuffer public static CycleBuffer Create( MemoryPool? pool = null, int pageSize = DefaultPageSize, + float pageGrow = DefaultPageGrow, ICycleBufferCallback? callback = null) { pool ??= DefaultPool; if (pageSize <= 0) pageSize = DefaultPageSize; + if (pageGrow <= 0) pageGrow = DefaultPageGrow; if (pageSize > pool.MaxBufferSize) pageSize = pool.MaxBufferSize; - return new CycleBuffer(pool, pageSize, callback); + return new CycleBuffer(pool, pageSize, pageGrow, callback); } - private CycleBuffer(MemoryPool pool, int pageSize, ICycleBufferCallback? callback) + private CycleBuffer(MemoryPool pool, int pageSize, float pageGrow, ICycleBufferCallback? callback) { Pool = pool; - PageSize = pageSize; + _pageSizeStart = _pageSize = pageSize; + _pageGrow = pageGrow; _callback = callback; leasedStart = -1; } private const int DefaultPageSize = 8 * 1024; + private const float DefaultPageGrow = 1f; - public int PageSize { get; } + public int PageSize => _pageSize; public MemoryPool Pool { get; } private readonly ICycleBufferCallback? _callback; + private readonly int _pageSizeStart; + private readonly float _pageGrow; private Segment? startSegment, endSegment; private int endSegmentCommitted, endSegmentLength; private int leasedStart; + private int _pageSize; public bool TryGetCommitted(out ReadOnlySpan span) { @@ -386,6 +393,13 @@ public ReadOnlySequence GetAllCommitted() return ros; } + private void NextPageSize() + { + var newSize = (int)Math.Floor(_pageSize * _pageGrow); + var maxSize = Pool.MaxBufferSize; + _pageSize = (uint)newSize > maxSize ? maxSize : newSize; + } + private Segment GetNextSegment() { DebugAssertValid(); @@ -412,7 +426,8 @@ private Segment GetNextSegment() } } - Segment newSegment = Segment.Create(Pool.Rent(PageSize)); + Segment newSegment = Segment.Create(Pool.Rent(_pageSize)); + NextPageSize(); if (endSegment is null) { // tabula rasa @@ -694,6 +709,7 @@ public void Release() node.Recycle(); node = next; } + _pageSize = _pageSizeStart; } /// diff --git a/src/RESPite/PublicAPI/PublicAPI.Shipped.txt b/src/RESPite/PublicAPI/PublicAPI.Shipped.txt index 27160d830..572c914c5 100644 --- a/src/RESPite/PublicAPI/PublicAPI.Shipped.txt +++ b/src/RESPite/PublicAPI/PublicAPI.Shipped.txt @@ -203,7 +203,7 @@ [SER004]RESPite.Messages.RespScanState.TryRead(System.ReadOnlySpan value, out int bytesRead) -> bool [SER004]RESPite.RespException [SER004]RESPite.RespException.RespException(string! message) -> void -[SER004]static RESPite.Buffers.CycleBuffer.Create(System.Buffers.MemoryPool? pool = null, int pageSize = 8192, RESPite.Buffers.ICycleBufferCallback? callback = null) -> RESPite.Buffers.CycleBuffer +[SER004]static RESPite.Buffers.CycleBuffer.Create(System.Buffers.MemoryPool? pool = null, int pageSize = 8192, float pageGrow = 1, RESPite.Buffers.ICycleBufferCallback? callback = null) -> RESPite.Buffers.CycleBuffer [SER004]static RESPite.Messages.RespFrameScanner.Default.get -> RESPite.Messages.RespFrameScanner! [SER004]static RESPite.Messages.RespFrameScanner.Subscription.get -> RESPite.Messages.RespFrameScanner! [SER004]virtual RESPite.Messages.RespAttributeReader.Read(ref RESPite.Messages.RespReader reader, ref T value) -> void diff --git a/src/StackExchange.Redis/BufferedStreamWriter.Pipe.cs b/src/StackExchange.Redis/BufferedStreamWriter.Pipe.cs index 50ad143b0..bcd8be967 100644 --- a/src/StackExchange.Redis/BufferedStreamWriter.Pipe.cs +++ b/src/StackExchange.Redis/BufferedStreamWriter.Pipe.cs @@ -3,6 +3,7 @@ using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; +using StackExchange.Redis.Configuration; namespace StackExchange.Redis; @@ -10,10 +11,20 @@ internal sealed class PipeStreamWriter : BufferedStreamWriter { private readonly PipeWriter _writer; - public PipeStreamWriter(Stream target, CancellationToken cancellationToken = default) + public PipeStreamWriter(Stream target, CancellationToken cancellationToken = default, BufferOptions? bufferOptions = null) : base(target, cancellationToken) { - var pipe = new Pipe(); + var options = PipeOptions.Default; + + if (bufferOptions != null) + { + var bufferSize = bufferOptions.BufferSize; + if (bufferSize == 0) bufferSize = options.MinimumSegmentSize; + + options = new PipeOptions(bufferOptions.MemoryPool, minimumSegmentSize: bufferSize); + } + + var pipe = new Pipe(options); _writer = pipe.Writer; WriteComplete = CopyToAsync(pipe.Reader, pipe.Writer, Target, cancellationToken); } diff --git a/src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs b/src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs index 153c3962a..9c2ec55d0 100644 --- a/src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs +++ b/src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs @@ -1,9 +1,11 @@ using System; +using System.Buffers; using System.Diagnostics; using System.IO; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; +using StackExchange.Redis.Configuration; namespace StackExchange.Redis; @@ -13,8 +15,8 @@ internal sealed class SwitchableBufferedStreamWriter : CycleBufferStreamWriter, private ManualResetValueTaskSourceCore _readerTask; private bool _syncSignalled; - public SwitchableBufferedStreamWriter(Stream target, CancellationToken cancellationToken, bool initiallySync) - : base(target, cancellationToken, initiallySync ? StateFlags.None : StateFlags.AsyncMode) + public SwitchableBufferedStreamWriter(Stream target, CancellationToken cancellationToken, BufferOptions? bufferOptions, bool initiallySync) + : base(target, cancellationToken, bufferOptions, initiallySync ? StateFlags.None : StateFlags.AsyncMode) { _readerTask.RunContinuationsAsynchronously = true; // we never want the flusher to take over the copying if (initiallySync) diff --git a/src/StackExchange.Redis/BufferedStreamWriter.cs b/src/StackExchange.Redis/BufferedStreamWriter.cs index 85a993a71..d87003586 100644 --- a/src/StackExchange.Redis/BufferedStreamWriter.cs +++ b/src/StackExchange.Redis/BufferedStreamWriter.cs @@ -8,6 +8,7 @@ using System.Threading; using System.Threading.Tasks; using RESPite.Buffers; +using StackExchange.Redis.Configuration; namespace StackExchange.Redis; @@ -57,7 +58,7 @@ public enum WriteMode public virtual bool IsSync => false; - public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connectionType, Stream target, CancellationToken cancellationToken) + public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connectionType, Stream target, BufferOptions? bufferOptions, CancellationToken cancellationToken) { if (connectionType is ConnectionType.Subscription | mode is WriteMode.Default) { @@ -67,9 +68,9 @@ public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connect } return mode switch { - WriteMode.Sync => new SwitchableBufferedStreamWriter(target, cancellationToken, initiallySync: true), - WriteMode.Async => new SwitchableBufferedStreamWriter(target, cancellationToken, initiallySync: false), - WriteMode.Pipe => new PipeStreamWriter(target, cancellationToken), + WriteMode.Sync => new SwitchableBufferedStreamWriter(target, cancellationToken, bufferOptions, initiallySync: true), + WriteMode.Async => new SwitchableBufferedStreamWriter(target, cancellationToken, bufferOptions, initiallySync: false), + WriteMode.Pipe => new PipeStreamWriter(target, cancellationToken, bufferOptions), _ => throw new ArgumentOutOfRangeException(nameof(mode)), }; } @@ -115,10 +116,10 @@ public virtual void DebugSetLog(Action log) { } internal abstract class CycleBufferStreamWriter : BufferedStreamWriter, ICycleBufferCallback { - protected CycleBufferStreamWriter(Stream target, CancellationToken cancellationToken, StateFlags flags = StateFlags.None) + protected CycleBufferStreamWriter(Stream target, CancellationToken cancellationToken, BufferOptions? bufferOptions = null, StateFlags flags = StateFlags.None) : base(target, cancellationToken) { - _buffer = CycleBuffer.Create(callback: this); + _buffer = CycleBuffer.Create(bufferOptions?.MemoryPool, bufferOptions?.BufferSize ?? 0, bufferOptions?.BufferGrowthFactor ?? 0, callback: this); _stateFlags = flags; } diff --git a/src/StackExchange.Redis/Configuration/BufferOptions.cs b/src/StackExchange.Redis/Configuration/BufferOptions.cs new file mode 100644 index 000000000..3baa8f6ef --- /dev/null +++ b/src/StackExchange.Redis/Configuration/BufferOptions.cs @@ -0,0 +1,24 @@ +using System.Buffers; + +namespace StackExchange.Redis.Configuration; + +/// +/// CycleBuffer BufferOptions. +/// +public sealed class BufferOptions +{ + /// + /// Memory Pool. + /// + public MemoryPool? MemoryPool { get; set; } + + /// + /// Buffer Size. + /// + public int BufferSize { get; set; } + + /// + /// Buffer Growth Factor. + /// + public float BufferGrowthFactor { get; set; } +} diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index e8114cdb6..e5c78b253 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.Collections.Generic; using System.ComponentModel; using System.Linq; @@ -308,6 +309,21 @@ public DefaultOptionsProvider Defaults /// public Action? BeforeSocketConnect { get; set; } + /// + /// Request BufferOptions. + /// + public BufferOptions? RequestBufferOptions { get; set; } + + /// + /// Response BufferOptions. + /// + public BufferOptions? ResponseBufferOptions { get; set; } + + /// + /// Response ArrayPool. + /// + public ArrayPool? ResponseArrayPool { get; set; } + internal Func, Task> AfterConnectAsync => Defaults.AfterConnectAsync; /// @@ -956,6 +972,9 @@ public static ConfigurationOptions Parse(string configuration, bool ignoreUnknow reconnectRetryPolicy = reconnectRetryPolicy, backlogPolicy = backlogPolicy, sslProtocols = sslProtocols, + RequestBufferOptions = RequestBufferOptions, + ResponseBufferOptions = ResponseBufferOptions, + ResponseArrayPool = ResponseArrayPool, BeforeSocketConnect = BeforeSocketConnect, EndPoints = EndPoints.Clone(), LoggerFactory = LoggerFactory, @@ -1156,6 +1175,9 @@ private void Clear() CertificateSelection = null; CertificateValidation = null; + RequestBufferOptions = null; + ResponseBufferOptions = null; + ResponseArrayPool = null; BeforeSocketConnect = null; ChannelPrefix = default; LibraryName = null; diff --git a/src/StackExchange.Redis/PhysicalConnection.Read.cs b/src/StackExchange.Redis/PhysicalConnection.Read.cs index af5421301..78aad7e4e 100644 --- a/src/StackExchange.Redis/PhysicalConnection.Read.cs +++ b/src/StackExchange.Redis/PhysicalConnection.Read.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Buffers; using System.Buffers.Binary; using System.Diagnostics; @@ -70,10 +70,12 @@ private async Task ReadAllAsync(CancellationToken cancellationToken) var tail = _ioStream ?? Stream.Null; if (_readStatus is not ReadStatus.TransitioningToAsync) { + var bufferOptions = BridgeCouldBeNull?.Multiplexer.RawConfig.ResponseBufferOptions; + // preserve existing state if transitioning _readStatus = ReadStatus.Init; _readState = default; - _readBuffer = CycleBuffer.Create(); + _readBuffer = CycleBuffer.Create(bufferOptions?.MemoryPool, bufferOptions?.BufferSize ?? 0, bufferOptions?.BufferGrowthFactor ?? 0); } try { @@ -130,7 +132,9 @@ private void ReadAllSync(CancellationToken cancellationToken) var tail = _ioStream ?? Stream.Null; _readStatus = ReadStatus.Init; _readState = default; - _readBuffer = CycleBuffer.Create(); + + var bufferOptions = BridgeCouldBeNull?.Multiplexer.RawConfig.ResponseBufferOptions; + _readBuffer = CycleBuffer.Create(bufferOptions?.MemoryPool, bufferOptions?.BufferSize ?? 0, bufferOptions?.BufferGrowthFactor ?? 0); try { int read; @@ -399,14 +403,16 @@ private void OnResponseFrame(RespPrefix prefix, ReadOnlySequence payload) else { var len = checked((int)payload.Length); - byte[]? oversized = ArrayPool.Shared.Rent(len); + var arrayPool = BridgeCouldBeNull?.Multiplexer.RawConfig.ResponseArrayPool ?? ArrayPool.Shared; + + byte[]? oversized = arrayPool.Rent(len); payload.CopyTo(oversized); OnResponseFrame(prefix, new(oversized, 0, len), ref oversized); // the lease could have been claimed by the activation code (to prevent another memcpy); otherwise, free if (oversized is not null) { - ArrayPool.Shared.Return(oversized); + arrayPool.Return(oversized); } } } diff --git a/src/StackExchange.Redis/PhysicalConnection.Write.cs b/src/StackExchange.Redis/PhysicalConnection.Write.cs index 84d3430e1..d8cdaa173 100644 --- a/src/StackExchange.Redis/PhysicalConnection.Write.cs +++ b/src/StackExchange.Redis/PhysicalConnection.Write.cs @@ -22,7 +22,8 @@ private void InitOutput(Stream? stream) { if (stream is null) return; _ioStream = stream; - _output = BufferedStreamWriter.Create(WriteMode, connectionType, stream, OutputCancel); + _output = BufferedStreamWriter.Create(WriteMode, connectionType, stream, BridgeCouldBeNull?.Multiplexer.RawConfig.RequestBufferOptions, OutputCancel); + #if DEBUG if (BridgeCouldBeNull?.Multiplexer.RawConfig.OutputLog is { } log) { diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 269cc11d2..621459a2b 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -213,6 +213,13 @@ StackExchange.Redis.Configuration.DefaultOptionsProvider.ClientName.get -> strin StackExchange.Redis.Configuration.DefaultOptionsProvider.DefaultOptionsProvider() -> void StackExchange.Redis.Configuration.Tunnel StackExchange.Redis.Configuration.Tunnel.Tunnel() -> void +StackExchange.Redis.Configuration.BufferOptions +StackExchange.Redis.Configuration.BufferOptions.MemoryPool.get -> System.Buffers.MemoryPool? +StackExchange.Redis.Configuration.BufferOptions.MemoryPool.set -> void +StackExchange.Redis.Configuration.BufferOptions.BufferSize.get -> int +StackExchange.Redis.Configuration.BufferOptions.BufferSize.set -> void +StackExchange.Redis.Configuration.BufferOptions.BufferGrowthFactor.get -> float +StackExchange.Redis.Configuration.BufferOptions.BufferGrowthFactor.set -> void static StackExchange.Redis.Configuration.Tunnel.HttpProxy(System.Net.EndPoint! proxy) -> StackExchange.Redis.Configuration.Tunnel! virtual StackExchange.Redis.Configuration.Tunnel.BeforeAuthenticateAsync(System.Net.EndPoint! endpoint, StackExchange.Redis.ConnectionType connectionType, System.Net.Sockets.Socket? socket, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask virtual StackExchange.Redis.Configuration.Tunnel.BeforeSocketConnectAsync(System.Net.EndPoint! endPoint, StackExchange.Redis.ConnectionType connectionType, System.Net.Sockets.Socket? socket, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask @@ -227,6 +234,12 @@ StackExchange.Redis.ConfigurationOptions.AsyncTimeout.get -> int StackExchange.Redis.ConfigurationOptions.AsyncTimeout.set -> void StackExchange.Redis.ConfigurationOptions.BacklogPolicy.get -> StackExchange.Redis.BacklogPolicy! StackExchange.Redis.ConfigurationOptions.BacklogPolicy.set -> void +StackExchange.Redis.ConfigurationOptions.RequestBufferOptions.get -> StackExchange.Redis.Configuration.BufferOptions? +StackExchange.Redis.ConfigurationOptions.RequestBufferOptions.set -> void +StackExchange.Redis.ConfigurationOptions.ResponseBufferOptions.get -> StackExchange.Redis.Configuration.BufferOptions? +StackExchange.Redis.ConfigurationOptions.ResponseBufferOptions.set -> void +StackExchange.Redis.ConfigurationOptions.ResponseArrayPool.get -> System.Buffers.ArrayPool? +StackExchange.Redis.ConfigurationOptions.ResponseArrayPool.set -> void StackExchange.Redis.ConfigurationOptions.BeforeSocketConnect.get -> System.Action? StackExchange.Redis.ConfigurationOptions.BeforeSocketConnect.set -> void StackExchange.Redis.ConfigurationOptions.CertificateSelection -> System.Net.Security.LocalCertificateSelectionCallback? diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt index 56c23463e..3275c8c2e 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt @@ -1,4 +1,5 @@ #nullable enable +StackExchange.Redis.Configuration.BufferOptions.BufferOptions() -> void [SER005]StackExchange.Redis.TestHarness [SER005]StackExchange.Redis.TestHarness.BufferValidator [SER005]StackExchange.Redis.TestHarness.ChannelPrefix.get -> StackExchange.Redis.RedisChannel diff --git a/src/StackExchange.Redis/StackExchange.Redis.csproj b/src/StackExchange.Redis/StackExchange.Redis.csproj index d3dd60aae..37b25e7ae 100644 --- a/src/StackExchange.Redis/StackExchange.Redis.csproj +++ b/src/StackExchange.Redis/StackExchange.Redis.csproj @@ -31,7 +31,7 @@ - + diff --git a/tests/StackExchange.Redis.Tests/BufferedStreamWriterTests.cs b/tests/StackExchange.Redis.Tests/BufferedStreamWriterTests.cs index 56e1c86e9..0449ea3c1 100644 --- a/tests/StackExchange.Redis.Tests/BufferedStreamWriterTests.cs +++ b/tests/StackExchange.Redis.Tests/BufferedStreamWriterTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Diagnostics; using System.IO; using System.Threading; @@ -17,7 +17,7 @@ public class BufferedStreamWriterTests public async Task FlushStateDoesNotLeakIntoNextPageActivation(WriteMode mode) { var stream = new ObservedStream(); - var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, null, CancellationToken.None); try { Write(writer, 1, 1); @@ -50,7 +50,7 @@ public async Task FlushStateDoesNotLeakIntoNextPageActivation(WriteMode mode) public async Task WriterDoesNotLoseFlushRequestedDuringDrainFlush(WriteMode mode) { var stream = new ObservedStream(); - var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, null, CancellationToken.None); try { stream.BlockNextFlush(); @@ -81,7 +81,7 @@ public async Task WriterFaultsWriteCompleteAfterTargetWriteFailure(WriteMode mod { var failure = new IOException("simulated target write failure"); var stream = new ObservedStream { WriteException = failure }; - var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create((BufferedStreamWriter.WriteMode)mode, ConnectionType.Interactive, stream, null, CancellationToken.None); Write(writer, 1, 1); writer.Flush(); @@ -101,7 +101,7 @@ public async Task WriterFaultsWriteCompleteAfterTargetWriteFailure(WriteMode mod public async Task SyncWriterTransitionsToAsyncWhileIdleAndPreservesBufferedData() { var stream = new ObservedStream(); - var writer = BufferedStreamWriter.Create(BufferedStreamWriter.WriteMode.Sync, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create(BufferedStreamWriter.WriteMode.Sync, ConnectionType.Interactive, stream, null, CancellationToken.None); try { Assert.True(writer.IsSync); @@ -135,7 +135,7 @@ public async Task SyncWriterTransitionsToAsyncWhileIdleAndPreservesBufferedData( public async Task SyncWriterTransitionsToAsyncAfterActiveSyncDrain() { var stream = new ObservedStream(); - var writer = BufferedStreamWriter.Create(BufferedStreamWriter.WriteMode.Sync, ConnectionType.Interactive, stream, CancellationToken.None); + var writer = BufferedStreamWriter.Create(BufferedStreamWriter.WriteMode.Sync, ConnectionType.Interactive, stream, null, CancellationToken.None); try { stream.BlockNextWrite(); diff --git a/tests/StackExchange.Redis.Tests/ConfigTests.cs b/tests/StackExchange.Redis.Tests/ConfigTests.cs index 561308d30..98efb238b 100644 --- a/tests/StackExchange.Redis.Tests/ConfigTests.cs +++ b/tests/StackExchange.Redis.Tests/ConfigTests.cs @@ -90,6 +90,9 @@ orderby name "password", "proxy", "reconnectRetryPolicy", + "RequestBufferOptions", + "ResponseArrayPool", + "ResponseBufferOptions", "responseTimeout", "ServiceName", "SocketManager",