Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions src/RESPite/Buffers/CycleBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,39 @@ public partial struct CycleBuffer
public static CycleBuffer Create(
MemoryPool<byte>? pool = null,
int pageSize = DefaultPageSize,
float pageGrow = DefaultPageGrow,
ICycleBufferCallback? callback = null)
{
pool ??= DefaultPool;
if (pageSize <= 0) pageSize = DefaultPageSize;
if (pageGrow <= 0) pageGrow = DefaultPageGrow;
if (pageSize > pool.MaxBufferSize) pageSize = pool.MaxBufferSize;
return new CycleBuffer(pool, pageSize, callback);
return new CycleBuffer(pool, pageSize, pageGrow, callback);
}

private CycleBuffer(MemoryPool<byte> pool, int pageSize, ICycleBufferCallback? callback)
private CycleBuffer(MemoryPool<byte> pool, int pageSize, float pageGrow, ICycleBufferCallback? callback)
{
Pool = pool;
PageSize = pageSize;
_pageSizeStart = _pageSize = pageSize;
_pageGrow = pageGrow;
_callback = callback;
leasedStart = -1;
}

private const int DefaultPageSize = 8 * 1024;
private const float DefaultPageGrow = 1f;

public int PageSize { get; }
public int PageSize => _pageSize;
public MemoryPool<byte> Pool { get; }
private readonly ICycleBufferCallback? _callback;
private readonly int _pageSizeStart;
private readonly float _pageGrow;

private Segment? startSegment, endSegment;

private int endSegmentCommitted, endSegmentLength;
private int leasedStart;
private int _pageSize;

public bool TryGetCommitted(out ReadOnlySpan<byte> span)
{
Expand Down Expand Up @@ -386,6 +393,13 @@ public ReadOnlySequence<byte> GetAllCommitted()
return ros;
}

private void NextPageSize()
{
var newSize = (int)Math.Floor(_pageSize * _pageGrow);
var maxSize = Pool.MaxBufferSize;
_pageSize = (uint)newSize > maxSize ? maxSize : newSize;
}

private Segment GetNextSegment()
{
DebugAssertValid();
Expand All @@ -412,7 +426,8 @@ private Segment GetNextSegment()
}
}

Segment newSegment = Segment.Create(Pool.Rent(PageSize));
Segment newSegment = Segment.Create(Pool.Rent(_pageSize));
NextPageSize();
if (endSegment is null)
{
// tabula rasa
Expand Down Expand Up @@ -694,6 +709,7 @@ public void Release()
node.Recycle();
node = next;
}
_pageSize = _pageSizeStart;
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/RESPite/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
[SER004]RESPite.Messages.RespScanState.TryRead(System.ReadOnlySpan<byte> value, out int bytesRead) -> bool
[SER004]RESPite.RespException
[SER004]RESPite.RespException.RespException(string! message) -> void
[SER004]static RESPite.Buffers.CycleBuffer.Create(System.Buffers.MemoryPool<byte>? pool = null, int pageSize = 8192, RESPite.Buffers.ICycleBufferCallback? callback = null) -> RESPite.Buffers.CycleBuffer
[SER004]static RESPite.Buffers.CycleBuffer.Create(System.Buffers.MemoryPool<byte>? pool = null, int pageSize = 8192, float pageGrow = 1, RESPite.Buffers.ICycleBufferCallback? callback = null) -> RESPite.Buffers.CycleBuffer
[SER004]static RESPite.Messages.RespFrameScanner.Default.get -> RESPite.Messages.RespFrameScanner!
[SER004]static RESPite.Messages.RespFrameScanner.Subscription.get -> RESPite.Messages.RespFrameScanner!
[SER004]virtual RESPite.Messages.RespAttributeReader<T>.Read(ref RESPite.Messages.RespReader reader, ref T value) -> void
Expand Down
15 changes: 13 additions & 2 deletions src/StackExchange.Redis/BufferedStreamWriter.Pipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,28 @@
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis.Configuration;

namespace StackExchange.Redis;

internal sealed class PipeStreamWriter : BufferedStreamWriter
{
private readonly PipeWriter _writer;

public PipeStreamWriter(Stream target, CancellationToken cancellationToken = default)
public PipeStreamWriter(Stream target, CancellationToken cancellationToken = default, BufferOptions? bufferOptions = null)
: base(target, cancellationToken)
{
var pipe = new Pipe();
var options = PipeOptions.Default;

if (bufferOptions != null)
{
var bufferSize = bufferOptions.BufferSize;
if (bufferSize == 0) bufferSize = options.MinimumSegmentSize;

options = new PipeOptions(bufferOptions.MemoryPool, minimumSegmentSize: bufferSize);
}

var pipe = new Pipe(options);
_writer = pipe.Writer;
WriteComplete = CopyToAsync(pipe.Reader, pipe.Writer, Target, cancellationToken);
}
Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/BufferedStreamWriter.Switchable.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System;
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using StackExchange.Redis.Configuration;

namespace StackExchange.Redis;

Expand All @@ -13,8 +15,8 @@ internal sealed class SwitchableBufferedStreamWriter : CycleBufferStreamWriter,
private ManualResetValueTaskSourceCore<bool> _readerTask;
private bool _syncSignalled;

public SwitchableBufferedStreamWriter(Stream target, CancellationToken cancellationToken, bool initiallySync)
: base(target, cancellationToken, initiallySync ? StateFlags.None : StateFlags.AsyncMode)
public SwitchableBufferedStreamWriter(Stream target, CancellationToken cancellationToken, BufferOptions? bufferOptions, bool initiallySync)
: base(target, cancellationToken, bufferOptions, initiallySync ? StateFlags.None : StateFlags.AsyncMode)
{
_readerTask.RunContinuationsAsynchronously = true; // we never want the flusher to take over the copying
if (initiallySync)
Expand Down
13 changes: 7 additions & 6 deletions src/StackExchange.Redis/BufferedStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using System.Threading.Tasks;
using RESPite.Buffers;
using StackExchange.Redis.Configuration;

namespace StackExchange.Redis;

Expand Down Expand Up @@ -57,7 +58,7 @@ public enum WriteMode

public virtual bool IsSync => false;

public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connectionType, Stream target, CancellationToken cancellationToken)
public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connectionType, Stream target, BufferOptions? bufferOptions, CancellationToken cancellationToken)
{
if (connectionType is ConnectionType.Subscription | mode is WriteMode.Default)
{
Expand All @@ -67,9 +68,9 @@ public static BufferedStreamWriter Create(WriteMode mode, ConnectionType connect
}
return mode switch
{
WriteMode.Sync => new SwitchableBufferedStreamWriter(target, cancellationToken, initiallySync: true),
WriteMode.Async => new SwitchableBufferedStreamWriter(target, cancellationToken, initiallySync: false),
WriteMode.Pipe => new PipeStreamWriter(target, cancellationToken),
WriteMode.Sync => new SwitchableBufferedStreamWriter(target, cancellationToken, bufferOptions, initiallySync: true),
WriteMode.Async => new SwitchableBufferedStreamWriter(target, cancellationToken, bufferOptions, initiallySync: false),
WriteMode.Pipe => new PipeStreamWriter(target, cancellationToken, bufferOptions),
_ => throw new ArgumentOutOfRangeException(nameof(mode)),
};
}
Expand Down Expand Up @@ -115,10 +116,10 @@ public virtual void DebugSetLog(Action<string> log) { }

internal abstract class CycleBufferStreamWriter : BufferedStreamWriter, ICycleBufferCallback
{
protected CycleBufferStreamWriter(Stream target, CancellationToken cancellationToken, StateFlags flags = StateFlags.None)
protected CycleBufferStreamWriter(Stream target, CancellationToken cancellationToken, BufferOptions? bufferOptions = null, StateFlags flags = StateFlags.None)
: base(target, cancellationToken)
{
_buffer = CycleBuffer.Create(callback: this);
_buffer = CycleBuffer.Create(bufferOptions?.MemoryPool, bufferOptions?.BufferSize ?? 0, bufferOptions?.BufferGrowthFactor ?? 0, callback: this);
_stateFlags = flags;
}

Expand Down
24 changes: 24 additions & 0 deletions src/StackExchange.Redis/Configuration/BufferOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Buffers;

namespace StackExchange.Redis.Configuration;

/// <summary>
/// CycleBuffer BufferOptions.
/// </summary>
public sealed class BufferOptions
{
/// <summary>
/// Memory Pool.
/// </summary>
public MemoryPool<byte>? MemoryPool { get; set; }

/// <summary>
/// Buffer Size.
/// </summary>
public int BufferSize { get; set; }

/// <summary>
/// Buffer Growth Factor.
/// </summary>
public float BufferGrowthFactor { get; set; }
}
22 changes: 22 additions & 0 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
Expand Down Expand Up @@ -308,6 +309,21 @@ public DefaultOptionsProvider Defaults
/// </summary>
public Action<EndPoint, ConnectionType, Socket>? BeforeSocketConnect { get; set; }

/// <summary>
/// Request BufferOptions.
/// </summary>
public BufferOptions? RequestBufferOptions { get; set; }

/// <summary>
/// Response BufferOptions.
/// </summary>
public BufferOptions? ResponseBufferOptions { get; set; }

/// <summary>
/// Response ArrayPool.
/// </summary>
public ArrayPool<byte>? ResponseArrayPool { get; set; }

internal Func<ConnectionMultiplexer, Action<string>, Task> AfterConnectAsync => Defaults.AfterConnectAsync;

/// <summary>
Expand Down Expand Up @@ -956,6 +972,9 @@ public static ConfigurationOptions Parse(string configuration, bool ignoreUnknow
reconnectRetryPolicy = reconnectRetryPolicy,
backlogPolicy = backlogPolicy,
sslProtocols = sslProtocols,
RequestBufferOptions = RequestBufferOptions,
ResponseBufferOptions = ResponseBufferOptions,
ResponseArrayPool = ResponseArrayPool,
BeforeSocketConnect = BeforeSocketConnect,
EndPoints = EndPoints.Clone(),
LoggerFactory = LoggerFactory,
Expand Down Expand Up @@ -1156,6 +1175,9 @@ private void Clear()

CertificateSelection = null;
CertificateValidation = null;
RequestBufferOptions = null;
ResponseBufferOptions = null;
ResponseArrayPool = null;
BeforeSocketConnect = null;
ChannelPrefix = default;
LibraryName = null;
Expand Down
16 changes: 11 additions & 5 deletions src/StackExchange.Redis/PhysicalConnection.Read.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
Expand Down Expand Up @@ -70,10 +70,12 @@ private async Task ReadAllAsync(CancellationToken cancellationToken)
var tail = _ioStream ?? Stream.Null;
if (_readStatus is not ReadStatus.TransitioningToAsync)
{
var bufferOptions = BridgeCouldBeNull?.Multiplexer.RawConfig.ResponseBufferOptions;

// preserve existing state if transitioning
_readStatus = ReadStatus.Init;
_readState = default;
_readBuffer = CycleBuffer.Create();
_readBuffer = CycleBuffer.Create(bufferOptions?.MemoryPool, bufferOptions?.BufferSize ?? 0, bufferOptions?.BufferGrowthFactor ?? 0);
}
try
{
Expand Down Expand Up @@ -130,7 +132,9 @@ private void ReadAllSync(CancellationToken cancellationToken)
var tail = _ioStream ?? Stream.Null;
_readStatus = ReadStatus.Init;
_readState = default;
_readBuffer = CycleBuffer.Create();

var bufferOptions = BridgeCouldBeNull?.Multiplexer.RawConfig.ResponseBufferOptions;
_readBuffer = CycleBuffer.Create(bufferOptions?.MemoryPool, bufferOptions?.BufferSize ?? 0, bufferOptions?.BufferGrowthFactor ?? 0);
try
{
int read;
Expand Down Expand Up @@ -399,14 +403,16 @@ private void OnResponseFrame(RespPrefix prefix, ReadOnlySequence<byte> payload)
else
{
var len = checked((int)payload.Length);
byte[]? oversized = ArrayPool<byte>.Shared.Rent(len);
var arrayPool = BridgeCouldBeNull?.Multiplexer.RawConfig.ResponseArrayPool ?? ArrayPool<byte>.Shared;

byte[]? oversized = arrayPool.Rent(len);
payload.CopyTo(oversized);
OnResponseFrame(prefix, new(oversized, 0, len), ref oversized);

// the lease could have been claimed by the activation code (to prevent another memcpy); otherwise, free
if (oversized is not null)
{
ArrayPool<byte>.Shared.Return(oversized);
arrayPool.Return(oversized);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/StackExchange.Redis/PhysicalConnection.Write.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ private void InitOutput(Stream? stream)
{
if (stream is null) return;
_ioStream = stream;
_output = BufferedStreamWriter.Create(WriteMode, connectionType, stream, OutputCancel);
_output = BufferedStreamWriter.Create(WriteMode, connectionType, stream, BridgeCouldBeNull?.Multiplexer.RawConfig.RequestBufferOptions, OutputCancel);

#if DEBUG
if (BridgeCouldBeNull?.Multiplexer.RawConfig.OutputLog is { } log)
{
Expand Down
13 changes: 13 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@ StackExchange.Redis.Configuration.DefaultOptionsProvider.ClientName.get -> strin
StackExchange.Redis.Configuration.DefaultOptionsProvider.DefaultOptionsProvider() -> void
StackExchange.Redis.Configuration.Tunnel
StackExchange.Redis.Configuration.Tunnel.Tunnel() -> void
StackExchange.Redis.Configuration.BufferOptions
StackExchange.Redis.Configuration.BufferOptions.MemoryPool.get -> System.Buffers.MemoryPool<byte>?
StackExchange.Redis.Configuration.BufferOptions.MemoryPool.set -> void
StackExchange.Redis.Configuration.BufferOptions.BufferSize.get -> int
StackExchange.Redis.Configuration.BufferOptions.BufferSize.set -> void
StackExchange.Redis.Configuration.BufferOptions.BufferGrowthFactor.get -> float
StackExchange.Redis.Configuration.BufferOptions.BufferGrowthFactor.set -> void
static StackExchange.Redis.Configuration.Tunnel.HttpProxy(System.Net.EndPoint! proxy) -> StackExchange.Redis.Configuration.Tunnel!
virtual StackExchange.Redis.Configuration.Tunnel.BeforeAuthenticateAsync(System.Net.EndPoint! endpoint, StackExchange.Redis.ConnectionType connectionType, System.Net.Sockets.Socket? socket, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<System.IO.Stream?>
virtual StackExchange.Redis.Configuration.Tunnel.BeforeSocketConnectAsync(System.Net.EndPoint! endPoint, StackExchange.Redis.ConnectionType connectionType, System.Net.Sockets.Socket? socket, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask
Expand All @@ -227,6 +234,12 @@ StackExchange.Redis.ConfigurationOptions.AsyncTimeout.get -> int
StackExchange.Redis.ConfigurationOptions.AsyncTimeout.set -> void
StackExchange.Redis.ConfigurationOptions.BacklogPolicy.get -> StackExchange.Redis.BacklogPolicy!
StackExchange.Redis.ConfigurationOptions.BacklogPolicy.set -> void
StackExchange.Redis.ConfigurationOptions.RequestBufferOptions.get -> StackExchange.Redis.Configuration.BufferOptions?
StackExchange.Redis.ConfigurationOptions.RequestBufferOptions.set -> void
StackExchange.Redis.ConfigurationOptions.ResponseBufferOptions.get -> StackExchange.Redis.Configuration.BufferOptions?
StackExchange.Redis.ConfigurationOptions.ResponseBufferOptions.set -> void
StackExchange.Redis.ConfigurationOptions.ResponseArrayPool.get -> System.Buffers.ArrayPool<byte>?
StackExchange.Redis.ConfigurationOptions.ResponseArrayPool.set -> void
StackExchange.Redis.ConfigurationOptions.BeforeSocketConnect.get -> System.Action<System.Net.EndPoint!, StackExchange.Redis.ConnectionType, System.Net.Sockets.Socket!>?
StackExchange.Redis.ConfigurationOptions.BeforeSocketConnect.set -> void
StackExchange.Redis.ConfigurationOptions.CertificateSelection -> System.Net.Security.LocalCertificateSelectionCallback?
Expand Down
1 change: 1 addition & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#nullable enable
StackExchange.Redis.Configuration.BufferOptions.BufferOptions() -> void
[SER005]StackExchange.Redis.TestHarness
[SER005]StackExchange.Redis.TestHarness.BufferValidator
[SER005]StackExchange.Redis.TestHarness.ChannelPrefix.get -> StackExchange.Redis.RedisChannel
Expand Down
2 changes: 1 addition & 1 deletion src/StackExchange.Redis/StackExchange.Redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

<!-- net461 needs this for OSPlatform et al -->
<PackageReference Include="System.Runtime.InteropServices.RuntimeInformation" Condition="'$(TargetFramework)' == 'net461' " />

<!-- netfx needs this for ZipArchive -->
<PackageReference Include="System.IO.Compression" Condition="'$(TargetFramework)' == 'net472' or '$(TargetFramework)' == 'net461' " />

Expand Down
Loading
Loading