Browse Source

Subscriptions simplified again.

pull/131/head
Sebastian Stehle 8 years ago
parent
commit
bdfe896ab0
  1. 1
      Squidex.ruleset
  2. 7
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs
  3. 168
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs
  4. 7
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs
  5. 93
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs
  6. 47
      src/Squidex.Infrastructure/Actors/Actor.cs
  7. 16
      src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs
  8. 9
      src/Squidex.Infrastructure/Actors/IActor.cs
  9. 14
      src/Squidex.Infrastructure/Actors/IMessage.cs
  10. 4
      src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs
  11. 23
      src/Squidex.Infrastructure/Actors/RemoteActors.cs
  12. 215
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
  13. 20
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs
  14. 4
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs
  15. 4
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs
  16. 6
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs
  17. 22
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs
  18. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
  19. 14
      src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs
  20. 5
      src/Squidex.Infrastructure/CQRS/Events/IEventSubscription.cs
  21. 4
      src/Squidex/Controllers/Api/Assets/AssetContentController.cs
  22. 21
      src/Squidex/Controllers/Api/EventConsumers/EventConsumersController.cs
  23. 9
      src/Squidex/Pipeline/CommandMiddlewares/EnrichWithActorCommandMiddleware.cs
  24. 36
      tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs
  25. 52
      tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs
  26. 55
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs

1
Squidex.ruleset

@ -75,6 +75,7 @@
<Rule Id="RECS0146" Action="Error" />
<Rule Id="RECS0026" Action="Error" />
<Rule Id="RECS0145" Action="None" />
<Rule Id="RECS0129" Action="None" />
</Rules>
<Rules AnalyzerId="Roslyn.Core" RuleNamespace="Roslyn.Core">
<Rule Id="AD0001" Action="None" />

7
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs

@ -44,9 +44,12 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
public IEventSubscription CreateSubscription()
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
{
return new GetEventStoreSubscription(connection, prefix, projectionHost);
Guard.NotNull(subscriber, nameof(subscriber));
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter));
return new GetEventStoreSubscription(connection, subscriber, projectionHost, prefix, position, streamFilter);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName)

168
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs

@ -17,11 +17,8 @@ using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.Projections;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.Tasks;
#pragma warning disable SA1401 // Fields must be private
namespace Squidex.Infrastructure.CQRS.Events
{
internal sealed class GetEventStoreSubscription : Actor, IEventSubscription
@ -31,36 +28,57 @@ namespace Squidex.Infrastructure.CQRS.Events
private static readonly TimeSpan TimeBetweenReconnects = TimeSpan.FromMinutes(5);
private static readonly ConcurrentDictionary<string, bool> SubscriptionsCreated = new ConcurrentDictionary<string, bool>();
private readonly IEventStoreConnection connection;
private readonly IEventSubscriber subscriber;
private readonly string prefix;
private readonly string streamName;
private readonly string streamFilter;
private readonly string projectionHost;
private readonly Queue<DateTime> reconnectTimes = new Queue<DateTime>();
private EventStoreCatchUpSubscription subscription;
private string streamFilter;
private string streamName;
private long? position;
private IActor parent;
private sealed class ConnectMessage : IMessage
private sealed class ESConnect
{
}
private sealed class ConnectionFailedMessage : IMessage
private abstract class ESMessage
{
public Exception Exception;
public EventStoreCatchUpSubscription Subscription { get; set; }
}
private sealed class ReceiveESEventMessage : IMessage
private sealed class ESSubscriptionFailed : ESMessage
{
public ResolvedEvent Event;
public Exception Exception { get; set; }
}
public EventStoreCatchUpSubscription Subscription;
private sealed class ESEventReceived : ESMessage
{
public ResolvedEvent Event { get; set; }
}
public GetEventStoreSubscription(IEventStoreConnection connection, string prefix, string projectionHost)
public GetEventStoreSubscription(
IEventStoreConnection connection,
IEventSubscriber subscriber,
string projectionHost,
string prefix,
string position,
string streamFilter)
{
this.prefix = prefix;
this.connection = connection;
this.position = ParsePosition(position);
this.prefix = prefix;
this.projectionHost = projectionHost;
this.streamFilter = streamFilter;
this.subscriber = subscriber;
streamName = ParseFilter(prefix, streamFilter);
DispatchAsync(new ESConnect()).Forget();
}
public Task StopAsync()
{
return StopAndWaitAsync();
}
protected override Task OnStop()
@ -72,95 +90,73 @@ namespace Squidex.Infrastructure.CQRS.Events
protected override async Task OnError(Exception exception)
{
if (parent != null)
{
await parent.SendAsync(exception);
}
await subscriber.OnErrorAsync(this, exception);
await StopAsync();
}
protected override async Task OnMessage(IMessage message)
protected override async Task OnMessage(object message)
{
switch (message)
{
case SubscribeMessage subscribe when parent == null:
{
parent = subscribe.Parent;
position = ParsePosition(subscribe.Position);
streamFilter = subscribe.StreamFilter;
streamName = $"by-{prefix.Simplify()}-{streamFilter.Simplify()}";
case ESConnect connect when subscription == null:
{
await InitializeAsync();
await CreateProjectionAsync();
subscription = SubscribeToStream();
SendAsync(new ConnectMessage()).Forget();
break;
}
break;
}
case ESSubscriptionFailed subscriptionFailed when subscriptionFailed.Subscription == subscription:
{
subscription.Stop();
subscription = null;
case ConnectionFailedMessage connectionFailed when parent != null && subscription == null:
if (CanReconnect(DateTime.UtcNow))
{
subscription.Stop();
subscription = null;
if (CanReconnect(DateTime.UtcNow))
{
Task.Delay(ReconnectWaitMs).ContinueWith(t => SendAsync(new ConnectMessage())).Forget();
}
else
{
await SendAsync(connectionFailed.Exception);
}
break;
Task.Delay(ReconnectWaitMs).ContinueWith(t => DispatchAsync(new ESConnect())).Forget();
}
case ConnectMessage connect when parent != null && subscription == null:
else
{
subscription = connection.SubscribeToStreamFrom(streamName, position, CatchUpSubscriptionSettings.Default, HandleEvent, null, HandleError);
break;
throw subscriptionFailed.Exception;
}
case ReceiveESEventMessage receiveEvent when parent != null:
{
if (receiveEvent.Subscription == subscription)
{
var storedEvent = Formatter.Read(receiveEvent.Event);
break;
}
await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent, Source = this });
case ESEventReceived eventReceived when eventReceived.Subscription == subscription:
{
var storedEvent = Formatter.Read(eventReceived.Event);
position = receiveEvent.Event.OriginalEventNumber;
}
await subscriber.OnEventAsync(this, storedEvent);
break;
}
}
}
position = eventReceived.Event.OriginalEventNumber;
private void HandleEvent(EventStoreCatchUpSubscription s, ResolvedEvent resolved)
{
SendAsync(new ReceiveESEventMessage { Event = resolved, Subscription = s }).Forget();
break;
}
}
}
private void HandleError(EventStoreCatchUpSubscription s, SubscriptionDropReason reason, Exception ex)
private EventStoreCatchUpSubscription SubscribeToStream()
{
if (reason == SubscriptionDropReason.ConnectionClosed && subscription == s)
{
SendAsync(new ConnectionFailedMessage { Exception = ex });
}
else if (reason != SubscriptionDropReason.UserInitiated && subscription == s)
{
var exception = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}.");
var settings = CatchUpSubscriptionSettings.Default;
SendAsync(ex).Forget();
}
}
return connection.SubscribeToStreamFrom(streamName, position, settings,
(s, e) =>
{
DispatchAsync(new ESEventReceived { Event = e, Subscription = s }).Forget();
}, null,
(s, reason, ex) =>
{
if (reason == SubscriptionDropReason.ConnectionClosed ||
reason == SubscriptionDropReason.UserInitiated)
{
ex = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}.");
private static long? ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
DispatchAsync(new ESSubscriptionFailed { Exception = ex, Subscription = s }).Forget();
}
});
}
private bool CanReconnect(DateTime utcNow)
@ -175,7 +171,7 @@ namespace Squidex.Infrastructure.CQRS.Events
return reconnectTimes.Count < ReconnectWindowMax && (reconnectTimes.Count == 0 || (utcNow - reconnectTimes.Peek()) > TimeBetweenReconnects);
}
private async Task CreateProjectionAsync()
private async Task InitializeAsync()
{
if (SubscriptionsCreated.TryAdd(streamName, true))
{
@ -193,7 +189,9 @@ namespace Squidex.Infrastructure.CQRS.Events
try
{
await projectsManager.CreateContinuousAsync($"${streamName}", projectionConfig, connection.Settings.DefaultUserCredentials);
var credentials = connection.Settings.DefaultUserCredentials;
await projectsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials);
}
catch (Exception ex)
{
@ -205,6 +203,16 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
private static string ParseFilter(string prefix, string filter)
{
return $"by-{prefix.Simplify()}-{filter.Simplify()}";
}
private static long? ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
private async Task<ProjectionsManager> ConnectToProjections()
{
var addressParts = projectionHost.Split(':');

7
src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs

@ -55,9 +55,12 @@ namespace Squidex.Infrastructure.CQRS.Events
collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true }));
}
public IEventSubscription CreateSubscription()
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
{
return new PollingSubscription(this, notifier);
Guard.NotNull(subscriber, nameof(subscriber));
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter));
return new PollingSubscription(this, notifier, subscriber, streamFilter, position);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName)

93
src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs

@ -11,110 +11,107 @@ using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.Tasks;
#pragma warning disable SA1401 // Fields must be private
namespace Squidex.Infrastructure.CQRS.Events
{
public sealed class PollingSubscription : Actor, IEventSubscription
{
private readonly IEventNotifier eventNotifier;
private readonly MongoEventStore eventStore;
private readonly CancellationTokenSource pollStop = new CancellationTokenSource();
private Regex streamRegex;
private string streamFilter;
private readonly IEventNotifier notifier;
private readonly MongoEventStore store;
private readonly CancellationTokenSource disposeToken = new CancellationTokenSource();
private readonly Regex streamRegex;
private readonly string streamFilter;
private readonly IEventSubscriber subscriber;
private string position;
private bool isPolling;
private IDisposable pollSubscription;
private IActor parent;
private IDisposable notification;
private sealed class Connect
{
}
private sealed class StartPollMessage : IMessage
private sealed class StartPoll
{
}
private sealed class StopPollMessage : IMessage
private sealed class StopPoll
{
}
public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier)
public PollingSubscription(MongoEventStore store, IEventNotifier notifier, IEventSubscriber subscriber, string streamFilter, string position)
{
this.eventStore = eventStore;
this.eventNotifier = eventNotifier;
this.notifier = notifier;
this.store = store;
this.streamFilter = streamFilter;
this.subscriber = subscriber;
streamRegex = new Regex(streamFilter);
DispatchAsync(new Connect()).Forget();
}
public Task StopAsync()
{
return StopAndWaitAsync();
}
protected override Task OnStop()
{
pollStop?.Cancel();
pollSubscription?.Dispose();
disposeToken?.Cancel();
parent = null;
notification?.Dispose();
return TaskHelper.Done;
}
protected override async Task OnError(Exception exception)
{
if (parent != null)
{
await parent.SendAsync(exception);
}
await subscriber.OnErrorAsync(this, exception);
await StopAsync();
}
protected override async Task OnMessage(IMessage message)
protected override async Task OnMessage(object message)
{
switch (message)
{
case SubscribeMessage subscribe when parent == null:
case Connect connect:
{
parent = subscribe.Parent;
position = subscribe.Position;
streamFilter = subscribe.StreamFilter;
streamRegex = new Regex(streamFilter);
pollSubscription = eventNotifier.Subscribe(streamName =>
notification = notifier.Subscribe(streamName =>
{
if (streamRegex.IsMatch(streamName))
{
SendAsync(new StartPollMessage()).Forget();
DispatchAsync(new StartPoll()).Forget();
}
});
SendAsync(new StartPollMessage()).Forget();
break;
}
case StartPollMessage poll when parent != null:
case StartPoll poll when !isPolling:
{
if (!isPolling)
{
isPolling = true;
isPolling = true;
PollAsync().Forget();
}
PollAsync().Forget();
break;
}
case StopPollMessage poll when parent != null:
case StopPoll poll when isPolling:
{
isPolling = false;
Task.Delay(5000).ContinueWith(t => SendAsync(new StartPollMessage())).Forget();
Task.Delay(5000).ContinueWith(t => DispatchAsync(new StartPoll())).Forget();
break;
}
case ReceiveEventMessage receiveEvent when parent != null:
case StoredEvent storedEvent:
{
await parent.SendAsync(receiveEvent);
await subscriber.OnEventAsync(this, storedEvent);
position = receiveEvent.Event.EventPosition;
position = storedEvent.EventPosition;
break;
}
@ -125,15 +122,15 @@ namespace Squidex.Infrastructure.CQRS.Events
{
try
{
await eventStore.GetEventsAsync(e => SendAsync(new ReceiveEventMessage { Event = e, Source = this }), pollStop.Token, streamFilter, position);
await store.GetEventsAsync(e => DispatchAsync(e), disposeToken.Token, streamFilter, position);
await SendAsync(new StopPollMessage());
await DispatchAsync(new StopPoll());
}
catch (Exception ex)
{
if (!ex.Is<OperationCanceledException>())
{
await SendAsync(ex);
await FailAsync(ex);
}
}
}

47
src/Squidex.Infrastructure/Actors/Actor.cs

@ -11,52 +11,57 @@ using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Tasks;
#pragma warning disable SA1401 // Fields must be private
namespace Squidex.Infrastructure.Actors
{
public abstract class Actor : IActor, IDisposable
public abstract class Actor : IDisposable
{
private readonly ActionBlock<IMessage> block;
private readonly ActionBlock<object> block;
private bool isStopped;
private sealed class StopMessage : IMessage
private sealed class StopMessage
{
}
private sealed class ErrorMessage : IMessage
private sealed class ErrorMessage
{
public Exception Exception;
public Exception Exception { get; set; }
}
protected Actor()
{
block = new ActionBlock<IMessage>(Handle, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 10 });
var options = new ExecutionDataflowBlockOptions
{
MaxMessagesPerTask = -1,
MaxDegreeOfParallelism = 1,
BoundedCapacity = 10
};
block = new ActionBlock<object>(Handle, options);
}
public void Dispose()
{
StopAsync().Wait();
StopAndWaitAsync().Wait();
}
public async Task StopAsync()
{
await block.SendAsync(new StopMessage());
await block.Completion;
}
public Task SendAsync(IMessage message)
protected async Task DispatchAsync(object message)
{
Guard.NotNull(message, nameof(message));
return block.SendAsync(message);
await block.SendAsync(message);
}
public Task SendAsync(Exception exception)
protected async Task FailAsync(Exception exception)
{
Guard.NotNull(exception, nameof(exception));
return block.SendAsync(new ErrorMessage { Exception = exception });
await block.SendAsync(new ErrorMessage { Exception = exception });
}
protected async Task StopAndWaitAsync()
{
await block.SendAsync(new StopMessage());
await block.Completion;
}
protected virtual Task OnStop()
@ -69,12 +74,12 @@ namespace Squidex.Infrastructure.Actors
return TaskHelper.Done;
}
protected virtual Task OnMessage(IMessage message)
protected virtual Task OnMessage(object message)
{
return TaskHelper.Done;
}
private async Task Handle(IMessage message)
private async Task Handle(object message)
{
if (isStopped)
{

16
src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs

@ -42,22 +42,22 @@ namespace Squidex.Infrastructure.Actors
serializer = JsonSerializer.Create(serializerSettings ?? new JsonSerializerSettings());
}
public Task SendAsync(string recipient, IMessage message)
public Task SendAsync(string recipient, object message)
{
Guard.NotNullOrEmpty(recipient, nameof(recipient));
Guard.NotNull(message, nameof(message));
var messageType = typeNameRegistry.GetName(message.GetType());
var messagePayload = WriteJson(message);
var messageBody = WriteJson(message);
var envelope = new Envelope { Recipient = recipient, Payload = messagePayload, PayloadType = messageType };
var envelope = new Envelope { Recipient = recipient, Payload = messageBody, PayloadType = messageType };
pubSub.Publish(ChannelName, JsonConvert.SerializeObject(envelope), true);
return TaskHelper.Done;
}
public void Subscribe(string recipient, Action<IMessage> handler)
public void Subscribe(string recipient, Action<object> handler)
{
Guard.NotNullOrEmpty(recipient, nameof(recipient));
@ -68,16 +68,16 @@ namespace Squidex.Infrastructure.Actors
if (string.Equals(envelope.Recipient, recipient, StringComparison.OrdinalIgnoreCase))
{
var messageType = typeNameRegistry.GetType(envelope.PayloadType);
var messagePayload = ReadJson<IMessage>(envelope.Payload, messageType);
var messageBody = ReadJson(envelope.Payload, messageType);
handler?.Invoke(messagePayload);
handler?.Invoke(messageBody);
}
});
}
private T ReadJson<T>(JToken token, Type type = null)
private object ReadJson(JToken token, Type type)
{
return (T)token.ToObject(type ?? typeof(T), serializer);
return token.ToObject(type, serializer);
}
private JToken WriteJson(object value)

9
src/Squidex.Infrastructure/Actors/IActor.cs

@ -6,17 +6,10 @@
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.Actors
{
public interface IActor
{
Task SendAsync(IMessage message);
Task SendAsync(Exception exception);
Task StopAsync();
void Tell(object message);
}
}

14
src/Squidex.Infrastructure/Actors/IMessage.cs

@ -1,14 +0,0 @@
// ==========================================================================
// IMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.Actors
{
public interface IMessage
{
}
}

4
src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs

@ -13,8 +13,8 @@ namespace Squidex.Infrastructure.Actors
{
public interface IRemoteActorChannel
{
Task SendAsync(string recipient, IMessage message);
Task SendAsync(string recipient, object message);
void Subscribe(string recipient, Action<IMessage> handler);
void Subscribe(string recipient, Action<object> handler);
}
}

23
src/Squidex.Infrastructure/Actors/RemoteActors.cs

@ -6,9 +6,7 @@
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.Actors
@ -26,24 +24,14 @@ namespace Squidex.Infrastructure.Actors
public Sender(IRemoteActorChannel channel, string recipient)
{
this.channel = channel;
this.recipient = recipient;
}
public Task SendAsync(IMessage message)
{
return channel.SendAsync(recipient, message);
}
public Task SendAsync(Exception exception)
{
throw new NotSupportedException();
this.channel = channel;
}
public Task StopAsync()
public void Tell(object message)
{
throw new NotSupportedException();
channel.SendAsync(recipient, message).Forget();
}
}
@ -66,10 +54,7 @@ namespace Squidex.Infrastructure.Actors
Guard.NotNullOrEmpty(id, nameof(id));
Guard.NotNull(actor, nameof(actor));
channel.Subscribe(id, message =>
{
actor.SendAsync(message).Forget();
});
channel.Subscribe(id, actor.Tell);
}
}
}

215
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs

@ -15,7 +15,7 @@ using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public sealed class EventConsumerActor : Actor
public sealed class EventConsumerActor : Actor, IEventSubscriber, IActor
{
private readonly EventDataFormatter formatter;
private readonly IEventStore eventStore;
@ -23,9 +23,29 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
private readonly ISemanticLog log;
private IEventSubscription eventSubscription;
private IEventConsumer eventConsumer;
private bool isStarted;
private bool isRunning;
private bool isSetup;
private sealed class Setup
{
public IEventConsumer EventConsumer { get; set; }
}
private abstract class SubscriptionMessage
{
public IEventSubscription Subscription { get; set; }
}
private sealed class SubscriptionEventReceived : SubscriptionMessage
{
public StoredEvent Event { get; set; }
}
private sealed class SubscriptionFailed : SubscriptionMessage
{
public Exception Exception { get; set; }
}
public EventConsumerActor(
EventDataFormatter formatter,
IEventStore eventStore,
@ -48,7 +68,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
return SendAsync(new SetupConsumerMessage { EventConsumer = eventConsumer });
return DispatchAsync(new Setup { EventConsumer = eventConsumer });
}
protected override async Task OnStop()
@ -61,30 +81,58 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
protected override Task OnError(Exception exception)
{
log.LogError(exception, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventConsumer", eventConsumer.Name));
return StopAsync(exception);
}
protected override async Task OnMessage(IMessage message)
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent @event)
{
return DispatchAsync(new SubscriptionEventReceived { Subscription = subscription, Event = @event });
}
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return DispatchAsync(new SubscriptionFailed { Subscription = subscription, Exception = exception });
}
void IActor.Tell(object message)
{
DispatchAsync(message).Forget();
}
protected override async Task OnMessage(object message)
{
switch (message)
{
case SetupConsumerMessage setupConsumer when !isSetup:
case Setup setup when !isSetup:
{
await SetupAsync(setupConsumer.EventConsumer);
eventConsumer = setup.EventConsumer;
await SetupAsync();
isSetup = true;
break;
}
case StartConsumerMessage startConsumer when isSetup && !isStarted:
case StartConsumerMessage startConsumer when isSetup && !isRunning:
{
await StartAsync();
isRunning = true;
break;
}
case StopConsumerMessage stopConsumer when isSetup && isStarted:
case StopConsumerMessage stopConsumer when isSetup && isRunning:
{
await StopAsync(stopConsumer.Exception);
await StopAsync();
isRunning = false;
break;
}
@ -95,15 +143,28 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
await ResetAsync();
await StartAsync();
isRunning = true;
break;
}
case SubscriptionFailed subscriptionFailed when isSetup:
{
if (subscriptionFailed.Subscription == eventSubscription)
{
await FailAsync(subscriptionFailed.Exception);
}
break;
}
case ReceiveEventMessage receiveEvent when isSetup:
case SubscriptionEventReceived eventReceived when isSetup:
{
if (receiveEvent.Source == eventSubscription)
if (eventReceived.Subscription == eventSubscription)
{
await DispatchConsumerAsync(ParseEvent(receiveEvent.Event));
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, receiveEvent.Event.EventPosition, false);
var @event = ParseEvent(eventReceived.Event);
await DispatchConsumerAsync(@event, eventReceived.Event.EventPosition);
}
break;
@ -111,137 +172,89 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
}
}
private async Task SetupAsync(IEventConsumer consumer)
private async Task SetupAsync()
{
eventConsumer = consumer;
await eventConsumerInfoRepository.CreateAsync(eventConsumer.Name);
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
if (!status.IsStopped)
{
SendAsync(new StartConsumerMessage()).Forget();
DispatchAsync(new StartConsumerMessage()).Forget();
}
isSetup = true;
}
private async Task StartAsync()
{
await eventConsumerInfoRepository.StartAsync(eventConsumer.Name);
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
var position = status.Position;
eventSubscription = eventStore.CreateSubscription();
eventSubscription.SendAsync(new SubscribeMessage { Parent = this, StreamFilter = eventConsumer.EventsFilter, Position = position }).Forget();
eventSubscription = eventStore.CreateSubscription(this, streamFilter: eventConsumer.EventsFilter, position: status.Position);
isStarted = true;
await eventConsumerInfoRepository.StartAsync(eventConsumer.Name);
}
private async Task StopAsync(Exception exception = null)
{
await eventConsumerInfoRepository.StopAsync(eventConsumer.Name, exception?.ToString());
await eventSubscription.StopAsync();
isStarted = false;
await eventConsumerInfoRepository.StopAsync(eventConsumer.Name, exception?.ToString());
}
private async Task ResetAsync()
{
await eventConsumerInfoRepository.ResetAsync(eventConsumer.Name);
var actionId = Guid.NewGuid().ToString();
try
{
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Started")
.WriteProperty("eventConsumer", eventConsumer.Name));
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Started")
.WriteProperty("eventConsumer", eventConsumer.Name));
using (log.MeasureTrace(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.Name)))
{
await eventConsumerInfoRepository.ResetAsync(eventConsumer.Name);
await eventConsumer.ClearAsync();
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, null, true);
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.Name));
}
catch (Exception ex)
{
log.LogFatal(ex, w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
throw;
}
}
private async Task DispatchConsumerAsync(Envelope<IEvent> @event)
private async Task DispatchConsumerAsync(Envelope<IEvent> @event, string position)
{
var eventId = @event.Headers.EventId().ToString();
var eventType = @event.Payload.GetType().Name;
try
{
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.Name));
await eventConsumer.On(@event);
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Completed")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.Name));
}
catch (Exception ex)
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.Name));
using (log.MeasureTrace(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Completed")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.Name)))
{
log.LogError(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.Name));
throw;
await eventConsumer.On(@event);
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, position, false);
}
}
private Envelope<IEvent> ParseEvent(StoredEvent message)
{
try
{
var @event = formatter.Parse(message.Data);
var @event = formatter.Parse(message.Data);
@event.SetEventPosition(message.EventPosition);
@event.SetEventStreamNumber(message.EventStreamNumber);
@event.SetEventPosition(message.EventPosition);
@event.SetEventStreamNumber(message.EventStreamNumber);
return @event;
}
catch (Exception ex)
{
log.LogFatal(ex, w => w
.WriteProperty("action", "ParseEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventId", message.Data.EventId.ToString())
.WriteProperty("eventPosition", message.EventPosition));
throw;
}
return @event;
}
}
}

20
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs

@ -1,20 +0,0 @@
// ==========================================================================
// ReceiveEventMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(ReceiveEventMessage))]
public sealed class ReceiveEventMessage : IMessage
{
public StoredEvent Event { get; set; }
public IEventSubscription Source { get; set; }
}
}

4
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs

@ -6,12 +6,10 @@
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(ResetConsumerMessage))]
public sealed class ResetConsumerMessage : IMessage
public sealed class ResetConsumerMessage
{
}
}

4
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs

@ -6,12 +6,10 @@
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(StartConsumerMessage))]
public sealed class StartConsumerMessage : IMessage
public sealed class StartConsumerMessage
{
}
}

6
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs

@ -6,14 +6,10 @@
// All rights reserved.
// ==========================================================================
using System;
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(StopConsumerMessage))]
public sealed class StopConsumerMessage : IMessage
public sealed class StopConsumerMessage
{
public Exception Exception { get; set; }
}
}

22
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs

@ -1,22 +0,0 @@
// ==========================================================================
// SubscribeMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(SubscribeMessage))]
public sealed class SubscribeMessage : IMessage
{
public string StreamFilter { get; set; }
public string Position { get; set; }
public IActor Parent { get; set; }
}
}

2
src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs

@ -20,6 +20,6 @@ namespace Squidex.Infrastructure.CQRS.Events
Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events);
IEventSubscription CreateSubscription();
IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null);
}
}

14
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SetupConsumerMessage.cs → src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs

@ -1,18 +1,20 @@
// ==========================================================================
// SetupConsumerMessage.cs
// IEventSubscriber.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
using System;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
namespace Squidex.Infrastructure.CQRS.Events
{
[TypeName(nameof(SetupConsumerMessage))]
public sealed class SetupConsumerMessage : IMessage
public interface IEventSubscriber
{
public IEventConsumer EventConsumer { get; set; }
Task OnEventAsync(IEventSubscription subscription, StoredEvent @event);
Task OnErrorAsync(IEventSubscription subscription, Exception exception);
}
}

5
src/Squidex.Infrastructure/CQRS/Events/IEventSubscription.cs

@ -6,11 +6,12 @@
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventSubscription : IActor
public interface IEventSubscription
{
Task StopAsync();
}
}

4
src/Squidex/Controllers/Api/Assets/AssetContentController.cs

@ -119,6 +119,4 @@ namespace Squidex.Controllers.Api.Assets
FileOptions.SequentialScan);
}
}
}
#pragma warning restore 1573
}

21
src/Squidex/Controllers/Api/EventConsumers/EventConsumersController.cs

@ -49,14 +49,11 @@ namespace Squidex.Controllers.Api.EventConsumers
[HttpPut]
[Route("event-consumers/{name}/start")]
[ApiCosts(0)]
public async Task<IActionResult> Start(string name)
public IActionResult Start(string name)
{
var actor = actors.Get(name);
if (actor != null)
{
await actor.SendAsync(new StartConsumerMessage());
}
actor?.Tell(new StartConsumerMessage());
return NoContent();
}
@ -64,14 +61,11 @@ namespace Squidex.Controllers.Api.EventConsumers
[HttpPut]
[Route("event-consumers/{name}/stop")]
[ApiCosts(0)]
public async Task<IActionResult> Stop(string name)
public IActionResult Stop(string name)
{
var actor = actors.Get(name);
if (actor != null)
{
await actor.SendAsync(new StopConsumerMessage());
}
actor?.Tell(new StopConsumerMessage());
return NoContent();
}
@ -79,14 +73,11 @@ namespace Squidex.Controllers.Api.EventConsumers
[HttpPut]
[Route("event-consumers/{name}/reset")]
[ApiCosts(0)]
public async Task<IActionResult> Reset(string name)
public IActionResult Reset(string name)
{
var actor = actors.Get(name);
if (actor != null)
{
await actor.SendAsync(new ResetConsumerMessage());
}
actor?.Tell(new ResetConsumerMessage());
return NoContent();
}

9
src/Squidex/Pipeline/CommandMiddlewares/EnrichWithActorCommandMiddleware.cs

@ -34,14 +34,7 @@ namespace Squidex.Pipeline.CommandMiddlewares
FindActorFromSubject() ??
FindActorFromClient();
#pragma warning disable
if (actorToken == null)
{
throw new SecurityException("No actor with subject or client id available");
}
#pragma warning enable
squidexCommand.Actor = actorToken;
squidexCommand.Actor = actorToken ?? throw new SecurityException("No actor with subject or client id available");
}
return next();

36
tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs

@ -6,7 +6,6 @@
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FluentAssertions;
@ -18,16 +17,21 @@ namespace Squidex.Infrastructure.Actors
public class ActorRemoteTests
{
[TypeName(nameof(SuccessMessage))]
public class SuccessMessage : IMessage
public class SuccessMessage : object
{
public int Counter { get; set; }
}
private sealed class MyActor : Actor
private sealed class MyActor : Actor, IActor
{
public List<IMessage> Invokes { get; } = new List<IMessage>();
public List<object> Invokes { get; } = new List<object>();
protected override Task OnMessage(IMessage message)
public void Tell(object message)
{
DispatchAsync(message).Forget();
}
protected override Task OnMessage(object message)
{
Invokes.Add(message);
@ -51,25 +55,13 @@ namespace Squidex.Infrastructure.Actors
}
[Fact]
public void Should_throw_exception_when_stopping_remote_actor()
{
Assert.Throws<NotSupportedException>(() => remoteActor.StopAsync().Forget());
}
[Fact]
public void Should_throw_exception_when_sending_exception_to_remote_actor()
{
Assert.Throws<NotSupportedException>(() => remoteActor.SendAsync(new InvalidOperationException()).Forget());
}
[Fact]
public async Task Should_handle_messages_sequentially()
public void Should_handle_messages_sequentially()
{
remoteActor.SendAsync(new SuccessMessage { Counter = 1 }).Forget();
remoteActor.SendAsync(new SuccessMessage { Counter = 2 }).Forget();
remoteActor.SendAsync(new SuccessMessage { Counter = 3 }).Forget();
remoteActor.Tell(new SuccessMessage { Counter = 1 });
remoteActor.Tell(new SuccessMessage { Counter = 2 });
remoteActor.Tell(new SuccessMessage { Counter = 3 });
await actor.StopAsync();
actor.Dispose();
actor.Invokes.ShouldBeEquivalentTo(new List<object>
{

52
tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs

@ -18,19 +18,34 @@ namespace Squidex.Infrastructure.Actors
{
public class ActorTests
{
public class SuccessMessage : IMessage
public class SuccessMessage
{
public int Counter { get; set; }
}
public class FailedMessage : IMessage
public class FailedMessage
{
}
private sealed class MyActor : Actor
private sealed class MyActor : Actor, IActor
{
public List<object> Invokes { get; } = new List<object>();
public void Tell(Exception exception)
{
FailAsync(exception).Forget();
}
public void Tell(object message)
{
DispatchAsync(message).Forget();
}
public Task StopAsync()
{
return StopAndWaitAsync();
}
protected override Task OnStop()
{
Invokes.Add(true);
@ -45,7 +60,7 @@ namespace Squidex.Infrastructure.Actors
return TaskHelper.Done;
}
protected override Task OnMessage(IMessage message)
protected override Task OnMessage(object message)
{
if (message is FailedMessage)
{
@ -63,7 +78,7 @@ namespace Squidex.Infrastructure.Actors
[Fact]
public async Task Should_invoke_with_exception()
{
sut.SendAsync(new InvalidOperationException()).Forget();
sut.Tell(new InvalidOperationException());
await sut.StopAsync();
@ -73,9 +88,9 @@ namespace Squidex.Infrastructure.Actors
[Fact]
public async Task Should_handle_messages_sequentially()
{
sut.SendAsync(new SuccessMessage { Counter = 1 }).Forget();
sut.SendAsync(new SuccessMessage { Counter = 2 }).Forget();
sut.SendAsync(new SuccessMessage { Counter = 3 }).Forget();
sut.Tell(new SuccessMessage { Counter = 1 });
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
await sut.StopAsync();
@ -91,9 +106,9 @@ namespace Squidex.Infrastructure.Actors
[Fact]
public async Task Should_raise_error_event_when_event_handling_failed()
{
sut.SendAsync(new FailedMessage()).Forget();
sut.SendAsync(new SuccessMessage { Counter = 2 }).Forget();
sut.SendAsync(new SuccessMessage { Counter = 3 }).Forget();
sut.Tell(new FailedMessage());
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
await sut.StopAsync();
@ -110,13 +125,14 @@ namespace Squidex.Infrastructure.Actors
[Fact]
public async Task Should_not_handle_messages_after_stop()
{
sut.SendAsync(new SuccessMessage { Counter = 1 }).Forget();
sut.Tell(new SuccessMessage { Counter = 1 });
await sut.StopAsync();
sut.SendAsync(new SuccessMessage { Counter = 2 }).Forget();
sut.SendAsync(new SuccessMessage { Counter = 3 }).Forget();
sut.SendAsync(new InvalidOperationException()).Forget();
sut.Tell(new SuccessMessage { Counter = 2 });
sut.Tell(new SuccessMessage { Counter = 3 });
sut.Tell(new InvalidOperationException());
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
@ -126,14 +142,12 @@ namespace Squidex.Infrastructure.Actors
}
[Fact]
public async Task Should_call_stop_on_dispose()
public void Should_call_stop_on_dispose()
{
sut.SendAsync(new SuccessMessage { Counter = 1 }).Forget();
sut.Tell(new SuccessMessage { Counter = 1 });
sut.Dispose();
await sut.StopAsync();
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },

55
tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs

@ -9,6 +9,7 @@
using System;
using System.Threading.Tasks;
using FakeItEasy;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.Log;
using Xunit;
@ -24,10 +25,13 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
private sealed class MyEventConsumerInfo : IEventConsumerInfo
{
public bool IsStopped { get; set; }
public bool IsResetting { get; set; }
public string Name { get; set; }
public string Error { get; set; }
public string Position { get; set; }
}
@ -36,6 +40,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventSubscription eventSubscription = A.Fake<IEventSubscription>();
private readonly ISemanticLog log = A.Fake<ISemanticLog>();
private readonly IActor sutActor;
private readonly IEventSubscriber sutSubscriber;
private readonly EventDataFormatter formatter = A.Fake<EventDataFormatter>();
private readonly EventData eventData = new EventData();
private readonly Envelope<IEvent> envelope = new Envelope<IEvent>(new MyEvent());
@ -48,7 +54,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
consumerInfo.Position = Guid.NewGuid().ToString();
consumerName = eventConsumer.GetType().Name;
A.CallTo(() => eventStore.CreateSubscription()).Returns(eventSubscription);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored)).Returns(eventSubscription);
A.CallTo(() => eventConsumer.Name).Returns(consumerName);
A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(consumerInfo);
@ -56,6 +62,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log);
sutActor = sut;
sutSubscriber = sut;
}
[Fact]
@ -63,16 +72,13 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
await SubscribeAsync();
await sut.StopAsync();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
.MustHaveHappened();
A.CallTo(() => eventSubscription.SendAsync(A<SubscribeMessage>.That.Matches(s => s.Parent == sut && s.Position == consumerInfo.Position)))
.MustHaveHappened();
}
[Fact]
@ -80,8 +86,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
await SubscribeAsync();
await sut.SendAsync(new StopConsumerMessage());
await sut.StopAsync();
sutActor.Tell(new StopConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
.MustHaveHappened();
@ -94,9 +101,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened();
A.CallTo(() => eventSubscription.SendAsync(A<SubscribeMessage>.That.Matches(s => s.Parent == sut && s.Position == consumerInfo.Position)))
.MustHaveHappened();
}
[Fact]
@ -104,8 +108,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
await SubscribeAsync();
await sut.SendAsync(new ResetConsumerMessage());
await sut.StopAsync();
sutActor.Tell(new ResetConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
.MustHaveHappened();
@ -122,9 +126,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
A.CallTo(() => eventConsumer.ClearAsync())
.MustHaveHappened();
A.CallTo(() => eventSubscription.SendAsync(A<SubscribeMessage>.That.Matches(s => s.Parent == sut && s.Position == consumerInfo.Position)))
.MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened();
}
@ -136,8 +137,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
await SubscribeAsync();
await sut.SendAsync(new ReceiveEventMessage { Event = @event, Source = eventSubscription });
await sut.StopAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
@ -153,8 +155,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
await SubscribeAsync();
await sut.SendAsync(new ReceiveEventMessage { Event = @event });
await sut.StopAsync();
await sutSubscriber.OnEventAsync(A.Fake<IEventSubscription>(), @event);
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
@ -175,8 +178,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
await SubscribeAsync();
await sut.SendAsync(new ResetConsumerMessage());
await sut.StopAsync();
sutActor.Tell(new ResetConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
@ -194,8 +197,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
await SubscribeAsync();
await sut.SendAsync(new ReceiveEventMessage { Event = @event, Source = eventSubscription });
await sut.StopAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
@ -219,8 +223,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
await SubscribeAsync();
await sut.SendAsync(new ReceiveEventMessage { Event = @event, Source = eventSubscription });
await sut.StopAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();

Loading…
Cancel
Save