diff --git a/src/StackExchange.Redis/ConfigurationOptions.cs b/src/StackExchange.Redis/ConfigurationOptions.cs index 91de712f4..8ad1662be 100644 --- a/src/StackExchange.Redis/ConfigurationOptions.cs +++ b/src/StackExchange.Redis/ConfigurationOptions.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.Collections.Generic; using System.ComponentModel; using System.Diagnostics.CodeAnalysis; @@ -1438,5 +1439,16 @@ public CycleBufferPool? RequestCycleBufferPool [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)] set; } + + /// + /// The memory pool to use when buffering responses. + /// + public MemoryPool? ResponseMemoryPool + { + [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)] + get; + [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)] + set; + } } } diff --git a/src/StackExchange.Redis/Lease.cs b/src/StackExchange.Redis/Lease.cs index a5a88e4eb..c77f6d6e9 100644 --- a/src/StackExchange.Redis/Lease.cs +++ b/src/StackExchange.Redis/Lease.cs @@ -1,6 +1,7 @@ using System; using System.Buffers; using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; using System.Threading; namespace StackExchange.Redis @@ -16,7 +17,7 @@ public sealed class Lease : IMemoryOwner /// public static Lease Empty { get; } = new Lease(System.Array.Empty(), 0); - private T[]? _arr; + private object? _buffer; /// /// Gets whether this lease is empty. @@ -41,9 +42,29 @@ public static Lease Create(int length, bool clear = true) return new Lease(arr, length); } + /// + /// Create a new lease. + /// + /// The size required. + /// Buffer. + public static Lease Create(int length, IMemoryOwner memoryOwner) + { + if (length == 0) return Empty; + if ((uint)length > memoryOwner.Memory.Length) + throw new ArgumentOutOfRangeException(nameof(length)); + + return new Lease(memoryOwner, length); + } + private Lease(T[] arr, int length) { - _arr = arr; + _buffer = arr; + Length = length; + } + + private Lease(IMemoryOwner memoryOwner, int length) + { + _buffer = memoryOwner; Length = length; } @@ -54,33 +75,50 @@ public void Dispose() { if (Length != 0) { - var arr = Interlocked.Exchange(ref _arr, null); - if (arr != null) ArrayPool.Shared.Return(arr); + var buffer = Interlocked.Exchange(ref _buffer, null); + if (buffer != null) + { + if (buffer is T[] arr) + ArrayPool.Shared.Return(arr); + else + ((IMemoryOwner)buffer).Dispose(); + } } } [MethodImpl(MethodImplOptions.NoInlining)] private static T[] ThrowDisposed() => throw new ObjectDisposedException(nameof(Lease)); - private T[] Array - { - [MethodImpl(MethodImplOptions.AggressiveInlining)] - get => _arr ?? ThrowDisposed(); - } - /// /// The data as a . /// - public Memory Memory => new Memory(Array, 0, Length); + public Memory Memory => _buffer is IMemoryOwner memoryOwner + ? memoryOwner.Memory.Slice(0, Length) + : new Memory((T[]?)_buffer ?? ThrowDisposed(), 0, Length); /// /// The data as a . /// - public Span Span => new Span(Array, 0, Length); + public Span Span => _buffer is IMemoryOwner memoryOwner + ? memoryOwner.Memory.Span.Slice(0, Length) + : new Span((T[]?)_buffer ?? ThrowDisposed(), 0, Length); /// /// The data as an . /// - public ArraySegment ArraySegment => new ArraySegment(Array, 0, Length); + public ArraySegment ArraySegment + { + get + { + if (_buffer is IMemoryOwner memoryOwner) + { + if (!MemoryMarshal.TryGetArray((ReadOnlyMemory)memoryOwner.Memory, out var segment)) + throw new NotSupportedException("Only array-backed buffers are supported"); + + return new ArraySegment(segment.Array!, segment.Offset, Length); + } + return new ArraySegment((T[]?)_buffer ?? ThrowDisposed(), 0, Length); + } + } } } diff --git a/src/StackExchange.Redis/PhysicalConnection.Read.cs b/src/StackExchange.Redis/PhysicalConnection.Read.cs index 774db2c6b..7ee1897c4 100644 --- a/src/StackExchange.Redis/PhysicalConnection.Read.cs +++ b/src/StackExchange.Redis/PhysicalConnection.Read.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Buffers; using System.Buffers.Binary; using System.Diagnostics; @@ -205,7 +205,7 @@ private bool ShouldTransitionToAsync() private bool ForceReconnect => BridgeCouldBeNull?.NeedsReconnect == true; - private static byte[]? SharedNoLease; + private static IMemoryOwner? SharedNoLease; private CycleBuffer _readBuffer; private RespScanState _readState = default; @@ -401,15 +401,15 @@ private void OnResponseFrame(RespPrefix prefix, ReadOnlySequence payload) else { var len = checked((int)payload.Length); - byte[]? oversized = ArrayPool.Shared.Rent(len); + var memoryPool = BridgeCouldBeNull?.Multiplexer.RawConfig.ResponseMemoryPool ?? MemoryPool.Shared; + var memoryOwner = memoryPool.Rent(len); + Span oversized = memoryOwner.Memory.Span.Slice(0, len); + payload.CopyTo(oversized); - OnResponseFrame(prefix, new(oversized, 0, len), ref oversized); - // the lease could have been claimed by the activation code (to prevent another memcpy); otherwise, free - if (oversized is not null) - { - ArrayPool.Shared.Return(oversized); - } + OnResponseFrame(prefix, oversized, ref memoryOwner); + + memoryOwner?.Dispose(); } } @@ -420,7 +420,7 @@ private void UpdateBufferStats(int lastResult, long inBuffer) bytesLastResult = lastResult; } - private void OnResponseFrame(RespPrefix prefix, ReadOnlySpan frame, ref byte[]? lease) + private void OnResponseFrame(RespPrefix prefix, ReadOnlySpan frame, ref IMemoryOwner? memoryOwner) { DebugValidateSingleFrame(frame); _readStatus = ReadStatus.MatchResult; @@ -431,7 +431,7 @@ private void OnResponseFrame(RespPrefix prefix, ReadOnlySpan frame, ref by case RespPrefix.Array when (_protocol is RedisProtocol.Resp2 & connectionType is ConnectionType.Subscription) && !IsArrayPong(frame): // could be a RESP2 pub/sub payload // out-of-band; pub/sub etc - if (OnOutOfBand(frame, ref lease)) + if (OnOutOfBand(frame, ref memoryOwner)) { OnDetailLog($"out-of-band message, not dequeuing: {prefix}"); return; @@ -502,7 +502,7 @@ internal static ReadOnlySpan StackCopyLengthChecked(scoped in RespReader r return buffer.Slice(0, len); } - private bool OnOutOfBand(ReadOnlySpan payload, ref byte[]? lease) + private bool OnOutOfBand(ReadOnlySpan payload, ref IMemoryOwner? memoryOwner) { var muxer = BridgeCouldBeNull?.Multiplexer; if (muxer is null) return true; // consume it blindly diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt index 269cc11d2..47422be39 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt @@ -1734,6 +1734,7 @@ static StackExchange.Redis.HashEntry.operator !=(StackExchange.Redis.HashEntry x static StackExchange.Redis.HashEntry.operator ==(StackExchange.Redis.HashEntry x, StackExchange.Redis.HashEntry y) -> bool static StackExchange.Redis.KeyspaceIsolation.DatabaseExtensions.WithKeyPrefix(this StackExchange.Redis.IDatabase! database, StackExchange.Redis.RedisKey keyPrefix) -> StackExchange.Redis.IDatabase! static StackExchange.Redis.Lease.Create(int length, bool clear = true) -> StackExchange.Redis.Lease! +static StackExchange.Redis.Lease.Create(int length, System.Buffers.IMemoryOwner! memoryOwner) -> StackExchange.Redis.Lease! static StackExchange.Redis.Lease.Empty.get -> StackExchange.Redis.Lease! static StackExchange.Redis.ListPopResult.Null.get -> StackExchange.Redis.ListPopResult static StackExchange.Redis.LuaScript.GetCachedScriptCount() -> int diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt index 4a8bf8b68..648acbb60 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt @@ -1,4 +1,6 @@ #nullable enable +[SER004]StackExchange.Redis.ConfigurationOptions.ResponseMemoryPool.get -> System.Buffers.MemoryPool? +[SER004]StackExchange.Redis.ConfigurationOptions.ResponseMemoryPool.set -> void [SER004]StackExchange.Redis.ConfigurationOptions.RequestCycleBufferPool.get -> RESPite.Buffers.CycleBufferPool? [SER004]StackExchange.Redis.ConfigurationOptions.RequestCycleBufferPool.set -> void [SER004]StackExchange.Redis.ConfigurationOptions.ResponseCycleBufferPool.get -> RESPite.Buffers.CycleBufferPool? diff --git a/src/StackExchange.Redis/RespReaderExtensions.cs b/src/StackExchange.Redis/RespReaderExtensions.cs index a5bb77bdd..4884690fe 100644 --- a/src/StackExchange.Redis/RespReaderExtensions.cs +++ b/src/StackExchange.Redis/RespReaderExtensions.cs @@ -1,4 +1,5 @@ -using System; +using System; +using System.Buffers; using System.Diagnostics; using System.Threading.Tasks; using RESPite.Messages;