// Copyright (c) Six Labors. // Licensed under the Apache License, Version 2.0. using System; using System.IO; using System.Threading; using System.Threading.Tasks; namespace SixLabors.ImageSharp.Tests.TestUtilities { public class PausedStream : Stream { private readonly SemaphoreSlim semaphore = new SemaphoreSlim(0); private readonly CancellationTokenSource cancelationTokenSource = new CancellationTokenSource(); private readonly Stream innerStream; private Action onWaitingCallback; public void OnWaiting(Action onWaitingCallback) => this.onWaitingCallback = onWaitingCallback; public void OnWaiting(Action onWaitingCallback) => this.OnWaiting(_ => onWaitingCallback()); public void Release() { this.semaphore.Release(); this.cancelationTokenSource.Cancel(); } public void Next() => this.semaphore.Release(); private void Wait() { if (this.cancelationTokenSource.IsCancellationRequested) { return; } this.onWaitingCallback?.Invoke(this.innerStream); try { this.semaphore.Wait(this.cancelationTokenSource.Token); } catch (OperationCanceledException) { // ignore this as its just used to unlock any waits in progress } } private async Task Await(Func action) { await Task.Yield(); this.Wait(); await action(); } private async Task Await(Func> action) { await Task.Yield(); this.Wait(); return await action(); } private T Await(Func action) { this.Wait(); return action(); } private void Await(Action action) { this.Wait(); action(); } public PausedStream(byte[] data) : this(new MemoryStream(data)) { } public PausedStream(string filePath) : this(File.OpenRead(filePath)) { } public PausedStream(Stream innerStream) => this.innerStream = innerStream; public override bool CanTimeout => this.innerStream.CanTimeout; public override void Close() => this.Await(() => this.innerStream.Close()); public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken) => this.Await(() => this.innerStream.CopyToAsync(destination, bufferSize, cancellationToken)); public override bool CanRead => this.innerStream.CanRead; public override bool CanSeek => this.innerStream.CanSeek; public override bool CanWrite => this.innerStream.CanWrite; public override long Length => this.Await(() => this.innerStream.Length); public override long Position { get => this.Await(() => this.innerStream.Position); set => this.Await(() => this.innerStream.Position = value); } public override void Flush() => this.Await(() => this.innerStream.Flush()); public override int Read(byte[] buffer, int offset, int count) => this.Await(() => this.innerStream.Read(buffer, offset, count)); public override long Seek(long offset, SeekOrigin origin) => this.Await(() => this.innerStream.Seek(offset, origin)); public override void SetLength(long value) => this.Await(() => this.innerStream.SetLength(value)); public override void Write(byte[] buffer, int offset, int count) => this.Await(() => this.innerStream.Write(buffer, offset, count)); public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => this.Await(() => this.innerStream.ReadAsync(buffer, offset, count, cancellationToken)); public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => this.Await(() => this.innerStream.WriteAsync(buffer, offset, count, cancellationToken)); public override void WriteByte(byte value) => this.Await(() => this.innerStream.WriteByte(value)); public override int ReadByte() => this.Await(() => this.innerStream.ReadByte()); protected override void Dispose(bool disposing) => this.innerStream.Dispose(); #if NETCOREAPP public override void CopyTo(Stream destination, int bufferSize) => this.Await(() => this.innerStream.CopyTo(destination, bufferSize)); public override int Read(Span buffer) { this.Wait(); return this.innerStream.Read(buffer); } public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => this.Await(() => this.innerStream.ReadAsync(buffer, cancellationToken)); public override void Write(ReadOnlySpan buffer) { this.Wait(); this.innerStream.Write(buffer); } public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => this.Await(() => this.innerStream.WriteAsync(buffer, cancellationToken)); #endif } }