Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics.CodeAnalysis;
Expand Down Expand Up @@ -1438,5 +1439,16 @@ public CycleBufferPool? RequestCycleBufferPool
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
set;
}

/// <summary>
/// The memory pool to use when buffering responses.
/// </summary>
public MemoryPool<byte>? ResponseMemoryPool
{
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
get;
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
set;
}
}
}
64 changes: 51 additions & 13 deletions src/StackExchange.Redis/Lease.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;

namespace StackExchange.Redis
Expand All @@ -16,7 +17,7 @@ public sealed class Lease<T> : IMemoryOwner<T>
/// </summary>
public static Lease<T> Empty { get; } = new Lease<T>(System.Array.Empty<T>(), 0);

private T[]? _arr;
private object? _buffer;

/// <summary>
/// Gets whether this lease is empty.
Expand All @@ -41,9 +42,29 @@ public static Lease<T> Create(int length, bool clear = true)
return new Lease<T>(arr, length);
}

/// <summary>
/// Create a new lease.
/// </summary>
/// <param name="length">The size required.</param>
/// <param name="memoryOwner">Buffer.</param>
public static Lease<T> Create(int length, IMemoryOwner<T> memoryOwner)
{
if (length == 0) return Empty;
if ((uint)length > memoryOwner.Memory.Length)
throw new ArgumentOutOfRangeException(nameof(length));

return new Lease<T>(memoryOwner, length);
}

private Lease(T[] arr, int length)
{
_arr = arr;
_buffer = arr;
Length = length;
}

private Lease(IMemoryOwner<T> memoryOwner, int length)
{
_buffer = memoryOwner;
Length = length;
}

Expand All @@ -54,33 +75,50 @@ public void Dispose()
{
if (Length != 0)
{
var arr = Interlocked.Exchange(ref _arr, null);
if (arr != null) ArrayPool<T>.Shared.Return(arr);
var buffer = Interlocked.Exchange(ref _buffer, null);
if (buffer != null)
{
if (buffer is T[] arr)
ArrayPool<T>.Shared.Return(arr);
else
((IMemoryOwner<T>)buffer).Dispose();
}
}
}

[MethodImpl(MethodImplOptions.NoInlining)]
private static T[] ThrowDisposed() => throw new ObjectDisposedException(nameof(Lease<T>));

private T[] Array
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get => _arr ?? ThrowDisposed();
}

/// <summary>
/// The data as a <see cref="Memory{T}"/>.
/// </summary>
public Memory<T> Memory => new Memory<T>(Array, 0, Length);
public Memory<T> Memory => _buffer is IMemoryOwner<T> memoryOwner
? memoryOwner.Memory.Slice(0, Length)
: new Memory<T>((T[]?)_buffer ?? ThrowDisposed(), 0, Length);

/// <summary>
/// The data as a <see cref="Span{T}"/>.
/// </summary>
public Span<T> Span => new Span<T>(Array, 0, Length);
public Span<T> Span => _buffer is IMemoryOwner<T> memoryOwner
? memoryOwner.Memory.Span.Slice(0, Length)
: new Span<T>((T[]?)_buffer ?? ThrowDisposed(), 0, Length);

/// <summary>
/// The data as an <see cref="ArraySegment{T}"/>.
/// </summary>
public ArraySegment<T> ArraySegment => new ArraySegment<T>(Array, 0, Length);
public ArraySegment<T> ArraySegment
{
get
{
if (_buffer is IMemoryOwner<T> memoryOwner)
{
if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<T>)memoryOwner.Memory, out var segment))
throw new NotSupportedException("Only array-backed buffers are supported");

return new ArraySegment<T>(segment.Array!, segment.Offset, Length);
}
return new ArraySegment<T>((T[]?)_buffer ?? ThrowDisposed(), 0, Length);
}
}
}
}
24 changes: 12 additions & 12 deletions src/StackExchange.Redis/PhysicalConnection.Read.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
Expand Down Expand Up @@ -205,7 +205,7 @@ private bool ShouldTransitionToAsync()

private bool ForceReconnect => BridgeCouldBeNull?.NeedsReconnect == true;

private static byte[]? SharedNoLease;
private static IMemoryOwner<byte>? SharedNoLease;

private CycleBuffer _readBuffer;
private RespScanState _readState = default;
Expand Down Expand Up @@ -401,15 +401,15 @@ private void OnResponseFrame(RespPrefix prefix, ReadOnlySequence<byte> payload)
else
{
var len = checked((int)payload.Length);
byte[]? oversized = ArrayPool<byte>.Shared.Rent(len);
var memoryPool = BridgeCouldBeNull?.Multiplexer.RawConfig.ResponseMemoryPool ?? MemoryPool<byte>.Shared;
var memoryOwner = memoryPool.Rent(len);
Span<byte> oversized = memoryOwner.Memory.Span.Slice(0, len);

payload.CopyTo(oversized);
OnResponseFrame(prefix, new(oversized, 0, len), ref oversized);

// the lease could have been claimed by the activation code (to prevent another memcpy); otherwise, free
if (oversized is not null)
{
ArrayPool<byte>.Shared.Return(oversized);
}
OnResponseFrame(prefix, oversized, ref memoryOwner);

memoryOwner?.Dispose();
}
}

Expand All @@ -420,7 +420,7 @@ private void UpdateBufferStats(int lastResult, long inBuffer)
bytesLastResult = lastResult;
}

private void OnResponseFrame(RespPrefix prefix, ReadOnlySpan<byte> frame, ref byte[]? lease)
private void OnResponseFrame(RespPrefix prefix, ReadOnlySpan<byte> frame, ref IMemoryOwner<byte>? memoryOwner)
{
DebugValidateSingleFrame(frame);
_readStatus = ReadStatus.MatchResult;
Expand All @@ -431,7 +431,7 @@ private void OnResponseFrame(RespPrefix prefix, ReadOnlySpan<byte> frame, ref by
case RespPrefix.Array when (_protocol is RedisProtocol.Resp2 & connectionType is ConnectionType.Subscription)
&& !IsArrayPong(frame): // could be a RESP2 pub/sub payload
// out-of-band; pub/sub etc
if (OnOutOfBand(frame, ref lease))
if (OnOutOfBand(frame, ref memoryOwner))
{
OnDetailLog($"out-of-band message, not dequeuing: {prefix}");
return;
Expand Down Expand Up @@ -502,7 +502,7 @@ internal static ReadOnlySpan<byte> StackCopyLengthChecked(scoped in RespReader r
return buffer.Slice(0, len);
}

private bool OnOutOfBand(ReadOnlySpan<byte> payload, ref byte[]? lease)
private bool OnOutOfBand(ReadOnlySpan<byte> payload, ref IMemoryOwner<byte>? memoryOwner)
{
var muxer = BridgeCouldBeNull?.Multiplexer;
if (muxer is null) return true; // consume it blindly
Expand Down
1 change: 1 addition & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1734,6 +1734,7 @@ static StackExchange.Redis.HashEntry.operator !=(StackExchange.Redis.HashEntry x
static StackExchange.Redis.HashEntry.operator ==(StackExchange.Redis.HashEntry x, StackExchange.Redis.HashEntry y) -> bool
static StackExchange.Redis.KeyspaceIsolation.DatabaseExtensions.WithKeyPrefix(this StackExchange.Redis.IDatabase! database, StackExchange.Redis.RedisKey keyPrefix) -> StackExchange.Redis.IDatabase!
static StackExchange.Redis.Lease<T>.Create(int length, bool clear = true) -> StackExchange.Redis.Lease<T>!
static StackExchange.Redis.Lease<T>.Create(int length, System.Buffers.IMemoryOwner<T>! memoryOwner) -> StackExchange.Redis.Lease<T>!
static StackExchange.Redis.Lease<T>.Empty.get -> StackExchange.Redis.Lease<T>!
static StackExchange.Redis.ListPopResult.Null.get -> StackExchange.Redis.ListPopResult
static StackExchange.Redis.LuaScript.GetCachedScriptCount() -> int
Expand Down
2 changes: 2 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#nullable enable
[SER004]StackExchange.Redis.ConfigurationOptions.ResponseMemoryPool.get -> System.Buffers.MemoryPool<byte>?
[SER004]StackExchange.Redis.ConfigurationOptions.ResponseMemoryPool.set -> void
[SER004]StackExchange.Redis.ConfigurationOptions.RequestCycleBufferPool.get -> RESPite.Buffers.CycleBufferPool?
[SER004]StackExchange.Redis.ConfigurationOptions.RequestCycleBufferPool.set -> void
[SER004]StackExchange.Redis.ConfigurationOptions.ResponseCycleBufferPool.get -> RESPite.Buffers.CycleBufferPool?
Expand Down
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/RespReaderExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System;
using System.Buffers;
using System.Diagnostics;
using System.Threading.Tasks;
using RESPite.Messages;
Expand Down
Loading