Browse Source

Subscriptions refactored.

pull/130/head
Sebastian Stehle 9 years ago
parent
commit
f9b531db73
  1. 39
      src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs
  2. 59
      src/Squidex.Infrastructure.MongoDb/EventStore/PollingSubscription.cs
  3. 2
      src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs
  4. 86
      src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
  5. 9
      src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
  6. 18
      src/Squidex.Infrastructure/CQRS/Events/IEventSubscription.cs
  7. 2
      src/Squidex.Infrastructure/Timers/CompletionTimer.cs
  8. 2
      src/Squidex.Infrastructure/UsageTracking/BackgroundUsageTracker.cs
  9. 3
      tests/Benchmarks/Tests/AppendToEventStore.cs
  10. 3
      tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs
  11. 6
      tests/Benchmarks/Tests/HandleEvents.cs
  12. 6
      tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs
  13. 5
      tests/Benchmarks/Utils/Helper.cs
  14. 10
      tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs
  15. 57
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

39
src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs

@ -19,13 +19,8 @@ using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.Tasks;
// ReSharper disable PossibleInvalidOperationException
// ReSharper disable EmptyGeneralCatchClause
// ReSharper disable AccessToModifiedClosure
// ReSharper disable RedundantAssignment
// ReSharper disable InvertIf
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
// ReSharper disable TooWideLocalVariableScope
namespace Squidex.Infrastructure.MongoDb.EventStore
{
@ -72,17 +67,24 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
return collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true });
}
public IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, string position = null)
public IEventSubscription CreateSubscription(string streamFilter = null, string position = null)
{
return Observable.Create<StoredEvent>((observer, ct) =>
return new PollingSubscription(this, notifier, streamFilter, position);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, string position)
{
var result = await Observable.Create<StoredEvent>((observer, ct) =>
{
return GetEventsAsync(storedEvent =>
{
observer.OnNext(storedEvent);
return TaskHelper.Done;
}, ct, streamFilter, position);
});
}, ct, streamName, position);
}).ToList();
return result.ToList();
}
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
@ -91,8 +93,6 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
StreamPosition lastPosition = position;
var wasEndOfCommit = lastPosition.IsEndOfCommit;
var filter = CreateFilter(streamFilter, lastPosition);
await Collection.Find(filter).Sort(Sort.Ascending(EventStreamField)).ForEachAsync(async commit =>
@ -106,7 +106,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
{
eventStreamOffset++;
if (commitOffset > lastPosition.CommitOffset || wasEndOfCommit)
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp)
{
var eventData = new EventData { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type };
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
@ -214,7 +214,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
pendingCommits.Enqueue((document, cts));
timer.Trigger();
timer.Wakeup();
await cts.Task;
}
@ -263,18 +263,7 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
}
}
FilterDefinition<MongoEventCommit> filter = new BsonDocument();
if (filters.Count > 1)
{
filter = Filter.And(filters);
}
else if (filters.Count == 1)
{
filter = filters[0];
}
return filter;
return Filter.And(filters);
}
}
}

59
src/Squidex.Infrastructure.MongoDb/EventStore/PollingSubscription.cs

@ -0,0 +1,59 @@
// ==========================================================================
// PollingSubscription.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.Timers;
namespace Squidex.Infrastructure.MongoDb.EventStore
{
public sealed class PollingSubscription : DisposableObjectBase, IEventSubscription
{
private readonly IEventNotifier eventNotifier;
private readonly MongoEventStore eventStore;
private readonly string streamFilter;
private readonly string position;
private CompletionTimer timer;
public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier, string streamFilter, string position)
{
this.position = position;
this.eventStore = eventStore;
this.eventNotifier = eventNotifier;
this.streamFilter = streamFilter;
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
timer.Dispose();
}
}
public IEventSubscription Subscribe(Func<StoredEvent, Task> handler)
{
Guard.NotNull(handler, nameof(handler));
if (timer == null)
{
throw new InvalidOperationException("An handler has already been registered.");
}
timer = new CompletionTimer(5000, async ct =>
{
await eventStore.GetEventsAsync(handler, ct, streamFilter, position);
});
eventNotifier.Subscribe(timer.Wakeup);
return this;
}
}
}

2
src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs

@ -43,7 +43,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
{
var streamName = nameResolver.GetStreamName(typeof(TDomainObject), id);
var events = await eventStore.GetEventsAsync(streamName).ToList();
var events = await eventStore.GetEventsAsync(streamName);
if (events.Count == 0)
{

86
src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs

@ -11,10 +11,6 @@ using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Timers;
// ReSharper disable ExpressionIsAlwaysNull
// ReSharper disable ConvertToLambdaExpression
// ReSharper disable MethodSupportsCancellation
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
// ReSharper disable InvertIf
namespace Squidex.Infrastructure.CQRS.Events
@ -23,28 +19,25 @@ namespace Squidex.Infrastructure.CQRS.Events
{
private readonly EventDataFormatter formatter;
private readonly IEventStore eventStore;
private readonly IEventNotifier eventNotifier;
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository;
private readonly ISemanticLog log;
private IEventSubscription currentSubscription;
private CompletionTimer timer;
public EventReceiver(
EventDataFormatter formatter,
IEventStore eventStore,
IEventNotifier eventNotifier,
IEventConsumerInfoRepository eventConsumerInfoRepository,
ISemanticLog log)
{
Guard.NotNull(log, nameof(log));
Guard.NotNull(formatter, nameof(formatter));
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventNotifier, nameof(eventNotifier));
Guard.NotNull(eventConsumerInfoRepository, nameof(eventConsumerInfoRepository));
this.log = log;
this.formatter = formatter;
this.eventStore = eventStore;
this.eventNotifier = eventNotifier;
this.eventConsumerInfoRepository = eventConsumerInfoRepository;
}
@ -52,6 +45,17 @@ namespace Squidex.Infrastructure.CQRS.Events
{
if (disposing)
{
try
{
currentSubscription?.Dispose();
}
catch (Exception ex)
{
log.LogWarning(ex, w => w
.WriteProperty("action", "DisposeEventReceiver")
.WriteProperty("state", "Failed"));
}
try
{
timer?.Dispose();
@ -65,14 +69,14 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
public void Next()
public void Refresh()
{
ThrowIfDisposed();
timer?.Trigger();
timer?.Wakeup();
}
public void Subscribe(IEventConsumer eventConsumer, int delay = 5000)
public void Subscribe(IEventConsumer eventConsumer)
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
@ -86,7 +90,7 @@ namespace Squidex.Infrastructure.CQRS.Events
var consumerName = eventConsumer.Name;
var consumerStarted = false;
timer = new CompletionTimer(delay, async ct =>
timer = new CompletionTimer(5000, async ct =>
{
if (!consumerStarted)
{
@ -101,48 +105,62 @@ namespace Squidex.Infrastructure.CQRS.Events
var position = status.Position;
if (status.IsResetting)
if (status.IsResetting || status.IsStopped)
{
currentSubscription?.Dispose();
currentSubscription = null;
position = null;
await ResetAsync(eventConsumer, consumerName, position);
await ResetAsync(eventConsumer);
}
else if (status.IsStopped)
{
currentSubscription?.Dispose();
currentSubscription = null;
return;
}
await eventStore.GetEventsAsync(se =>
if (currentSubscription == null)
{
return HandleEventAsync(eventConsumer, se, consumerName);
}, ct, eventConsumer.EventsFilter, position);
Subscribe(eventConsumer, position);
}
}
catch (Exception ex)
{
log.LogFatal(ex, w => w.WriteProperty("action", "EventHandlingFailed"));
await eventConsumerInfoRepository.StopAsync(consumerName, ex.ToString());
}
});
eventNotifier.Subscribe(timer.Trigger);
}
private async Task HandleEventAsync(IEventConsumer eventConsumer, StoredEvent storedEvent, string consumerName)
private void Subscribe(IEventConsumer eventConsumer, string position)
{
var @event = ParseEvent(storedEvent);
var consumerName = eventConsumer.Name;
if (@event == null)
var subscription = eventStore.CreateSubscription(eventConsumer.EventsFilter, position);
subscription.Subscribe(async storedEvent =>
{
return;
}
try
{
await DispatchConsumer(ParseEvent(storedEvent), eventConsumer, eventConsumer.Name);
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, storedEvent.EventPosition, false);
}
catch (Exception ex)
{
await eventConsumerInfoRepository.StopAsync(consumerName, ex.ToString());
await DispatchConsumer(@event, eventConsumer, consumerName);
subscription.Dispose();
}
});
await eventConsumerInfoRepository.SetPositionAsync(consumerName, storedEvent.EventPosition, false);
currentSubscription = subscription;
}
private async Task ResetAsync(IEventConsumer eventConsumer, string consumerName, string position)
private async Task ResetAsync(IEventConsumer eventConsumer)
{
var actionId = Guid.NewGuid().ToString();
try
@ -151,16 +169,16 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Started")
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
.WriteProperty("eventConsumer", eventConsumer.Name));
await eventConsumer.ClearAsync();
await eventConsumerInfoRepository.SetPositionAsync(consumerName, position, true);
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, null, true);
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
.WriteProperty("eventConsumer", eventConsumer.Name));
}
catch (Exception ex)
{
@ -223,10 +241,6 @@ namespace Squidex.Infrastructure.CQRS.Events
return @event;
}
catch (TypeNameNotFoundException)
{
return null;
}
catch (Exception ex)
{
log.LogFatal(ex, w => w

9
src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs

@ -8,17 +8,16 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventStore
{
IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, string position = null);
Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken = default(CancellationToken), string streamFilter = null, string position = null);
Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, string position = null);
Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events);
IEventSubscription CreateSubscription(string streamFilter = null, string position = null);
}
}

18
src/Squidex.Infrastructure/CQRS/Events/IEventSubscription.cs

@ -0,0 +1,18 @@
// ==========================================================================
// IEventSubscription.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventSubscription : IDisposable
{
IEventSubscription Subscribe(Func<StoredEvent, Task> handler);
}
}

2
src/Squidex.Infrastructure/Timers/CompletionTimer.cs

@ -61,7 +61,7 @@ namespace Squidex.Infrastructure.Timers
}
}
public void Trigger()
public void Wakeup()
{
ThrowIfDisposed();

2
src/Squidex.Infrastructure/UsageTracking/BackgroundUsageTracker.cs

@ -67,7 +67,7 @@ namespace Squidex.Infrastructure.UsageTracking
{
ThrowIfDisposed();
timer.Trigger();
timer.Wakeup();
}
private async Task TrackAsync()

3
tests/Benchmarks/Tests/AppendToEventStore.cs

@ -12,7 +12,6 @@ using MongoDB.Driver;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.MongoDb.EventStore;
using Squidex.Infrastructure.Tasks;
namespace Benchmarks.Tests
{
@ -42,7 +41,7 @@ namespace Benchmarks.Tests
mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString());
eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()));
eventStore.GetEventsAsync(x => TaskHelper.Done).Wait();
eventStore.Warmup();
}
public long Run()

3
tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs

@ -13,7 +13,6 @@ using MongoDB.Driver;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.MongoDb.EventStore;
using Squidex.Infrastructure.Tasks;
namespace Benchmarks.Tests
{
@ -43,7 +42,7 @@ namespace Benchmarks.Tests
mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString());
eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()));
eventStore.GetEventsAsync(x => TaskHelper.Done).Wait();
eventStore.Warmup();
}
public long Run()

6
tests/Benchmarks/Tests/HandleEvents.cs

@ -8,6 +8,7 @@
using System;
using Benchmarks.Tests.TestData;
using Benchmarks.Utils;
using MongoDB.Driver;
using Newtonsoft.Json;
using Squidex.Infrastructure;
@ -15,7 +16,6 @@ using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.MongoDb.EventStore;
using Squidex.Infrastructure.Tasks;
// ReSharper disable InvertIf
@ -68,9 +68,9 @@ namespace Benchmarks.Tests
eventNotifier = new DefaultEventNotifier(new InMemoryPubSub());
eventStore = new MongoEventStore(mongoDatabase, eventNotifier);
eventStore.GetEventsAsync(x => TaskHelper.Done).Wait();
eventStore.Warmup();
eventReceiver = new EventReceiver(formatter, eventStore, eventNotifier, eventConsumerInfos, log);
eventReceiver = new EventReceiver(formatter, eventStore, eventConsumerInfos, log);
eventReceiver.Subscribe(eventConsumer);
}

6
tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs

@ -9,6 +9,7 @@
using System;
using System.Threading.Tasks;
using Benchmarks.Tests.TestData;
using Benchmarks.Utils;
using MongoDB.Driver;
using Newtonsoft.Json;
using Squidex.Infrastructure;
@ -16,7 +17,6 @@ using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.MongoDb.EventStore;
using Squidex.Infrastructure.Tasks;
// ReSharper disable InvertIf
@ -70,9 +70,9 @@ namespace Benchmarks.Tests
eventNotifier = new DefaultEventNotifier(new InMemoryPubSub());
eventStore = new MongoEventStore(mongoDatabase, eventNotifier);
eventStore.GetEventsAsync(x => TaskHelper.Done).Wait();
eventStore.Warmup();
eventReceiver = new EventReceiver(formatter, eventStore, eventNotifier, eventConsumerInfos, log);
eventReceiver = new EventReceiver(formatter, eventStore, eventConsumerInfos, log);
eventReceiver.Subscribe(eventConsumer);
}

5
tests/Benchmarks/Utils/Helper.cs

@ -7,6 +7,7 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using Squidex.Infrastructure.CQRS.Events;
namespace Benchmarks.Utils
@ -16,7 +17,11 @@ namespace Benchmarks.Utils
public static EventData CreateEventData()
{
return new EventData { EventId = Guid.NewGuid(), Metadata = "EventMetdata", Payload = "EventPayload", Type = "MyEvent" };
}
public static void Warmup(this IEventStore eventStore)
{
eventStore.AppendEventsAsync(Guid.NewGuid(), "my-stream", -1, new List<EventData> { CreateEventData() }).Wait();
}
}
}

10
tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs

@ -73,7 +73,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
[Fact]
public async Task Should_throw_exception_when_event_store_returns_no_events()
{
eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(Observable.Empty<StoredEvent>());
eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(Task.FromResult<IReadOnlyList<StoredEvent>>(new List<StoredEvent>()));
await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => sut.GetByIdAsync<MyDomainObject>(aggregateId));
}
@ -87,13 +87,13 @@ namespace Squidex.Infrastructure.CQRS.Commands
var event1 = new MyEvent();
var event2 = new MyEvent();
var events = new[]
var events = new List<StoredEvent>
{
new StoredEvent("0", 0, eventData1),
new StoredEvent("1", 1, eventData2)
};
eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(events.ToObservable());
eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(Task.FromResult<IReadOnlyList<StoredEvent>>(events));
eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope<IEvent>(event1));
eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope<IEvent>(event2));
@ -112,13 +112,13 @@ namespace Squidex.Infrastructure.CQRS.Commands
var event1 = new MyEvent();
var event2 = new MyEvent();
var events = new[]
var events = new List<StoredEvent>
{
new StoredEvent("0", 0, eventData1),
new StoredEvent("1", 1, eventData2)
};
eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(events.ToObservable());
eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(Task.FromResult<IReadOnlyList<StoredEvent>>(events));
eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope<IEvent>(event1));
eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope<IEvent>(event2));

57
tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

@ -8,7 +8,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Moq;
using Squidex.Infrastructure.Log;
@ -28,34 +27,58 @@ namespace Squidex.Infrastructure.CQRS.Events
private sealed class MyEventConsumerInfo : IEventConsumerInfo
{
public bool IsStopped { get; set; }
public bool IsResetting { get; set; }
public string Name { get; set; }
public string Error { get; set; }
public string Position { get; set; }
}
private sealed class MyEventStore : IEventStore
private sealed class MyEventSubscription : IEventSubscription
{
private readonly IEnumerable<StoredEvent> storedEvents;
private bool isDisposed;
public MyEventStore(IEnumerable<StoredEvent> storedEvents)
public MyEventSubscription(IEnumerable<StoredEvent> storedEvents)
{
this.storedEvents = storedEvents;
}
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
public IEventSubscription Subscribe(Func<StoredEvent, Task> handler)
{
foreach (var @event in storedEvents)
foreach (var storedEvent in storedEvents)
{
await callback(@event);
if (isDisposed)
{
break;
}
handler(storedEvent).Wait();
}
return this;
}
public void Dispose()
{
isDisposed = true;
}
}
private sealed class MyEventStore : IEventStore
{
private readonly IEnumerable<StoredEvent> storedEvents;
public MyEventStore(IEnumerable<StoredEvent> storedEvents)
{
this.storedEvents = storedEvents;
}
public IEventSubscription CreateSubscription(string streamFilter = null, string position = null)
{
return new MyEventSubscription(storedEvents);
}
public IObservable<StoredEvent> GetEventsAsync(string streamFilter = null, string position = null)
public Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, string position = null)
{
throw new NotSupportedException();
}
@ -101,7 +124,7 @@ namespace Squidex.Infrastructure.CQRS.Events
formatter.Setup(x => x.Parse(eventData2)).Returns(envelope2);
formatter.Setup(x => x.Parse(eventData3)).Returns(envelope3);
sut = new EventReceiver(formatter.Object, eventStore, eventNotifier.Object, eventConsumerInfoRepository.Object, log.Object);
sut = new EventReceiver(formatter.Object, eventStore, eventConsumerInfoRepository.Object, log.Object);
}
[Fact]
@ -109,7 +132,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
sut.Subscribe(eventConsumer.Object);
sut.Subscribe(eventConsumer.Object);
sut.Next();
sut.Refresh();
sut.Dispose();
eventConsumerInfoRepository.Verify(x => x.CreateAsync(consumerName), Times.Once());
@ -121,7 +144,7 @@ namespace Squidex.Infrastructure.CQRS.Events
consumerInfo.Position = "2";
sut.Subscribe(eventConsumer.Object);
sut.Next();
sut.Refresh();
sut.Dispose();
eventConsumer.Verify(x => x.On(envelope1), Times.Once());
@ -138,7 +161,7 @@ namespace Squidex.Infrastructure.CQRS.Events
eventConsumer.Setup(x => x.On(envelope2)).Throws(new InvalidOperationException());
sut.Subscribe(eventConsumer.Object);
sut.Next();
sut.Refresh();
sut.Dispose();
eventConsumer.Verify(x => x.On(envelope1), Times.Once());
@ -156,7 +179,7 @@ namespace Squidex.Infrastructure.CQRS.Events
formatter.Setup(x => x.Parse(eventData2)).Throws(new InvalidOperationException());
sut.Subscribe(eventConsumer.Object);
sut.Next();
sut.Refresh();
sut.Dispose();
eventConsumer.Verify(x => x.On(envelope1), Times.Once());
@ -173,7 +196,7 @@ namespace Squidex.Infrastructure.CQRS.Events
consumerInfo.Position = "2";
sut.Subscribe(eventConsumer.Object);
sut.Next();
sut.Refresh();
sut.Dispose();
eventConsumer.Verify(x => x.On(envelope1), Times.Once());

Loading…
Cancel
Save