Browse Source

SemaphoreReadMemoryStream

pull/1296/head
Anton Firszov 6 years ago
parent
commit
672420cbbd
  1. 71
      tests/ImageSharp.Tests/TestUtilities/SemaphoreReadMemoryStream.cs
  2. 93
      tests/ImageSharp.Tests/TestUtilities/Tests/SemaphoreReadMemoryStreamTests.cs

71
tests/ImageSharp.Tests/TestUtilities/SemaphoreReadMemoryStream.cs

@ -0,0 +1,71 @@
// Copyright (c) Six Labors.
// Licensed under the Apache License, Version 2.0.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace SixLabors.ImageSharp.Tests.TestUtilities
{
internal class SemaphoreReadMemoryStream : MemoryStream
{
private SemaphoreSlim waitSemaphore;
private readonly SemaphoreSlim signalFinishedSemaphore;
private readonly long waitAfterPosition;
public SemaphoreReadMemoryStream(byte[] buffer, SemaphoreSlim waitSemaphore, SemaphoreSlim signalFinishedSemaphore, long waitAfterPosition)
: base(buffer)
{
this.waitSemaphore = waitSemaphore;
this.signalFinishedSemaphore = signalFinishedSemaphore;
this.waitAfterPosition = waitAfterPosition;
}
public override int Read(byte[] buffer, int offset, int count)
{
int read = base.Read(buffer, offset, count);
if (this.Position + read > this.waitAfterPosition)
{
this.waitSemaphore.Wait();
}
this.SignalIfFinished();
return read;
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
int read = await base.ReadAsync(buffer, offset, count, cancellationToken);
if (this.Position + read > this.waitAfterPosition)
{
await this.waitSemaphore.WaitAsync();
}
this.SignalIfFinished();
return read;
}
public override int ReadByte()
{
if (this.Position + 1 > this.waitAfterPosition)
{
this.waitSemaphore.Wait();
}
int result = base.ReadByte();
this.SignalIfFinished();
return result;
}
private void SignalIfFinished()
{
if (this.Position == this.Length)
{
this.signalFinishedSemaphore.Release();
}
}
}
}

93
tests/ImageSharp.Tests/TestUtilities/Tests/SemaphoreReadMemoryStreamTests.cs

@ -0,0 +1,93 @@
// Copyright (c) Six Labors.
// Licensed under the Apache License, Version 2.0.
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using SixLabors.ImageSharp.Tests.TestUtilities;
using Xunit;
namespace SixLabors.ImageSharp.Tests
{
public class SemaphoreReadMemoryStreamTests
{
private readonly SemaphoreSlim WaitSemaphore = new SemaphoreSlim(0);
private readonly SemaphoreSlim FinishedSemaphore = new SemaphoreSlim(0);
private readonly byte[] Buffer = new byte[128];
[Fact]
public void Read_BeforeWaitLimit_ShouldFinish()
{
using Stream stream = this.GetStream();
int read = stream.Read(this.Buffer);
Assert.Equal(this.Buffer.Length, read);
}
[Fact]
public async Task ReadAsync_BeforeWaitLimit_ShouldFinish()
{
using Stream stream = this.GetStream();
int read = await stream.ReadAsync(this.Buffer);
Assert.Equal(this.Buffer.Length, read);
}
[Fact]
public async Task Read_AfterWaitLimit_ShouldPause()
{
using Stream stream = this.GetStream();
stream.Read(this.Buffer);
Task readTask = Task.Factory.StartNew(() => stream.Read(new byte[512]), TaskCreationOptions.LongRunning);
await Task.Delay(5);
Assert.False(readTask.IsCompleted);
this.WaitSemaphore.Release();
await readTask;
}
[Fact]
public async Task ReadAsync_AfterWaitLimit_ShouldPause()
{
using Stream stream = this.GetStream();
await stream.ReadAsync(this.Buffer);
Task readTask =
Task.Factory.StartNew(() => stream.ReadAsync(new byte[512]).AsTask(), TaskCreationOptions.LongRunning);
await Task.Delay(5);
Assert.False(readTask.IsCompleted);
this.WaitSemaphore.Release();
await readTask;
}
[Fact]
public async Task Read_WhenFinished_ShouldNotify()
{
using Stream stream = this.GetStream(512, int.MaxValue);
stream.Read(this.Buffer);
stream.Read(this.Buffer);
stream.Read(this.Buffer);
Assert.Equal(0, this.FinishedSemaphore.CurrentCount);
stream.Read(this.Buffer);
Assert.Equal(1, this.FinishedSemaphore.CurrentCount);
}
[Fact]
public async Task ReadAsync_WhenFinished_ShouldNotify()
{
using Stream stream = this.GetStream(512, int.MaxValue);
await stream.ReadAsync(this.Buffer);
await stream.ReadAsync(this.Buffer);
await stream.ReadAsync(this.Buffer);
Assert.Equal(0, this.FinishedSemaphore.CurrentCount);
Task lastRead = stream.ReadAsync(this.Buffer).AsTask();
Task finishedTask = this.FinishedSemaphore.WaitAsync();
await Task.WhenAll(lastRead, finishedTask);
}
private Stream GetStream(int size = 1024, int waitAfterPosition = 256)
{
byte[] buffer = new byte[size];
return new SemaphoreReadMemoryStream(buffer, this.WaitSemaphore, this.FinishedSemaphore, waitAfterPosition);
}
}
}
Loading…
Cancel
Save