Browse Source

cover JpegDecoder's own cancellation support

pull/2301/head
Anton Firszov 3 years ago
parent
commit
937e78ce80
  1. 67
      tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs
  2. 15
      tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs
  3. 168
      tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs
  4. 6
      tests/ImageSharp.Tests/TestUtilities/PausedStream.cs

67
tests/ImageSharp.Tests/Formats/Jpg/JpegDecoderTests.cs

@ -251,15 +251,27 @@ public partial class JpegDecoderTests
Assert.IsType<InvalidMemoryOperationException>(ex.InnerException);
}
private static readonly TestFile CancellationTestFile = TestFile.Create(TestImages.Jpeg.Baseline.Jpeg420Small);
public static readonly TheoryData<bool, double> CancellationData = new()
{
{ false, 0 },
{ false, 0.5 },
{ false, 0.9 },
{ true, 0 },
{ true, 0.5 },
{ true, 0.9 },
};
[Theory]
[InlineData(0)]
[InlineData(0.5)]
[InlineData(0.9)]
public async Task DecodeAsync_IsCancellable(double percentageOfStreamReadToCancel)
[MemberData(nameof(CancellationData))]
public async Task DecodeAsync_IsCancellable(bool useMemoryStream, double percentageOfStreamReadToCancel)
{
var cts = new CancellationTokenSource();
string file = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, TestImages.Jpeg.Baseline.Jpeg420Small);
using var pausedStream = new PausedStream(file);
CancellationTokenSource cts = new();
using IPausedStream pausedStream = useMemoryStream ?
new PausedMemoryStream(CancellationTestFile.Bytes) :
new PausedStream(CancellationTestFile.FullPath);
pausedStream.OnWaiting(s =>
{
if (s.Position >= s.Length * percentageOfStreamReadToCancel)
@ -273,40 +285,51 @@ public partial class JpegDecoderTests
}
});
var configuration = Configuration.CreateDefaultInstance();
configuration.FileSystem = new SingleStreamFileSystem(pausedStream);
Configuration configuration = Configuration.CreateDefaultInstance();
configuration.FileSystem = new SingleStreamFileSystem((Stream)pausedStream);
DecoderOptions options = new()
{
Configuration = configuration
};
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
TimeSpan testTimeout = TimeSpan.FromSeconds(10);
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
{
using Image image = await Image.LoadAsync(options, "someFakeFile", cts.Token);
});
}).WaitAsync(testTimeout);
}
[Fact]
public async Task Identify_IsCancellable()
[Theory]
[MemberData(nameof(CancellationData))]
public async Task Identify_IsCancellable(bool useMemoryStream, double percentageOfStreamReadToCancel)
{
var cts = new CancellationTokenSource();
CancellationTokenSource cts = new();
using IPausedStream pausedStream = useMemoryStream ?
new PausedMemoryStream(CancellationTestFile.Bytes) :
new PausedStream(CancellationTestFile.FullPath);
string file = Path.Combine(TestEnvironment.InputImagesDirectoryFullPath, TestImages.Jpeg.Baseline.Jpeg420Small);
using var pausedStream = new PausedStream(file);
pausedStream.OnWaiting(_ =>
pausedStream.OnWaiting(s =>
{
cts.Cancel();
pausedStream.Release();
if (s.Position >= s.Length * percentageOfStreamReadToCancel)
{
cts.Cancel();
pausedStream.Release();
}
else
{
pausedStream.Next();
}
});
var configuration = Configuration.CreateDefaultInstance();
configuration.FileSystem = new SingleStreamFileSystem(pausedStream);
Configuration configuration = Configuration.CreateDefaultInstance();
configuration.FileSystem = new SingleStreamFileSystem((Stream)pausedStream);
DecoderOptions options = new()
{
Configuration = configuration
};
await Assert.ThrowsAsync<TaskCanceledException>(async () => await Image.IdentifyAsync(options, "someFakeFile", cts.Token));
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await Image.IdentifyAsync(options, "someFakeFile", cts.Token));
}
[Theory]

15
tests/ImageSharp.Tests/TestUtilities/IPausedStream.cs

@ -0,0 +1,15 @@
// Copyright (c) Six Labors.
// Licensed under the Six Labors Split License.
namespace SixLabors.ImageSharp.Tests.TestUtilities;
public interface IPausedStream : IDisposable
{
public void OnWaiting(Action<Stream> onWaitingCallback);
public void OnWaiting(Action onWaitingCallback);
public void Next();
public void Release();
}

168
tests/ImageSharp.Tests/TestUtilities/PausedMemoryStream.cs

@ -0,0 +1,168 @@
// Copyright (c) Six Labors.
// Licensed under the Six Labors Split License.
using System.Buffers;
namespace SixLabors.ImageSharp.Tests.TestUtilities;
/// <summary>
/// <see cref="PausedMemoryStream"/> is a variant of <see cref="PausedStream"/> that derives from <see cref="MemoryStream"/> instead of encapsulating it.
/// It is used to test decoder cancellation without relying on of our standard prefetching of arbitrary streams to <see cref="ImageSharp.IO.ChunkedMemoryStream"/>
/// on asynchronous path.
/// </summary>
public class PausedMemoryStream : MemoryStream, IPausedStream
{
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
private readonly CancellationTokenSource cancelationTokenSource = new CancellationTokenSource();
private Action<Stream> onWaitingCallback;
public void OnWaiting(Action<Stream> onWaitingCallback) => this.onWaitingCallback = onWaitingCallback;
public void OnWaiting(Action onWaitingCallback) => this.OnWaiting(_ => onWaitingCallback());
public void Release()
{
this.semaphore.Release();
this.cancelationTokenSource.Cancel();
}
public void Next() => this.semaphore.Release();
private void Wait()
{
if (this.cancelationTokenSource.IsCancellationRequested)
{
return;
}
this.onWaitingCallback?.Invoke(this);
try
{
this.semaphore.Wait(this.cancelationTokenSource.Token);
}
catch (OperationCanceledException)
{
// ignore this as its just used to unlock any waits in progress
}
}
private async Task Await(Func<Task> action)
{
await Task.Yield();
this.Wait();
await action();
}
private async Task<T> Await<T>(Func<Task<T>> action)
{
await Task.Yield();
this.Wait();
return await action();
}
private T Await<T>(Func<T> action)
{
this.Wait();
return action();
}
private void Await(Action action)
{
this.Wait();
action();
}
public PausedMemoryStream(byte[] data)
: base(data)
{
}
public override bool CanTimeout => base.CanTimeout;
public override void Close() => this.Await(() => base.Close());
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
{
// To make sure the copy operation is buffered and pausable, we should override MemoryStream's strategy
// with the default Stream copy logic based of System.IO.Stream:
// https://github.com/dotnet/runtime/blob/4f53c2f7e62df44f07cf410df8a0d439f42a0a71/src/libraries/System.Private.CoreLib/src/System/IO/Stream.cs#L104-L116
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
int bytesRead;
while ((bytesRead = await this.ReadAsync(new Memory<byte>(buffer), cancellationToken).ConfigureAwait(false)) != 0)
{
await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public override bool CanRead => base.CanRead;
public override bool CanSeek => base.CanSeek;
public override bool CanWrite => base.CanWrite;
//public override long Length => this.Await(() => base.Length);
//public override long Position { get => this.Await(() => base.Position); set => this.Await(() => base.Position = value); }
public override void Flush() => this.Await(() => base.Flush());
public override int Read(byte[] buffer, int offset, int count) => this.Await(() => base.Read(buffer, offset, count));
public override long Seek(long offset, SeekOrigin origin) => this.Await(() => base.Seek(offset, origin));
public override void SetLength(long value) => this.Await(() => base.SetLength(value));
public override void Write(byte[] buffer, int offset, int count) => this.Await(() => base.Write(buffer, offset, count));
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => this.Await(() => base.ReadAsync(buffer, offset, count, cancellationToken));
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => this.Await(() => base.WriteAsync(buffer, offset, count, cancellationToken));
public override void WriteByte(byte value) => this.Await(() => base.WriteByte(value));
public override int ReadByte() => this.Await(() => base.ReadByte());
public override void CopyTo(Stream destination, int bufferSize)
{
// See comments on CopyToAsync.
byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
try
{
int bytesRead;
while ((bytesRead = this.Read(buffer, 0, buffer.Length)) != 0)
{
destination.Write(buffer, 0, bytesRead);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
public override int Read(Span<byte> buffer)
{
this.Wait();
return base.Read(buffer);
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) => this.Await(() => base.ReadAsync(buffer, cancellationToken));
public override void Write(ReadOnlySpan<byte> buffer)
{
this.Wait();
base.Write(buffer);
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) => this.Await(() => base.WriteAsync(buffer, cancellationToken));
}

6
tests/ImageSharp.Tests/TestUtilities/PausedStream.cs

@ -5,7 +5,7 @@ using System.Buffers;
namespace SixLabors.ImageSharp.Tests.TestUtilities;
public class PausedStream : Stream
public class PausedStream : Stream, IPausedStream
{
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
@ -113,9 +113,9 @@ public class PausedStream : Stream
public override bool CanWrite => this.innerStream.CanWrite;
public override long Length => this.Await(() => this.innerStream.Length);
public override long Length => this.innerStream.Length;
public override long Position { get => this.Await(() => this.innerStream.Position); set => this.Await(() => this.innerStream.Position = value); }
public override long Position { get => this.innerStream.Position; set => this.innerStream.Position = value; }
public override void Flush() => this.Await(() => this.innerStream.Flush());

Loading…
Cancel
Save