diff --git a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs index 0c5eaa4..e1cdf42 100644 --- a/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs +++ b/src/Ramstack.FileSystem.Amazon/S3UploadStream.cs @@ -10,12 +10,11 @@ namespace Ramstack.FileSystem.Amazon; /// /// Represents a stream for uploading data to Amazon S3 using multipart upload. -/// This stream accumulates data in a temporary buffer and uploads it to S3 in parts -/// once the buffer reaches a predefined size. /// internal sealed class S3UploadStream : Stream { - private const long PartSize = 5 * 1024 * 1024; + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + private const long MinPartSize = 5L * 1024 * 1024; private readonly IAmazonS3 _client; private readonly string _bucketName; @@ -81,7 +80,7 @@ public S3UploadStream(IAmazonS3 client, string bucketName, string key, string up FileShare.None, bufferSize: 4096, FileOptions.DeleteOnClose - | FileOptions.Asynchronous); + | FileOptions.Asynchronous); } /// @@ -102,7 +101,7 @@ public override void Write(ReadOnlySpan buffer) { _stream.Write(buffer); - if (_stream.Length >= PartSize) + if (_stream.Length >= MinPartSize) UploadPart(); } catch (Exception exception) @@ -122,7 +121,7 @@ public override async ValueTask WriteAsync(ReadOnlyMemory buffer, Cancella try { await _stream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); - if (_stream.Length >= PartSize) + if (_stream.Length >= MinPartSize) await UploadPartAsync(cancellationToken).ConfigureAwait(false); } catch (Exception exception) @@ -146,16 +145,11 @@ public override void SetLength(long value) => /// public override void Flush() { - _stream.Flush(); - UploadPart(); } /// - public override async Task FlushAsync(CancellationToken cancellationToken) - { - await _stream.FlushAsync(cancellationToken).ConfigureAwait(false); - await UploadPartAsync(cancellationToken).ConfigureAwait(false); - } + public override Task FlushAsync(CancellationToken cancellationToken) => + Task.CompletedTask; /// protected override void Dispose(bool disposing) @@ -233,7 +227,13 @@ private async ValueTask UploadPartAsync(CancellationToken cancellationToken) _stream.Position = 0; // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - // The maximum allowed part size is 5 gigabytes. + // The maximum allowed part size is 5 GiB. + // ----------------------------------------------------------------------------------- + // We don't need to worry about S3's 5 GiB part limit because: + // 1. All Write/WriteAsync methods are inherently limited by Array.MaxLength (~2 GiB). + // 2. The upload starts as soon as the buffer reaches MinPartSize (5 MiB). + // Even if a single write matches Array.MaxLength, the data is + // uploaded immediately, staying within AWS limits. var request = new UploadPartRequest { diff --git a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs index fc75b3b..7d2e8c4 100644 --- a/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs +++ b/tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs @@ -60,10 +60,8 @@ public async Task File_OpenWrite_InternalBufferWriteError_DoesNotCreateFile() var underlying = (FileStream)stream.GetType().GetField("_stream", BindingFlags.NonPublic | BindingFlags.Instance)!.GetValue(stream)!; Assert.That(underlying, Is.Not.Null); - await stream.WriteAsync(new ReadOnlyMemory(new byte[1024])); - - // Forces to upload buffer. - await stream.FlushAsync(); + // Write enough data to trigger automatic part upload (>= 5 MiB). + await stream.WriteAsync(new ReadOnlyMemory(new byte[6 * 1024 * 1024])); // Simulates an internal buffer write error. await underlying.DisposeAsync(); @@ -195,6 +193,93 @@ await reader.ReadToEndAsync(), await destination.DeleteAsync(); } + [Test] + public async Task File_OpenWrite_FlushDoesNotCauseUndersizedParts() + { + using var fs = GetFileSystem(); + + const string Content = "Hello, World!"; + + { + await using var stream = await fs.OpenWriteAsync("/flush-test.txt"); + await using var writer = new StreamWriter(stream); + + // Write small data and flush multiple times. + // Flush should be a no-op and not upload undersized parts. + foreach (var ch in Content) + { + await writer.WriteAsync(ch); + await writer.FlushAsync(); + } + } + { + // ReSharper disable once UseAwaitUsing + using var stream = await fs.OpenReadAsync("/flush-test.txt"); + using var reader = new StreamReader(stream); + + Assert.That(await reader.ReadToEndAsync(), Is.EqualTo(Content)); + } + + await fs.DeleteFileAsync("/flush-test.txt"); + } + + [Test] + public async Task File_OpenWrite_FlushWithMultipartUpload() + { + using var fs = GetFileSystem(); + + const int Count = 5; + const string FileName = "/flush-multipart-test.bin"; + + var chunk = new byte[3 * 1024 * 1024]; + Random.Shared.NextBytes(chunk); + + { + await using var stream = await fs.OpenWriteAsync(FileName); + for (var i = 0; i < Count; i++) + await stream.WriteAsync(chunk); + } + + { + var file = fs.GetFile(FileName); + + Assert.That(await file.ExistsAsync(), Is.True); + Assert.That(await file.GetLengthAsync(), Is.EqualTo(chunk.Length * Count)); + + // ReSharper disable once UseAwaitUsing + using var stream = await file.OpenReadAsync(); + + var bytes = new byte[chunk.Length]; + + for (var i = 0; i < Count; i++) + { + var n = await ReadBlockAsync(stream, bytes); + Assert.That(n, Is.EqualTo(bytes.Length)); + + Assert.That( + bytes.AsSpan().SequenceEqual(chunk), + Is.True); + } + } + + await fs.DeleteFileAsync(FileName); + + static async Task ReadBlockAsync(Stream stream, Memory memory) + { + var count = memory.Length; + + while (!memory.IsEmpty) + { + var n = await stream.ReadAsync(memory); + if (n == 0) + return 0; + + memory = memory[n..]; + } + + return count; + } + } [Test] public async Task Directory_BatchDeleting()