From 672420cbbd9ad1dc417f7cdad2f1121247a62837 Mon Sep 17 00:00:00 2001 From: Anton Firszov Date: Wed, 22 Jul 2020 04:28:19 +0200 Subject: [PATCH] SemaphoreReadMemoryStream --- .../SemaphoreReadMemoryStream.cs | 71 ++++++++++++++ .../Tests/SemaphoreReadMemoryStreamTests.cs | 93 +++++++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 tests/ImageSharp.Tests/TestUtilities/SemaphoreReadMemoryStream.cs create mode 100644 tests/ImageSharp.Tests/TestUtilities/Tests/SemaphoreReadMemoryStreamTests.cs diff --git a/tests/ImageSharp.Tests/TestUtilities/SemaphoreReadMemoryStream.cs b/tests/ImageSharp.Tests/TestUtilities/SemaphoreReadMemoryStream.cs new file mode 100644 index 000000000..296611bb5 --- /dev/null +++ b/tests/ImageSharp.Tests/TestUtilities/SemaphoreReadMemoryStream.cs @@ -0,0 +1,71 @@ +// Copyright (c) Six Labors. +// Licensed under the Apache License, Version 2.0. + +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace SixLabors.ImageSharp.Tests.TestUtilities +{ + internal class SemaphoreReadMemoryStream : MemoryStream + { + private SemaphoreSlim waitSemaphore; + private readonly SemaphoreSlim signalFinishedSemaphore; + private readonly long waitAfterPosition; + + public SemaphoreReadMemoryStream(byte[] buffer, SemaphoreSlim waitSemaphore, SemaphoreSlim signalFinishedSemaphore, long waitAfterPosition) + : base(buffer) + { + this.waitSemaphore = waitSemaphore; + this.signalFinishedSemaphore = signalFinishedSemaphore; + this.waitAfterPosition = waitAfterPosition; + } + + public override int Read(byte[] buffer, int offset, int count) + { + int read = base.Read(buffer, offset, count); + if (this.Position + read > this.waitAfterPosition) + { + this.waitSemaphore.Wait(); + } + + this.SignalIfFinished(); + + return read; + } + + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + int read = await base.ReadAsync(buffer, offset, count, cancellationToken); + if (this.Position + read > this.waitAfterPosition) + { + await this.waitSemaphore.WaitAsync(); + } + + this.SignalIfFinished(); + + return read; + } + + public override int ReadByte() + { + if (this.Position + 1 > this.waitAfterPosition) + { + this.waitSemaphore.Wait(); + } + + int result = base.ReadByte(); + this.SignalIfFinished(); + return result; + } + + private void SignalIfFinished() + { + if (this.Position == this.Length) + { + this.signalFinishedSemaphore.Release(); + } + } + } +} diff --git a/tests/ImageSharp.Tests/TestUtilities/Tests/SemaphoreReadMemoryStreamTests.cs b/tests/ImageSharp.Tests/TestUtilities/Tests/SemaphoreReadMemoryStreamTests.cs new file mode 100644 index 000000000..ac90d5fb2 --- /dev/null +++ b/tests/ImageSharp.Tests/TestUtilities/Tests/SemaphoreReadMemoryStreamTests.cs @@ -0,0 +1,93 @@ +// Copyright (c) Six Labors. +// Licensed under the Apache License, Version 2.0. + +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using SixLabors.ImageSharp.Tests.TestUtilities; +using Xunit; + +namespace SixLabors.ImageSharp.Tests +{ + public class SemaphoreReadMemoryStreamTests + { + private readonly SemaphoreSlim WaitSemaphore = new SemaphoreSlim(0); + private readonly SemaphoreSlim FinishedSemaphore = new SemaphoreSlim(0); + private readonly byte[] Buffer = new byte[128]; + + [Fact] + public void Read_BeforeWaitLimit_ShouldFinish() + { + using Stream stream = this.GetStream(); + int read = stream.Read(this.Buffer); + Assert.Equal(this.Buffer.Length, read); + } + + [Fact] + public async Task ReadAsync_BeforeWaitLimit_ShouldFinish() + { + using Stream stream = this.GetStream(); + int read = await stream.ReadAsync(this.Buffer); + Assert.Equal(this.Buffer.Length, read); + } + + [Fact] + public async Task Read_AfterWaitLimit_ShouldPause() + { + using Stream stream = this.GetStream(); + stream.Read(this.Buffer); + + Task readTask = Task.Factory.StartNew(() => stream.Read(new byte[512]), TaskCreationOptions.LongRunning); + await Task.Delay(5); + Assert.False(readTask.IsCompleted); + this.WaitSemaphore.Release(); + await readTask; + } + + [Fact] + public async Task ReadAsync_AfterWaitLimit_ShouldPause() + { + using Stream stream = this.GetStream(); + await stream.ReadAsync(this.Buffer); + + Task readTask = + Task.Factory.StartNew(() => stream.ReadAsync(new byte[512]).AsTask(), TaskCreationOptions.LongRunning); + await Task.Delay(5); + Assert.False(readTask.IsCompleted); + this.WaitSemaphore.Release(); + await readTask; + } + + [Fact] + public async Task Read_WhenFinished_ShouldNotify() + { + using Stream stream = this.GetStream(512, int.MaxValue); + stream.Read(this.Buffer); + stream.Read(this.Buffer); + stream.Read(this.Buffer); + Assert.Equal(0, this.FinishedSemaphore.CurrentCount); + stream.Read(this.Buffer); + Assert.Equal(1, this.FinishedSemaphore.CurrentCount); + } + + [Fact] + public async Task ReadAsync_WhenFinished_ShouldNotify() + { + using Stream stream = this.GetStream(512, int.MaxValue); + await stream.ReadAsync(this.Buffer); + await stream.ReadAsync(this.Buffer); + await stream.ReadAsync(this.Buffer); + Assert.Equal(0, this.FinishedSemaphore.CurrentCount); + + Task lastRead = stream.ReadAsync(this.Buffer).AsTask(); + Task finishedTask = this.FinishedSemaphore.WaitAsync(); + await Task.WhenAll(lastRead, finishedTask); + } + + private Stream GetStream(int size = 1024, int waitAfterPosition = 256) + { + byte[] buffer = new byte[size]; + return new SemaphoreReadMemoryStream(buffer, this.WaitSemaphore, this.FinishedSemaphore, waitAfterPosition); + } + } +}