From 937e78ce80b39ef4d425c546995f2e021708db64 Mon Sep 17 00:00:00 2001 From: Anton Firszov Date: Mon, 5 Dec 2022 00:06:03 +0100 Subject: [PATCH] cover JpegDecoder's own cancellation support --- .../Formats/Jpg/JpegDecoderTests.cs | 67 ++++--- .../TestUtilities/IPausedStream.cs | 15 ++ .../TestUtilities/PausedMemoryStream.cs | 168 ++++++++++++++++++ .../TestUtilities/PausedStream.cs | 6 +- 4 files changed, 231 insertions(+), 25 deletions(-) create mode 100644 tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs create mode 100644 tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs diff --git a/tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs b/tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs index 6ea6396e06..906c3b818a 100644 --- a/tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs +++ b/tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs @@ -251,15 +251,27 @@ public partial class JpegDecoderTests Assert.IsType(ex.InnerException); } + private static readonly TestFile CancellationTestFile = TestFile.Create(TestImages.Jpeg.Baseline.Jpeg420Small); + + public static readonly TheoryData CancellationData = new() + { + { false, 0 }, + { false, 0.5 }, + { false, 0.9 }, + { true, 0 }, + { true, 0.5 }, + { true, 0.9 }, + }; + [Theory] - [InlineData(0)] - [InlineData(0.5)] - [InlineData(0.9)] - public async Task DecodeAsync_IsCancellable(double percentageOfStreamReadToCancel) + [MemberData(nameof(CancellationData))] + public async Task DecodeAsync_IsCancellable(bool useMemoryStream, double percentageOfStreamReadToCancel) { - var cts = new CancellationTokenSource(); - string file = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, TestImages.Jpeg.Baseline.Jpeg420Small); - using var pausedStream = new PausedStream(file); + CancellationTokenSource cts = new(); + using IPausedStream pausedStream = useMemoryStream ? + new PausedMemoryStream(CancellationTestFile.Bytes) : + new PausedStream(CancellationTestFile.FullPath); + pausedStream.OnWaiting(s => { if (s.Position >= s.Length * percentageOfStreamReadToCancel) @@ -273,40 +285,51 @@ public partial class JpegDecoderTests } }); - var configuration = Configuration.CreateDefaultInstance(); - configuration.FileSystem = new SingleStreamFileSystem(pausedStream); + Configuration configuration = Configuration.CreateDefaultInstance(); + configuration.FileSystem = new SingleStreamFileSystem((Stream)pausedStream); DecoderOptions options = new() { Configuration = configuration }; - await Assert.ThrowsAsync(async () => + TimeSpan testTimeout = TimeSpan.FromSeconds(10); + + await Assert.ThrowsAnyAsync(async () => { using Image image = await Image.LoadAsync(options, "someFakeFile", cts.Token); - }); + }).WaitAsync(testTimeout); } - [Fact] - public async Task Identify_IsCancellable() + [Theory] + [MemberData(nameof(CancellationData))] + public async Task Identify_IsCancellable(bool useMemoryStream, double percentageOfStreamReadToCancel) { - var cts = new CancellationTokenSource(); + CancellationTokenSource cts = new(); + using IPausedStream pausedStream = useMemoryStream ? + new PausedMemoryStream(CancellationTestFile.Bytes) : + new PausedStream(CancellationTestFile.FullPath); - string file = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, TestImages.Jpeg.Baseline.Jpeg420Small); - using var pausedStream = new PausedStream(file); - pausedStream.OnWaiting(_ => + pausedStream.OnWaiting(s => { - cts.Cancel(); - pausedStream.Release(); + if (s.Position >= s.Length * percentageOfStreamReadToCancel) + { + cts.Cancel(); + pausedStream.Release(); + } + else + { + pausedStream.Next(); + } }); - var configuration = Configuration.CreateDefaultInstance(); - configuration.FileSystem = new SingleStreamFileSystem(pausedStream); + Configuration configuration = Configuration.CreateDefaultInstance(); + configuration.FileSystem = new SingleStreamFileSystem((Stream)pausedStream); DecoderOptions options = new() { Configuration = configuration }; - await Assert.ThrowsAsync(async () => await Image.IdentifyAsync(options, "someFakeFile", cts.Token)); + await Assert.ThrowsAnyAsync(async () => await Image.IdentifyAsync(options, "someFakeFile", cts.Token)); } [Theory] diff --git a/tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs b/tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs new file mode 100644 index 0000000000..1eedef9d09 --- /dev/null +++ b/tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs @@ -0,0 +1,15 @@ +// Copyright (c) Six Labors. +// Licensed under the Six Labors Split License. + +namespace SixLabors.ImageSharp.Tests.TestUtilities; + +public interface IPausedStream : IDisposable +{ + public void OnWaiting(Action onWaitingCallback); + + public void OnWaiting(Action onWaitingCallback); + + public void Next(); + + public void Release(); +} diff --git a/tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs b/tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs new file mode 100644 index 0000000000..96c3c5b63c --- /dev/null +++ b/tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs @@ -0,0 +1,168 @@ +// Copyright (c) Six Labors. +// Licensed under the Six Labors Split License. + +using System.Buffers; + +namespace SixLabors.ImageSharp.Tests.TestUtilities; + +/// +/// is a variant of that derives from instead of encapsulating it. +/// It is used to test decoder cancellation without relying on of our standard prefetching of arbitrary streams to +/// on asynchronous path. +/// +public class PausedMemoryStream : MemoryStream, IPausedStream +{ + private readonly SemaphoreSlim semaphore = new SemaphoreSlim(0); + + private readonly CancellationTokenSource cancelationTokenSource = new CancellationTokenSource(); + + private Action onWaitingCallback; + + public void OnWaiting(Action onWaitingCallback) => this.onWaitingCallback = onWaitingCallback; + + public void OnWaiting(Action onWaitingCallback) => this.OnWaiting(_ => onWaitingCallback()); + + public void Release() + { + this.semaphore.Release(); + this.cancelationTokenSource.Cancel(); + } + + public void Next() => this.semaphore.Release(); + + private void Wait() + { + if (this.cancelationTokenSource.IsCancellationRequested) + { + return; + } + + this.onWaitingCallback?.Invoke(this); + + try + { + this.semaphore.Wait(this.cancelationTokenSource.Token); + } + catch (OperationCanceledException) + { + // ignore this as its just used to unlock any waits in progress + } + } + + private async Task Await(Func action) + { + await Task.Yield(); + this.Wait(); + await action(); + } + + private async Task Await(Func> action) + { + await Task.Yield(); + this.Wait(); + return await action(); + } + + private T Await(Func action) + { + this.Wait(); + return action(); + } + + private void Await(Action action) + { + this.Wait(); + action(); + } + + public PausedMemoryStream(byte[] data) + : base(data) + { + } + + public override bool CanTimeout => base.CanTimeout; + + public override void Close() => this.Await(() => base.Close()); + + public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) + { + // To make sure the copy operation is buffered and pausable, we should override MemoryStream's strategy + // with the default Stream copy logic based of System.IO.Stream: + // https://github.com/dotnet/runtime/blob/4f53c2f7e62df44f07cf410df8a0d439f42a0a71/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs#L104-L116 + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + int bytesRead; + while ((bytesRead = await this.ReadAsync(new Memory(buffer), cancellationToken).ConfigureAwait(false)) != 0) + { + await destination.WriteAsync(new ReadOnlyMemory(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false); + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public override bool CanRead => base.CanRead; + + public override bool CanSeek => base.CanSeek; + + public override bool CanWrite => base.CanWrite; + + //public override long Length => this.Await(() => base.Length); + + //public override long Position { get => this.Await(() => base.Position); set => this.Await(() => base.Position = value); } + + public override void Flush() => this.Await(() => base.Flush()); + + public override int Read(byte[] buffer, int offset, int count) => this.Await(() => base.Read(buffer, offset, count)); + + public override long Seek(long offset, SeekOrigin origin) => this.Await(() => base.Seek(offset, origin)); + + public override void SetLength(long value) => this.Await(() => base.SetLength(value)); + + public override void Write(byte[] buffer, int offset, int count) => this.Await(() => base.Write(buffer, offset, count)); + + public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => this.Await(() => base.ReadAsync(buffer, offset, count, cancellationToken)); + + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => this.Await(() => base.WriteAsync(buffer, offset, count, cancellationToken)); + + public override void WriteByte(byte value) => this.Await(() => base.WriteByte(value)); + + public override int ReadByte() => this.Await(() => base.ReadByte()); + + public override void CopyTo(Stream destination, int bufferSize) + { + // See comments on CopyToAsync. + byte[] buffer = ArrayPool.Shared.Rent(bufferSize); + try + { + int bytesRead; + while ((bytesRead = this.Read(buffer, 0, buffer.Length)) != 0) + { + destination.Write(buffer, 0, bytesRead); + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + public override int Read(Span buffer) + { + this.Wait(); + return base.Read(buffer); + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => this.Await(() => base.ReadAsync(buffer, cancellationToken)); + + public override void Write(ReadOnlySpan buffer) + { + this.Wait(); + base.Write(buffer); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => this.Await(() => base.WriteAsync(buffer, cancellationToken)); +} diff --git a/tests/ImageSharp.Tests/TestUtilities/PausedStream.cs b/tests/ImageSharp.Tests/TestUtilities/PausedStream.cs index cafe1c28e7..566cc79d5a 100644 --- a/tests/ImageSharp.Tests/TestUtilities/PausedStream.cs +++ b/tests/ImageSharp.Tests/TestUtilities/PausedStream.cs @@ -5,7 +5,7 @@ using System.Buffers; namespace SixLabors.ImageSharp.Tests.TestUtilities; -public class PausedStream : Stream +public class PausedStream : Stream, IPausedStream { private readonly SemaphoreSlim semaphore = new SemaphoreSlim(0); @@ -113,9 +113,9 @@ public class PausedStream : Stream public override bool CanWrite => this.innerStream.CanWrite; - public override long Length => this.Await(() => this.innerStream.Length); + public override long Length => this.innerStream.Length; - public override long Position { get => this.Await(() => this.innerStream.Position); set => this.Await(() => this.innerStream.Position = value); } + public override long Position { get => this.innerStream.Position; set => this.innerStream.Position = value; } public override void Flush() => this.Await(() => this.innerStream.Flush());