mirror of https://github.com/SixLabors/ImageSharp
2 changed files with 164 additions and 0 deletions
@ -0,0 +1,71 @@ |
|||||
|
// Copyright (c) Six Labors.
|
||||
|
// Licensed under the Apache License, Version 2.0.
|
||||
|
|
||||
|
using System; |
||||
|
using System.IO; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
|
||||
|
namespace SixLabors.ImageSharp.Tests.TestUtilities |
||||
|
{ |
||||
|
internal class SemaphoreReadMemoryStream : MemoryStream |
||||
|
{ |
||||
|
private SemaphoreSlim waitSemaphore; |
||||
|
private readonly SemaphoreSlim signalFinishedSemaphore; |
||||
|
private readonly long waitAfterPosition; |
||||
|
|
||||
|
public SemaphoreReadMemoryStream(byte[] buffer, SemaphoreSlim waitSemaphore, SemaphoreSlim signalFinishedSemaphore, long waitAfterPosition) |
||||
|
: base(buffer) |
||||
|
{ |
||||
|
this.waitSemaphore = waitSemaphore; |
||||
|
this.signalFinishedSemaphore = signalFinishedSemaphore; |
||||
|
this.waitAfterPosition = waitAfterPosition; |
||||
|
} |
||||
|
|
||||
|
public override int Read(byte[] buffer, int offset, int count) |
||||
|
{ |
||||
|
int read = base.Read(buffer, offset, count); |
||||
|
if (this.Position + read > this.waitAfterPosition) |
||||
|
{ |
||||
|
this.waitSemaphore.Wait(); |
||||
|
} |
||||
|
|
||||
|
this.SignalIfFinished(); |
||||
|
|
||||
|
return read; |
||||
|
} |
||||
|
|
||||
|
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) |
||||
|
{ |
||||
|
int read = await base.ReadAsync(buffer, offset, count, cancellationToken); |
||||
|
if (this.Position + read > this.waitAfterPosition) |
||||
|
{ |
||||
|
await this.waitSemaphore.WaitAsync(); |
||||
|
} |
||||
|
|
||||
|
this.SignalIfFinished(); |
||||
|
|
||||
|
return read; |
||||
|
} |
||||
|
|
||||
|
public override int ReadByte() |
||||
|
{ |
||||
|
if (this.Position + 1 > this.waitAfterPosition) |
||||
|
{ |
||||
|
this.waitSemaphore.Wait(); |
||||
|
} |
||||
|
|
||||
|
int result = base.ReadByte(); |
||||
|
this.SignalIfFinished(); |
||||
|
return result; |
||||
|
} |
||||
|
|
||||
|
private void SignalIfFinished() |
||||
|
{ |
||||
|
if (this.Position == this.Length) |
||||
|
{ |
||||
|
this.signalFinishedSemaphore.Release(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,93 @@ |
|||||
|
// Copyright (c) Six Labors.
|
||||
|
// Licensed under the Apache License, Version 2.0.
|
||||
|
|
||||
|
using System.IO; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using SixLabors.ImageSharp.Tests.TestUtilities; |
||||
|
using Xunit; |
||||
|
|
||||
|
namespace SixLabors.ImageSharp.Tests |
||||
|
{ |
||||
|
public class SemaphoreReadMemoryStreamTests |
||||
|
{ |
||||
|
private readonly SemaphoreSlim WaitSemaphore = new SemaphoreSlim(0); |
||||
|
private readonly SemaphoreSlim FinishedSemaphore = new SemaphoreSlim(0); |
||||
|
private readonly byte[] Buffer = new byte[128]; |
||||
|
|
||||
|
[Fact] |
||||
|
public void Read_BeforeWaitLimit_ShouldFinish() |
||||
|
{ |
||||
|
using Stream stream = this.GetStream(); |
||||
|
int read = stream.Read(this.Buffer); |
||||
|
Assert.Equal(this.Buffer.Length, read); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task ReadAsync_BeforeWaitLimit_ShouldFinish() |
||||
|
{ |
||||
|
using Stream stream = this.GetStream(); |
||||
|
int read = await stream.ReadAsync(this.Buffer); |
||||
|
Assert.Equal(this.Buffer.Length, read); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task Read_AfterWaitLimit_ShouldPause() |
||||
|
{ |
||||
|
using Stream stream = this.GetStream(); |
||||
|
stream.Read(this.Buffer); |
||||
|
|
||||
|
Task readTask = Task.Factory.StartNew(() => stream.Read(new byte[512]), TaskCreationOptions.LongRunning); |
||||
|
await Task.Delay(5); |
||||
|
Assert.False(readTask.IsCompleted); |
||||
|
this.WaitSemaphore.Release(); |
||||
|
await readTask; |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task ReadAsync_AfterWaitLimit_ShouldPause() |
||||
|
{ |
||||
|
using Stream stream = this.GetStream(); |
||||
|
await stream.ReadAsync(this.Buffer); |
||||
|
|
||||
|
Task readTask = |
||||
|
Task.Factory.StartNew(() => stream.ReadAsync(new byte[512]).AsTask(), TaskCreationOptions.LongRunning); |
||||
|
await Task.Delay(5); |
||||
|
Assert.False(readTask.IsCompleted); |
||||
|
this.WaitSemaphore.Release(); |
||||
|
await readTask; |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task Read_WhenFinished_ShouldNotify() |
||||
|
{ |
||||
|
using Stream stream = this.GetStream(512, int.MaxValue); |
||||
|
stream.Read(this.Buffer); |
||||
|
stream.Read(this.Buffer); |
||||
|
stream.Read(this.Buffer); |
||||
|
Assert.Equal(0, this.FinishedSemaphore.CurrentCount); |
||||
|
stream.Read(this.Buffer); |
||||
|
Assert.Equal(1, this.FinishedSemaphore.CurrentCount); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public async Task ReadAsync_WhenFinished_ShouldNotify() |
||||
|
{ |
||||
|
using Stream stream = this.GetStream(512, int.MaxValue); |
||||
|
await stream.ReadAsync(this.Buffer); |
||||
|
await stream.ReadAsync(this.Buffer); |
||||
|
await stream.ReadAsync(this.Buffer); |
||||
|
Assert.Equal(0, this.FinishedSemaphore.CurrentCount); |
||||
|
|
||||
|
Task lastRead = stream.ReadAsync(this.Buffer).AsTask(); |
||||
|
Task finishedTask = this.FinishedSemaphore.WaitAsync(); |
||||
|
await Task.WhenAll(lastRead, finishedTask); |
||||
|
} |
||||
|
|
||||
|
private Stream GetStream(int size = 1024, int waitAfterPosition = 256) |
||||
|
{ |
||||
|
byte[] buffer = new byte[size]; |
||||
|
return new SemaphoreReadMemoryStream(buffer, this.WaitSemaphore, this.FinishedSemaphore, waitAfterPosition); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue