Browse Source

Merge branch 'master' into orleans3

# Conflicts:
#	src/Squidex/Squidex.csproj
pull/249/head
Sebastian Stehle 8 years ago
parent
commit
ff6000ce1a
  1. 5
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  2. 48
      src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs
  3. 1
      src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj
  4. 161
      tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs

5
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -7,7 +7,6 @@
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
@ -87,9 +86,9 @@ namespace Squidex.Infrastructure.EventSourcing
return QueryAsync(callback, lastPosition, filter, ct);
}
private async Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, FilterDefinition<MongoEventCommit> filter, CancellationToken ct)
private Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, FilterDefinition<MongoEventCommit> filter, CancellationToken ct)
{
await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachAsync(async commit =>
return Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit =>
{
var eventStreamOffset = (int)commit.EventStreamOffset;

48
src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs

@ -7,7 +7,9 @@
using System;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using MongoDB.Bson;
using MongoDB.Driver;
using Squidex.Infrastructure.States;
@ -128,5 +130,51 @@ namespace Squidex.Infrastructure.MongoDb
}
}
}
public static async Task ForEachPipelineAsync<TDocument>(this IAsyncCursorSource<TDocument> source, Func<TDocument, Task> processor, CancellationToken cancellationToken = default(CancellationToken))
{
var cursor = await source.ToCursorAsync(cancellationToken);
await cursor.ForEachPipelineAsync(processor, cancellationToken);
}
public static async Task ForEachPipelineAsync<TDocument>(this IAsyncCursor<TDocument> source, Func<TDocument, Task> processor, CancellationToken cancellationToken = default(CancellationToken))
{
var actionBlock =
new ActionBlock<TDocument>(processor,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1,
BoundedCapacity = 100
});
using (var selfToken = new CancellationTokenSource())
{
using (var combined = CancellationTokenSource.CreateLinkedTokenSource(selfToken.Token, cancellationToken))
{
try
{
await source.ForEachAsync(async i =>
{
if (!await actionBlock.SendAsync(i, combined.Token))
{
selfToken.Cancel();
}
}, combined.Token);
actionBlock.Complete();
}
catch (Exception ex)
{
((IDataflowBlock)actionBlock).Fault(ex);
}
finally
{
await actionBlock.Completion;
}
}
}
}
}
}

1
src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj

@ -15,6 +15,7 @@
<PackageReference Include="MongoDB.Driver" Version="2.5.0" />
<PackageReference Include="RefactoringEssentials" Version="5.6.0" />
<PackageReference Include="StyleCop.Analyzers" Version="1.0.2" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.8.0" />
<PackageReference Include="System.ValueTuple" Version="4.4.0" />
</ItemGroup>
<PropertyGroup>

161
tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs

@ -0,0 +1,161 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Infrastructure.Tasks;
using Xunit;
namespace Squidex.Infrastructure.MongoDb
{
public class MongoExtensionsTests
{
public sealed class Cursor<T> : IAsyncCursor<T>
{
private readonly List<object> items = new List<object>();
private int index = -1;
public IEnumerable<T> Current
{
get
{
if (items[index] is Exception ex)
{
throw ex;
}
return Enumerable.Repeat((T)items[index], 1);
}
}
public Cursor<T> Add(T item)
{
items.Add(item);
return this;
}
public Cursor<T> Add(Exception ex)
{
items.Add(ex);
return this;
}
public void Dispose()
{
}
public bool MoveNext(CancellationToken cancellationToken = default(CancellationToken))
{
index++;
return index < items.Count;
}
public async Task<bool> MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken))
{
await Task.Delay(1, cancellationToken);
return MoveNext(cancellationToken);
}
}
[Fact]
public async Task Should_enumerate_over_items()
{
var result = new List<int>();
var cursor = new Cursor<int>().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5);
await cursor.ForEachPipelineAsync(x =>
{
result.Add(x);
return TaskHelper.Done;
});
Assert.Equal(new List<int> { 0, 1, 1, 2, 3, 5 }, result);
}
[Fact]
public async Task Should_break_when_cursor_failed()
{
var ex = new InvalidOperationException();
var result = new List<int>();
var cursor = new Cursor<int>().Add(0).Add(1).Add(1).Add(ex).Add(2).Add(3).Add(5);
await Assert.ThrowsAsync<InvalidOperationException>(() =>
{
return cursor.ForEachPipelineAsync(x =>
{
result.Add(x);
return TaskHelper.Done;
});
});
Assert.Equal(new List<int> { 0, 1, 1 }, result);
}
[Fact]
public async Task Should_break_when_handler_failed()
{
var ex = new InvalidOperationException();
var result = new List<int>();
var cursor = new Cursor<int>().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5);
await Assert.ThrowsAsync<InvalidOperationException>(() =>
{
return cursor.ForEachPipelineAsync(x =>
{
if (x == 2)
{
throw ex;
}
result.Add(x);
return TaskHelper.Done;
});
});
Assert.Equal(new List<int> { 0, 1, 1 }, result);
}
[Fact]
public async Task Should_stop_when_cancelled1()
{
var cts = new CancellationTokenSource();
var result = new List<int>();
var cursor = new Cursor<int>().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5);
await Assert.ThrowsAsync<TaskCanceledException>(() =>
{
return cursor.ForEachPipelineAsync(x =>
{
if (x == 2)
{
cts.Cancel();
}
result.Add(x);
return TaskHelper.Done;
}, cts.Token);
});
Assert.Equal(new List<int> { 0, 1, 1, 2 }, result);
}
}
}
Loading…
Cancel
Save