Browse Source

Use real cancellation handling.

pull/2301/head
James Jackson-South 4 years ago
parent
commit
4891dc309a
  1. 7
      src/ImageSharp/Formats/ImageDecoder.cs
  2. 4
      src/ImageSharp/Formats/ImageDecoderUtilities.cs
  3. 9
      src/ImageSharp/IO/BufferedReadStream.cs
  4. 158
      tests/ImageSharp.Tests/Image/ImageTests.Decode_Cancellation.cs

7
src/ImageSharp/Formats/ImageDecoder.cs

@ -208,13 +208,6 @@ public abstract class ImageDecoder : IImageDecoder
stream.Position = position + s.Position; stream.Position = position + s.Position;
} }
// TODO: This is a hack. Our decoders do not check for cancellation requests.
// We need to fix this properly by implemented each decoder.
if (ct.IsCancellationRequested)
{
throw new TaskCanceledException();
}
return result; return result;
} }

4
src/ImageSharp/Formats/ImageDecoderUtilities.cs

@ -18,7 +18,7 @@ internal static class ImageDecoderUtilities
Stream stream, Stream stream,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
using BufferedReadStream bufferedReadStream = new(configuration, stream); using BufferedReadStream bufferedReadStream = new(configuration, stream, cancellationToken);
try try
{ {
@ -50,7 +50,7 @@ internal static class ImageDecoderUtilities
CancellationToken cancellationToken) CancellationToken cancellationToken)
where TPixel : unmanaged, IPixel<TPixel> where TPixel : unmanaged, IPixel<TPixel>
{ {
using BufferedReadStream bufferedReadStream = new(configuration, stream); using BufferedReadStream bufferedReadStream = new(configuration, stream, cancellationToken);
try try
{ {

9
src/ImageSharp/IO/BufferedReadStream.cs

@ -12,6 +12,8 @@ namespace SixLabors.ImageSharp.IO;
/// </summary> /// </summary>
internal sealed class BufferedReadStream : Stream internal sealed class BufferedReadStream : Stream
{ {
private readonly CancellationToken cancellationToken;
private readonly int maxBufferIndex; private readonly int maxBufferIndex;
private readonly byte[] readBuffer; private readonly byte[] readBuffer;
@ -33,12 +35,15 @@ internal sealed class BufferedReadStream : Stream
/// </summary> /// </summary>
/// <param name="configuration">The configuration which allows altering default behaviour or extending the library.</param> /// <param name="configuration">The configuration which allows altering default behaviour or extending the library.</param>
/// <param name="stream">The input stream.</param> /// <param name="stream">The input stream.</param>
public BufferedReadStream(Configuration configuration, Stream stream) /// <param name="cancellationToken">The optional cancellation token.</param>
public BufferedReadStream(Configuration configuration, Stream stream, CancellationToken cancellationToken = default)
{ {
Guard.NotNull(configuration, nameof(configuration)); Guard.NotNull(configuration, nameof(configuration));
Guard.IsTrue(stream.CanRead, nameof(stream), "Stream must be readable."); Guard.IsTrue(stream.CanRead, nameof(stream), "Stream must be readable.");
Guard.IsTrue(stream.CanSeek, nameof(stream), "Stream must be seekable."); Guard.IsTrue(stream.CanSeek, nameof(stream), "Stream must be seekable.");
this.cancellationToken = cancellationToken;
// Ensure all underlying buffers have been flushed before we attempt to read the stream. // Ensure all underlying buffers have been flushed before we attempt to read the stream.
// User streams may have opted to throw from Flush if CanWrite is false // User streams may have opted to throw from Flush if CanWrite is false
// (although the abstract Stream does not do so). // (although the abstract Stream does not do so).
@ -163,6 +168,8 @@ internal sealed class BufferedReadStream : Stream
[MethodImpl(MethodImplOptions.AggressiveInlining)] [MethodImpl(MethodImplOptions.AggressiveInlining)]
public override int Read(Span<byte> buffer) public override int Read(Span<byte> buffer)
{ {
this.cancellationToken.ThrowIfCancellationRequested();
// Too big for our buffer. Read directly from the stream. // Too big for our buffer. Read directly from the stream.
int count = buffer.Length; int count = buffer.Length;
if (count > this.BufferSize) if (count > this.BufferSize)

158
tests/ImageSharp.Tests/Image/ImageTests.Decode_Cancellation.cs

@ -2,7 +2,7 @@
// Licensed under the Six Labors Split License. // Licensed under the Six Labors Split License.
using SixLabors.ImageSharp.Formats; using SixLabors.ImageSharp.Formats;
using SixLabors.ImageSharp.PixelFormats; using SixLabors.ImageSharp.Tests.TestUtilities;
namespace SixLabors.ImageSharp.Tests; namespace SixLabors.ImageSharp.Tests;
@ -13,148 +13,68 @@ public partial class ImageTests
private bool isTestStreamSeekable; private bool isTestStreamSeekable;
private readonly SemaphoreSlim notifyWaitPositionReachedSemaphore = new(0); private readonly SemaphoreSlim notifyWaitPositionReachedSemaphore = new(0);
private readonly SemaphoreSlim continueSemaphore = new(0); private readonly SemaphoreSlim continueSemaphore = new(0);
private readonly CancellationTokenSource cts = new();
public Decode_Cancellation() => this.TopLevelConfiguration.StreamProcessingBufferSize = 128; public Decode_Cancellation() => this.TopLevelConfiguration.StreamProcessingBufferSize = 128;
[Theory] public static readonly TheoryData<string> TestFiles = new()
[InlineData(false)]
[InlineData(true)]
public Task LoadAsync_Specific_Stream(bool isInputStreamSeekable)
{ {
this.isTestStreamSeekable = isInputStreamSeekable; TestImages.Png.BikeSmall,
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning); TestImages.Jpeg.Baseline.Jpeg420Small,
TestImages.Bmp.Car,
DecoderOptions options = new() TestImages.Tiff.RgbUncompressed,
{ TestImages.Gif.Kumin,
Configuration = this.TopLevelConfiguration TestImages.Tga.Bit32PalRleBottomLeft,
}; TestImages.Webp.TestPatternOpaqueSmall,
TestImages.Pbm.RgbPlainMagick
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.LoadAsync<Rgb24>(options, this.DataStream, this.cts.Token)); };
}
[Theory] [Theory]
[InlineData(false)] [MemberData(nameof(TestFiles))]
[InlineData(true)] public async Task IdentifyAsync_IsCancellable(string file)
public Task LoadAsync_Agnostic_Stream(bool isInputStreamSeekable)
{ {
this.isTestStreamSeekable = isInputStreamSeekable; CancellationTokenSource cts = new();
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning); string path = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, file);
using PausedStream pausedStream = new(path);
DecoderOptions options = new() pausedStream.OnWaiting(_ =>
{ {
Configuration = this.TopLevelConfiguration cts.Cancel();
}; pausedStream.Release();
});
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.LoadAsync(options, this.DataStream, this.cts.Token));
}
[Fact]
public Task LoadAsync_Agnostic_Path()
{
this.isTestStreamSeekable = true;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
Configuration configuration = Configuration.CreateDefaultInstance();
configuration.FileSystem = new SingleStreamFileSystem(pausedStream);
DecoderOptions options = new() DecoderOptions options = new()
{ {
Configuration = this.TopLevelConfiguration Configuration = configuration
}; };
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.LoadAsync(options, this.MockFilePath, this.cts.Token)); await Assert.ThrowsAsync<TaskCanceledException>(async () => await Image.IdentifyAsync(options, "someFakeFile", cts.Token));
}
[Fact]
public Task LoadAsync_Specific_Path()
{
this.isTestStreamSeekable = true;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
DecoderOptions options = new()
{
Configuration = this.TopLevelConfiguration
};
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.LoadAsync<Rgb24>(options, this.MockFilePath, this.cts.Token));
} }
[Theory] [Theory]
[InlineData(false)] [MemberData(nameof(TestFiles))]
[InlineData(true)] public async Task DecodeAsync_IsCancellable(string file)
public Task IdentifyAsync_Stream(bool isInputStreamSeekable)
{
this.isTestStreamSeekable = isInputStreamSeekable;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
DecoderOptions options = new()
{
Configuration = this.TopLevelConfiguration
};
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.IdentifyAsync(options, this.DataStream, this.cts.Token));
}
[Fact]
public Task IdentifyAsync_CustomConfiguration_Path()
{ {
this.isTestStreamSeekable = true; CancellationTokenSource cts = new();
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning); string path = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, file);
using PausedStream pausedStream = new(path);
DecoderOptions options = new() pausedStream.OnWaiting(_ =>
{ {
Configuration = this.TopLevelConfiguration cts.Cancel();
}; pausedStream.Release();
});
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.IdentifyAsync(options, this.MockFilePath, this.cts.Token));
}
[Theory]
[InlineData(false)]
[InlineData(true)]
public Task IdentifyWithFormatAsync_CustomConfiguration_Stream(bool isInputStreamSeekable)
{
this.isTestStreamSeekable = isInputStreamSeekable;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
Configuration configuration = Configuration.CreateDefaultInstance();
configuration.FileSystem = new SingleStreamFileSystem(pausedStream);
DecoderOptions options = new() DecoderOptions options = new()
{ {
Configuration = this.TopLevelConfiguration Configuration = configuration
}; };
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.IdentifyWithFormatAsync(options, this.DataStream, this.cts.Token)); await Assert.ThrowsAsync<TaskCanceledException>(async () =>
}
[Fact]
public Task IdentifyWithFormatAsync_CustomConfiguration_Path()
{
this.isTestStreamSeekable = true;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
DecoderOptions options = new()
{ {
Configuration = this.TopLevelConfiguration using Image image = await Image.LoadAsync(options, "someFakeFile", cts.Token);
}; });
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.IdentifyWithFormatAsync(options, this.MockFilePath, this.cts.Token));
}
[Fact]
public Task IdentifyWithFormatAsync_DefaultConfiguration_Stream()
{
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
return Assert.ThrowsAsync<TaskCanceledException>(() => Image.IdentifyWithFormatAsync(this.DataStream, this.cts.Token));
}
private async Task DoCancel()
{
// wait until we reach the middle of the steam
await this.notifyWaitPositionReachedSemaphore.WaitAsync();
// set the cancellation
this.cts.Cancel();
// continue processing the stream
this.continueSemaphore.Release();
} }
protected override Stream CreateStream() => this.TestFormat.CreateAsyncSemaphoreStream(this.notifyWaitPositionReachedSemaphore, this.continueSemaphore, this.isTestStreamSeekable); protected override Stream CreateStream() => this.TestFormat.CreateAsyncSemaphoreStream(this.notifyWaitPositionReachedSemaphore, this.continueSemaphore, this.isTestStreamSeekable);

Loading…
Cancel
Save