Browse Source

tests for general cancellation

pull/1574/head
Anton Firszov 6 years ago
parent
commit
103d676bd4
  1. 77
      src/ImageSharp/Image.FromFile.cs
  2. 82
      tests/ImageSharp.Tests/Image/ImageTests.Decode_Cancellation.cs
  3. 2
      tests/ImageSharp.Tests/Image/ImageTests.DetectFormat.cs
  4. 2
      tests/ImageSharp.Tests/Image/ImageTests.Identify.cs
  5. 21
      tests/ImageSharp.Tests/Image/ImageTests.ImageLoadTestBase.cs
  6. 18
      tests/ImageSharp.Tests/Image/ImageTests.Load_FileSystemPath_UseDefaultConfiguration.cs
  7. 2
      tests/ImageSharp.Tests/Image/ImageTests.Load_FromBytes_PassLocalConfiguration.cs
  8. 2
      tests/ImageSharp.Tests/ImageSharp.Tests.csproj
  9. 10
      tests/ImageSharp.Tests/TestFileSystem.cs
  10. 27
      tests/ImageSharp.Tests/TestFormat.cs
  11. 49
      tests/ImageSharp.Tests/TestUtilities/SemaphoreReadMemoryStream.cs
  12. 77
      tests/ImageSharp.Tests/TestUtilities/Tests/SemaphoreReadMemoryStreamTests.cs

77
src/ImageSharp/Image.FromFile.cs

@ -197,6 +197,23 @@ namespace SixLabors.ImageSharp
public static Task<Image> LoadAsync(Configuration configuration, string path, IImageDecoder decoder)
=> LoadAsync(configuration, path, decoder, default);
/// <summary>
/// Create a new instance of the <see cref="Image"/> class from the given file.
/// </summary>
/// <param name="configuration">The Configuration.</param>
/// <param name="path">The file path to the image.</param>
/// <param name="decoder">The decoder.</param>
/// <exception cref="ArgumentNullException">The configuration is null.</exception>
/// <exception cref="ArgumentNullException">The path is null.</exception>
/// <exception cref="ArgumentNullException">The decoder is null.</exception>
/// <exception cref="UnknownImageFormatException">Image format not recognised.</exception>
/// <exception cref="InvalidImageContentException">Image contains invalid content.</exception>
/// <typeparam name="TPixel">The pixel format.</typeparam>
/// <returns>A <see cref="Task{Image}"/> representing the asynchronous operation.</returns>
public static Task<Image<TPixel>> LoadAsync<TPixel>(Configuration configuration, string path, IImageDecoder decoder)
where TPixel : unmanaged, IPixel<TPixel>
=> LoadAsync<TPixel>(configuration, path, decoder, default);
/// <summary>
/// Create a new instance of the <see cref="Image"/> class from the given file.
/// </summary>
@ -219,6 +236,66 @@ namespace SixLabors.ImageSharp
return LoadAsync(configuration, stream, decoder, cancellationToken);
}
/// <summary>
/// Create a new instance of the <see cref="Image"/> class from the given file.
/// </summary>
/// <param name="configuration">The Configuration.</param>
/// <param name="path">The file path to the image.</param>
/// <param name="decoder">The decoder.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <exception cref="ArgumentNullException">The configuration is null.</exception>
/// <exception cref="ArgumentNullException">The path is null.</exception>
/// <exception cref="ArgumentNullException">The decoder is null.</exception>
/// <exception cref="UnknownImageFormatException">Image format not recognised.</exception>
/// <exception cref="InvalidImageContentException">Image contains invalid content.</exception>
/// <typeparam name="TPixel">The pixel format.</typeparam>
/// <returns>A <see cref="Task{Image}"/> representing the asynchronous operation.</returns>
public static Task<Image<TPixel>> LoadAsync<TPixel>(Configuration configuration, string path, IImageDecoder decoder, CancellationToken cancellationToken)
where TPixel : unmanaged, IPixel<TPixel>
{
Guard.NotNull(configuration, nameof(configuration));
Guard.NotNull(path, nameof(path));
using Stream stream = configuration.FileSystem.OpenRead(path);
return LoadAsync<TPixel>(configuration, stream, decoder, cancellationToken);
}
/// <summary>
/// Create a new instance of the <see cref="Image"/> class from the given file.
/// </summary>
/// <param name="configuration">The configuration for the decoder.</param>
/// <param name="path">The file path to the image.</param>
/// <exception cref="ArgumentNullException">The configuration is null.</exception>
/// <exception cref="ArgumentNullException">The path is null.</exception>
/// <exception cref="UnknownImageFormatException">Image format not recognised.</exception>
/// <exception cref="InvalidImageContentException">Image contains invalid content.</exception>
/// <typeparam name="TPixel">The pixel format.</typeparam>
/// <returns>A <see cref="Task{Image}"/> representing the asynchronous operation.</returns>
public static Task<Image<TPixel>> LoadAsync<TPixel>(Configuration configuration, string path)
where TPixel : unmanaged, IPixel<TPixel>
=> LoadAsync<TPixel>(configuration, path, default(CancellationToken));
/// <summary>
/// Create a new instance of the <see cref="Image"/> class from the given file.
/// </summary>
/// <param name="configuration">The configuration for the decoder.</param>
/// <param name="path">The file path to the image.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <exception cref="ArgumentNullException">The configuration is null.</exception>
/// <exception cref="ArgumentNullException">The path is null.</exception>
/// <exception cref="UnknownImageFormatException">Image format not recognised.</exception>
/// <exception cref="InvalidImageContentException">Image contains invalid content.</exception>
/// <typeparam name="TPixel">The pixel format.</typeparam>
/// <returns>A <see cref="Task{Image}"/> representing the asynchronous operation.</returns>
public static async Task<Image<TPixel>> LoadAsync<TPixel>(Configuration configuration, string path, CancellationToken cancellationToken)
where TPixel : unmanaged, IPixel<TPixel>
{
using Stream stream = configuration.FileSystem.OpenRead(path);
(Image<TPixel> img, _) = await LoadWithFormatAsync<TPixel>(configuration, stream, cancellationToken)
.ConfigureAwait(false);
return img;
}
/// <summary>
/// Create a new instance of the <see cref="Image"/> class from the given file.
/// </summary>

82
tests/ImageSharp.Tests/Image/ImageTests.Decode_Cancellation.cs

@ -0,0 +1,82 @@
// Copyright (c) Six Labors.
// Licensed under the Apache License, Version 2.0.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using SixLabors.ImageSharp.PixelFormats;
using Xunit;
namespace SixLabors.ImageSharp.Tests
{
public partial class ImageTests
{
public class Decode_Cancellation : ImageLoadTestBase
{
private bool isTestStreamSeekable;
private readonly SemaphoreSlim notifyWaitPositionReachedSemaphore = new SemaphoreSlim(0);
private readonly SemaphoreSlim continueSemaphore = new SemaphoreSlim(0);
private readonly CancellationTokenSource cts = new CancellationTokenSource();
public Decode_Cancellation()
{
this.TopLevelConfiguration.StreamProcessingBufferSize = 128;
}
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task LoadAsync_Specific_Stream_WhenCancelledDuringRead(bool isInputStreamSeekable)
{
this.isTestStreamSeekable = isInputStreamSeekable;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
await Assert.ThrowsAsync<TaskCanceledException>(() => Image.LoadAsync<Rgb24>(this.TopLevelConfiguration, this.DataStream, this.cts.Token));
}
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task LoadAsync_Agnostic_Stream_WhenCancelledDuringRead(bool isInputStreamSeekable)
{
this.isTestStreamSeekable = isInputStreamSeekable;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
await Assert.ThrowsAsync<TaskCanceledException>(() => Image.LoadAsync(this.TopLevelConfiguration, this.DataStream, this.cts.Token));
}
[Fact]
public async Task LoadAsync_Agnostic_Path_WhenCancelledDuringRead()
{
this.isTestStreamSeekable = true;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
await Assert.ThrowsAsync<TaskCanceledException>(() => Image.LoadAsync(this.TopLevelConfiguration, this.MockFilePath, this.cts.Token));
}
[Fact]
public async Task LoadAsync_Specific_Path_WhenCancelledDuringRead()
{
this.isTestStreamSeekable = true;
_ = Task.Factory.StartNew(this.DoCancel, TaskCreationOptions.LongRunning);
await Assert.ThrowsAsync<TaskCanceledException>(() => Image.LoadAsync<Rgb24>(this.TopLevelConfiguration, this.MockFilePath, this.cts.Token));
}
private async Task DoCancel()
{
// wait until we reach the middle of the steam
await this.notifyWaitPositionReachedSemaphore.WaitAsync();
// set the cancellation
this.cts.Cancel();
// continue processing the stream
this.continueSemaphore.Release();
}
protected override Stream CreateStream() => this.TestFormat.CreateAsyncSamaphoreStream(this.notifyWaitPositionReachedSemaphore, this.continueSemaphore, this.isTestStreamSeekable);
}
}
}

2
tests/ImageSharp.Tests/Image/ImageTests.DetectFormat.cs

@ -24,8 +24,6 @@ namespace SixLabors.ImageSharp.Tests
private ReadOnlySpan<byte> ActualImageSpan => this.ActualImageBytes.AsSpan();
private byte[] ByteArray => this.DataStream.ToArray();
private IImageFormat LocalImageFormat => this.localImageFormatMock.Object;
private static readonly IImageFormat ExpectedGlobalFormat =

2
tests/ImageSharp.Tests/Image/ImageTests.Identify.cs

@ -21,8 +21,6 @@ namespace SixLabors.ImageSharp.Tests
private byte[] ActualImageBytes => TestFile.Create(TestImages.Bmp.F).Bytes;
private byte[] ByteArray => this.DataStream.ToArray();
private IImageInfo LocalImageInfo => this.localImageInfoMock.Object;
private IImageFormat LocalImageFormat => this.localImageFormatMock.Object;

21
tests/ImageSharp.Tests/Image/ImageTests.ImageLoadTestBase.cs

@ -16,6 +16,8 @@ namespace SixLabors.ImageSharp.Tests
{
public abstract class ImageLoadTestBase : IDisposable
{
private Lazy<Stream> dataStreamLazy;
protected Image<Rgba32> localStreamReturnImageRgba32;
protected Image<Bgra4444> localStreamReturnImageAgnostic;
@ -46,10 +48,12 @@ namespace SixLabors.ImageSharp.Tests
public byte[] Marker { get; }
public MemoryStream DataStream { get; }
public Stream DataStream => this.dataStreamLazy.Value;
public byte[] DecodedData { get; private set; }
protected byte[] ByteArray => ((MemoryStream)this.DataStream).ToArray();
protected ImageLoadTestBase()
{
this.localStreamReturnImageRgba32 = new Image<Rgba32>(1, 1);
@ -61,9 +65,8 @@ namespace SixLabors.ImageSharp.Tests
var detector = new Mock<IImageInfoDetector>();
detector.Setup(x => x.Identify(It.IsAny<Configuration>(), It.IsAny<Stream>())).Returns(this.localImageInfoMock.Object);
detector.Setup(x => x.IdentifyAsync(It.IsAny<Configuration>(), It.IsAny<Stream>(), It.IsAny<CancellationToken>())).ReturnsAsync(this.localImageInfoMock.Object);
this.localDecoder = detector.As<IImageDecoder>();
this.localMimeTypeDetector = new MockImageFormatDetector(this.localImageFormatMock.Object);
this.localDecoder = detector.As<IImageDecoder>();
this.localDecoder.Setup(x => x.Decode<Rgba32>(It.IsAny<Configuration>(), It.IsAny<Stream>()))
.Callback<Configuration, Stream>((c, s) =>
{
@ -86,6 +89,8 @@ namespace SixLabors.ImageSharp.Tests
})
.Returns(this.localStreamReturnImageAgnostic);
this.localMimeTypeDetector = new MockImageFormatDetector(this.localImageFormatMock.Object);
this.LocalConfiguration = new Configuration();
this.LocalConfiguration.ImageFormatsManager.AddImageFormatDetector(this.localMimeTypeDetector);
this.LocalConfiguration.ImageFormatsManager.SetDecoder(this.localImageFormatMock.Object, this.localDecoder.Object);
@ -93,10 +98,12 @@ namespace SixLabors.ImageSharp.Tests
this.TopLevelConfiguration = new Configuration(this.TestFormat);
this.Marker = Guid.NewGuid().ToByteArray();
this.DataStream = this.TestFormat.CreateStream(this.Marker);
this.LocalFileSystemMock.Setup(x => x.OpenRead(this.MockFilePath)).Returns(this.DataStream);
this.topLevelFileSystem.AddFile(this.MockFilePath, this.DataStream);
this.dataStreamLazy = new Lazy<Stream>(this.CreateStream);
Stream StreamFactory() => this.DataStream;
this.LocalFileSystemMock.Setup(x => x.OpenRead(this.MockFilePath)).Returns(StreamFactory);
this.topLevelFileSystem.AddFile(this.MockFilePath, StreamFactory);
this.LocalConfiguration.FileSystem = this.LocalFileSystemMock.Object;
this.TopLevelConfiguration.FileSystem = this.topLevelFileSystem;
}
@ -107,6 +114,8 @@ namespace SixLabors.ImageSharp.Tests
this.localStreamReturnImageRgba32?.Dispose();
this.localStreamReturnImageAgnostic?.Dispose();
}
protected virtual Stream CreateStream() => this.TestFormat.CreateStream(this.Marker);
}
}
}

18
tests/ImageSharp.Tests/Image/ImageTests.Load_FileSystemPath_UseDefaultConfiguration.cs

@ -49,6 +49,15 @@ namespace SixLabors.ImageSharp.Tests
}
}
[Fact]
public async Task Path_Specific_Async()
{
using (var img = await Image.LoadAsync<Rgb24>(Configuration.Default, this.Path))
{
VerifyDecodedImage(img);
}
}
[Fact]
public async Task Path_Agnostic_Configuration_Async()
{
@ -85,6 +94,15 @@ namespace SixLabors.ImageSharp.Tests
}
}
[Fact]
public async Task Path_Decoder_Specific_Async()
{
using (var img = await Image.LoadAsync<Rgb24>(Configuration.Default, this.Path, new BmpDecoder()))
{
VerifyDecodedImage(img);
}
}
[Fact]
public void Path_OutFormat_Specific()
{

2
tests/ImageSharp.Tests/Image/ImageTests.Load_FromBytes_PassLocalConfiguration.cs

@ -14,8 +14,6 @@ namespace SixLabors.ImageSharp.Tests
{
public class Load_FromBytes_PassLocalConfiguration : ImageLoadTestBase
{
private byte[] ByteArray => this.DataStream.ToArray();
private ReadOnlySpan<byte> ByteSpan => this.ByteArray.AsSpan();
[Theory]

2
tests/ImageSharp.Tests/ImageSharp.Tests.csproj

@ -22,7 +22,7 @@
<PackageReference Include="Magick.NET-Q16-AnyCPU" />
<PackageReference Include="Microsoft.DotNet.RemoteExecutor" />
<PackageReference Include="Moq" />
<PackageReference Include="SharpZipLib" />
<PackageReference Include="SharpZipLib" />
<PackageReference Include="System.Drawing.Common" />
</ItemGroup>

10
tests/ImageSharp.Tests/TestFileSystem.cs

@ -12,9 +12,9 @@ namespace SixLabors.ImageSharp.Tests
/// </summary>
public class TestFileSystem : ImageSharp.IO.IFileSystem
{
private readonly Dictionary<string, Stream> fileSystem = new Dictionary<string, Stream>(StringComparer.OrdinalIgnoreCase);
private readonly Dictionary<string, Func<Stream>> fileSystem = new Dictionary<string, Func<Stream>>(StringComparer.OrdinalIgnoreCase);
public void AddFile(string path, Stream data)
public void AddFile(string path, Func<Stream> data)
{
lock (this.fileSystem)
{
@ -29,7 +29,7 @@ namespace SixLabors.ImageSharp.Tests
{
if (this.fileSystem.ContainsKey(path))
{
Stream stream = this.fileSystem[path];
Stream stream = this.fileSystem[path]();
stream.Position = 0;
return stream;
}
@ -45,7 +45,7 @@ namespace SixLabors.ImageSharp.Tests
{
if (this.fileSystem.ContainsKey(path))
{
Stream stream = this.fileSystem[path];
Stream stream = this.fileSystem[path]();
stream.Position = 0;
return stream;
}
@ -54,4 +54,4 @@ namespace SixLabors.ImageSharp.Tests
return File.OpenRead(path);
}
}
}
}

27
tests/ImageSharp.Tests/TestFormat.cs

@ -10,6 +10,7 @@ using System.Threading;
using System.Threading.Tasks;
using SixLabors.ImageSharp.Formats;
using SixLabors.ImageSharp.PixelFormats;
using SixLabors.ImageSharp.Tests.TestUtilities;
using Xunit;
namespace SixLabors.ImageSharp.Tests
@ -33,9 +34,9 @@ namespace SixLabors.ImageSharp.Tests
public List<DecodeOperation> DecodeCalls { get; } = new List<DecodeOperation>();
public IImageEncoder Encoder { get; }
public TestEncoder Encoder { get; }
public IImageDecoder Decoder { get; }
public TestDecoder Decoder { get; }
private byte[] header = Guid.NewGuid().ToByteArray();
@ -53,6 +54,14 @@ namespace SixLabors.ImageSharp.Tests
return ms;
}
public Stream CreateAsyncSamaphoreStream(SemaphoreSlim notifyWaitPositionReachedSemaphore, SemaphoreSlim continueSemaphore, bool seeakable, int size = 1024, int waitAfterPosition = 512)
{
var buffer = new byte[size];
this.header.CopyTo(buffer, 0);
var semaphoreStream = new SemaphoreReadMemoryStream(buffer, waitAfterPosition, notifyWaitPositionReachedSemaphore, continueSemaphore);
return seeakable ? (Stream)semaphoreStream : new AsyncStreamWrapper(semaphoreStream, () => false);
}
public void VerifySpecificDecodeCall<TPixel>(byte[] marker, Configuration config)
where TPixel : unmanaged, IPixel<TPixel>
{
@ -205,9 +214,17 @@ namespace SixLabors.ImageSharp.Tests
public Image<TPixel> Decode<TPixel>(Configuration config, Stream stream)
where TPixel : unmanaged, IPixel<TPixel>
=> this.DecodeImpl<TPixel>(config, stream, default).GetAwaiter().GetResult();
public Task<Image<TPixel>> DecodeAsync<TPixel>(Configuration config, Stream stream, CancellationToken cancellationToken)
where TPixel : unmanaged, IPixel<TPixel>
=> this.DecodeImpl<TPixel>(config, stream, cancellationToken);
private async Task<Image<TPixel>> DecodeImpl<TPixel>(Configuration config, Stream stream, CancellationToken cancellationToken)
where TPixel : unmanaged, IPixel<TPixel>
{
var ms = new MemoryStream();
stream.CopyTo(ms);
await stream.CopyToAsync(ms, config.StreamProcessingBufferSize, cancellationToken);
var marker = ms.ToArray().Skip(this.testFormat.header.Length).ToArray();
this.testFormat.DecodeCalls.Add(new DecodeOperation
{
@ -220,10 +237,6 @@ namespace SixLabors.ImageSharp.Tests
return this.testFormat.Sample<TPixel>();
}
public Task<Image<TPixel>> DecodeAsync<TPixel>(Configuration config, Stream stream, CancellationToken cancellationToken)
where TPixel : unmanaged, IPixel<TPixel>
=> Task.FromResult(this.Decode<TPixel>(config, stream));
public bool IsSupportedFileFormat(Span<byte> header) => this.testFormat.IsSupportedFileFormat(header);
public Image Decode(Configuration configuration, Stream stream) => this.Decode<TestPixelForAgnosticDecode>(configuration, stream);

49
tests/ImageSharp.Tests/TestUtilities/SemaphoreReadMemoryStream.cs

@ -10,62 +10,59 @@ namespace SixLabors.ImageSharp.Tests.TestUtilities
{
internal class SemaphoreReadMemoryStream : MemoryStream
{
private SemaphoreSlim waitSemaphore;
private readonly SemaphoreSlim signalFinishedSemaphore;
private readonly long waitAfterPosition;
private readonly SemaphoreSlim continueSemaphore;
private readonly SemaphoreSlim notifyWaitPositionReachedSemaphore;
private int pauseDone;
private readonly long waitPosition;
public SemaphoreReadMemoryStream(byte[] buffer, SemaphoreSlim waitSemaphore, SemaphoreSlim signalFinishedSemaphore, long waitAfterPosition)
public SemaphoreReadMemoryStream(
byte[] buffer,
long waitPosition,
SemaphoreSlim notifyWaitPositionReachedSemaphore,
SemaphoreSlim continueSemaphore)
: base(buffer)
{
this.waitSemaphore = waitSemaphore;
this.signalFinishedSemaphore = signalFinishedSemaphore;
this.waitAfterPosition = waitAfterPosition;
this.continueSemaphore = continueSemaphore;
this.notifyWaitPositionReachedSemaphore = notifyWaitPositionReachedSemaphore;
this.waitPosition = waitPosition;
}
public override int Read(byte[] buffer, int offset, int count)
{
int read = base.Read(buffer, offset, count);
if (this.Position + read > this.waitAfterPosition)
if (this.Position > this.waitPosition && this.TryPause())
{
this.waitSemaphore.Wait();
this.notifyWaitPositionReachedSemaphore.Release();
this.continueSemaphore.Wait();
}
this.SignalIfFinished();
return read;
}
private bool TryPause() => Interlocked.CompareExchange(ref this.pauseDone, 1, 0) == 0;
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int read = await base.ReadAsync(buffer, offset, count, cancellationToken);
if (this.Position + read > this.waitAfterPosition)
if (this.Position > this.waitPosition && this.TryPause())
{
await this.waitSemaphore.WaitAsync();
this.notifyWaitPositionReachedSemaphore.Release();
await this.continueSemaphore.WaitAsync();
}
this.SignalIfFinished();
return read;
}
public override int ReadByte()
{
if (this.Position + 1 > this.waitAfterPosition)
if (this.Position + 1 > this.waitPosition && this.TryPause())
{
this.waitSemaphore.Wait();
this.notifyWaitPositionReachedSemaphore.Release();
this.continueSemaphore.Wait();
}
int result = base.ReadByte();
this.SignalIfFinished();
return result;
}
private void SignalIfFinished()
{
if (this.Position == this.Length)
{
this.signalFinishedSemaphore.Release();
}
}
}
}

77
tests/ImageSharp.Tests/TestUtilities/Tests/SemaphoreReadMemoryStreamTests.cs

@ -11,83 +11,72 @@ namespace SixLabors.ImageSharp.Tests
{
public class SemaphoreReadMemoryStreamTests
{
private readonly SemaphoreSlim WaitSemaphore = new SemaphoreSlim(0);
private readonly SemaphoreSlim FinishedSemaphore = new SemaphoreSlim(0);
private readonly byte[] Buffer = new byte[128];
private readonly SemaphoreSlim continueSemaphore = new SemaphoreSlim(0);
private readonly SemaphoreSlim notifyWaitPositionReachedSemaphore = new SemaphoreSlim(0);
private readonly byte[] buffer = new byte[128];
[Fact]
public void Read_BeforeWaitLimit_ShouldFinish()
{
using Stream stream = this.GetStream();
int read = stream.Read(this.Buffer);
Assert.Equal(this.Buffer.Length, read);
using Stream stream = this.CreateTestStream();
int read = stream.Read(this.buffer);
Assert.Equal(this.buffer.Length, read);
}
[Fact]
public async Task ReadAsync_BeforeWaitLimit_ShouldFinish()
{
using Stream stream = this.GetStream();
int read = await stream.ReadAsync(this.Buffer);
Assert.Equal(this.Buffer.Length, read);
using Stream stream = this.CreateTestStream();
int read = await stream.ReadAsync(this.buffer);
Assert.Equal(this.buffer.Length, read);
}
[Fact]
public async Task Read_AfterWaitLimit_ShouldPause()
{
using Stream stream = this.GetStream();
stream.Read(this.Buffer);
using Stream stream = this.CreateTestStream();
stream.Read(this.buffer);
Task readTask = Task.Factory.StartNew(() => stream.Read(new byte[512]), TaskCreationOptions.LongRunning);
Task readTask = Task.Factory.StartNew(
() =>
{
stream.Read(this.buffer);
stream.Read(this.buffer);
stream.Read(this.buffer);
stream.Read(this.buffer);
stream.Read(this.buffer);
}, TaskCreationOptions.LongRunning);
Assert.Equal(0, this.notifyWaitPositionReachedSemaphore.CurrentCount);
await Task.Delay(5);
Assert.False(readTask.IsCompleted);
this.WaitSemaphore.Release();
await this.notifyWaitPositionReachedSemaphore.WaitAsync();
await Task.Delay(5);
Assert.False(readTask.IsCompleted);
this.continueSemaphore.Release();
await readTask;
}
[Fact]
public async Task ReadAsync_AfterWaitLimit_ShouldPause()
{
using Stream stream = this.GetStream();
await stream.ReadAsync(this.Buffer);
using Stream stream = this.CreateTestStream();
await stream.ReadAsync(this.buffer);
Task readTask =
Task.Factory.StartNew(() => stream.ReadAsync(new byte[512]).AsTask(), TaskCreationOptions.LongRunning);
await Task.Delay(5);
Assert.False(readTask.IsCompleted);
this.WaitSemaphore.Release();
await this.notifyWaitPositionReachedSemaphore.WaitAsync();
await Task.Delay(5);
Assert.False(readTask.IsCompleted);
this.continueSemaphore.Release();
await readTask;
}
[Fact]
public async Task Read_WhenFinished_ShouldNotify()
{
using Stream stream = this.GetStream(512, int.MaxValue);
stream.Read(this.Buffer);
stream.Read(this.Buffer);
stream.Read(this.Buffer);
Assert.Equal(0, this.FinishedSemaphore.CurrentCount);
stream.Read(this.Buffer);
Assert.Equal(1, this.FinishedSemaphore.CurrentCount);
}
[Fact]
public async Task ReadAsync_WhenFinished_ShouldNotify()
{
using Stream stream = this.GetStream(512, int.MaxValue);
await stream.ReadAsync(this.Buffer);
await stream.ReadAsync(this.Buffer);
await stream.ReadAsync(this.Buffer);
Assert.Equal(0, this.FinishedSemaphore.CurrentCount);
Task lastRead = stream.ReadAsync(this.Buffer).AsTask();
Task finishedTask = this.FinishedSemaphore.WaitAsync();
await Task.WhenAll(lastRead, finishedTask);
}
private Stream GetStream(int size = 1024, int waitAfterPosition = 256)
private Stream CreateTestStream(int size = 1024, int waitAfterPosition = 256)
{
byte[] buffer = new byte[size];
return new SemaphoreReadMemoryStream(buffer, this.WaitSemaphore, this.FinishedSemaphore, waitAfterPosition);
return new SemaphoreReadMemoryStream(buffer, waitAfterPosition, this.notifyWaitPositionReachedSemaphore, this.continueSemaphore);
}
}
}

Loading…
Cancel
Save