Browse Source

EventReceiver Improved

pull/1/head
Sebastian Stehle 9 years ago
parent
commit
bbf2eaffae
  1. 96
      src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs
  2. 182
      src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
  3. 5
      src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
  4. 70
      src/Squidex.Infrastructure/CQRS/Events/Internal/DispatchEventBlock.cs
  5. 118
      src/Squidex.Infrastructure/CQRS/Events/Internal/EventReceiverBlock.cs
  6. 52
      src/Squidex.Infrastructure/CQRS/Events/Internal/ParseEventBlock.cs
  7. 102
      src/Squidex.Infrastructure/CQRS/Events/Internal/QueryEventsBlock.cs
  8. 33
      src/Squidex.Infrastructure/CQRS/Events/Internal/UpdateStateBlock.cs
  9. 2
      src/Squidex.Infrastructure/Squidex.Infrastructure.csproj
  10. 6
      tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs
  11. 23
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

96
src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs

@ -10,6 +10,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
@ -55,61 +56,75 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
{
var indexNames =
await Task.WhenAll(
collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }),
collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }),
collection.Indexes.CreateOneAsync(IndexKeys.Descending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }),
collection.Indexes.CreateOneAsync(IndexKeys.Descending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }));
eventsOffsetIndex = indexNames[0];
}
public IObservable<StoredEvent> GetEventsAsync(string streamName)
public IObservable<StoredEvent> GetEventsAsync(string streamName, long lastReceivedEventNumber = -1)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
return Observable.Create<StoredEvent>(async (observer, ct) =>
return Observable.Create<StoredEvent>((observer, ct) =>
{
await Collection.Find(x => x.EventStream == streamName).ForEachAsync(commit =>
return GetEventsAsync(storedEvent =>
{
var eventNumber = commit.EventsOffset;
var eventStreamNumber = commit.EventStreamOffset;
foreach (var @event in commit.Events)
{
eventNumber++;
eventStreamNumber++;
observer.OnNext(storedEvent);
var eventData = SimpleMapper.Map(@event, new EventData());
observer.OnNext(new StoredEvent(eventNumber, eventStreamNumber, eventData));
}
}, ct);
return Tasks.TaskHelper.Done;
}, ct, streamName, lastReceivedEventNumber);
});
}
public IObservable<StoredEvent> GetEventsAsync(long lastReceivedEventNumber = -1)
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamName = null, long lastReceivedEventNumber = -1)
{
return Observable.Create<StoredEvent>(async (observer, ct) =>
Guard.NotNull(callback, nameof(callback));
var filters = new List<FilterDefinition<MongoEventCommit>>();
if (lastReceivedEventNumber >= 0)
{
var commitOffset = await GetPreviousOffsetAsync(lastReceivedEventNumber);
filters.Add(Filter.Gte(x => x.EventsOffset, commitOffset));
}
if (!string.IsNullOrWhiteSpace(streamName))
{
filters.Add(Filter.Eq(x => x.EventStream, streamName));
}
FilterDefinition<MongoEventCommit> filter = new BsonDocument();
if (filters.Count > 1)
{
filter = Filter.And(filters);
}
else if (filters.Count == 1)
{
var commitOffset = await GetPreviousOffset(lastReceivedEventNumber);
filter = filters[0];
}
await Collection.Find(x => x.EventsOffset >= commitOffset).SortBy(x => x.EventsOffset).ForEachAsync(commit =>
await Collection.Find(filter).SortBy(x => x.EventsOffset).ForEachAsync(async commit =>
{
var eventNumber = commit.EventsOffset;
var eventStreamNumber = commit.EventStreamOffset;
foreach (var mongoEvent in commit.Events)
{
var eventNumber = commit.EventsOffset;
var eventStreamNumber = commit.EventStreamOffset;
eventNumber++;
eventStreamNumber++;
foreach (var @event in commit.Events)
if (eventNumber > lastReceivedEventNumber)
{
eventNumber++;
eventStreamNumber++;
if (eventNumber > lastReceivedEventNumber)
{
var eventData = SimpleMapper.Map(@event, new EventData());
var eventData = SimpleMapper.Map(mongoEvent, new EventData());
observer.OnNext(new StoredEvent(eventNumber, eventStreamNumber, eventData));
}
await callback(new StoredEvent(eventNumber, eventStreamNumber, eventData));
}
}, ct);
});
}
}, cancellationToken);
}
public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable<EventData> events)
@ -130,7 +145,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
if (commitEvents.Any())
{
var offset = await GetEventOffset();
var offset = await GetEventOffsetAsync();
var commit = new MongoEventCommit
{
@ -157,7 +172,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
{
if (ex.Message.IndexOf(eventsOffsetIndex, StringComparison.OrdinalIgnoreCase) >= 0)
{
commit.EventsOffset = await GetEventOffset();
commit.EventsOffset = await GetEventOffsetAsync();
}
else if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
@ -174,25 +189,24 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
}
}
private async Task<long> GetPreviousOffset(long startEventNumber)
private async Task<long> GetPreviousOffsetAsync(long startEventNumber)
{
var document =
await Collection.Find(x => x.EventsOffset <= startEventNumber)
.Project<BsonDocument>(Projection
.Include(x => x.EventStreamOffset)
.Include(x => x.EventsCount))
.Include(x => x.EventsOffset))
.SortByDescending(x => x.EventsOffset).Limit(1)
.FirstOrDefaultAsync();
if (document != null)
{
return document["EventStreamOffset"].ToInt64();
return document["EventsOffset"].ToInt64();
}
return -1;
}
private async Task<long> GetEventOffset()
private async Task<long> GetEventOffsetAsync()
{
var document =
await Collection.Find(new BsonDocument())

182
src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs

@ -7,14 +7,12 @@
// ==========================================================================
using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading;
using Squidex.Infrastructure.CQRS.Events.Internal;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Timers;
// ReSharper disable MethodSupportsCancellation
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
// ReSharper disable InvertIf
// ReSharper disable UseObjectOrCollectionInitializer
namespace Squidex.Infrastructure.CQRS.Events
{
@ -25,7 +23,11 @@ namespace Squidex.Infrastructure.CQRS.Events
private readonly IEventNotifier eventNotifier;
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository;
private readonly ISemanticLog log;
private CompletionTimer timer;
private QueryEventsBlock queryEventsBlock;
private DispatchEventBlock dispatchEventBlock;
private UpdateStateBlock updateStateBlock;
private ParseEventBlock parseEventBlock;
private Timer timer;
public EventReceiver(
EventDataFormatter formatter,
@ -53,7 +55,11 @@ namespace Squidex.Infrastructure.CQRS.Events
{
try
{
queryEventsBlock?.Complete();
timer?.Dispose();
updateStateBlock?.Completion.Wait();
}
catch (Exception ex)
{
@ -64,11 +70,6 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
public void Trigger()
{
timer?.Trigger();
}
public void Subscribe(IEventConsumer eventConsumer, int delay = 5000)
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
@ -78,153 +79,32 @@ namespace Squidex.Infrastructure.CQRS.Events
return;
}
var consumerName = eventConsumer.Name;
var consumerStarted = false;
timer = new CompletionTimer(delay, async ct =>
{
if (!consumerStarted)
{
await eventConsumerInfoRepository.CreateAsync(consumerName);
consumerStarted = true;
}
try
{
var status = await eventConsumerInfoRepository.FindAsync(consumerName);
var lastHandledEventNumber = status.LastHandledEventNumber;
if (status.IsResetting)
{
await ResetAsync(eventConsumer, consumerName);
lastHandledEventNumber = -1;
}
else if (status.IsStopped)
{
return;
}
await eventStore.GetEventsAsync(lastHandledEventNumber)
.Select(storedEvent =>
{
HandleEventAsync(eventConsumer, storedEvent, consumerName).Wait();
return storedEvent;
}).DefaultIfEmpty();
}
catch (Exception ex)
{
log.LogFatal(ex, w => w.WriteProperty("action", "EventHandlingFailed"));
await eventConsumerInfoRepository.StopAsync(consumerName, ex.ToString());
}
});
eventNotifier.Subscribe(timer.Trigger);
}
private async Task HandleEventAsync(IEventConsumer eventConsumer, StoredEvent storedEvent, string consumerName)
{
var @event = ParseEvent(storedEvent);
updateStateBlock = new UpdateStateBlock(eventConsumer, eventConsumerInfoRepository, log);
await DispatchConsumer(@event, eventConsumer);
await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventNumber);
}
dispatchEventBlock = new DispatchEventBlock(eventConsumer, eventConsumerInfoRepository, log);
dispatchEventBlock.LinkTo(updateStateBlock.Target);
dispatchEventBlock.Completion.ContinueWith(t => updateStateBlock.Complete());
private async Task ResetAsync(IEventConsumer eventConsumer, string consumerName)
{
var actionId = Guid.NewGuid().ToString();
try
{
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Started")
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
await eventConsumer.ClearAsync();
await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1);
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
}
catch (Exception ex)
{
log.LogFatal(ex, w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
parseEventBlock = new ParseEventBlock(eventConsumer, eventConsumerInfoRepository, log, formatter);
parseEventBlock.LinkTo(dispatchEventBlock.Target);
parseEventBlock.Completion.ContinueWith(t => dispatchEventBlock.Complete());
throw;
}
}
queryEventsBlock = new QueryEventsBlock(eventConsumer, eventConsumerInfoRepository, log, eventStore);
queryEventsBlock.OnEvent = parseEventBlock.NextAsync;
queryEventsBlock.OnReset = Reset;
queryEventsBlock.Completion.ContinueWith(t => parseEventBlock.Complete());
timer = new Timer(x => queryEventsBlock.NextOrThrowAway(null), null, 0, delay);
private async Task DispatchConsumer(Envelope<IEvent> @event, IEventConsumer eventConsumer)
{
var eventId = @event.Headers.EventId().ToString();
var eventType = @event.Payload.GetType().Name;
try
{
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
await eventConsumer.On(@event);
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Completed")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
}
catch (Exception ex)
{
log.LogError(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
throw;
}
eventNotifier.Subscribe(() => queryEventsBlock.NextOrThrowAway(null));
}
private Envelope<IEvent> ParseEvent(StoredEvent storedEvent)
private void Reset()
{
try
{
var @event = formatter.Parse(storedEvent.Data);
@event.SetEventNumber(storedEvent.EventNumber);
@event.SetEventStreamNumber(storedEvent.EventStreamNumber);
return @event;
}
catch (Exception ex)
{
log.LogFatal(ex, w => w
.WriteProperty("action", "ParseEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventId", storedEvent.Data.EventId.ToString())
.WriteProperty("eventNumber", storedEvent.EventNumber));
throw;
}
dispatchEventBlock.Reset();
parseEventBlock.Reset();
queryEventsBlock.Reset();
updateStateBlock.Reset();
}
}
}

5
src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs

@ -8,15 +8,16 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventStore
{
IObservable<StoredEvent> GetEventsAsync(long lastReceivedEventNumber = -1);
IObservable<StoredEvent> GetEventsAsync(string streamName = null, long lastReceivedEventNumber = -1);
IObservable<StoredEvent> GetEventsAsync(string streamName);
Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamName = null, long lastReceivedEventNumber = -1);
Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable<EventData> events);
}

70
src/Squidex.Infrastructure/CQRS/Events/Internal/DispatchEventBlock.cs

@ -0,0 +1,70 @@
// ==========================================================================
// DispatchEventBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
internal sealed class DispatchEventBlock : EventReceiverBlock<Envelope<IEvent>, Envelope<IEvent>>
{
public DispatchEventBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log)
: base(true, eventConsumer, eventConsumerInfoRepository, log)
{
}
protected override async Task<Envelope<IEvent>> On(Envelope<IEvent> input)
{
var consumerName = EventConsumer.Name;
var eventId = input.Headers.EventId().ToString();
var eventType = input.Payload.GetType().Name;
try
{
Log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
await EventConsumer.On(input);
Log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Completed")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
return input;
}
catch (Exception ex)
{
Log.LogError(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", consumerName));
}
return null;
}
protected override long GetEventNumber(Envelope<IEvent> input)
{
return input.Headers.EventNumber();
}
}
}

118
src/Squidex.Infrastructure/CQRS/Events/Internal/EventReceiverBlock.cs

@ -0,0 +1,118 @@
// ==========================================================================
// EventReceiverBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
using System.Threading.Tasks.Dataflow;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
public abstract class EventReceiverBlock<TInput, TOutput>
{
private long lastEventNumber = -1;
protected ISemanticLog Log { get; }
protected IEventConsumer EventConsumer { get; }
protected IEventConsumerInfoRepository EventConsumerInfoRepository { get; }
public ITargetBlock<TInput> Target { get; }
public Task Completion
{
get { return Target.Completion; }
}
protected EventReceiverBlock(bool transform, IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log)
{
EventConsumer = eventConsumer;
EventConsumerInfoRepository = eventConsumerInfoRepository;
Log = log;
if (transform)
{
Target =
new TransformBlock<TInput, TOutput>(new Func<TInput, Task<TOutput>>(HandleAsync),
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
else
{
Target =
new ActionBlock<TInput>(HandleAsync,
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
}
public Task NextAsync(TInput input)
{
return Target.SendAsync(input);
}
public void NextOrThrowAway(TInput input)
{
Target.Post(input);
}
public void Complete()
{
Target.Complete();
}
public void Reset()
{
lastEventNumber = -1;
}
public void LinkTo(ITargetBlock<TOutput> other)
{
if (Target is TransformBlock<TInput, TOutput> transformBlock)
{
transformBlock.LinkTo(other, e => e != null);
}
}
protected abstract Task<TOutput> On(TInput input);
protected abstract long GetEventNumber(TInput input);
private async Task<TOutput> HandleAsync(TInput input)
{
try
{
var eventNumber = GetEventNumber(input);
if (eventNumber > lastEventNumber)
{
var envelope = await On(input);
lastEventNumber = eventNumber;
return envelope;
}
}
catch (Exception ex)
{
Log.LogFatal(ex, w => w.WriteProperty("action", "EventHandlingFailed"));
try
{
await EventConsumerInfoRepository.StopAsync(EventConsumer.Name, ex.ToString());
}
catch (Exception ex2)
{
Log.LogFatal(ex2, w => w.WriteProperty("action", "EventHandlingFailed"));
}
}
return default(TOutput);
}
}
}

52
src/Squidex.Infrastructure/CQRS/Events/Internal/ParseEventBlock.cs

@ -0,0 +1,52 @@
// ==========================================================================
// ParseEventBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
internal sealed class ParseEventBlock : EventReceiverBlock<StoredEvent, Envelope<IEvent>>
{
private readonly EventDataFormatter formatter;
public ParseEventBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log, EventDataFormatter formatter)
: base(true, eventConsumer, eventConsumerInfoRepository, log)
{
this.formatter = formatter;
}
protected override Task<Envelope<IEvent>> On(StoredEvent input)
{
Envelope<IEvent> result = null;
try
{
result = formatter.Parse(input.Data);
result.SetEventNumber(input.EventNumber);
result.SetEventStreamNumber(input.EventStreamNumber);
}
catch (Exception ex)
{
Log.LogFatal(ex, w => w
.WriteProperty("action", "ParseEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventId", input.Data.EventId.ToString())
.WriteProperty("eventNumber", input.EventNumber));
}
return Task.FromResult(result);
}
protected override long GetEventNumber(StoredEvent input)
{
return input.EventNumber;
}
}
}

102
src/Squidex.Infrastructure/CQRS/Events/Internal/QueryEventsBlock.cs

@ -0,0 +1,102 @@
// ==========================================================================
// QueryEventsBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Log;
// ReSharper disable InvertIf
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
internal sealed class QueryEventsBlock : EventReceiverBlock<object, object>
{
private readonly IEventStore eventStore;
private bool isStarted;
private long handled;
public Func<StoredEvent, Task> OnEvent { get; set; }
public Action OnReset { get; set; }
public QueryEventsBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log, IEventStore eventStore)
: base(false, eventConsumer, eventConsumerInfoRepository, log)
{
this.eventStore = eventStore;
}
protected override async Task<object> On(object input)
{
if (!isStarted)
{
await EventConsumerInfoRepository.CreateAsync(EventConsumer.Name);
isStarted = true;
}
var status = await EventConsumerInfoRepository.FindAsync(EventConsumer.Name);
var lastReceivedEventNumber = status.LastHandledEventNumber;
if (status.IsResetting)
{
await ResetAsync();
}
if (!status.IsStopped)
{
var ct = CancellationToken.None;
await eventStore.GetEventsAsync(storedEvent => OnEvent?.Invoke(storedEvent), ct, null, lastReceivedEventNumber);
}
return null;
}
private async Task ResetAsync()
{
var consumerName = EventConsumer.Name;
var actionId = Guid.NewGuid().ToString();
try
{
Log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Started")
.WriteProperty("eventConsumer", consumerName));
await EventConsumer.ClearAsync();
await EventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1);
Log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", consumerName));
OnReset?.Invoke();
}
catch (Exception ex)
{
Log.LogFatal(ex, w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", consumerName));
}
}
protected override long GetEventNumber(object input)
{
return handled++;
}
}
}

33
src/Squidex.Infrastructure/CQRS/Events/Internal/UpdateStateBlock.cs

@ -0,0 +1,33 @@
// ==========================================================================
// UpdateStateBlock.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
namespace Squidex.Infrastructure.CQRS.Events.Internal
{
public sealed class UpdateStateBlock : EventReceiverBlock<Envelope<IEvent>, Envelope<IEvent>>
{
public UpdateStateBlock(IEventConsumer eventConsumer, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log)
: base(false, eventConsumer, eventConsumerInfoRepository, log)
{
}
protected override async Task<Envelope<IEvent>> On(Envelope<IEvent> input)
{
await EventConsumerInfoRepository.SetLastHandledEventNumberAsync(EventConsumer.Name, input.Headers.EventNumber());
return input;
}
protected override long GetEventNumber(Envelope<IEvent> input)
{
return input.Headers.EventNumber();
}
}
}

2
src/Squidex.Infrastructure/Squidex.Infrastructure.csproj

@ -1,6 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<NoWarn>$(NoWarn);IDE0017</NoWarn>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DebugType>full</DebugType>
@ -15,5 +16,6 @@
<PackageReference Include="System.Reactive" Version="3.1.1" />
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.3.0" />
<PackageReference Include="System.Security.Claims" Version="4.3.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.7.0" />
</ItemGroup>
</Project>

6
tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs

@ -74,7 +74,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
[Fact]
public async Task Should_throw_exception_when_event_store_returns_no_events()
{
eventStore.Setup(x => x.GetEventsAsync(streamName)).Returns(Observable.Empty<StoredEvent>());
eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(Observable.Empty<StoredEvent>());
await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => sut.GetByIdAsync<MyDomainObject>(aggregateId));
}
@ -94,7 +94,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
new StoredEvent(1, 1, eventData2)
};
eventStore.Setup(x => x.GetEventsAsync(streamName)).Returns(events.ToObservable());
eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(events.ToObservable());
eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope<IEvent>(event1));
eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope<IEvent>(event2));
@ -119,7 +119,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
new StoredEvent(1, 1, eventData2)
};
eventStore.Setup(x => x.GetEventsAsync(streamName)).Returns(events.ToObservable());
eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(events.ToObservable());
eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope<IEvent>(event1));
eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope<IEvent>(event2));

23
tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

@ -8,7 +8,7 @@
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using Squidex.Infrastructure.Log;
@ -81,7 +81,7 @@ namespace Squidex.Infrastructure.CQRS.Events
consumerName = eventConsumer.Object.GetType().Name;
eventStore.Setup(x => x.GetEventsAsync(2)).Returns(events.ToObservable());
ExceptEvents(2);
eventConsumer.Setup(x => x.Name).Returns(consumerName);
eventConsumerInfoRepository.Setup(x => x.FindAsync(consumerName)).Returns(Task.FromResult<IEventConsumerInfo>(consumerInfo));
@ -175,8 +175,8 @@ namespace Squidex.Infrastructure.CQRS.Events
{
consumerInfo.IsResetting = true;
consumerInfo.LastHandledEventNumber = 2L;
eventStore.Setup(x => x.GetEventsAsync(-1)).Returns(events.ToObservable());
ExceptEvents(-1);
sut.Subscribe(eventConsumer.Object);
@ -191,5 +191,20 @@ namespace Squidex.Infrastructure.CQRS.Events
eventConsumer.Verify(x => x.ClearAsync(), Times.Once());
}
private void ExceptEvents(int eventNumber)
{
eventStore.Setup(x => x.GetEventsAsync(It.IsAny<Func<StoredEvent, Task>>(), CancellationToken.None, null, eventNumber))
.Callback<Func<StoredEvent, Task>, CancellationToken, string, long>(ReturnEvents)
.Returns(TaskHelper.Done);
}
private void ReturnEvents(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamName, long lastReceivedEventNumber)
{
foreach (var storedEvent in events)
{
callback(storedEvent).Wait(cancellationToken);
}
}
}
}

Loading…
Cancel
Save