From 83e1130f5c5a021e8ac7d7882f3a4e7ba9e0375d Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Thu, 23 Nov 2017 23:01:32 +0100 Subject: [PATCH] Back to actors. --- .../RedisPubSub.cs | 19 +- .../RedisSubscription.cs | 52 ++- .../EventConsumerActor.cs} | 265 ++++++------ .../Actors/EventConsumerActorManager.cs | 91 ++++ .../CQRS/Events/Actors/EventConsumerState.cs | 52 +++ .../Actors/Messages/GetStatesRequest.cs} | 9 +- .../Actors/Messages/GetStatesResponse.cs} | 11 +- .../Actors/Messages/ResetConsumerMessage.cs | 15 + .../Actors/Messages/StartConsumerMessage.cs | 15 + .../Actors/Messages/StopConsumerMessage.cs | 15 + .../CQRS/Events/EventStoreSubscription.cs | 4 - .../CQRS/Events/IEventSubscriber.cs | 2 - .../Orleans/Grains/EventConsumerBootstrap.cs | 31 -- .../Orleans/Grains/IEventConsumerGrain.cs | 34 -- .../Grains/IEventConsumerRegistryGrain.cs | 28 -- .../Implementation/EventConsumerGrainState.cs | 52 --- .../EventConsumerRegistryGrain.cs | 98 ----- .../Implementation/WrapperSubscription.cs | 48 --- .../Events/Orleans/OrleansEventNotifier.cs | 30 -- .../CQRS/Events/RetrySubscription.cs | 15 - .../Caching/InvalidatingMemoryCache.cs | 23 +- .../Caching/InvalidationMessage.cs | 15 + src/Squidex.Infrastructure/IPubSub.cs | 4 +- src/Squidex.Infrastructure/InMemoryPubSub.cs | 12 +- src/Squidex.Infrastructure/Json/Orleans/J.cs | 57 --- .../Json/Orleans/JsonExternalSerializer.cs | 91 ---- src/Squidex.Infrastructure/Orleans/GrainV2.cs | 83 ---- .../PubSubExtensions.cs | 79 ++++ .../Squidex.Infrastructure.csproj | 3 - .../States/IStateFactory.cs | 17 + .../States/IStateHolder.cs | 21 + .../States/IStateStore.cs | 19 + .../States/InvalidateMessage.cs | 15 + .../States/StateFactory.cs | 108 +++++ .../States/StateHolder.cs | 44 ++ .../States/StatefulObject.cs | 65 +++ .../Events/Actors/EventConsumerActorTests.cs | 358 +++++++++++++++ .../Actors/EventConsumerManagerTests.cs | 125 ++++++ .../CQRS/Events/EventSubscriptionTests.cs | 16 - .../Grains/EventConsumerBootstrapTests.cs | 58 --- .../Events/Grains/EventConsumerGrainTests.cs | 408 ------------------ .../Grains/EventConsumerRegistryGrainTests.cs | 165 ------- .../Grains/OrleansEventNotifierTests.cs | 41 -- .../CQRS/Events/RetrySubscriptionTests.cs | 25 -- .../Caching/InvalidatingMemoryCacheTests.cs | 8 +- .../InMemoryPubSubTests.cs | 66 ++- .../Orleans/JsonExternalSerializerTests.cs | 108 ----- 47 files changed, 1303 insertions(+), 1617 deletions(-) rename src/Squidex.Infrastructure/CQRS/Events/{Orleans/Grains/Implementation/EventConsumerGrain.cs => Actors/EventConsumerActor.cs} (60%) create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActorManager.cs create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerState.cs rename src/Squidex.Infrastructure/{Json/Orleans/IJsonValue.cs => CQRS/Events/Actors/Messages/GetStatesRequest.cs} (67%) rename src/Squidex.Infrastructure/{Json/Orleans/JExtensions.cs => CQRS/Events/Actors/Messages/GetStatesResponse.cs} (58%) create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/EventConsumerBootstrap.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerGrain.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerRegistryGrain.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrainState.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerRegistryGrain.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/WrapperSubscription.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Orleans/OrleansEventNotifier.cs create mode 100644 src/Squidex.Infrastructure/Caching/InvalidationMessage.cs delete mode 100644 src/Squidex.Infrastructure/Json/Orleans/J.cs delete mode 100644 src/Squidex.Infrastructure/Json/Orleans/JsonExternalSerializer.cs delete mode 100644 src/Squidex.Infrastructure/Orleans/GrainV2.cs create mode 100644 src/Squidex.Infrastructure/PubSubExtensions.cs create mode 100644 src/Squidex.Infrastructure/States/IStateFactory.cs create mode 100644 src/Squidex.Infrastructure/States/IStateHolder.cs create mode 100644 src/Squidex.Infrastructure/States/IStateStore.cs create mode 100644 src/Squidex.Infrastructure/States/InvalidateMessage.cs create mode 100644 src/Squidex.Infrastructure/States/StateFactory.cs create mode 100644 src/Squidex.Infrastructure/States/StateHolder.cs create mode 100644 src/Squidex.Infrastructure/States/StatefulObject.cs create mode 100644 tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs create mode 100644 tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs delete mode 100644 tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerBootstrapTests.cs delete mode 100644 tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerGrainTests.cs delete mode 100644 tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerRegistryGrainTests.cs delete mode 100644 tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/OrleansEventNotifierTests.cs delete mode 100644 tests/Squidex.Infrastructure.Tests/Json/Orleans/JsonExternalSerializerTests.cs diff --git a/src/Squidex.Infrastructure.Redis/RedisPubSub.cs b/src/Squidex.Infrastructure.Redis/RedisPubSub.cs index 83a9929de..238b912d8 100644 --- a/src/Squidex.Infrastructure.Redis/RedisPubSub.cs +++ b/src/Squidex.Infrastructure.Redis/RedisPubSub.cs @@ -13,9 +13,9 @@ using StackExchange.Redis; namespace Squidex.Infrastructure { - public class RedisPubSub : IPubSub, IExternalSystem + public sealed class RedisPubSub : IPubSub, IExternalSystem { - private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); + private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); private readonly Lazy redisClient; private readonly Lazy redisSubscriber; private readonly ISemanticLog log; @@ -43,18 +43,21 @@ namespace Squidex.Infrastructure } } - public void Publish(string channelName, string token, bool notifySelf) + public void Publish(T value, bool notifySelf) { - Guard.NotNullOrEmpty(channelName, nameof(channelName)); + GetSubscriber().Publish(value, notifySelf); + } - subscriptions.GetOrAdd(channelName, c => new RedisSubscription(redisSubscriber.Value, c, log)).Publish(token, notifySelf); + public IDisposable Subscribe(Action handler) + { + return GetSubscriber().Subscribe(handler); } - public IDisposable Subscribe(string channelName, Action handler) + private RedisSubscription GetSubscriber() { - Guard.NotNullOrEmpty(channelName, nameof(channelName)); + var typeName = typeof(T).FullName; - return subscriptions.GetOrAdd(channelName, c => new RedisSubscription(redisSubscriber.Value, c, log)).Subscribe(handler); + return (RedisSubscription)subscriptions.GetOrAdd(typeName, c => new RedisSubscription(redisSubscriber.Value, c, log)); } } } diff --git a/src/Squidex.Infrastructure.Redis/RedisSubscription.cs b/src/Squidex.Infrastructure.Redis/RedisSubscription.cs index 6ec078f70..9fbbacad1 100644 --- a/src/Squidex.Infrastructure.Redis/RedisSubscription.cs +++ b/src/Squidex.Infrastructure.Redis/RedisSubscription.cs @@ -7,49 +7,58 @@ // ========================================================================== using System; -using System.Linq; using System.Reactive.Subjects; +using Newtonsoft.Json; using Squidex.Infrastructure.Log; using StackExchange.Redis; +#pragma warning disable SA1401 // Fields must be private + namespace Squidex.Infrastructure { - internal sealed class RedisSubscription + internal sealed class RedisSubscription { - private static readonly Guid InstanceId = Guid.NewGuid(); - private readonly Subject subject = new Subject(); + private readonly Guid selfId = Guid.NewGuid(); + private readonly Subject subject = new Subject(); private readonly ISubscriber subscriber; - private readonly string channelName; private readonly ISemanticLog log; + private readonly string channelName; + + private sealed class Envelope + { + public T Payload; + + public Guid Sender; + } public RedisSubscription(ISubscriber subscriber, string channelName, ISemanticLog log) { this.log = log; this.subscriber = subscriber; - this.subscriber.Subscribe(channelName, (channel, value) => HandleInvalidation(value)); + this.subscriber.Subscribe(channelName, (channel, value) => HandleMessage(value)); this.channelName = channelName; } - public void Publish(string token, bool notifySelf) + public void Publish(object value, bool notifySelf) { try { - var message = string.Join("#", (notifySelf ? Guid.Empty : InstanceId).ToString(), token); + var envelope = JsonConvert.SerializeObject(new Envelope { Sender = selfId, Payload = (T)value }); - subscriber.Publish(channelName, message); + subscriber.Publish(channelName, envelope); } catch (Exception ex) { log.LogError(ex, w => w .WriteProperty("action", "PublishRedisMessage") .WriteProperty("state", "Failed") - .WriteProperty("token", token)); + .WriteProperty("channel", channelName)); } } - private void HandleInvalidation(string value) + private void HandleMessage(string value) { try { @@ -58,28 +67,15 @@ namespace Squidex.Infrastructure return; } - var parts = value.Split('#'); - - if (parts.Length < 1) - { - return; - } - - if (!Guid.TryParse(parts[0], out var sender)) - { - return; - } + var envelope = JsonConvert.DeserializeObject(value); - if (sender != InstanceId) + if (envelope.Sender != selfId) { - var token = string.Join("#", parts.Skip(1)); - - subject.OnNext(token); + subject.OnNext(envelope.Payload); log.LogDebug(w => w .WriteProperty("action", "ReceiveRedisMessage") .WriteProperty("channel", channelName) - .WriteProperty("token", token) .WriteProperty("state", "Received")); } } @@ -92,7 +88,7 @@ namespace Squidex.Infrastructure } } - public IDisposable Subscribe(Action handler) + public IDisposable Subscribe(Action handler) { return subject.Subscribe(handler); } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs similarity index 60% rename from src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs rename to src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index 0d6570280..d1f58ff68 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -1,5 +1,5 @@ // ========================================================================== -// EventConsumerGrain.cs +// EventConsumerActor.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -8,83 +8,124 @@ using System; using System.Threading.Tasks; -using Orleans; -using Orleans.Concurrency; -using Orleans.Core; -using Orleans.Runtime; using Squidex.Infrastructure.Log; -using Squidex.Infrastructure.Orleans; +using Squidex.Infrastructure.States; using Squidex.Infrastructure.Tasks; -namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation +namespace Squidex.Infrastructure.CQRS.Events.Actors { - public class EventConsumerGrain : GrainV2, IEventConsumerGrain + public class EventConsumerActor : StatefulObject, IEventSubscriber { - private readonly EventDataFormatter eventFormatter; - private readonly EventConsumerFactory eventConsumerFactory; + private readonly EventDataFormatter formatter; private readonly IEventStore eventStore; private readonly ISemanticLog log; - private TaskScheduler scheduler; + private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(1); + private IEventSubscription currentSubscription; private IEventConsumer eventConsumer; - private IEventSubscription eventSubscription; - protected IEventStore EventStore + public EventConsumerActor( + EventDataFormatter formatter, + IEventStore eventStore, + ISemanticLog log) { - get { return eventStore; } + Guard.NotNull(log, nameof(log)); + Guard.NotNull(formatter, nameof(formatter)); + Guard.NotNull(eventStore, nameof(eventStore)); + + this.log = log; + + this.formatter = formatter; + this.eventStore = eventStore; } - public EventConsumerGrain( - EventDataFormatter eventFormatter, - EventConsumerFactory eventConsumerFactory, - IEventStore eventStore, - ISemanticLog log, - IGrainRuntime runtime) - : this(eventFormatter, eventConsumerFactory, eventStore, log, null, runtime, null) + protected override void DisposeObject(bool disposing) { + if (disposing) + { + dispatcher.StopAndWaitAsync().Wait(); + } } - protected EventConsumerGrain( - EventDataFormatter eventFormatter, - EventConsumerFactory eventConsumerFactory, - IEventStore eventStore, - ISemanticLog log, - IGrainIdentity identity, - IGrainRuntime runtime, - IStorage storage) - : base(identity, runtime, storage) + protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position) { - Guard.NotNull(log, nameof(log)); - Guard.NotNull(eventStore, nameof(eventStore)); - Guard.NotNull(eventFormatter, nameof(eventFormatter)); - Guard.NotNull(eventConsumerFactory, nameof(eventConsumerFactory)); + return new RetrySubscription(eventStore, this, streamFilter, position); + } - this.log = log; + public virtual EventConsumerInfo GetState() + { + return State.ToInfo(this.eventConsumer.Name); + } - this.eventStore = eventStore; - this.eventFormatter = eventFormatter; - this.eventConsumerFactory = eventConsumerFactory; + public virtual void Stop() + { + dispatcher.DispatchAsync(() => HandleStopAsync()).Forget(); } - public override Task OnActivateAsync() + public virtual void Start() { - scheduler = TaskScheduler.Current; + dispatcher.DispatchAsync(() => HandleStartAsync()).Forget(); + } - eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString()); + public virtual void Reset() + { + dispatcher.DispatchAsync(() => HandleResetAsync()).Forget(); + } + + public virtual void Activate(IEventConsumer eventConsumer) + { + Guard.NotNull(eventConsumer, nameof(eventConsumer)); - return TaskHelper.Done; + dispatcher.DispatchAsync(() => HandleSetupAsync(eventConsumer)).Forget(); } - public Task ActivateAsync() + private async Task HandleSetupAsync(IEventConsumer consumer) { + eventConsumer = consumer; + + await ReadStateAsync(); + if (!State.IsStopped) { Subscribe(State.Position); } + } + + private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + { + if (subscription != currentSubscription) + { + return TaskHelper.Done; + } - return TaskHelper.Done; + return DoAndUpdateStateAsync(async () => + { + var @event = ParseKnownEvent(storedEvent); + + if (@event != null) + { + await DispatchConsumerAsync(@event); + } + + State = State.Handled(storedEvent.EventPosition); + }); + } + + private Task HandleErrorAsync(IEventSubscription subscription, Exception exception) + { + if (subscription != currentSubscription) + { + return TaskHelper.Done; + } + + return DoAndUpdateStateAsync(() => + { + Unsubscribe(); + + State = State.Failed(exception); + }); } - public Task StartAsync() + private Task HandleStartAsync() { if (!State.IsStopped) { @@ -99,7 +140,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation }); } - public Task StopAsync() + private Task HandleStopAsync() { if (State.IsStopped) { @@ -114,7 +155,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation }); } - public Task ResetAsync() + private Task HandleResetAsync() { return DoAndUpdateStateAsync(async () => { @@ -124,61 +165,51 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation Subscribe(null); - State = EventConsumerGrainState.Initial(); + State = State.Reset(); }); } - public Task OnEventAsync(Immutable subscription, Immutable storedEvent) + Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) { - if (subscription.Value != eventSubscription) - { - return TaskHelper.Done; - } - - return DoAndUpdateStateAsync(async () => - { - var @event = ParseKnownEvent(storedEvent.Value); + return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent)); + } - if (@event != null) - { - await DispatchConsumerAsync(@event); - } + Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) + { + return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception)); + } - State = EventConsumerGrainState.Handled(storedEvent.Value.EventPosition); - }); + private Task DoAndUpdateStateAsync(Action action) + { + return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }); } - public Task OnErrorAsync(Immutable subscription, Immutable exception) + private async Task DoAndUpdateStateAsync(Func action) { - if (subscription.Value != eventSubscription) + try { - return TaskHelper.Done; + await action(); } - - return DoAndUpdateStateAsync(() => + catch (Exception ex) { - Unsubscribe(); + try + { + Unsubscribe(); + } + catch (Exception unsubscribeException) + { + ex = new AggregateException(ex, unsubscribeException); + } - State = State.Failed(exception.Value); - }); - } + log.LogFatal(ex, w => w + .WriteProperty("action", "HandleEvent") + .WriteProperty("state", "Failed") + .WriteProperty("eventConsumer", eventConsumer.Name)); - public Task OnClosedAsync(Immutable subscription) - { - if (subscription.Value != eventSubscription) - { - return TaskHelper.Done; + State = State.Failed(ex); } - return DoAndUpdateStateAsync(() => - { - Unsubscribe(); - }); - } - - public Task> GetStateAsync() - { - return Task.FromResult(new Immutable(State.ToInfo(this.GetPrimaryKeyString()))); + await WriteStateAsync(); } private async Task ClearAsync() @@ -228,19 +259,19 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation private void Unsubscribe() { - if (eventSubscription != null) + if (currentSubscription != null) { - eventSubscription.StopAsync().Forget(); - eventSubscription = null; + currentSubscription.StopAsync().Forget(); + currentSubscription = null; } } private void Subscribe(string position) { - if (eventSubscription == null) + if (currentSubscription == null) { - eventSubscription?.StopAsync().Forget(); - eventSubscription = CreateSubscription(eventConsumer.EventsFilter, position); + currentSubscription?.StopAsync().Forget(); + currentSubscription = CreateSubscription(eventStore, eventConsumer.EventsFilter, position); } } @@ -248,7 +279,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation { try { - var @event = eventFormatter.Parse(message.Data); + var @event = formatter.Parse(message.Data); @event.SetEventPosition(message.EventPosition); @event.SetEventStreamNumber(message.EventStreamNumber); @@ -262,53 +293,5 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation return null; } } - - protected virtual IEventConsumerGrain GetSelf() - { - return this.AsReference(); - } - - protected virtual IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position) - { - return new RetrySubscription(EventStore, subscriber, streamFilter, position); - } - - private IEventSubscription CreateSubscription(string streamFilter, string position) - { - return CreateSubscription(new WrapperSubscription(GetSelf(), scheduler), streamFilter, position); - } - - private Task DoAndUpdateStateAsync(Action action) - { - return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }); - } - - private async Task DoAndUpdateStateAsync(Func action) - { - try - { - await action(); - } - catch (Exception ex) - { - try - { - Unsubscribe(); - } - catch (Exception unsubscribeException) - { - ex = new AggregateException(ex, unsubscribeException); - } - - log.LogFatal(ex, w => w - .WriteProperty("action", "HandleEvent") - .WriteProperty("state", "Failed") - .WriteProperty("eventConsumer", eventConsumer.Name)); - - State = State.Failed(ex); - } - - await WriteStateAsync(); - } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActorManager.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActorManager.cs new file mode 100644 index 000000000..9045748f0 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActorManager.cs @@ -0,0 +1,91 @@ +// ========================================================================== +// EventConsumerActorManager.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Squidex.Infrastructure.CQRS.Events.Actors.Messages; +using Squidex.Infrastructure.States; + +namespace Squidex.Infrastructure.CQRS.Events.Actors +{ + public sealed class EventConsumerActorManager : DisposableObjectBase, IExternalSystem + { + private readonly IStateFactory factory; + private readonly IPubSub pubSub; + private readonly List consumers; + private readonly List subscriptions = new List(); + + public EventConsumerActorManager(IEnumerable consumers, IPubSub pubSub, IStateFactory factory) + { + Guard.NotNull(pubSub, nameof(pubSub)); + Guard.NotNull(factory, nameof(factory)); + Guard.NotNull(consumers, nameof(consumers)); + + this.pubSub = pubSub; + this.factory = factory; + this.consumers = consumers.ToList(); + } + + public void Connect() + { + var actors = new Dictionary(); + + foreach (var consumer in consumers) + { + var actor = factory.GetAsync(consumer.Name).Result; + + actors[consumer.Name] = actor; + actor.Activate(consumer); + } + + subscriptions.Add(pubSub.Subscribe(m => + { + if (actors.TryGetValue(m.ConsumerName, out var actor)) + { + actor.Start(); + } + })); + + subscriptions.Add(pubSub.Subscribe(m => + { + if (actors.TryGetValue(m.ConsumerName, out var actor)) + { + actor.Stop(); + } + })); + + subscriptions.Add(pubSub.Subscribe(m => + { + if (actors.TryGetValue(m.ConsumerName, out var actor)) + { + actor.Reset(); + } + })); + + subscriptions.Add(pubSub.ReceiveAsync(request => + { + var states = actors.Values.Select(x => x.GetState()).ToArray(); + + return Task.FromResult(new GetStatesResponse { States = states }); + })); + } + + protected override void DisposeObject(bool disposing) + { + if (disposing) + { + foreach (var subscription in subscriptions) + { + subscription.Dispose(); + } + } + } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerState.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerState.cs new file mode 100644 index 000000000..9e456e10b --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerState.cs @@ -0,0 +1,52 @@ +// ========================================================================== +// EventConsumerGrainState.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using Squidex.Infrastructure.Reflection; + +namespace Squidex.Infrastructure.CQRS.Events.Actors +{ + public sealed class EventConsumerState + { + public bool IsStopped { get; set; } + + public string Error { get; set; } + + public string Position { get; set; } + + public EventConsumerState Reset() + { + return new EventConsumerState(); + } + + public EventConsumerState Handled(string position) + { + return new EventConsumerState { Position = position }; + } + + public EventConsumerState Failed(Exception ex) + { + return new EventConsumerState { Position = Position, IsStopped = true, Error = ex?.ToString() }; + } + + public EventConsumerState Stopped() + { + return new EventConsumerState { Position = Position, IsStopped = true }; + } + + public EventConsumerState Started() + { + return new EventConsumerState { Position = Position, IsStopped = false }; + } + + public EventConsumerInfo ToInfo(string name) + { + return SimpleMapper.Map(this, new EventConsumerInfo { Name = name }); + } + } +} diff --git a/src/Squidex.Infrastructure/Json/Orleans/IJsonValue.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesRequest.cs similarity index 67% rename from src/Squidex.Infrastructure/Json/Orleans/IJsonValue.cs rename to src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesRequest.cs index f2d8ab41f..9e67dbcb7 100644 --- a/src/Squidex.Infrastructure/Json/Orleans/IJsonValue.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesRequest.cs @@ -1,17 +1,14 @@ // ========================================================================== -// IJsonValue.cs +// GetStatesRequest.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group // All rights reserved. // ========================================================================== -namespace Squidex.Infrastructure.Json.Orleans +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { - public interface IJsonValue + public sealed class GetStatesRequest { - object Value { get; } - - bool IsImmutable { get; } } } diff --git a/src/Squidex.Infrastructure/Json/Orleans/JExtensions.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesResponse.cs similarity index 58% rename from src/Squidex.Infrastructure/Json/Orleans/JExtensions.cs rename to src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesResponse.cs index daf6fcb52..95d23de57 100644 --- a/src/Squidex.Infrastructure/Json/Orleans/JExtensions.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesResponse.cs @@ -1,18 +1,15 @@ // ========================================================================== -// JExtensions.cs +// GetStatesResponse.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group // All rights reserved. // ========================================================================== -namespace Squidex.Infrastructure.Json.Orleans +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { - public static class JExtensions + public sealed class GetStatesResponse { - public static J AsJ(this T value, bool immutable = true) - { - return new J(value, immutable); - } + public EventConsumerInfo[] States { get; set; } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs new file mode 100644 index 000000000..f2e09aca2 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs @@ -0,0 +1,15 @@ +// ========================================================================== +// ResetConsumerMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages +{ + public sealed class ResetConsumerMessage + { + public string ConsumerName { get; set; } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs new file mode 100644 index 000000000..29fa49e1b --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs @@ -0,0 +1,15 @@ +// ========================================================================== +// StartConsumerMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages +{ + public sealed class StartConsumerMessage + { + public string ConsumerName { get; set; } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs new file mode 100644 index 000000000..ea32b4b17 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs @@ -0,0 +1,15 @@ +// ========================================================================== +// StopConsumerMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages +{ + public sealed class StopConsumerMessage + { + public string ConsumerName { get; set; } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs b/src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs index 68a9960d5..22d11f041 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs @@ -51,10 +51,6 @@ namespace Squidex.Infrastructure.CQRS.Events await eventSubscriber.OnErrorAsync(this, ex); } } - finally - { - await eventSubscriber.OnClosedAsync(this); - } }); } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs index 7f7e68eb8..6957f83c1 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs @@ -16,7 +16,5 @@ namespace Squidex.Infrastructure.CQRS.Events Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent); Task OnErrorAsync(IEventSubscription subscription, Exception exception); - - Task OnClosedAsync(IEventSubscription subscription); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/EventConsumerBootstrap.cs b/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/EventConsumerBootstrap.cs deleted file mode 100644 index 38e5952f9..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/EventConsumerBootstrap.cs +++ /dev/null @@ -1,31 +0,0 @@ -// ========================================================================== -// EventConsumerBootstrap.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Threading.Tasks; -using Orleans.Providers; -using Squidex.Infrastructure.Tasks; - -namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains -{ - public sealed class EventConsumerBootstrap : IBootstrapProvider - { - public string Name { get; private set; } - - public Task Close() - { - return TaskHelper.Done; - } - - public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config) - { - Name = name; - - return providerRuntime.GrainFactory.GetGrain("Default").ActivateAsync(null); - } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerGrain.cs deleted file mode 100644 index b554a2fc6..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerGrain.cs +++ /dev/null @@ -1,34 +0,0 @@ -// ========================================================================== -// IEventConsumerGrain.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Threading.Tasks; -using Orleans; -using Orleans.Concurrency; - -namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains -{ - public interface IEventConsumerGrain : IGrainWithStringKey - { - Task> GetStateAsync(); - - Task ActivateAsync(); - - Task StopAsync(); - - Task StartAsync(); - - Task ResetAsync(); - - Task OnEventAsync(Immutable subscription, Immutable storedEvent); - - Task OnErrorAsync(Immutable subscription, Immutable exception); - - Task OnClosedAsync(Immutable subscription); - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerRegistryGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerRegistryGrain.cs deleted file mode 100644 index 15cc35e12..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerRegistryGrain.cs +++ /dev/null @@ -1,28 +0,0 @@ -// ========================================================================== -// IEventConsumerRegistryGrain.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Collections.Generic; -using System.Threading.Tasks; -using Orleans; -using Orleans.Concurrency; - -namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains -{ - public interface IEventConsumerRegistryGrain : IGrainWithStringKey - { - Task ActivateAsync(string streamName); - - Task StopAsync(string consumerName); - - Task StartAsync(string consumerName); - - Task ResetAsync(string consumerName); - - Task>> GetConsumersAsync(); - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrainState.cs b/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrainState.cs deleted file mode 100644 index ee12b60b7..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrainState.cs +++ /dev/null @@ -1,52 +0,0 @@ -// ========================================================================== -// EventConsumerGrainState.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using Squidex.Infrastructure.Reflection; - -namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation -{ - public sealed class EventConsumerGrainState - { - public bool IsStopped { get; set; } - - public string Error { get; set; } - - public string Position { get; set; } - - public static EventConsumerGrainState Initial() - { - return new EventConsumerGrainState(); - } - - public static EventConsumerGrainState Handled(string position) - { - return new EventConsumerGrainState { Position = position }; - } - - public EventConsumerGrainState Failed(Exception ex) - { - return new EventConsumerGrainState { Position = Position, IsStopped = true, Error = ex?.ToString() }; - } - - public EventConsumerGrainState Stopped() - { - return new EventConsumerGrainState { Position = Position, IsStopped = true }; - } - - public EventConsumerGrainState Started() - { - return new EventConsumerGrainState { Position = Position, IsStopped = false }; - } - - public EventConsumerInfo ToInfo(string name) - { - return SimpleMapper.Map(this, new EventConsumerInfo { Name = name }); - } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerRegistryGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerRegistryGrain.cs deleted file mode 100644 index cd9c3bcf1..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerRegistryGrain.cs +++ /dev/null @@ -1,98 +0,0 @@ -// ========================================================================== -// EventConsumerRegistryGrain.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text.RegularExpressions; -using System.Threading.Tasks; -using Orleans; -using Orleans.Concurrency; -using Orleans.Core; -using Orleans.Runtime; - -namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation -{ - public class EventConsumerRegistryGrain : Grain, IEventConsumerRegistryGrain, IRemindable - { - private readonly IEnumerable eventConsumers; - - public EventConsumerRegistryGrain(IEnumerable eventConsumers) - : this(eventConsumers, null, null) - { - } - - protected EventConsumerRegistryGrain( - IEnumerable eventConsumers, - IGrainIdentity identity, - IGrainRuntime runtime) - : base(identity, runtime) - { - Guard.NotNull(eventConsumers, nameof(eventConsumers)); - - this.eventConsumers = eventConsumers; - } - - public Task ReceiveReminder(string reminderName, TickStatus status) - { - return ActivateAsync(null); - } - - public override Task OnActivateAsync() - { - DelayDeactivation(TimeSpan.FromDays(1)); - - RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10)); - RegisterTimer(x => ActivateAsync(null), null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); - - return Task.FromResult(true); - } - - public Task ActivateAsync(string streamName) - { - var tasks = - eventConsumers - .Where(c => streamName == null || Regex.IsMatch(streamName, c.EventsFilter)) - .Select(c => GrainFactory.GetGrain(c.Name)) - .Select(c => c.ActivateAsync()); - - return Task.WhenAll(tasks); - } - - public Task>> GetConsumersAsync() - { - var tasks = - eventConsumers - .Select(c => GrainFactory.GetGrain(c.Name)) - .Select(c => c.GetStateAsync()); - - return Task.WhenAll(tasks).ContinueWith(x => new Immutable>(x.Result.Select(r => r.Value).ToList())); - } - - public Task ResetAsync(string consumerName) - { - var eventConsumer = GrainFactory.GetGrain(consumerName); - - return eventConsumer.ResetAsync(); - } - - public Task StartAsync(string consumerName) - { - var eventConsumer = GrainFactory.GetGrain(consumerName); - - return eventConsumer.StartAsync(); - } - - public Task StopAsync(string consumerName) - { - var eventConsumer = GrainFactory.GetGrain(consumerName); - - return eventConsumer.StopAsync(); - } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/WrapperSubscription.cs b/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/WrapperSubscription.cs deleted file mode 100644 index e1ebe67f8..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/WrapperSubscription.cs +++ /dev/null @@ -1,48 +0,0 @@ -// ========================================================================== -// WrapperSubscription.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Threading; -using System.Threading.Tasks; -using Orleans.Concurrency; - -namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation -{ - internal sealed class WrapperSubscription : IEventSubscriber - { - private readonly IEventConsumerGrain grain; - private readonly TaskScheduler scheduler; - - public WrapperSubscription(IEventConsumerGrain grain, TaskScheduler scheduler) - { - this.grain = grain; - - this.scheduler = scheduler ?? TaskScheduler.Default; - } - - public Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) - { - return Dispatch(() => grain.OnEventAsync(subscription.AsImmutable(), storedEvent.AsImmutable())); - } - - public Task OnErrorAsync(IEventSubscription subscription, Exception exception) - { - return Dispatch(() => grain.OnErrorAsync(subscription.AsImmutable(), exception.AsImmutable())); - } - - public Task OnClosedAsync(IEventSubscription subscription) - { - return Dispatch(() => grain.OnClosedAsync(subscription.AsImmutable())); - } - - private Task Dispatch(Func task) - { - return Task.Factory.StartNew(() => task(), CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap(); - } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/OrleansEventNotifier.cs b/src/Squidex.Infrastructure/CQRS/Events/Orleans/OrleansEventNotifier.cs deleted file mode 100644 index e2b2c1bd6..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/OrleansEventNotifier.cs +++ /dev/null @@ -1,30 +0,0 @@ -// ========================================================================== -// OrleansEventNotifier.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using Orleans; -using Squidex.Infrastructure.CQRS.Events.Orleans.Grains; - -namespace Squidex.Infrastructure.CQRS.Events.Orleans -{ - public sealed class OrleansEventNotifier : IEventNotifier - { - private readonly IEventConsumerRegistryGrain eventConsumerRegistryGrain; - - public OrleansEventNotifier(IGrainFactory factory) - { - Guard.NotNull(factory, nameof(factory)); - - eventConsumerRegistryGrain = factory.GetGrain("Default"); - } - - public void NotifyEventsStored(string streamName) - { - eventConsumerRegistryGrain.ActivateAsync(streamName); - } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs b/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs index f3aebd20e..e14ea7e43 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs @@ -65,16 +65,6 @@ namespace Squidex.Infrastructure.CQRS.Events } } - private async Task HandleClosedAsync(IEventSubscription subscription) - { - if (subscription == currentSubscription) - { - await eventSubscriber.OnClosedAsync(this); - - Unsubscribe(); - } - } - private async Task HandleErrorAsync(IEventSubscription subscription, Exception exception) { if (subscription == currentSubscription) @@ -105,11 +95,6 @@ namespace Squidex.Infrastructure.CQRS.Events return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception)); } - Task IEventSubscriber.OnClosedAsync(IEventSubscription subscription) - { - return dispatcher.DispatchAsync(() => HandleClosedAsync(subscription)); - } - public async Task StopAsync() { await dispatcher.DispatchAsync(() => Unsubscribe()); diff --git a/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs b/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs index 4ce50f1e7..adfe5e764 100644 --- a/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs +++ b/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs @@ -6,14 +6,15 @@ // All rights reserved. // ========================================================================== +using System; using Microsoft.Extensions.Caching.Memory; namespace Squidex.Infrastructure.Caching { - public class InvalidatingMemoryCache : IMemoryCache, IInvalidatingCache + public class InvalidatingMemoryCache : DisposableObjectBase, IMemoryCache, IInvalidatingCache { - private const string ChannelName = "CacheInvalidations"; private readonly IMemoryCache inner; + private readonly IDisposable subscription; private readonly IPubSub invalidator; public InvalidatingMemoryCache(IMemoryCache inner, IPubSub invalidator) @@ -24,12 +25,20 @@ namespace Squidex.Infrastructure.Caching this.inner = inner; this.invalidator = invalidator; - invalidator.Subscribe(ChannelName, inner.Remove); + subscription = invalidator.Subscribe(m => + { + inner.Remove(m.CacheKey); + }); } - public void Dispose() + protected override void DisposeObject(bool disposing) { - inner.Dispose(); + if (disposing) + { + subscription.Dispose(); + + inner.Dispose(); + } } public ICacheEntry CreateEntry(object key) @@ -49,9 +58,9 @@ namespace Squidex.Infrastructure.Caching public void Invalidate(object key) { - if (key is string) + if (key is string stringKey) { - invalidator.Publish(ChannelName, key.ToString(), true); + invalidator.Publish(new InvalidationMessage { CacheKey = stringKey }, true); } } } diff --git a/src/Squidex.Infrastructure/Caching/InvalidationMessage.cs b/src/Squidex.Infrastructure/Caching/InvalidationMessage.cs new file mode 100644 index 000000000..03bae01b3 --- /dev/null +++ b/src/Squidex.Infrastructure/Caching/InvalidationMessage.cs @@ -0,0 +1,15 @@ +// ========================================================================== +// InvalidationMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +namespace Squidex.Infrastructure.Caching +{ + public sealed class InvalidationMessage + { + public string CacheKey { get; set; } + } +} diff --git a/src/Squidex.Infrastructure/IPubSub.cs b/src/Squidex.Infrastructure/IPubSub.cs index cee212fac..b06586aa6 100644 --- a/src/Squidex.Infrastructure/IPubSub.cs +++ b/src/Squidex.Infrastructure/IPubSub.cs @@ -12,8 +12,8 @@ namespace Squidex.Infrastructure { public interface IPubSub { - void Publish(string channelName, string token, bool notifySelf); + void Publish(T value, bool notifySelf); - IDisposable Subscribe(string channelName, Action handler); + IDisposable Subscribe(Action handler); } } diff --git a/src/Squidex.Infrastructure/InMemoryPubSub.cs b/src/Squidex.Infrastructure/InMemoryPubSub.cs index 3d0307371..a4ace58fd 100644 --- a/src/Squidex.Infrastructure/InMemoryPubSub.cs +++ b/src/Squidex.Infrastructure/InMemoryPubSub.cs @@ -7,26 +7,26 @@ // ========================================================================== using System; -using System.Collections.Concurrent; +using System.Reactive.Linq; using System.Reactive.Subjects; namespace Squidex.Infrastructure { public sealed class InMemoryPubSub : IPubSub { - private readonly ConcurrentDictionary> subjects = new ConcurrentDictionary>(); + private readonly Subject subject = new Subject(); - public void Publish(string channelName, string token, bool notifySelf) + public void Publish(T value, bool notifySelf) { if (notifySelf) { - subjects.GetOrAdd(channelName, k => new Subject()).OnNext(token); + subject.OnNext(value); } } - public IDisposable Subscribe(string channelName, Action handler) + public IDisposable Subscribe(Action handler) { - return subjects.GetOrAdd(channelName, k => new Subject()).Subscribe(handler); + return subject.Where(x => x is T).OfType().Subscribe(handler); } } } diff --git a/src/Squidex.Infrastructure/Json/Orleans/J.cs b/src/Squidex.Infrastructure/Json/Orleans/J.cs deleted file mode 100644 index c137b6a8d..000000000 --- a/src/Squidex.Infrastructure/Json/Orleans/J.cs +++ /dev/null @@ -1,57 +0,0 @@ -// ========================================================================== -// J.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Threading.Tasks; -using Newtonsoft.Json; - -namespace Squidex.Infrastructure.Json.Orleans -{ - public struct J : IJsonValue - { - private readonly T value; - private readonly bool isImmutable; - - public T Value - { - get { return value; } - } - - bool IJsonValue.IsImmutable - { - get { return isImmutable; } - } - - object IJsonValue.Value - { - get { return Value; } - } - - [JsonConstructor] - public J(T value, bool isImmutable = false) - { - this.value = value; - - this.isImmutable = isImmutable; - } - - public static implicit operator T(J value) - { - return value.Value; - } - - public static implicit operator J(T d) - { - return new J(d); - } - - public static Task> AsTask(T value) - { - return Task.FromResult>(value); - } - } -} diff --git a/src/Squidex.Infrastructure/Json/Orleans/JsonExternalSerializer.cs b/src/Squidex.Infrastructure/Json/Orleans/JsonExternalSerializer.cs deleted file mode 100644 index 80566b15d..000000000 --- a/src/Squidex.Infrastructure/Json/Orleans/JsonExternalSerializer.cs +++ /dev/null @@ -1,91 +0,0 @@ -// ========================================================================== -// JsonExternalSerializer.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.IO; -using System.Linq; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using Orleans.Runtime; -using Orleans.Serialization; - -namespace Squidex.Infrastructure.Json.Orleans -{ - public class JsonExternalSerializer : IExternalSerializer - { - private readonly JsonSerializer serializer; - - public JsonExternalSerializer(JsonSerializer serializer) - { - Guard.NotNull(serializer, nameof(serializer)); - - this.serializer = serializer; - } - - public void Initialize(Logger logger) - { - } - - public bool IsSupportedType(Type itemType) - { - return itemType.GetInterfaces().Contains(typeof(IJsonValue)); - } - - public object DeepCopy(object source, ICopyContext context) - { - var jsonValue = source as IJsonValue; - - if (jsonValue == null) - { - return null; - } - else if (jsonValue.IsImmutable) - { - return jsonValue; - } - else if (jsonValue.Value == null) - { - return jsonValue; - } - else - { - return JObject.FromObject(source, serializer).ToObject(source.GetType(), serializer); - } - } - - public object Deserialize(Type expectedType, IDeserializationContext context) - { - var outLength = context.StreamReader.ReadInt(); - var outBytes = context.StreamReader.ReadBytes(outLength); - - var stream = new MemoryStream(outBytes); - - using (var reader = new JsonTextReader(new StreamReader(stream))) - { - return serializer.Deserialize(reader, expectedType); - } - } - - public void Serialize(object item, ISerializationContext context, Type expectedType) - { - var stream = new MemoryStream(); - - using (var writer = new JsonTextWriter(new StreamWriter(stream))) - { - serializer.Serialize(writer, item); - - writer.Flush(); - } - - var outBytes = stream.ToArray(); - - context.StreamWriter.Write(outBytes.Length); - context.StreamWriter.Write(outBytes); - } - } -} diff --git a/src/Squidex.Infrastructure/Orleans/GrainV2.cs b/src/Squidex.Infrastructure/Orleans/GrainV2.cs deleted file mode 100644 index 0a2006362..000000000 --- a/src/Squidex.Infrastructure/Orleans/GrainV2.cs +++ /dev/null @@ -1,83 +0,0 @@ -// ========================================================================== -// GrainV2.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Threading; -using System.Threading.Tasks; -using Orleans; -using Orleans.Core; -using Orleans.Runtime; - -namespace Squidex.Infrastructure.Orleans -{ - public class GrainV2 : Grain where TGrainState : new() - { - private readonly IGrainRuntime runtime; - private IStorage storage; - - protected GrainV2(IGrainRuntime runtime) - { - this.runtime = runtime; - } - - protected GrainV2(IGrainIdentity identity, IGrainRuntime runtime, IStorage storage) - : base(identity, runtime) - { - this.runtime = runtime; - this.storage = storage; - } - - protected TGrainState State - { - get - { - return storage.State; - } - set - { - storage.State = value; - } - } - - protected virtual Task ClearStateAsync() - { - return storage.ClearStateAsync(); - } - - protected virtual Task WriteStateAsync() - { - return storage.WriteStateAsync(); - } - - protected virtual Task ReadStateAsync() - { - return storage.ReadStateAsync(); - } - - public override void Participate(IGrainLifecycle lifecycle) - { - base.Participate(lifecycle); - - lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState); - } - - private async Task OnSetupState(CancellationToken ct) - { - if (!ct.IsCancellationRequested) - { - storage = runtime.GetStorage(this); - - await OnSetupState(); - } - } - - private async Task OnSetupState() - { - await ReadStateAsync(); - } - } -} \ No newline at end of file diff --git a/src/Squidex.Infrastructure/PubSubExtensions.cs b/src/Squidex.Infrastructure/PubSubExtensions.cs new file mode 100644 index 000000000..b1669a341 --- /dev/null +++ b/src/Squidex.Infrastructure/PubSubExtensions.cs @@ -0,0 +1,79 @@ +// ========================================================================== +// PubSubExtensions.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; + +#pragma warning disable 4014 +#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void + +namespace Squidex.Infrastructure +{ + public static class PubSubExtensions + { + public class Request + { + public T Body { get; set; } + + public Guid CorrelationId { get; set; } + } + + public class Response + { + public T Body { get; set; } + + public Guid CorrelationId { get; set; } + } + + public static IDisposable ReceiveAsync(this IPubSub pubsub, Func> callback, bool self = true) + { + return pubsub.Subscribe>(async x => + { + var response = await callback(x.Body); + + pubsub.Publish(new Response { CorrelationId = x.CorrelationId, Body = response }, true); + }); + } + + public static async Task RequestAsync(this IPubSub pubsub, TRequest message, TimeSpan timeout, bool self = true) + { + var request = new Request { Body = message, CorrelationId = Guid.NewGuid() }; + + IDisposable subscription = null; + try + { + var receiveTask = new TaskCompletionSource(); + + subscription = pubsub.Subscribe>(response => + { + if (response.CorrelationId == request.CorrelationId) + { + receiveTask.SetResult(response.Body); + } + }); + + Task.Run(() => pubsub.Publish(request, self)); + + var firstTask = await Task.WhenAny(receiveTask.Task, Task.Delay(timeout)); + + if (firstTask.Id != receiveTask.Task.Id) + { + throw new TaskCanceledException(); + } + else + { + return await receiveTask.Task; + } + } + finally + { + subscription?.Dispose(); + } + } + } +} diff --git a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj index cdbfcbd4d..0e7de800d 100644 --- a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj +++ b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj @@ -10,9 +10,6 @@ - - - diff --git a/src/Squidex.Infrastructure/States/IStateFactory.cs b/src/Squidex.Infrastructure/States/IStateFactory.cs new file mode 100644 index 000000000..1c238f0ed --- /dev/null +++ b/src/Squidex.Infrastructure/States/IStateFactory.cs @@ -0,0 +1,17 @@ +// ========================================================================== +// IStateFactory.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; + +namespace Squidex.Infrastructure.States +{ + public interface IStateFactory + { + Task GetAsync(string key) where T : StatefulObject; + } +} diff --git a/src/Squidex.Infrastructure/States/IStateHolder.cs b/src/Squidex.Infrastructure/States/IStateHolder.cs new file mode 100644 index 000000000..b2425ffe5 --- /dev/null +++ b/src/Squidex.Infrastructure/States/IStateHolder.cs @@ -0,0 +1,21 @@ +// ========================================================================== +// IStateHolder.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; + +namespace Squidex.Infrastructure.States +{ + public interface IStateHolder + { + T State { get; set; } + + Task ReadAsync(); + + Task WriteAsync(); + } +} diff --git a/src/Squidex.Infrastructure/States/IStateStore.cs b/src/Squidex.Infrastructure/States/IStateStore.cs new file mode 100644 index 000000000..f6f903b95 --- /dev/null +++ b/src/Squidex.Infrastructure/States/IStateStore.cs @@ -0,0 +1,19 @@ +// ========================================================================== +// IStateStore.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; + +namespace Squidex.Infrastructure.States +{ + public interface IStateStore + { + Task WriteAsync(string key, T value, string oldEtag, string newEtag); + + Task<(T Value, string Etag)> ReadAsync(string key); + } +} diff --git a/src/Squidex.Infrastructure/States/InvalidateMessage.cs b/src/Squidex.Infrastructure/States/InvalidateMessage.cs new file mode 100644 index 000000000..56aa270a4 --- /dev/null +++ b/src/Squidex.Infrastructure/States/InvalidateMessage.cs @@ -0,0 +1,15 @@ +// ========================================================================== +// InvalidateMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +namespace Squidex.Infrastructure.States +{ + public sealed class InvalidateMessage + { + public string Key { get; set; } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure/States/StateFactory.cs b/src/Squidex.Infrastructure/States/StateFactory.cs new file mode 100644 index 000000000..ea6482abe --- /dev/null +++ b/src/Squidex.Infrastructure/States/StateFactory.cs @@ -0,0 +1,108 @@ +// ========================================================================== +// StateFactory.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.Caching.Memory; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.States +{ + public sealed class StateFactory : DisposableObjectBase, IExternalSystem, IStateFactory + { + private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(10); + private readonly IPubSub pubSub; + private readonly IStateStore store; + private readonly IMemoryCache statesCache; + private readonly IServiceProvider services; + private readonly List states = new List(); + private readonly SingleThreadedDispatcher cleanupDispatcher = new SingleThreadedDispatcher(); + private IDisposable pubSubscription; + + public StateFactory( + IPubSub pubSub, + IServiceProvider services, + IStateStore store, + IMemoryCache statesCache) + { + Guard.NotNull(pubSub, nameof(pubSub)); + Guard.NotNull(store, nameof(store)); + Guard.NotNull(services, nameof(services)); + Guard.NotNull(statesCache, nameof(statesCache)); + + this.pubSub = pubSub; + this.store = store; + this.services = services; + this.statesCache = statesCache; + } + + public void Connect() + { + pubSubscription = pubSub.Subscribe(m => + { + statesCache.Remove(m.Key); + }); + } + + public async Task GetAsync(string key) where T : StatefulObject + { + Guard.NotNull(key, nameof(key)); + + if (statesCache.TryGetValue(key, out var state)) + { + return state; + } + + state = (T)services.GetService(typeof(T)); + + var stateHolder = new StateHolder(key, () => + { + pubSub.Publish(new InvalidateMessage { Key = key }, false); + }, store); + + await state.ActivateAsync(stateHolder); + + var stateEntry = statesCache.CreateEntry(key); + + stateEntry.Value = state; + stateEntry.AbsoluteExpirationRelativeToNow = CacheDuration; + + stateEntry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration + { + EvictionCallback = (k, v, r, s) => + { + cleanupDispatcher.DispatchAsync(() => + { + state.Dispose(); + states.Remove(state); + }).Forget(); + } + }); + + states.Add(state); + + return state; + } + + protected override void DisposeObject(bool disposing) + { + if (disposing) + { + cleanupDispatcher.DispatchAsync(() => + { + foreach (var state in states) + { + state.Dispose(); + } + }); + cleanupDispatcher.StopAndWaitAsync().Wait(); + } + } + } +} diff --git a/src/Squidex.Infrastructure/States/StateHolder.cs b/src/Squidex.Infrastructure/States/StateHolder.cs new file mode 100644 index 000000000..7e5fba0f3 --- /dev/null +++ b/src/Squidex.Infrastructure/States/StateHolder.cs @@ -0,0 +1,44 @@ +// ========================================================================== +// StateHolder.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; + +namespace Squidex.Infrastructure.States +{ + public sealed class StateHolder : IStateHolder + { + private readonly Action written; + private readonly IStateStore store; + private readonly string key; + private string etag; + + public T State { get; set; } + + internal StateHolder(string key, Action written, IStateStore store) + { + this.key = key; + this.written = written; + this.store = store; + } + + public async Task ReadAsync() + { + (State, etag) = await store.ReadAsync(key); + } + + public async Task WriteAsync() + { + var newEtag = Guid.NewGuid().ToString(); + + await store.WriteAsync(key, State, etag, newEtag); + + etag = newEtag; + } + } +} diff --git a/src/Squidex.Infrastructure/States/StatefulObject.cs b/src/Squidex.Infrastructure/States/StatefulObject.cs new file mode 100644 index 000000000..70e4c44a7 --- /dev/null +++ b/src/Squidex.Infrastructure/States/StatefulObject.cs @@ -0,0 +1,65 @@ +// ========================================================================== +// StatefulActor.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; + +namespace Squidex.Infrastructure.States +{ + public abstract class StatefulObject : DisposableObjectBase + { + private IStateHolder stateHolder; + + public T State + { + get + { + if (stateHolder != null) + { + return stateHolder.State; + } + else + { + return default(T); + } + } + + protected set + { + if (stateHolder != null) + { + stateHolder.State = value; + } + } + } + + public Task ActivateAsync(IStateHolder stateHolder) + { + Guard.NotNull(stateHolder, nameof(stateHolder)); + + this.stateHolder = stateHolder; + + return stateHolder.ReadAsync(); + } + + public async Task ReadStateAsync() + { + if (stateHolder != null) + { + await stateHolder.ReadAsync(); + } + } + + public async Task WriteStateAsync() + { + if (stateHolder != null) + { + await stateHolder.WriteAsync(); + } + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs new file mode 100644 index 000000000..1e86fba58 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs @@ -0,0 +1,358 @@ +// ========================================================================== +// EventConsumerActorTests.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using FakeItEasy; +using FluentAssertions; +using Squidex.Infrastructure.Log; +using Squidex.Infrastructure.States; +using Xunit; + +namespace Squidex.Infrastructure.CQRS.Events.Actors +{ + public class EventConsumerActorTests + { + public sealed class MyEvent : IEvent + { + } + + public sealed class MyEventConsumerActor : EventConsumerActor + { + public MyEventConsumerActor(EventDataFormatter formatter, IEventStore eventStore, ISemanticLog log) + : base(formatter, eventStore, log) + { + } + + protected override IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position) + { + return eventStore.CreateSubscription(this, streamFilter, position); + } + } + + private readonly IEventConsumer eventConsumer = A.Fake(); + private readonly IEventStore eventStore = A.Fake(); + private readonly IEventSubscriber sutSubscriber; + private readonly IEventSubscription eventSubscription = A.Fake(); + private readonly ISemanticLog log = A.Fake(); + private readonly IStateHolder stateHolder = A.Fake>(); + private readonly EventDataFormatter formatter = A.Fake(); + private readonly EventData eventData = new EventData(); + private readonly Envelope envelope = new Envelope(new MyEvent()); + private readonly EventConsumerActor sut; + private readonly string consumerName; + private readonly string initialPosition = Guid.NewGuid().ToString(); + private EventConsumerState state = new EventConsumerState(); + + public EventConsumerActorTests() + { + state.Position = initialPosition; + + consumerName = eventConsumer.GetType().Name; + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .Returns(eventSubscription); + + A.CallTo(() => eventConsumer.Name). + Returns(consumerName); + + A.CallTo(() => stateHolder.State) + .ReturnsLazily(() => state); + + A.CallToSet(() => stateHolder.State) + .Invokes(new Action(s => state = s)); + + A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope); + + sut = new MyEventConsumerActor(formatter, eventStore, log); + sutSubscriber = sut; + + sut.ActivateAsync(stateHolder).Wait(); + } + + [Fact] + public void Should_not_subscribe_to_event_store_when_stopped_in_db() + { + state = state.Stopped(); + + sut.Activate(eventConsumer); + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null }); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustNotHaveHappened(); + } + + [Fact] + public void Should_subscribe_to_event_store_when_not_found_in_db() + { + sut.Activate(eventConsumer); + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public void Should_subscribe_to_event_store_when_not_stopped_in_db() + { + sut.Activate(eventConsumer); + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public void Should_stop_subscription_when_stopped() + { + sut.Activate(eventConsumer); + sut.Stop(); + sut.Stop(); + + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null }); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public void Should_reset_consumer_when_resetting() + { + sut.Activate(eventConsumer); + sut.Stop(); + sut.Reset(); + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = null, Error = null }); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustHaveHappened(Repeated.Exactly.Twice); + + A.CallTo(() => eventConsumer.ClearAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, state.Position)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, null)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_invoke_and_update_position_when_event_received() + { + sut.Activate(eventConsumer); + + var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); + + await OnEventAsync(eventSubscription, @event); + + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null }); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_ignore_old_events() + { + sut.Activate(eventConsumer); + + A.CallTo(() => formatter.Parse(eventData, true)) + .Throws(new TypeNameNotFoundException()); + + var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); + + await OnEventAsync(eventSubscription, @event); + + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null }); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_not_invoke_and_update_position_when_event_is_from_another_subscription() + { + sut.Activate(eventConsumer); + + var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); + + await OnEventAsync(A.Fake(), @event); + + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription() + { + sut.Activate(eventConsumer); + + var ex = new InvalidOperationException(); + + await OnErrorAsync(A.Fake(), ex); + + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustNotHaveHappened(); + } + + [Fact] + public void Should_stop_if_resetting_failed() + { + sut.Activate(eventConsumer); + + var ex = new InvalidOperationException(); + + A.CallTo(() => eventConsumer.ClearAsync()) + .Throws(ex); + + sut.Reset(); + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_stop_if_handling_failed() + { + sut.Activate(eventConsumer); + + var ex = new InvalidOperationException(); + + A.CallTo(() => eventConsumer.On(envelope)) + .Throws(ex); + + var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); + + await OnEventAsync(eventSubscription, @event); + + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustHaveHappened(); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_stop_if_deserialization_failed() + { + sut.Activate(eventConsumer); + + var ex = new InvalidOperationException(); + + A.CallTo(() => formatter.Parse(eventData, true)) + .Throws(ex); + + var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); + + await OnEventAsync(eventSubscription, @event); + + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustNotHaveHappened(); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_start_after_stop_when_handling_failed() + { + sut.Activate(eventConsumer); + + var exception = new InvalidOperationException(); + + A.CallTo(() => eventConsumer.On(envelope)) + .Throws(exception); + + var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); + + await OnEventAsync(eventSubscription, @event); + + sut.Start(); + sut.Start(); + sut.Dispose(); + + state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustHaveHappened(); + + A.CallTo(() => stateHolder.WriteAsync()) + .MustHaveHappened(Repeated.Exactly.Twice); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Twice); + } + + private Task OnErrorAsync(IEventSubscription subscriber, Exception ex) + { + return sutSubscriber.OnErrorAsync(subscriber, ex); + } + + private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev) + { + return sutSubscriber.OnEventAsync(subscriber, ev); + } + } +} \ No newline at end of file diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs new file mode 100644 index 000000000..6f45d0c78 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs @@ -0,0 +1,125 @@ +// ========================================================================== +// EventConsumerManagerTests.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using FakeItEasy; +using FluentAssertions; +using Squidex.Infrastructure.CQRS.Events.Actors.Messages; +using Squidex.Infrastructure.States; +using Xunit; + +namespace Squidex.Infrastructure.CQRS.Events.Actors +{ + public class EventConsumerManagerTests + { + private readonly EventConsumerActor actor1 = A.Fake(); + private readonly EventConsumerActor actor2 = A.Fake(); + private readonly IStateFactory factory = A.Fake(); + private readonly IEventConsumer consumer1 = A.Fake(); + private readonly IEventConsumer consumer2 = A.Fake(); + private readonly IPubSub pubSub = new InMemoryPubSub(); + private readonly string consumerName1 = "Consumer1"; + private readonly string consumerName2 = "Consumer2"; + private readonly EventConsumerActorManager sut; + + public EventConsumerManagerTests() + { + A.CallTo(() => consumer1.Name).Returns(consumerName1); + A.CallTo(() => consumer2.Name).Returns(consumerName2); + + A.CallTo(() => factory.GetAsync(consumerName1)).Returns(actor1); + A.CallTo(() => factory.GetAsync(consumerName2)).Returns(actor2); + + sut = new EventConsumerActorManager(new IEventConsumer[] { consumer1, consumer2 }, pubSub, factory); + } + + [Fact] + public void Should_activate_all_actors() + { + sut.Connect(); + + A.CallTo(() => actor1.Activate(consumer1)) + .MustHaveHappened(); + + A.CallTo(() => actor2.Activate(consumer2)) + .MustHaveHappened(); + } + + [Fact] + public void Should_start_correct_actor() + { + sut.Connect(); + + pubSub.Publish(new StartConsumerMessage { ConsumerName = consumerName1 }, true); + + A.CallTo(() => actor1.Start()) + .MustHaveHappened(); + + A.CallTo(() => actor2.Start()) + .MustNotHaveHappened(); + } + + [Fact] + public void Should_stop_correct_actor() + { + sut.Connect(); + + pubSub.Publish(new StopConsumerMessage { ConsumerName = consumerName1 }, true); + + A.CallTo(() => actor1.Stop()) + .MustHaveHappened(); + + A.CallTo(() => actor2.Stop()) + .MustNotHaveHappened(); + } + + [Fact] + public void Should_reset_correct_actor() + { + sut.Connect(); + + pubSub.Publish(new ResetConsumerMessage { ConsumerName = consumerName2 }, true); + + A.CallTo(() => actor1.Reset()) + .MustNotHaveHappened(); + + A.CallTo(() => actor2.Reset()) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_get_state_from_all_actors() + { + sut.Connect(); + + A.CallTo(() => actor1.GetState()) + .Returns(new EventConsumerInfo { Name = consumerName1, Position = "123 " }); + + A.CallTo(() => actor2.GetState()) + .Returns(new EventConsumerInfo { Name = consumerName2, Position = "345 " }); + + var response = await pubSub.RequestAsync(new GetStatesRequest(), TimeSpan.FromSeconds(5), true); + + response.States.ShouldAllBeEquivalentTo(new EventConsumerInfo[] + { + new EventConsumerInfo { Name = consumerName1, Position = "123 " }, + new EventConsumerInfo { Name = consumerName2, Position = "345 " } + }); + } + + [Fact] + public void Should_not_dispose_actors() + { + sut.Dispose(); + + Assert.False(actor1.IsDisposed); + Assert.False(actor2.IsDisposed); + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs index 15a3a74c3..35588e911 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs @@ -47,22 +47,6 @@ namespace Squidex.Infrastructure.CQRS.Events .MustHaveHappened(); } - [Fact] - public async Task Should_propagate_closed_to_subscriber() - { - var ex = new InvalidOperationException(); - - A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) - .Throws(ex); - - var sut = new EventStoreSubscription(eventStore, eventSubscriber, "^my-stream", position); - - await WaitAndStopAsync(sut); - - A.CallTo(() => eventSubscriber.OnClosedAsync(sut)) - .MustHaveHappened(); - } - [Fact] public async Task Should_propagate_operation_cancelled_exception_to_subscriber() { diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerBootstrapTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerBootstrapTests.cs deleted file mode 100644 index 9e94d3831..000000000 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerBootstrapTests.cs +++ /dev/null @@ -1,58 +0,0 @@ -// ========================================================================== -// EventConsumerBootstrapTests.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Threading.Tasks; -using FakeItEasy; -using Orleans; -using Orleans.Providers; -using Squidex.Infrastructure.CQRS.Events.Orleans.Grains; -using Xunit; - -namespace Squidex.Infrastructure.CQRS.Events.Grains -{ - public sealed class EventConsumerBootstrapTests - { - private readonly IEventConsumerRegistryGrain registry = A.Fake(); - private readonly IProviderRuntime runtime = A.Fake(); - private readonly EventConsumerBootstrap sut = new EventConsumerBootstrap(); - - public EventConsumerBootstrapTests() - { - var factory = A.Fake(); - - A.CallTo(() => factory.GetGrain("Default", null)) - .Returns(registry); - - A.CallTo(() => runtime.GrainFactory) - .Returns(factory); - } - - [Fact] - public async Task Should_do_nothing_on_close() - { - await sut.Close(); - } - - [Fact] - public async Task Should_set_name_on_init() - { - await sut.Init("MyName", runtime, null); - - Assert.Equal("MyName", sut.Name); - } - - [Fact] - public async Task Should_activate_registry_on_init() - { - await sut.Init("MyName", runtime, null); - - A.CallTo(() => registry.ActivateAsync(null)) - .MustHaveHappened(); - } - } -} diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerGrainTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerGrainTests.cs deleted file mode 100644 index 780a7ec02..000000000 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerGrainTests.cs +++ /dev/null @@ -1,408 +0,0 @@ -// ========================================================================== -// EventConsumerGrainTests.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Threading.Tasks; -using FakeItEasy; -using Orleans.Concurrency; -using Orleans.Core; -using Orleans.Runtime; -using Squidex.Infrastructure.CQRS.Events.Orleans.Grains; -using Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation; -using Squidex.Infrastructure.Log; -using Xunit; - -namespace Squidex.Infrastructure.CQRS.Events.Grains -{ - public class EventConsumerGrainTests - { - public sealed class MyEvent : IEvent - { - } - - public sealed class MyEventConsumerGrain : EventConsumerGrain - { - public MyEventConsumerGrain( - EventDataFormatter formatter, - EventConsumerFactory eventConsumerFactory, - IEventStore eventStore, - ISemanticLog log, - IGrainIdentity identity, - IGrainRuntime runtime, - IStorage storage) - : base(formatter, eventConsumerFactory, eventStore, log, identity, runtime, storage) - { - } - - protected override IEventConsumerGrain GetSelf() - { - return this; - } - - protected override IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position) - { - return EventStore.CreateSubscription(subscriber, streamFilter, position); - } - } - - private readonly IEventConsumer eventConsumer = A.Fake(); - private readonly IEventStore eventStore = A.Fake(); - private readonly IEventSubscription eventSubscription = A.Fake(); - private readonly ISemanticLog log = A.Fake(); - private readonly IStorage storage = A.Fake>(); - private readonly EventDataFormatter formatter = A.Fake(); - private readonly EventData eventData = new EventData(); - private readonly Envelope envelope = new Envelope(new MyEvent()); - private readonly EventConsumerFactory factory; - private readonly MyEventConsumerGrain sut; - private readonly string consumerName; - private EventConsumerGrainState state = new EventConsumerGrainState(); - - public EventConsumerGrainTests() - { - factory = x => eventConsumer; - - state.Position = Guid.NewGuid().ToString(); - consumerName = eventConsumer.GetType().Name; - - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)).Returns(eventSubscription); - A.CallTo(() => eventConsumer.Name).Returns(consumerName); - - A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope); - - A.CallTo(() => storage.State).ReturnsLazily(() => state); - A.CallToSet(() => storage.State).Invokes(new Action(s => state = s)); - - sut = new MyEventConsumerGrain( - formatter, - factory, - eventStore, - log, - A.Fake(), - A.Fake(), - storage); - } - - [Fact] - public async Task Should_not_subscribe_to_event_store_when_stopped_in_db() - { - state.IsStopped = true; - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) - .MustNotHaveHappened(); - } - - [Fact] - public async Task Should_subscribe_to_event_store_when_not_stopped_in_db() - { - state.Position = "123"; - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, "123")) - .MustHaveHappened(Repeated.Exactly.Once); - } - - [Fact] - public async Task Should_stop_subscription_when_stopped() - { - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - await sut.StopAsync(); - await sut.StopAsync(); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - Assert.True(state.IsStopped); - } - - [Fact] - public async Task Should_reset_consumer_when_resetting() - { - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - await sut.StopAsync(); - await sut.ResetAsync(); - - A.CallTo(() => eventConsumer.ClearAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, state.Position)) - .MustHaveHappened(Repeated.Exactly.Once); - - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, null)) - .MustHaveHappened(Repeated.Exactly.Once); - - Assert.False(state.IsStopped); - } - - [Fact] - public async Task Should_unsubscribe_from_subscription_when_closed() - { - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnClosedAsync(eventSubscription); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(); - - Assert.False(state.IsStopped); - } - - [Fact] - public async Task Should_not_unsubscribe_from_subscription_when_closed_call_is_from_another_subscription() - { - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnClosedAsync(A.Fake()); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustNotHaveHappened(); - - Assert.False(state.IsStopped); - } - - [Fact] - public async Task Should_not_unsubscribe_from_subscription_when_not_running() - { - state.IsStopped = true; - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnClosedAsync(A.Fake()); - - A.CallTo(() => storage.WriteStateAsync()) - .MustNotHaveHappened(); - - Assert.True(state.IsStopped); - } - - [Fact] - public async Task Should_invoke_and_update_position_when_event_received() - { - var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnEventAsync(eventSubscription, @event); - - A.CallTo(() => eventConsumer.On(envelope)) - .MustHaveHappened(Repeated.Exactly.Once); - - Assert.Equal(@event.EventPosition, state.Position); - - var info = await sut.GetStateAsync(); - - Assert.Equal(@event.EventPosition, info.Value.Position); - } - - [Fact] - public async Task Should_ignore_old_events() - { - A.CallTo(() => formatter.Parse(eventData, true)) - .Throws(new TypeNameNotFoundException()); - - var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnEventAsync(eventSubscription, @event); - - A.CallTo(() => eventConsumer.On(envelope)) - .MustNotHaveHappened(); - - Assert.Equal(@event.EventPosition, state.Position); - } - - [Fact] - public async Task Should_not_invoke_and_update_position_when_event_is_from_another_subscription() - { - var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnEventAsync(A.Fake(), @event); - - A.CallTo(() => eventConsumer.On(envelope)) - .MustNotHaveHappened(); - } - - [Fact] - public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription() - { - var ex = new InvalidOperationException(); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnErrorAsync(A.Fake(), ex); - - Assert.False(state.IsStopped); - } - - [Fact] - public async Task Should_stop_if_subscription_failed() - { - var ex = new InvalidOperationException(); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnErrorAsync(eventSubscription, ex); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - Assert.True(state.IsStopped); - } - - [Fact] - public async Task Should_stop_if_subscription_failed_and_ignore_error_on_unsubscribe() - { - A.CallTo(() => eventSubscription.StopAsync()) - .Throws(new InvalidOperationException()); - - var ex = new InvalidOperationException(); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnErrorAsync(eventSubscription, ex); - - Assert.True(state.IsStopped); - } - - [Fact] - public async Task Should_stop_if_resetting_failed() - { - var ex = new InvalidOperationException(); - - A.CallTo(() => eventConsumer.ClearAsync()) - .Throws(ex); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await sut.ResetAsync(); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - Assert.True(state.IsStopped); - } - - [Fact] - public async Task Should_stop_if_handling_failed() - { - var ex = new InvalidOperationException(); - - A.CallTo(() => eventConsumer.On(envelope)) - .Throws(ex); - - var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnEventAsync(eventSubscription, @event); - - A.CallTo(() => eventConsumer.On(envelope)) - .MustHaveHappened(); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - Assert.True(state.IsStopped); - } - - [Fact] - public async Task Should_stop_if_deserialization_failed() - { - var ex = new InvalidOperationException(); - - A.CallTo(() => formatter.Parse(eventData, true)) - .Throws(ex); - - var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnEventAsync(eventSubscription, @event); - - A.CallTo(() => eventConsumer.On(envelope)) - .MustNotHaveHappened(); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - Assert.True(state.IsStopped); - } - - [Fact] - public async Task Should_start_after_stop_when_handling_failed() - { - var exception = new InvalidOperationException(); - - A.CallTo(() => eventConsumer.On(envelope)) - .Throws(exception); - - var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - - await sut.OnActivateAsync(); - await sut.ActivateAsync(); - - await OnEventAsync(eventSubscription, @event); - - Assert.True(state.IsStopped); - - await sut.StartAsync(); - await sut.StartAsync(); - - A.CallTo(() => eventConsumer.On(envelope)) - .MustHaveHappened(); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) - .MustHaveHappened(Repeated.Exactly.Twice); - - Assert.False(state.IsStopped); - } - - private Task OnErrorAsync(IEventSubscription subscriber, Exception ex) - { - return sut.OnErrorAsync(subscriber.AsImmutable(), ex.AsImmutable()); - } - - private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev) - { - return sut.OnEventAsync(subscriber.AsImmutable(), ev.AsImmutable()); - } - - private Task OnClosedAsync(IEventSubscription subscriber) - { - return sut.OnClosedAsync(subscriber.AsImmutable()); - } - } -} \ No newline at end of file diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerRegistryGrainTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerRegistryGrainTests.cs deleted file mode 100644 index 1dbba926d..000000000 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerRegistryGrainTests.cs +++ /dev/null @@ -1,165 +0,0 @@ -// ========================================================================== -// EventConsumerRegistryGrainTests.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Collections.Generic; -using System.Threading.Tasks; -using FakeItEasy; -using FluentAssertions; -using Orleans; -using Orleans.Concurrency; -using Orleans.Core; -using Orleans.Runtime; -using Squidex.Infrastructure.CQRS.Events.Orleans.Grains; -using Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation; -using Xunit; - -namespace Squidex.Infrastructure.CQRS.Events.Grains -{ - public class EventConsumerRegistryGrainTests - { - public class MyEventConsumerRegistryGrain : EventConsumerRegistryGrain - { - public MyEventConsumerRegistryGrain( - IEnumerable eventConsumers, - IGrainIdentity identity, - IGrainRuntime runtime) - : base(eventConsumers, identity, runtime) - { - } - } - - private readonly IEventConsumer consumerA = A.Fake(); - private readonly IEventConsumer consumerB = A.Fake(); - private readonly IEventConsumerGrain grainA = A.Fake(); - private readonly IEventConsumerGrain grainB = A.Fake(); - private readonly MyEventConsumerRegistryGrain sut; - - public EventConsumerRegistryGrainTests() - { - var grainRuntime = A.Fake(); - var grainFactory = A.Fake(); - - A.CallTo(() => grainFactory.GetGrain("a", null)).Returns(grainA); - A.CallTo(() => grainFactory.GetGrain("b", null)).Returns(grainB); - A.CallTo(() => grainRuntime.GrainFactory).Returns(grainFactory); - - A.CallTo(() => consumerA.Name).Returns("a"); - A.CallTo(() => consumerA.EventsFilter).Returns("^a-"); - - A.CallTo(() => consumerB.Name).Returns("b"); - A.CallTo(() => consumerB.EventsFilter).Returns("^b-"); - - sut = new MyEventConsumerRegistryGrain(new[] { consumerA, consumerB }, A.Fake(), grainRuntime); - } - - [Fact] - public async Task Should_not_activate_all_grains_on_activate() - { - await sut.OnActivateAsync(); - - A.CallTo(() => grainA.ActivateAsync()) - .MustNotHaveHappened(); - - A.CallTo(() => grainB.ActivateAsync()) - .MustNotHaveHappened(); - } - - [Fact] - public async Task Should_activate_all_grains_on_reminder() - { - await sut.ReceiveReminder(null, default(TickStatus)); - - A.CallTo(() => grainA.ActivateAsync()) - .MustHaveHappened(); - - A.CallTo(() => grainB.ActivateAsync()) - .MustHaveHappened(); - } - - [Fact] - public async Task Should_activate_all_grains_on_activate_with_null() - { - await sut.ActivateAsync(null); - - A.CallTo(() => grainA.ActivateAsync()) - .MustHaveHappened(); - - A.CallTo(() => grainB.ActivateAsync()) - .MustHaveHappened(); - } - - [Fact] - public async Task Should_activate_matching_grains_when_stream_name_defined() - { - await sut.ActivateAsync("a-123"); - - A.CallTo(() => grainA.ActivateAsync()) - .MustHaveHappened(); - - A.CallTo(() => grainB.ActivateAsync()) - .MustNotHaveHappened(); - } - - [Fact] - public async Task Should_start_matching_grain() - { - await sut.StartAsync("a"); - - A.CallTo(() => grainA.StartAsync()) - .MustHaveHappened(); - - A.CallTo(() => grainB.StartAsync()) - .MustNotHaveHappened(); - } - - [Fact] - public async Task Should_stop_matching_grain() - { - await sut.StopAsync("b"); - - A.CallTo(() => grainA.StopAsync()) - .MustNotHaveHappened(); - - A.CallTo(() => grainB.StopAsync()) - .MustHaveHappened(); - } - - [Fact] - public async Task Should_reset_matching_grain() - { - await sut.ResetAsync("b"); - - A.CallTo(() => grainA.ResetAsync()) - .MustNotHaveHappened(); - - A.CallTo(() => grainB.ResetAsync()) - .MustHaveHappened(); - } - - [Fact] - public async Task Should_fetch_infos_from_all_grains() - { - A.CallTo(() => grainA.GetStateAsync()) - .Returns(new Immutable( - new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" })); - - A.CallTo(() => grainB.GetStateAsync()) - .Returns(new Immutable( - new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" })); - - var infos = await sut.GetConsumersAsync(); - - infos.Value.ShouldBeEquivalentTo( - new List - { - new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" }, - new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" } - }); - } - } -} diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/OrleansEventNotifierTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/OrleansEventNotifierTests.cs deleted file mode 100644 index 325b97b05..000000000 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/OrleansEventNotifierTests.cs +++ /dev/null @@ -1,41 +0,0 @@ -// ========================================================================== -// OrleansEventNotifierTests.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using FakeItEasy; -using Orleans; -using Squidex.Infrastructure.CQRS.Events.Orleans; -using Squidex.Infrastructure.CQRS.Events.Orleans.Grains; -using Xunit; - -namespace Squidex.Infrastructure.CQRS.Events.Grains -{ - public class OrleansEventNotifierTests - { - private readonly IEventConsumerRegistryGrain registry = A.Fake(); - private readonly OrleansEventNotifier sut; - - public OrleansEventNotifierTests() - { - var factory = A.Fake(); - - A.CallTo(() => factory.GetGrain("Default", null)) - .Returns(registry); - - sut = new OrleansEventNotifier(factory); - } - - [Fact] - public void Should_activate_registry_with_stream_name() - { - sut.NotifyEventsStored("my-stream"); - - A.CallTo(() => registry.ActivateAsync("my-stream")) - .MustHaveHappened(); - } - } -} diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs index 84eb646d8..f68d22ae1 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs @@ -112,26 +112,6 @@ namespace Squidex.Infrastructure.CQRS.Events .MustNotHaveHappened(); } - [Fact] - public async Task Should_forward_closed_from_inner_subscription() - { - await OnClosedAsync(eventSubscription); - await sut.StopAsync(); - - A.CallTo(() => eventSubscriber.OnClosedAsync(sut)) - .MustHaveHappened(); - } - - [Fact] - public async Task Should_not_forward_closed_when_message_is_from_another_subscription() - { - await OnClosedAsync(A.Fake()); - await sut.StopAsync(); - - A.CallTo(() => eventSubscriber.OnClosedAsync(sut)) - .MustNotHaveHappened(); - } - private Task OnErrorAsync(IEventSubscription subscriber, Exception ex) { return sutSubscriber.OnErrorAsync(subscriber, ex); @@ -141,10 +121,5 @@ namespace Squidex.Infrastructure.CQRS.Events { return sutSubscriber.OnEventAsync(subscriber, ev); } - - private Task OnClosedAsync(IEventSubscription subscriber) - { - return sutSubscriber.OnClosedAsync(subscriber); - } } } \ No newline at end of file diff --git a/tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs b/tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs index edeac56d4..181dad179 100644 --- a/tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs @@ -61,7 +61,7 @@ namespace Squidex.Infrastructure.Caching { sut.Invalidate(123); - A.CallTo(() => pubsub.Publish("CacheInvalidations", A.Ignored, true)).MustNotHaveHappened(); + A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustNotHaveHappened(); } [Fact] @@ -69,7 +69,7 @@ namespace Squidex.Infrastructure.Caching { sut.Invalidate("a-key"); - A.CallTo(() => pubsub.Publish("CacheInvalidations", "a-key", true)).MustHaveHappened(); + A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustHaveHappened(); } [Fact] @@ -77,7 +77,7 @@ namespace Squidex.Infrastructure.Caching { ((IMemoryCache)sut).Invalidate("a-key"); - A.CallTo(() => pubsub.Publish("CacheInvalidations", "a-key", true)).MustHaveHappened(); + A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustHaveHappened(); } [Fact] @@ -117,7 +117,7 @@ namespace Squidex.Infrastructure.Caching Assert.Equal(123, anotherSut.Get("a-key")); - anotherPubsub.Publish("CacheInvalidations", "a-key", true); + anotherPubsub.Publish(new InvalidationMessage { CacheKey = "a-key" }, true); Assert.Equal(0, anotherSut.Get("a-key")); } diff --git a/tests/Squidex.Infrastructure.Tests/InMemoryPubSubTests.cs b/tests/Squidex.Infrastructure.Tests/InMemoryPubSubTests.cs index 56ec23778..f17cce7bc 100644 --- a/tests/Squidex.Infrastructure.Tests/InMemoryPubSubTests.cs +++ b/tests/Squidex.Infrastructure.Tests/InMemoryPubSubTests.cs @@ -6,7 +6,9 @@ // All rights reserved. // ========================================================================== +using System; using System.Collections.Generic; +using System.Threading.Tasks; using Xunit; namespace Squidex.Infrastructure @@ -15,36 +17,78 @@ namespace Squidex.Infrastructure { private readonly InMemoryPubSub sut = new InMemoryPubSub(); + private sealed class MessageA + { + public string Text { get; set; } + } + + private sealed class MessageB + { + public string Text { get; set; } + } + [Fact] public void Should_publish_to_handlers() { var channel1Events = new List(); var channel2Events = new List(); - sut.Subscribe("channel1", x => + sut.Subscribe(m => { - channel1Events.Add(x); + channel1Events.Add(m.Text); }); - sut.Subscribe("channel1", x => + sut.Subscribe(m => { - channel1Events.Add(x); + channel1Events.Add(m.Text); }); - sut.Subscribe("channel2", x => + sut.Subscribe(m => { - channel2Events.Add(x); + channel2Events.Add(m.Text); }); - sut.Publish("channel1", "1", true); - sut.Publish("channel1", "2", true); - sut.Publish("channel1", "3", false); + sut.Publish(new MessageA { Text = "1" }, true); + sut.Publish(new MessageA { Text = "2" }, true); + sut.Publish(new MessageA { Text = "3" }, false); - sut.Publish("channel2", "a", true); - sut.Publish("channel2", "b", true); + sut.Publish(new MessageB { Text = "a" }, true); + sut.Publish(new MessageB { Text = "b" }, true); Assert.Equal(new[] { "1", "1", "2", "2" }, channel1Events.ToArray()); Assert.Equal(new[] { "a", "b" }, channel2Events.ToArray()); } + + [Fact] + public async Task Should_make_request_reply_requests() + { + sut.ReceiveAsync(x => + { + return Task.FromResult(x + x); + }, true); + + var response = await sut.RequestAsync(2, TimeSpan.FromSeconds(2), true); + + Assert.Equal(4, response); + } + + [Fact] + public async Task Should_timeout_when_response_is_too_slow() + { + sut.ReceiveAsync(async x => + { + await Task.Delay(1000); + + return x + x; + }, true); + + await Assert.ThrowsAsync(() => sut.RequestAsync(1, TimeSpan.FromSeconds(0.5), true)); + } + + [Fact] + public async Task Should_timeout_when_nobody_responds() + { + await Assert.ThrowsAsync(() => sut.RequestAsync(2, TimeSpan.FromSeconds(0.5), true)); + } } } diff --git a/tests/Squidex.Infrastructure.Tests/Json/Orleans/JsonExternalSerializerTests.cs b/tests/Squidex.Infrastructure.Tests/Json/Orleans/JsonExternalSerializerTests.cs deleted file mode 100644 index 13f0ad5cd..000000000 --- a/tests/Squidex.Infrastructure.Tests/Json/Orleans/JsonExternalSerializerTests.cs +++ /dev/null @@ -1,108 +0,0 @@ -// ========================================================================== -// JsonExternalSerializerTests.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Generic; -using FakeItEasy; -using Newtonsoft.Json; -using Orleans.Serialization; -using Xunit; - -namespace Squidex.Infrastructure.Json.Orleans -{ - public class JsonExternalSerializerTests - { - private readonly JsonExternalSerializer sut = new JsonExternalSerializer(JsonSerializer.CreateDefault()); - - [Fact] - public void Should_serialize_js_only() - { - Assert.True(sut.IsSupportedType(typeof(J))); - Assert.True(sut.IsSupportedType(typeof(J>))); - - Assert.False(sut.IsSupportedType(typeof(int))); - Assert.False(sut.IsSupportedType(typeof(List))); - } - - [Fact] - public void Should_copy_null() - { - var v = (string)null; - var c = DeepCopy(v); - - Assert.Null(c); - } - - [Fact] - public void Should_copy_null_json() - { - var v = new J>(null); - var c = DeepCopy(v); - - Assert.Null(c.Value); - } - - [Fact] - public void Should_not_copy_immutable_values() - { - var v = new List { 1, 2, 3 }.AsJ(true); - var c = DeepCopy(v); - - Assert.Same(v.Value, c.Value); - } - - [Fact] - public void Should_copy_non_immutable_values() - { - var value = new J>(new List { 1, 2, 3 }); - var copy = (J>)sut.DeepCopy(value, null); - - Assert.Equal(value.Value, copy.Value); - Assert.NotSame(value.Value, copy.Value); - } - - [Fact] - public void Should_serialize_and_deserialize_value() - { - var value = new J>(new List { 1, 2, 3 }); - - var writtenLength = 0; - var writtenBuffer = (byte[])null; - - var writer = A.Fake(); - var writerContext = new SerializationContext(null) { StreamWriter = writer }; - - A.CallTo(() => writer.Write(A.Ignored)) - .Invokes(new Action(x => writtenLength = x)); - - A.CallTo(() => writer.Write(A.Ignored)) - .Invokes(new Action(x => writtenBuffer = x)); - - sut.Serialize(value, writerContext, value.GetType()); - - var reader = A.Fake(); - var readerContext = new DeserializationContext(null) { StreamReader = reader }; - - A.CallTo(() => reader.ReadInt()) - .Returns(writtenLength); - - A.CallTo(() => reader.ReadBytes(writtenLength)) - .Returns(writtenBuffer); - - var copy = (J>)sut.Deserialize(value.GetType(), readerContext); - - Assert.Equal(value.Value, copy.Value); - Assert.NotSame(value.Value, copy.Value); - } - - private T DeepCopy(T value) - { - return (T)sut.DeepCopy(value, null); - } - } -}