Browse Source

Refactoring/channels (#745)

* Use channels for easier code.

* Remove unused code.
pull/748/head
Sebastian Stehle 4 years ago
committed by GitHub
parent
commit
ed4d019706
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs
  2. 2
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs
  3. 53
      backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs
  4. 5
      backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs
  5. 221
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs
  6. 28
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
  7. 68
      backend/src/Squidex.Infrastructure/Tasks/AsyncHelper.cs
  8. 1
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs
  9. 168
      backend/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs
  10. 2
      backend/tests/Squidex.Infrastructure.Tests/MongoDb/MongoFieldTests.cs
  11. 9
      backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs

2
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs

@ -76,7 +76,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
{
using (Profiler.TraceMethod<MongoAssetFolderRepository>())
{
await Collection.Find(new BsonDocument(), Batching.Options).ForEachPipedAsync(x => callback(Map(x), x.Version), ct);
await Collection.Find(new BsonDocument(), Batching.Options).ForEachAsync(x => callback(Map(x), x.Version), ct);
}
}

2
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs

@ -76,7 +76,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
{
using (Profiler.TraceMethod<MongoAssetRepository>())
{
await Collection.Find(new BsonDocument(), Batching.Options).ForEachPipedAsync(x => callback(Map(x), x.Version), ct);
await Collection.Find(new BsonDocument(), Batching.Options).ForEachAsync(x => callback(Map(x), x.Version), ct);
}
}

53
backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs

@ -9,7 +9,6 @@ using System;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using MongoDB.Bson;
using MongoDB.Driver;
using Squidex.Infrastructure.States;
@ -173,58 +172,6 @@ namespace Squidex.Infrastructure.MongoDb
}
}
public static async Task ForEachPipedAsync<T>(this IAsyncCursorSource<T> source, Func<T, Task> processor, CancellationToken cancellationToken = default)
{
using (var cursor = await source.ToCursorAsync(cancellationToken))
{
await cursor.ForEachPipedAsync(processor, cancellationToken);
}
}
public static async Task ForEachPipedAsync<T>(this IAsyncCursor<T> source, Func<T, Task> processor, CancellationToken cancellationToken = default)
{
using (var selfToken = new CancellationTokenSource())
{
using (var combined = CancellationTokenSource.CreateLinkedTokenSource(selfToken.Token, cancellationToken))
{
var actionBlock =
new ActionBlock<T>(async x =>
{
if (!combined.IsCancellationRequested)
{
await processor(x);
}
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1,
BoundedCapacity = Batching.BufferSize
});
try
{
await source.ForEachAsync(async i =>
{
if (!await actionBlock.SendAsync(i, combined.Token))
{
selfToken.Cancel();
}
}, combined.Token);
actionBlock.Complete();
}
catch (Exception ex)
{
((IDataflowBlock)actionBlock).Fault(ex);
}
finally
{
await actionBlock.Completion;
}
}
}
}
public static async Task<Version> GetVersionAsync(this IMongoDatabase database)
{
var command =

5
backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs

@ -93,11 +93,12 @@ namespace Squidex.Infrastructure.States
}
}
public async Task ReadAllAsync(Func<T, long, Task> callback, CancellationToken ct = default)
public async Task ReadAllAsync(Func<T, long, Task> callback,
CancellationToken ct = default)
{
using (Profiler.TraceMethod<MongoSnapshotStore<T>>())
{
await Collection.Find(new BsonDocument(), options: Batching.Options).ForEachPipedAsync(x => callback(x.Doc, x.Version), ct);
await Collection.Find(new BsonDocument(), options: Batching.Options).ForEachAsync(x => callback(x.Doc, x.Version), ct);
}
}

221
backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs

@ -6,194 +6,169 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Tasks;
#pragma warning disable RECS0082 // Parameter has the same name as a member and hides it
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
namespace Squidex.Infrastructure.EventSourcing.Grains
{
internal sealed class BatchSubscriber : IEventSubscriber
internal sealed class BatchSubscriber : IEventSubscriber, IEventSubscription
{
private readonly ITargetBlock<Job> pipelineStart;
private readonly IEventSubscription eventSubscription;
private readonly IDataflowBlock pipelineEnd;
private readonly Channel<object> taskQueue;
private readonly Channel<EventSource> parseQueue;
private readonly Task handleTask;
private readonly CancellationTokenSource completed = new CancellationTokenSource();
public object? Sender
{
get => eventSubscription.Sender!;
}
private sealed class Job
{
public StoredEvent? StoredEvent { get; init; }
public Exception? Exception { get; set; }
public Envelope<IEvent>? Event { get; set; }
public bool ShouldHandle { get; set; }
public object Sender { get; init; }
}
private sealed record EventSource(StoredEvent StoredEvent, object Sender);
private sealed record BatchItem(Envelope<IEvent>? Event, string Position, object Sender);
private sealed record BatchJob(BatchItem[] Items);
private sealed record ErrorJob(Exception Exception, object? Sender);
public BatchSubscriber(
EventConsumerGrain grain,
IEventDataFormatter eventDataFormatter,
IEventConsumer eventConsumer,
Func<IEventSubscriber, IEventSubscription> factory,
TaskScheduler scheduler)
Func<IEventSubscriber, IEventSubscription> factory)
{
eventSubscription = factory(this);
var batchSize = Math.Max(1, eventConsumer.BatchSize);
var batchDelay = Math.Max(100, eventConsumer.BatchDelay);
var parse = new TransformBlock<Job, Job>(job =>
parseQueue = Channel.CreateBounded<EventSource>(new BoundedChannelOptions(batchSize)
{
try
{
if (job.StoredEvent != null)
{
job.ShouldHandle = eventConsumer.Handles(job.StoredEvent);
}
if (job.ShouldHandle)
{
try
{
job.Event = eventDataFormatter.ParseIfKnown(job.StoredEvent!);
}
catch (Exception ex)
{
job.Exception = ex;
}
}
AllowSynchronousContinuations = true,
SingleReader = true,
SingleWriter = true
});
return job;
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, new ExecutionDataflowBlockOptions
taskQueue = Channel.CreateBounded<object>(new BoundedChannelOptions(2)
{
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 10,
BoundedCapacity = batchSize
SingleReader = true,
SingleWriter = true
});
var buffer = AsyncHelper.CreateBatchBlock<Job>(batchSize, batchDelay, new GroupingDataflowBlockOptions
var batchQueue = Channel.CreateBounded<object>(new BoundedChannelOptions(batchSize)
{
BoundedCapacity = batchSize * 2
AllowSynchronousContinuations = true,
SingleReader = true,
SingleWriter = true
});
var handle = new ActionBlock<IList<Job>>(async jobs =>
batchQueue.Batch<BatchItem, object>(taskQueue, x => new BatchJob(x.ToArray()), batchSize, batchDelay);
Task.Run(async () =>
{
try
await foreach (var (storedEvent, sender) in parseQueue.Reader.ReadAllAsync(completed.Token))
{
var sender = eventSubscription?.Sender;
foreach (var jobsBySender in jobs.GroupBy(x => x.Sender))
try
{
if (sender != null && ReferenceEquals(jobsBySender.Key, sender))
{
var exception = jobs.FirstOrDefault(x => x.Exception != null)?.Exception;
var shouldHandle = eventConsumer.Handles(storedEvent);
if (exception != null)
{
await grain.OnErrorAsync(sender, exception);
}
else
{
await grain.OnEventsAsync(sender, GetEvents(jobsBySender), GetPosition(jobsBySender));
}
Envelope<IEvent>? @event = null;
if (eventConsumer.Handles(storedEvent))
{
@event = eventDataFormatter.ParseIfKnown(storedEvent);
}
await batchQueue.Writer.WriteAsync(new BatchItem(@event, storedEvent.EventPosition, sender), completed.Token);
}
catch (Exception ex)
{
await taskQueue.Writer.WriteAsync(new ErrorJob(ex, sender), completed.Token);
}
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
},
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 2,
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1,
TaskScheduler = scheduler
});
}).ContinueWith(x => batchQueue.Writer.TryComplete(x.Exception));
parse.LinkTo(buffer, new DataflowLinkOptions
{
PropagateCompletion = true
});
handleTask = Run(grain);
}
buffer.LinkTo(handle, new DataflowLinkOptions
private async Task Run(EventConsumerGrain grain)
{
await foreach (var task in taskQueue.Reader.ReadAllAsync())
{
PropagateCompletion = true,
});
var sender = eventSubscription?.Sender;
pipelineStart = parse;
pipelineEnd = handle;
if (sender == null)
{
continue;
}
eventSubscription = factory(this);
}
switch (task)
{
case ErrorJob error when error.Exception is not OperationCanceledException:
{
if (ReferenceEquals(error.Sender, sender))
{
await grain.OnErrorAsync(sender, error.Exception);
}
private static List<Envelope<IEvent>> GetEvents(IEnumerable<Job> jobsBySender)
{
return jobsBySender.NotNull(x => x.Event).ToList();
}
break;
}
private static string GetPosition(IEnumerable<Job> jobsBySender)
{
return jobsBySender.Last().StoredEvent!.EventPosition;
case BatchJob batch:
{
foreach (var itemsBySender in batch.Items.GroupBy(x => x.Sender))
{
if (ReferenceEquals(itemsBySender.Key, sender))
{
var position = itemsBySender.Last().Position;
await grain.OnEventsAsync(sender, itemsBySender.Select(x => x.Event).NotNull().ToList(), position);
}
}
break;
}
}
}
}
public Task CompleteAsync()
{
pipelineStart.Complete();
parseQueue.Writer.TryComplete();
return pipelineEnd.Completion;
return handleTask;
}
public void WakeUp()
void IEventSubscription.Unsubscribe()
{
eventSubscription.WakeUp();
}
completed.Cancel();
public void Unsubscribe()
{
eventSubscription.Unsubscribe();
}
public Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
void IEventSubscription.WakeUp()
{
var job = new Job
{
Sender = subscription,
StoredEvent = storedEvent
};
return pipelineStart.SendAsync(job);
eventSubscription.WakeUp();
}
public Task OnErrorAsync(IEventSubscription subscription, Exception exception)
async Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
if (exception is OperationCanceledException)
if (subscription.Sender != null)
{
return Task.CompletedTask;
await parseQueue.Writer.WriteAsync(new EventSource(storedEvent, subscription.Sender), completed.Token);
}
}
var job = new Job
async Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
if (subscription.Sender != null && exception is not OperationCanceledException)
{
Sender = subscription,
StoredEvent = null,
Exception = exception
};
return pipelineStart.SendAsync(job);
await taskQueue.Writer.WriteAsync(new ErrorJob(exception, subscription.Sender), completed.Token);
}
}
}
}

28
backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

@ -24,8 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
private readonly IEventStore eventStore;
private readonly ISemanticLog log;
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1);
private TaskScheduler? scheduler;
private BatchSubscriber? currentSubscriber;
private IEventSubscription? currentSubscription;
private IEventConsumer? eventConsumer;
private EventConsumerState State
@ -51,8 +50,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
protected override Task OnActivateAsync(string key)
{
scheduler = TaskScheduler.Current;
eventConsumer = eventConsumerFactory(key);
return Task.CompletedTask;
@ -67,9 +64,9 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
public async Task CompleteAsync()
{
if (currentSubscriber != null)
if (currentSubscription is BatchSubscriber batchSubscriber)
{
await currentSubscriber.CompleteAsync();
await batchSubscriber.CompleteAsync();
}
}
@ -85,7 +82,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
public Task OnEventsAsync(object sender, IReadOnlyList<Envelope<IEvent>> events, string position)
{
if (!ReferenceEquals(sender, currentSubscriber?.Sender))
if (!ReferenceEquals(sender, currentSubscription?.Sender))
{
return Task.CompletedTask;
}
@ -100,7 +97,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
public Task OnErrorAsync(object sender, Exception exception)
{
if (!ReferenceEquals(sender, currentSubscriber?.Sender))
if (!ReferenceEquals(sender, currentSubscription?.Sender))
{
return Task.CompletedTask;
}
@ -262,31 +259,26 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
private void Unsubscribe()
{
var subscription = Interlocked.Exchange(ref currentSubscriber, null);
var subscription = Interlocked.Exchange(ref currentSubscription, null);
subscription?.Unsubscribe();
}
private void Subscribe()
{
if (currentSubscriber == null)
if (currentSubscription == null)
{
currentSubscriber = CreateSubscription();
currentSubscription = CreateSubscription();
}
else
{
currentSubscriber.WakeUp();
currentSubscription.WakeUp();
}
}
protected virtual TaskScheduler GetScheduler()
{
return scheduler!;
}
private BatchSubscriber CreateSubscription()
{
return new BatchSubscriber(this, eventDataFormatter, eventConsumer!, CreateRetrySubscription, GetScheduler());
return new BatchSubscriber(this, eventDataFormatter, eventConsumer!, CreateRetrySubscription);
}
protected virtual IEventSubscription CreateRetrySubscription(IEventSubscriber subscriber)

68
backend/src/Squidex.Infrastructure/Tasks/AsyncHelper.cs

@ -6,9 +6,10 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Squidex.Infrastructure.Tasks
{
@ -37,8 +38,7 @@ namespace Squidex.Infrastructure.Tasks
public static TResult Sync<TResult>(Func<Task<TResult>> func)
{
return TaskFactory
.StartNew(func)
.Unwrap()
.StartNew(func).Unwrap()
.GetAwaiter()
.GetResult();
}
@ -46,42 +46,52 @@ namespace Squidex.Infrastructure.Tasks
public static void Sync(Func<Task> func)
{
TaskFactory
.StartNew(func)
.Unwrap()
.StartNew(func).Unwrap()
.GetAwaiter()
.GetResult();
}
public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize, int timeout, GroupingDataflowBlockOptions? dataflowBlockOptions = null)
public static void Batch<TIn, TOut>(this Channel<object> source, Channel<TOut> target, Func<IReadOnlyList<TIn>, TOut> converter, int batchSize, int timeout,
CancellationToken ct = default)
{
dataflowBlockOptions ??= new GroupingDataflowBlockOptions();
Task.Run(async () =>
{
var batch = new List<TIn>(batchSize);
var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
var force = new object();
var timer = new Timer(_ => batchBlock.TriggerBatch());
using var timer = new Timer(_ => source.Writer.TryWrite(force));
var timerBlock = new TransformBlock<T, T>(value =>
{
timer.Change(timeout, Timeout.Infinite);
async Task TrySendAsync()
{
if (batch.Count > 0)
{
await target.Writer.WriteAsync(converter(batch), ct);
batch.Clear();
}
}
return value;
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 1,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
TaskScheduler = dataflowBlockOptions.TaskScheduler
});
timerBlock.LinkTo(batchBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
await foreach (var item in source.Reader.ReadAllAsync(ct))
{
if (ReferenceEquals(item, force))
{
await TrySendAsync();
}
else if (item is TIn typed)
{
timer.Change(timeout, Timeout.Infinite);
batch.Add(typed);
if (batch.Count >= batchSize)
{
await TrySendAsync();
}
}
}
return DataflowBlock.Encapsulate(timerBlock, batchBlock);
await TrySendAsync();
}, ct).ContinueWith(x => target.Writer.TryComplete(x.Exception));
}
}
}

1
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs

@ -17,7 +17,6 @@ using Xunit;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
[Trait("Category", "Dependencies")]
public class EventConsumerGrainTests
{
public sealed class MyEventConsumerGrain : EventConsumerGrain

168
backend/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs

@ -1,168 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Xunit;
namespace Squidex.Infrastructure.MongoDb
{
public class MongoExtensionsTests
{
public sealed class Cursor<T> : IAsyncCursor<T> where T : notnull
{
private readonly List<object> items = new List<object>();
private int index = -1;
public IEnumerable<T> Current
{
get
{
if (items[index] is Exception ex)
{
throw ex;
}
return Enumerable.Repeat((T)items[index], 1);
}
}
public Cursor<T> Add(params T[] newItems)
{
foreach (var item in newItems)
{
items.Add(item);
}
return this;
}
public Cursor<T> Add(Exception ex)
{
items.Add(ex);
return this;
}
public void Dispose()
{
}
public bool MoveNext(CancellationToken cancellationToken = default)
{
index++;
return index < items.Count;
}
public async Task<bool> MoveNextAsync(CancellationToken cancellationToken = default)
{
await Task.Delay(1, cancellationToken);
return MoveNext(cancellationToken);
}
}
[Fact]
public async Task Should_enumerate_over_items()
{
var result = new List<int>();
var cursor = new Cursor<int>().Add(0, 1, 2, 3, 4, 5);
await cursor.ForEachPipedAsync(x =>
{
result.Add(x);
return Task.CompletedTask;
});
Assert.Equal(new List<int> { 0, 1, 2, 3, 4, 5 }, result);
}
[Fact]
public async Task Should_break_if_cursor_failed()
{
var ex = new InvalidOperationException();
var result = new List<int>();
using (var cursor = new Cursor<int>().Add(0, 1, 2).Add(ex).Add(3, 4, 5))
{
await Assert.ThrowsAsync<InvalidOperationException>(() =>
{
return cursor.ForEachPipedAsync(x =>
{
result.Add(x);
return Task.CompletedTask;
});
});
}
Assert.Equal(new List<int> { 0, 1, 2 }, result);
}
[Fact]
public async Task Should_break_if_handler_failed()
{
var ex = new InvalidOperationException();
var result = new List<int>();
using (var cursor = new Cursor<int>().Add(0, 1, 2, 3, 4, 5))
{
await Assert.ThrowsAsync<InvalidOperationException>(() =>
{
return cursor.ForEachPipedAsync(x =>
{
if (x == 2)
{
throw ex;
}
result.Add(x);
return Task.CompletedTask;
});
});
}
Assert.Equal(new List<int> { 0, 1 }, result);
}
[Fact]
public async Task Should_stop_if_cancelled1()
{
using (var cts = new CancellationTokenSource())
{
var result = new List<int>();
using (var cursor = new Cursor<int>().Add(0, 1, 2, 3, 4, 5))
{
await Assert.ThrowsAnyAsync<OperationCanceledException>(() =>
{
return cursor.ForEachPipedAsync(x =>
{
if (x == 2)
{
cts.Cancel();
}
result.Add(x);
return Task.CompletedTask;
}, cts.Token);
});
}
Assert.Equal(new List<int> { 0, 1, 2 }, result);
}
}
}
}

2
backend/tests/Squidex.Infrastructure.Tests/MongoDb/FieldTests.cs → backend/tests/Squidex.Infrastructure.Tests/MongoDb/MongoFieldTests.cs

@ -10,7 +10,7 @@ using Xunit;
namespace Squidex.Infrastructure.MongoDb
{
public class FieldTests
public class MongoFieldTests
{
public sealed class Entity
{

9
backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs

@ -16,15 +16,16 @@ namespace Squidex.Infrastructure.Tasks
{
public class PartitionedActionBlockTests
{
private const int Partitions = 10;
[Fact]
public async Task Should_propagate_in_order()
{
var random = new Random();
var partitions = 10;
var lists = new List<int>[partitions];
var lists = new List<int>[Partitions];
for (var i = 0; i < partitions; i++)
for (var i = 0; i < Partitions; i++)
{
lists[i] = new List<int>();
}
@ -43,7 +44,7 @@ namespace Squidex.Infrastructure.Tasks
BoundedCapacity = 100
});
for (var i = 0; i < partitions; i++)
for (var i = 0; i < Partitions; i++)
{
for (var j = 0; j < 10; j++)
{

Loading…
Cancel
Save