diff --git a/tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs b/tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs index deca7f69fa..a545f8542f 100644 --- a/tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs +++ b/tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs @@ -251,54 +251,6 @@ public partial class JpegDecoderTests Assert.IsType(ex.InnerException); } - [Fact] - public async Task DecodeAsync_IsCancellable() - { - var cts = new CancellationTokenSource(); - string file = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, TestImages.Jpeg.Baseline.Jpeg420Small); - using var pausedStream = new PausedStream(file); - pausedStream.OnWaiting(_ => - { - cts.Cancel(); - pausedStream.Release(); - }); - - var configuration = Configuration.CreateDefaultInstance(); - configuration.FileSystem = new SingleStreamFileSystem(pausedStream); - DecoderOptions options = new() - { - Configuration = configuration - }; - - await Assert.ThrowsAsync(async () => - { - using Image image = await Image.LoadAsync(options, "someFakeFile", cts.Token); - }); - } - - [Fact] - public async Task Identify_IsCancellable() - { - var cts = new CancellationTokenSource(); - - string file = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, TestImages.Jpeg.Baseline.Jpeg420Small); - using var pausedStream = new PausedStream(file); - pausedStream.OnWaiting(_ => - { - cts.Cancel(); - pausedStream.Release(); - }); - - var configuration = Configuration.CreateDefaultInstance(); - configuration.FileSystem = new SingleStreamFileSystem(pausedStream); - DecoderOptions options = new() - { - Configuration = configuration - }; - - await Assert.ThrowsAsync(async () => await Image.IdentifyAsync(options, "someFakeFile", cts.Token)); - } - [Theory] [WithFileCollection(nameof(UnsupportedTestJpegs), PixelTypes.Rgba32)] public void ThrowsNotSupported_WithUnsupportedJpegs(TestImageProvider provider) diff --git a/tests/ImageSharp.Tests/Image/ImageTests.Decode_Cancellation.cs b/tests/ImageSharp.Tests/Image/ImageTests.Decode_Cancellation.cs index 7bd794c2c3..c93831491f 100644 --- a/tests/ImageSharp.Tests/Image/ImageTests.Decode_Cancellation.cs +++ b/tests/ImageSharp.Tests/Image/ImageTests.Decode_Cancellation.cs @@ -16,65 +16,105 @@ public partial class ImageTests public Decode_Cancellation() => this.TopLevelConfiguration.StreamProcessingBufferSize = 128; - public static readonly TheoryData TestFiles = new() + private static TheoryData GetTestData() { - TestImages.Png.BikeSmall, - TestImages.Jpeg.Baseline.Jpeg420Small, - TestImages.Bmp.Car, - TestImages.Tiff.RgbUncompressed, - TestImages.Gif.Kumin, - TestImages.Tga.Bit32PalRleBottomLeft, - TestImages.Webp.TestPatternOpaqueSmall, - TestImages.Pbm.RgbPlainMagick - }; + string[] testFileForEachCodec = new[] + { + TestImages.Png.BikeSmall, + TestImages.Jpeg.Baseline.Jpeg420Small, + TestImages.Bmp.Car, + TestImages.Tiff.RgbUncompressed, + TestImages.Gif.Kumin, + TestImages.Tga.Bit32PalRleBottomLeft, + TestImages.Webp.TestPatternOpaqueSmall, + TestImages.Pbm.GrayscaleBinaryWide + }; + + double[] percentages = new[] { 0, 0.5, 0.9 }; + + TheoryData data = new(); + + foreach (string file in testFileForEachCodec) + { + foreach (double p in percentages) + { + data.Add(false, file, p); + data.Add(true, file, p); + } + } + + return data; + } + + public static TheoryData TestData { get; } = GetTestData(); [Theory] - [MemberData(nameof(TestFiles))] - public async Task IdentifyAsync_IsCancellable(string file) + [MemberData(nameof(TestData))] + public async Task IdentifyAsync_IsCancellable(bool useMemoryStream, string file, double percentageOfStreamReadToCancel) { CancellationTokenSource cts = new(); - string path = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, file); - using PausedStream pausedStream = new(path); - pausedStream.OnWaiting(_ => + using IPausedStream pausedStream = useMemoryStream ? + new PausedMemoryStream(TestFile.Create(file).Bytes) : + new PausedStream(TestFile.GetInputFileFullPath(file)); + + pausedStream.OnWaiting(s => { - cts.Cancel(); - pausedStream.Release(); + if (s.Position >= s.Length * percentageOfStreamReadToCancel) + { + cts.Cancel(); + pausedStream.Release(); + } + else + { + pausedStream.Next(); + } }); Configuration configuration = Configuration.CreateDefaultInstance(); - configuration.FileSystem = new SingleStreamFileSystem(pausedStream); + configuration.FileSystem = new SingleStreamFileSystem((Stream)pausedStream); DecoderOptions options = new() { Configuration = configuration }; - await Assert.ThrowsAsync(async () => await Image.IdentifyAsync(options, "someFakeFile", cts.Token)); + await Assert.ThrowsAnyAsync( + async () => await Image.IdentifyAsync(options, "someFakeFile", cts.Token)) + .WaitAsync(TimeSpan.FromSeconds(10)); } [Theory] - [MemberData(nameof(TestFiles))] - public async Task DecodeAsync_IsCancellable(string file) + [MemberData(nameof(TestData))] + public async Task LoadAsync_IsCancellable(bool useMemoryStream, string file, double percentageOfStreamReadToCancel) { CancellationTokenSource cts = new(); - string path = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, file); - using PausedStream pausedStream = new(path); - pausedStream.OnWaiting(_ => + using IPausedStream pausedStream = useMemoryStream ? + new PausedMemoryStream(TestFile.Create(file).Bytes) : + new PausedStream(TestFile.GetInputFileFullPath(file)); + + pausedStream.OnWaiting(s => { - cts.Cancel(); - pausedStream.Release(); + if (s.Position >= s.Length * percentageOfStreamReadToCancel) + { + cts.Cancel(); + pausedStream.Release(); + } + else + { + pausedStream.Next(); + } }); Configuration configuration = Configuration.CreateDefaultInstance(); - configuration.FileSystem = new SingleStreamFileSystem(pausedStream); + configuration.FileSystem = new SingleStreamFileSystem((Stream)pausedStream); DecoderOptions options = new() { Configuration = configuration }; - await Assert.ThrowsAsync(async () => + await Assert.ThrowsAnyAsync(async () => { using Image image = await Image.LoadAsync(options, "someFakeFile", cts.Token); - }); + }).WaitAsync(TimeSpan.FromSeconds(10)); } protected override Stream CreateStream() => this.TestFormat.CreateAsyncSemaphoreStream(this.notifyWaitPositionReachedSemaphore, this.continueSemaphore, this.isTestStreamSeekable); diff --git a/tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs b/tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs new file mode 100644 index 0000000000..1eedef9d09 --- /dev/null +++ b/tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs @@ -0,0 +1,15 @@ +// Copyright (c) Six Labors. +// Licensed under the Six Labors Split License. + +namespace SixLabors.ImageSharp.Tests.TestUtilities; + +public interface IPausedStream : IDisposable +{ + public void OnWaiting(Action onWaitingCallback); + + public void OnWaiting(Action onWaitingCallback); + + public void Next(); + + public void Release(); +} diff --git a/tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs b/tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs new file mode 100644 index 0000000000..17d742e30f --- /dev/null +++ b/tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs @@ -0,0 +1,164 @@ +// Copyright (c) Six Labors. +// Licensed under the Six Labors Split License. + +using System.Buffers; + +namespace SixLabors.ImageSharp.Tests.TestUtilities; + +/// +/// is a variant of that derives from instead of encapsulating it. +/// It is used to test decoder cancellation without relying on of our standard prefetching of arbitrary streams to +/// on asynchronous path. +/// +public class PausedMemoryStream : MemoryStream, IPausedStream +{ + private readonly SemaphoreSlim semaphore = new SemaphoreSlim(0); + + private readonly CancellationTokenSource cancelationTokenSource = new CancellationTokenSource(); + + private Action onWaitingCallback; + + public void OnWaiting(Action onWaitingCallback) => this.onWaitingCallback = onWaitingCallback; + + public void OnWaiting(Action onWaitingCallback) => this.OnWaiting(_ => onWaitingCallback()); + + public void Release() + { + this.semaphore.Release(); + this.cancelationTokenSource.Cancel(); + } + + public void Next() => this.semaphore.Release(); + + private void Wait() + { + if (this.cancelationTokenSource.IsCancellationRequested) + { + return; + } + + this.onWaitingCallback?.Invoke(this); + + try + { + this.semaphore.Wait(this.cancelationTokenSource.Token); + } + catch (OperationCanceledException) + { + // ignore this as its just used to unlock any waits in progress + } + } + + private async Task Await(Func action) + { + await Task.Yield(); + this.Wait(); + await action(); + } + + private async Task Await(Func> action) + { + await Task.Yield(); + this.Wait(); + return await action(); + } + + private T Await(Func action) + { + this.Wait(); + return action(); + } + + private void Await(Action action) + { + this.Wait(); + action(); + } + + public PausedMemoryStream(byte[] data) + : base(data) + { + } + + public override bool CanTimeout => base.CanTimeout; + + public override void Close() => this.Await(() => base.Close()); + + public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + // To make sure the copy operation is buffered and pausable, we should override MemoryStream's strategy + // with the default Stream copy logic of System.IO.Stream: + // https://github.com/dotnet/runtime/blob/4f53c2f7e62df44f07cf410df8a0d439f42a0a71/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs#L104-L116 + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + int bytesRead; + while ((bytesRead = await this.ReadAsync(new Memory(buffer), cancellationToken).ConfigureAwait(false)) != 0) + { + await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false); + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public override bool CanRead => base.CanRead; + + public override bool CanSeek => base.CanSeek; + + public override bool CanWrite => base.CanWrite; + + public override void Flush() => this.Await(() => base.Flush()); + + public override int Read(byte[] buffer, int offset, int count) => this.Await(() => base.Read(buffer, offset, count)); + + public override long Seek(long offset, SeekOrigin origin) => this.Await(() => base.Seek(offset, origin)); + + public override void SetLength(long value) => this.Await(() => base.SetLength(value)); + + public override void Write(byte[] buffer, int offset, int count) => this.Await(() => base.Write(buffer, offset, count)); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => this.Await(() => base.ReadAsync(buffer, offset, count, cancellationToken)); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => this.Await(() => base.WriteAsync(buffer, offset, count, cancellationToken)); + + public override void WriteByte(byte value) => this.Await(() => base.WriteByte(value)); + + public override int ReadByte() => this.Await(() => base.ReadByte()); + + public override void CopyTo(Stream destination, int bufferSize) + { + // See comments on CopyToAsync. + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + int bytesRead; + while ((bytesRead = this.Read(buffer, 0, buffer.Length)) != 0) + { + destination.Write(buffer, 0, bytesRead); + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public override int Read(Span buffer) + { + this.Wait(); + return base.Read(buffer); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => this.Await(() => base.ReadAsync(buffer, cancellationToken)); + + public override void Write(ReadOnlySpan buffer) + { + this.Wait(); + base.Write(buffer); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => this.Await(() => base.WriteAsync(buffer, cancellationToken)); +} diff --git a/tests/ImageSharp.Tests/TestUtilities/PausedStream.cs b/tests/ImageSharp.Tests/TestUtilities/PausedStream.cs index 2d13de0745..05f5f7a060 100644 --- a/tests/ImageSharp.Tests/TestUtilities/PausedStream.cs +++ b/tests/ImageSharp.Tests/TestUtilities/PausedStream.cs @@ -1,9 +1,11 @@ // Copyright (c) Six Labors. // Licensed under the Six Labors Split License. +using System.Buffers; + namespace SixLabors.ImageSharp.Tests.TestUtilities; -public class PausedStream : Stream +public class PausedStream : Stream, IPausedStream { private readonly SemaphoreSlim semaphore = new SemaphoreSlim(0); @@ -85,7 +87,25 @@ public class PausedStream : Stream public override void Close() => this.Await(() => this.innerStream.Close()); - public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => this.Await(() => this.innerStream.CopyToAsync(destination, bufferSize, cancellationToken)); + public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + // To make sure the copy operation is buffered and pausable, we should override MemoryStream's strategy + // with the default Stream copy logic of System.IO.Stream: + // https://github.com/dotnet/runtime/blob/4f53c2f7e62df44f07cf410df8a0d439f42a0a71/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs#L104-L116 + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + int bytesRead; + while ((bytesRead = await this.ReadAsync(new Memory(buffer), cancellationToken).ConfigureAwait(false)) != 0) + { + await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false); + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } public override bool CanRead => this.innerStream.CanRead; @@ -93,9 +113,9 @@ public class PausedStream : Stream public override bool CanWrite => this.innerStream.CanWrite; - public override long Length => this.Await(() => this.innerStream.Length); + public override long Length => this.innerStream.Length; - public override long Position { get => this.Await(() => this.innerStream.Position); set => this.Await(() => this.innerStream.Position = value); } + public override long Position { get => this.innerStream.Position; set => this.innerStream.Position = value; } public override void Flush() => this.Await(() => this.innerStream.Flush()); @@ -115,9 +135,33 @@ public class PausedStream : Stream public override int ReadByte() => this.Await(() => this.innerStream.ReadByte()); - protected override void Dispose(bool disposing) => this.innerStream.Dispose(); + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + + if (disposing) + { + this.innerStream.Dispose(); + } + } - public override void CopyTo(Stream destination, int bufferSize) => this.Await(() => this.innerStream.CopyTo(destination, bufferSize)); + public override void CopyTo(Stream destination, int bufferSize) + { + // See comments on CopyToAsync. + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + int bytesRead; + while ((bytesRead = this.Read(buffer, 0, buffer.Length)) != 0) + { + destination.Write(buffer, 0, bytesRead); + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } public override int Read(Span buffer) {