diff --git a/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/FastlyActionHandler.cs b/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/FastlyActionHandler.cs index bc3e716b3..5a8a252bd 100644 --- a/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/FastlyActionHandler.cs +++ b/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/FastlyActionHandler.cs @@ -8,14 +8,11 @@ using System; using System.Collections.Generic; using System.Net.Http; -using System.Text; using System.Threading.Tasks; -using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Squidex.Domain.Apps.Core.Rules; using Squidex.Domain.Apps.Core.Rules.Actions; using Squidex.Domain.Apps.Events; -using Squidex.Infrastructure; using Squidex.Infrastructure.EventSourcing; using Squidex.Infrastructure.Http; diff --git a/src/Squidex.Domain.Apps.Entities/Apps/AppCommandMiddleware.cs b/src/Squidex.Domain.Apps.Entities/Apps/AppCommandMiddleware.cs index 02bb242bd..9439961c4 100644 --- a/src/Squidex.Domain.Apps.Entities/Apps/AppCommandMiddleware.cs +++ b/src/Squidex.Domain.Apps.Entities/Apps/AppCommandMiddleware.cs @@ -6,7 +6,6 @@ // ========================================================================== using System; -using System.Collections.Generic; using System.Threading.Tasks; using Squidex.Domain.Apps.Entities.Apps.Commands; using Squidex.Domain.Apps.Entities.Apps.Guards; diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs index 3fbffbe37..f5102f54a 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -46,6 +46,10 @@ namespace Squidex.Infrastructure.EventSourcing return TaskHelper.Done; } + public void WakeUp() + { + } + private EventStoreCatchUpSubscription SubscribeToStream(string streamName) { var settings = CatchUpSubscriptionSettings.Default; diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs index 2b4c072a3..6bca83110 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs @@ -56,7 +56,7 @@ namespace Squidex.Infrastructure.EventSourcing Guard.NotNull(subscriber, nameof(subscriber)); Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); - return new PollingSubscription(this, notifier, subscriber, streamFilter, position); + return new PollingSubscription(this, subscriber, streamFilter, position); } public async Task> GetEventsAsync(string streamName, long streamPosition = 0) diff --git a/src/Squidex.Infrastructure/EventSourcing/DefaultEventNotifier.cs b/src/Squidex.Infrastructure/EventSourcing/DefaultEventNotifier.cs deleted file mode 100644 index 1e94354e3..000000000 --- a/src/Squidex.Infrastructure/EventSourcing/DefaultEventNotifier.cs +++ /dev/null @@ -1,40 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; - -namespace Squidex.Infrastructure.EventSourcing -{ - public sealed class DefaultEventNotifier : IEventNotifier - { - private static readonly string ChannelName = typeof(DefaultEventNotifier).Name; - - private readonly IPubSub pubsub; - - public sealed class EventNotification - { - public string StreamName { get; set; } - } - - public DefaultEventNotifier(IPubSub pubsub) - { - Guard.NotNull(pubsub, nameof(pubsub)); - - this.pubsub = pubsub; - } - - public void NotifyEventsStored(string streamName) - { - pubsub.Publish(new EventNotification { StreamName = streamName }, true); - } - - public IDisposable Subscribe(Action handler) - { - return pubsub.Subscribe(x => handler?.Invoke(x.StreamName)); - } - } -} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerBootstrap.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerBootstrap.cs new file mode 100644 index 000000000..40425f5e7 --- /dev/null +++ b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerBootstrap.cs @@ -0,0 +1,34 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Orleans; +using Orleans.Runtime; + +namespace Squidex.Infrastructure.EventSourcing.Grains +{ + public sealed class EventConsumerBootstrap : ILifecycleParticipant + { + private readonly IGrainFactory grainFactory; + + public EventConsumerBootstrap(IGrainFactory grainFactory) + { + Guard.NotNull(grainFactory, nameof(grainFactory)); + + this.grainFactory = grainFactory; + } + + public void Participate(ISiloLifecycle lifecycle) + { + lifecycle.Subscribe(SiloLifecycleStage.SiloActive, ct => + { + var grain = grainFactory.GetGrain("Default"); + + return grain.ActivateAsync(); + }); + } + } +} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs index 04194ae7f..343462025 100644 --- a/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs +++ b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs @@ -8,120 +8,104 @@ using System; using System.Runtime.CompilerServices; using System.Threading.Tasks; +using Orleans; +using Orleans.Concurrency; +using Orleans.Core; +using Orleans.Runtime; using Squidex.Infrastructure.Log; using Squidex.Infrastructure.States; using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.EventSourcing.Grains { - public class EventConsumerGrain : DisposableObjectBase, IStatefulObject, IEventSubscriber + public class EventConsumerGrain : Grain, IEventConsumerGrain { + private readonly EventConsumerFactory eventConsumerFactory; private readonly IEventDataFormatter eventDataFormatter; private readonly IEventStore eventStore; private readonly ISemanticLog log; - private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(1); + private readonly IPersistence persistence; + private TaskScheduler scheduler; private IEventSubscription currentSubscription; private IEventConsumer eventConsumer; - private IPersistence persistence; private EventConsumerState state = new EventConsumerState(); public EventConsumerGrain( + EventConsumerFactory eventConsumerFactory, + IStore store, IEventStore eventStore, IEventDataFormatter eventDataFormatter, ISemanticLog log) + : this (eventConsumerFactory, store, eventStore, eventDataFormatter, null, null, log) + { + } + + protected EventConsumerGrain( + EventConsumerFactory eventConsumerFactory, + IStore store, + IEventStore eventStore, + IEventDataFormatter eventDataFormatter, + IGrainIdentity identity, + IGrainRuntime runtime, + ISemanticLog log) { Guard.NotNull(log, nameof(log)); + Guard.NotNull(store, nameof(store)); Guard.NotNull(eventStore, nameof(eventStore)); Guard.NotNull(eventDataFormatter, nameof(eventDataFormatter)); + Guard.NotNull(eventConsumerFactory, nameof(eventConsumerFactory)); this.log = log; this.eventStore = eventStore; this.eventDataFormatter = eventDataFormatter; - } + this.eventConsumerFactory = eventConsumerFactory; - protected override void DisposeObject(bool disposing) - { - if (disposing) - { - dispatcher.StopAndWaitAsync().Wait(); - } + persistence = store.WithSnapshots(this.GetPrimaryKeyString(), s => state = s); } - public Task ActivateAsync(string key, IStore store) + public override Task OnActivateAsync() { - persistence = store.WithSnapshots(key, s => state = s); + scheduler = TaskScheduler.Current; + + eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString()); return persistence.ReadAsync(); } protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position) { - return new RetrySubscription(eventStore, this, streamFilter, position); - } - - public virtual EventConsumerInfo GetState() - { - return state.ToInfo(this.eventConsumer.Name); - } - - public virtual void Stop() - { - dispatcher.DispatchAsync(HandleStopAsync).Forget(); - } - - public virtual void Start() - { - dispatcher.DispatchAsync(HandleStartAsync).Forget(); - } - - public virtual void Reset() - { - dispatcher.DispatchAsync(HandleResetAsync).Forget(); - } - - public virtual void Activate(IEventConsumer eventConsumer) - { - Guard.NotNull(eventConsumer, nameof(eventConsumer)); - - dispatcher.DispatchAsync(() => HandleSetupAsync(eventConsumer)).Forget(); + return new RetrySubscription(eventStore, new WrapperSubscription(this.AsReference(), scheduler), streamFilter, position); } - private Task HandleSetupAsync(IEventConsumer consumer) + public virtual Task> GetStateAsync() { - eventConsumer = consumer; - - if (!state.IsStopped) - { - Subscribe(state.Position); - } - - return TaskHelper.Done; + return Task.FromResult(state.ToInfo(this.eventConsumer.Name).AsImmutable()); } - private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + public Task OnEventAsync(Immutable subscription, Immutable storedEvent) { - if (subscription != currentSubscription) + if (subscription.Value != currentSubscription) { return TaskHelper.Done; } return DoAndUpdateStateAsync(async () => { - var @event = ParseKnownEvent(storedEvent); + var @event = ParseKnownEvent(storedEvent.Value); if (@event != null) { await DispatchConsumerAsync(@event); } - state = state.Handled(storedEvent.EventPosition); + state = state.Handled(storedEvent.Value.EventPosition); }); } - private Task HandleErrorAsync(IEventSubscription subscription, Exception exception) + public Task OnErrorAsync(Immutable subscription, Immutable exception) { - if (subscription != currentSubscription) + if (subscription.Value != currentSubscription) { return TaskHelper.Done; } @@ -130,11 +114,28 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { Unsubscribe(); - state = state.Failed(exception); + state = state.Failed(exception.Value); }); } - private Task HandleStartAsync() + public Task WakeUpAsync() + { + currentSubscription?.WakeUp(); + + return TaskHelper.Done; + } + + public Task ActivateAsync() + { + if (!state.IsStopped) + { + Subscribe(state.Position); + } + + return TaskHelper.Done; + } + + public Task StartAsync() { if (!state.IsStopped) { @@ -149,7 +150,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains }); } - private Task HandleStopAsync() + public Task StopAsync() { if (state.IsStopped) { @@ -164,7 +165,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains }); } - private Task HandleResetAsync() + public Task ResetAsync() { return DoAndUpdateStateAsync(async () => { @@ -178,16 +179,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains }); } - Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) - { - return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent)); - } - - Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) - { - return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception)); - } - private Task DoAndUpdateStateAsync(Action action, [CallerMemberName] string caller = null) { return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }, caller); diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrainManager.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrainManager.cs deleted file mode 100644 index e833aa5b4..000000000 --- a/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrainManager.cs +++ /dev/null @@ -1,90 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Squidex.Infrastructure.EventSourcing.Grains.Messages; -using Squidex.Infrastructure.States; - -namespace Squidex.Infrastructure.EventSourcing.Grains -{ - public sealed class EventConsumerGrainManager : DisposableObjectBase, IRunnable - { - private readonly IStateFactory factory; - private readonly IPubSub pubSub; - private readonly List consumers; - private readonly List subscriptions = new List(); - - public EventConsumerGrainManager(IEnumerable consumers, IPubSub pubSub, IStateFactory factory) - { - Guard.NotNull(pubSub, nameof(pubSub)); - Guard.NotNull(factory, nameof(factory)); - Guard.NotNull(consumers, nameof(consumers)); - - this.pubSub = pubSub; - this.factory = factory; - this.consumers = consumers.ToList(); - } - - public void Run() - { - var actors = new Dictionary(); - - foreach (var consumer in consumers) - { - var actor = factory.CreateAsync(consumer.Name).Result; - - actors[consumer.Name] = actor; - actor.Activate(consumer); - } - - subscriptions.Add(pubSub.Subscribe(m => - { - if (actors.TryGetValue(m.ConsumerName, out var actor)) - { - actor.Start(); - } - })); - - subscriptions.Add(pubSub.Subscribe(m => - { - if (actors.TryGetValue(m.ConsumerName, out var actor)) - { - actor.Stop(); - } - })); - - subscriptions.Add(pubSub.Subscribe(m => - { - if (actors.TryGetValue(m.ConsumerName, out var actor)) - { - actor.Reset(); - } - })); - - subscriptions.Add(pubSub.ReceiveAsync(request => - { - var states = actors.Values.Select(x => x.GetState()).ToArray(); - - return Task.FromResult(new GetStatesResponse { States = states }); - })); - } - - protected override void DisposeObject(bool disposing) - { - if (disposing) - { - foreach (var subscription in subscriptions) - { - subscription.Dispose(); - } - } - } - } -} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs new file mode 100644 index 000000000..a18f79078 --- /dev/null +++ b/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs @@ -0,0 +1,107 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Orleans; +using Orleans.Concurrency; +using Orleans.Core; +using Orleans.Runtime; + +namespace Squidex.Infrastructure.EventSourcing.Grains +{ + public class EventConsumerManagerGrain : Grain, IEventConsumerManagerGrain + { + private readonly IEnumerable eventConsumers; + + public EventConsumerManagerGrain(IEnumerable eventConsumers) + : this(eventConsumers, null, null) + { + } + + protected EventConsumerManagerGrain( + IEnumerable eventConsumers, + IGrainIdentity identity, + IGrainRuntime runtime) + : base(identity, runtime) + { + Guard.NotNull(eventConsumers, nameof(eventConsumers)); + + this.eventConsumers = eventConsumers; + } + + public Task ReceiveReminder(string reminderName, TickStatus status) + { + return ActivateAsync(); + } + + public override Task OnActivateAsync() + { + DelayDeactivation(TimeSpan.FromDays(1)); + + RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10)); + RegisterTimer(x => ActivateAsync(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); + + return Task.FromResult(true); + } + + public Task ActivateAsync() + { + var tasks = + eventConsumers + .Select(c => GrainFactory.GetGrain(c.Name)) + .Select(c => c.ActivateAsync()); + + return Task.WhenAll(tasks); + } + + public Task WakeUpAsync(string streamName) + { + var tasks = + eventConsumers + .Where(c => streamName == null || Regex.IsMatch(streamName, c.EventsFilter)) + .Select(c => GrainFactory.GetGrain(c.Name)) + .Select(c => c.WakeUpAsync()); + + return Task.WhenAll(tasks); + } + + public Task>> GetConsumersAsync() + { + var tasks = + eventConsumers + .Select(c => GrainFactory.GetGrain(c.Name)) + .Select(c => c.GetStateAsync()); + + return Task.WhenAll(tasks).ContinueWith(x => new Immutable>(x.Result.Select(r => r.Value).ToList())); + } + + public Task ResetAsync(string consumerName) + { + var eventConsumer = GrainFactory.GetGrain(consumerName); + + return eventConsumer.ResetAsync(); + } + + public Task StartAsync(string consumerName) + { + var eventConsumer = GrainFactory.GetGrain(consumerName); + + return eventConsumer.StartAsync(); + } + + public Task StopAsync(string consumerName) + { + var eventConsumer = GrainFactory.GetGrain(consumerName); + + return eventConsumer.StopAsync(); + } + } +} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerGrain.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerGrain.cs new file mode 100644 index 000000000..fdb330b61 --- /dev/null +++ b/src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerGrain.cs @@ -0,0 +1,33 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using Orleans; +using Orleans.Concurrency; + +namespace Squidex.Infrastructure.EventSourcing.Grains +{ + public interface IEventConsumerGrain : IGrainWithStringKey + { + Task> GetStateAsync(); + + Task ActivateAsync(); + + Task StopAsync(); + + Task StartAsync(); + + Task ResetAsync(); + + Task WakeUpAsync(); + + Task OnEventAsync(Immutable subscription, Immutable storedEvent); + + Task OnErrorAsync(Immutable subscription, Immutable exception); + } +} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerManagerGrain.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerManagerGrain.cs new file mode 100644 index 000000000..549953924 --- /dev/null +++ b/src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerManagerGrain.cs @@ -0,0 +1,29 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans; +using Orleans.Concurrency; + +namespace Squidex.Infrastructure.EventSourcing.Grains +{ + public interface IEventConsumerManagerGrain : IGrainWithStringKey + { + Task ActivateAsync(); + + Task WakeUpAsync(string streamName); + + Task StopAsync(string consumerName); + + Task StartAsync(string consumerName); + + Task ResetAsync(string consumerName); + + Task>> GetConsumersAsync(); + } +} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesRequest.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesRequest.cs deleted file mode 100644 index d193d7ebb..000000000 --- a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesRequest.cs +++ /dev/null @@ -1,13 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -namespace Squidex.Infrastructure.EventSourcing.Grains.Messages -{ - public sealed class GetStatesRequest - { - } -} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesResponse.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesResponse.cs deleted file mode 100644 index 922116d82..000000000 --- a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesResponse.cs +++ /dev/null @@ -1,14 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -namespace Squidex.Infrastructure.EventSourcing.Grains.Messages -{ - public sealed class GetStatesResponse - { - public EventConsumerInfo[] States { get; set; } - } -} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/ResetConsumerMessage.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/ResetConsumerMessage.cs deleted file mode 100644 index 012cfd2e1..000000000 --- a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/ResetConsumerMessage.cs +++ /dev/null @@ -1,14 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -namespace Squidex.Infrastructure.EventSourcing.Grains.Messages -{ - public sealed class ResetConsumerMessage - { - public string ConsumerName { get; set; } - } -} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StartConsumerMessage.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StartConsumerMessage.cs deleted file mode 100644 index 8d8378653..000000000 --- a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StartConsumerMessage.cs +++ /dev/null @@ -1,14 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -namespace Squidex.Infrastructure.EventSourcing.Grains.Messages -{ - public sealed class StartConsumerMessage - { - public string ConsumerName { get; set; } - } -} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StopConsumerMessage.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StopConsumerMessage.cs deleted file mode 100644 index 5a354a468..000000000 --- a/src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StopConsumerMessage.cs +++ /dev/null @@ -1,14 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -namespace Squidex.Infrastructure.EventSourcing.Grains.Messages -{ - public sealed class StopConsumerMessage - { - public string ConsumerName { get; set; } - } -} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/OrleansEventNotifier.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/OrleansEventNotifier.cs new file mode 100644 index 000000000..4ce33f916 --- /dev/null +++ b/src/Squidex.Infrastructure/EventSourcing/Grains/OrleansEventNotifier.cs @@ -0,0 +1,34 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using Orleans; + +namespace Squidex.Infrastructure.EventSourcing.Grains +{ + public sealed class OrleansEventNotifier : IEventNotifier + { + private readonly IEventConsumerManagerGrain eventConsumerManagerGrain; + + public OrleansEventNotifier(IGrainFactory factory) + { + Guard.NotNull(factory, nameof(factory)); + + eventConsumerManagerGrain = factory.GetGrain("Default"); + } + + public void NotifyEventsStored(string streamName) + { + eventConsumerManagerGrain.WakeUpAsync(streamName); + } + + public IDisposable Subscribe(Action handler) + { + return null; + } + } +} diff --git a/src/Squidex.Infrastructure/EventSourcing/Grains/WrapperSubscription.cs b/src/Squidex.Infrastructure/EventSourcing/Grains/WrapperSubscription.cs new file mode 100644 index 000000000..012179f1f --- /dev/null +++ b/src/Squidex.Infrastructure/EventSourcing/Grains/WrapperSubscription.cs @@ -0,0 +1,42 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Threading; +using System.Threading.Tasks; +using Orleans.Concurrency; + +namespace Squidex.Infrastructure.EventSourcing.Grains +{ + internal sealed class WrapperSubscription : IEventSubscriber + { + private readonly IEventConsumerGrain grain; + private readonly TaskScheduler scheduler; + + public WrapperSubscription(IEventConsumerGrain grain, TaskScheduler scheduler) + { + this.grain = grain; + + this.scheduler = scheduler ?? TaskScheduler.Default; + } + + public Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + { + return Dispatch(() => grain.OnEventAsync(subscription.AsImmutable(), storedEvent.AsImmutable())); + } + + public Task OnErrorAsync(IEventSubscription subscription, Exception exception) + { + return Dispatch(() => grain.OnErrorAsync(subscription.AsImmutable(), exception.AsImmutable())); + } + + private Task Dispatch(Func task) + { + return Task.Factory.StartNew(() => task(), CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap(); + } + } +} diff --git a/src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs b/src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs index a33b1f22b..48ead1da9 100644 --- a/src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs +++ b/src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs @@ -11,6 +11,8 @@ namespace Squidex.Infrastructure.EventSourcing { public interface IEventSubscription { + void WakeUp(); + Task StopAsync(); } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs b/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs index dd5fc072b..cfdb4486d 100644 --- a/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs +++ b/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs @@ -14,10 +14,8 @@ namespace Squidex.Infrastructure.EventSourcing { public sealed class PollingSubscription : IEventSubscription { - private readonly IEventNotifier eventNotifier; private readonly IEventStore eventStore; private readonly IEventSubscriber eventSubscriber; - private readonly IDisposable notification; private readonly CompletionTimer timer; private readonly Regex streamRegex; private readonly string streamFilter; @@ -25,17 +23,14 @@ namespace Squidex.Infrastructure.EventSourcing public PollingSubscription( IEventStore eventStore, - IEventNotifier eventNotifier, IEventSubscriber eventSubscriber, string streamFilter, string position) { Guard.NotNull(eventStore, nameof(eventStore)); - Guard.NotNull(eventNotifier, nameof(eventNotifier)); Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); this.position = position; - this.eventNotifier = eventNotifier; this.eventStore = eventStore; this.eventSubscriber = eventSubscriber; this.streamFilter = streamFilter; @@ -61,20 +56,15 @@ namespace Squidex.Infrastructure.EventSourcing } } }); + } - notification = eventNotifier.Subscribe(streamName => - { - if (streamRegex.IsMatch(streamName)) - { - timer.SkipCurrentDelay(); - } - }); + public void WakeUp() + { + timer.SkipCurrentDelay(); } public Task StopAsync() { - notification?.Dispose(); - return timer.StopAsync(); } } diff --git a/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs b/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs index d023eead5..60a9f5679 100644 --- a/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs +++ b/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs @@ -57,6 +57,11 @@ namespace Squidex.Infrastructure.EventSourcing currentSubscription = null; } + public void WakeUp() + { + currentSubscription?.WakeUp(); + } + private async Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) { if (subscription == currentSubscription) diff --git a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj index ba723caf1..b7c609898 100644 --- a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj +++ b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj @@ -10,6 +10,9 @@ + + + diff --git a/src/Squidex.Infrastructure/TypeNameRegistry.cs b/src/Squidex.Infrastructure/TypeNameRegistry.cs index c000e29eb..233f5ee28 100644 --- a/src/Squidex.Infrastructure/TypeNameRegistry.cs +++ b/src/Squidex.Infrastructure/TypeNameRegistry.cs @@ -23,19 +23,14 @@ namespace Squidex.Infrastructure lock (namesByType) { - try + if (typesByName.TryGetValue(name, out var existingType) && existingType != type) { - typesByName.Add(name, type); - } - catch (ArgumentException) - { - if (typesByName[name] != type) - { - var message = $"The name '{name}' is already registered with type '{typesByName[name]}'"; + var message = $"The name '{name}' is already registered with type '{typesByName[name]}'"; - throw new ArgumentException(message, nameof(type)); - } + throw new ArgumentException(message, nameof(type)); } + + typesByName[name] = type; } return this; @@ -62,33 +57,23 @@ namespace Squidex.Infrastructure lock (namesByType) { - try - { - namesByType.Add(type, name); - } - catch (ArgumentException) + if (namesByType.TryGetValue(type, out var existingName) && existingName != name) { - if (namesByType[type] != name) - { - var message = $"The type '{type}' is already registered with name '{namesByType[type]}'"; + var message = $"The type '{type}' is already registered with name '{namesByType[type]}'"; - throw new ArgumentException(message, nameof(type)); - } + throw new ArgumentException(message, nameof(type)); } - try - { - typesByName.Add(name, type); - } - catch (ArgumentException) + namesByType[type] = name; + + if (typesByName.TryGetValue(name, out var existingType) && existingType != type) { - if (typesByName[name] != type) - { - var message = $"The name '{name}' is already registered with type '{typesByName[name]}'"; + var message = $"The name '{name}' is already registered with type '{typesByName[name]}'"; - throw new ArgumentException(message, nameof(type)); - } + throw new ArgumentException(message, nameof(type)); } + + typesByName[name] = type; } return this; diff --git a/src/Squidex/Areas/Api/Controllers/EventConsumers/EventConsumersController.cs b/src/Squidex/Areas/Api/Controllers/EventConsumers/EventConsumersController.cs index 41e232307..7e961c6ce 100644 --- a/src/Squidex/Areas/Api/Controllers/EventConsumers/EventConsumersController.cs +++ b/src/Squidex/Areas/Api/Controllers/EventConsumers/EventConsumersController.cs @@ -5,15 +5,14 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using NSwag.Annotations; +using Orleans; using Squidex.Areas.Api.Controllers.EventConsumers.Models; -using Squidex.Infrastructure; using Squidex.Infrastructure.Commands; -using Squidex.Infrastructure.EventSourcing.Grains.Messages; +using Squidex.Infrastructure.EventSourcing.Grains; using Squidex.Infrastructure.Reflection; using Squidex.Pipeline; @@ -25,12 +24,12 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers [SwaggerIgnore] public sealed class EventConsumersController : ApiController { - private readonly IPubSub pubSub; + private readonly IEventConsumerManagerGrain eventConsumerManagerGrain; - public EventConsumersController(ICommandBus commandBus, IPubSub pubSub) + public EventConsumersController(ICommandBus commandBus, IClusterClient orleans) : base(commandBus) { - this.pubSub = pubSub; + eventConsumerManagerGrain = orleans.GetGrain("Default"); } [HttpGet] @@ -38,9 +37,9 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers [ApiCosts(0)] public async Task GetEventConsumers() { - var entities = await pubSub.RequestAsync(new GetStatesRequest(), TimeSpan.FromSeconds(2), true); + var entities = await eventConsumerManagerGrain.GetConsumersAsync(); - var models = entities.States.Select(x => SimpleMapper.Map(x, new EventConsumerDto())).ToList(); + var models = entities.Value.Select(x => SimpleMapper.Map(x, new EventConsumerDto())).ToList(); return Ok(models); } @@ -48,9 +47,9 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers [HttpPut] [Route("event-consumers/{name}/start/")] [ApiCosts(0)] - public IActionResult Start(string name) + public async Task Start(string name) { - pubSub.Publish(new StartConsumerMessage { ConsumerName = name }, true); + await eventConsumerManagerGrain.StartAsync(name); return NoContent(); } @@ -58,9 +57,9 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers [HttpPut] [Route("event-consumers/{name}/stop/")] [ApiCosts(0)] - public IActionResult Stop(string name) + public async Task Stop(string name) { - pubSub.Publish(new StopConsumerMessage { ConsumerName = name }, true); + await eventConsumerManagerGrain.StopAsync(name); return NoContent(); } @@ -68,9 +67,9 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers [HttpPut] [Route("event-consumers/{name}/reset/")] [ApiCosts(0)] - public IActionResult Reset(string name) + public async Task Reset(string name) { - pubSub.Publish(new ResetConsumerMessage { ConsumerName = name }, true); + await eventConsumerManagerGrain.ResetAsync(name); return NoContent(); } diff --git a/src/Squidex/Config/Authentication/MicrosoftHandler.cs b/src/Squidex/Config/Authentication/MicrosoftHandler.cs index c308814af..168995ad9 100644 --- a/src/Squidex/Config/Authentication/MicrosoftHandler.cs +++ b/src/Squidex/Config/Authentication/MicrosoftHandler.cs @@ -5,7 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Security.Claims; using System.Threading.Tasks; using Microsoft.AspNetCore.Authentication.OAuth; using Squidex.Shared.Identity; diff --git a/src/Squidex/Config/Domain/ReadServices.cs b/src/Squidex/Config/Domain/ReadServices.cs index cd3961628..97dfe5c6e 100644 --- a/src/Squidex/Config/Domain/ReadServices.cs +++ b/src/Squidex/Config/Domain/ReadServices.cs @@ -40,10 +40,6 @@ namespace Squidex.Config.Domain if (consumeEvents) { - services.AddTransient(); - - services.AddSingletonAs() - .As(); services.AddSingletonAs() .As(); } @@ -113,7 +109,7 @@ namespace Squidex.Config.Domain services.AddSingletonAs() .As(); - services.AddSingletonAs() + services.AddSingletonAs() .As(); services.AddSingletonAs() diff --git a/src/Squidex/Config/Orleans/ClientServices.cs b/src/Squidex/Config/Orleans/ClientServices.cs new file mode 100644 index 000000000..60dd40a81 --- /dev/null +++ b/src/Squidex/Config/Orleans/ClientServices.cs @@ -0,0 +1,41 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using Microsoft.Extensions.DependencyInjection; +using Orleans; +using Squidex.Infrastructure.EventSourcing.Grains; + +namespace Squidex.Config.Orleans +{ + public static class ClientServices + { + public static void AddAppClient(this IServiceCollection services) + { + services.AddSingletonAs(c => c.GetRequiredService()) + .As(); + + services.AddSingletonAs(c => + { + var client = new ClientBuilder() + .ConfigureApplicationParts(builder => + { + builder.AddApplicationPart(typeof(EventConsumerGrain).Assembly); + }) + .UseStaticGatewayListProvider(options => + { + options.Gateways.Add(new Uri("gwy.tcp://127.0.0.1:40000/0")); + }) + .Build(); + + client.Connect().Wait(); + + return client; + }); + } + } +} diff --git a/src/Squidex/Config/Orleans/SiloExtensions.cs b/src/Squidex/Config/Orleans/SiloExtensions.cs new file mode 100644 index 000000000..430220d07 --- /dev/null +++ b/src/Squidex/Config/Orleans/SiloExtensions.cs @@ -0,0 +1,34 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Microsoft.Extensions.Configuration; +using Orleans; +using Orleans.Hosting; +using Orleans.Runtime.Configuration; + +namespace Squidex.Config.Orleans +{ + public static class SiloExtensions + { + public static ISiloHostBuilder UseContentRoot(this ISiloHostBuilder builder, string path) + { + builder.ConfigureAppConfiguration(config => + { + config.SetBasePath(path); + }); + + return builder; + } + + public static ClusterConfiguration WithDashboard(this ClusterConfiguration config) + { + config.RegisterDashboard(); + + return config; + } + } +} diff --git a/src/Squidex/Config/Orleans/SiloServices.cs b/src/Squidex/Config/Orleans/SiloServices.cs new file mode 100644 index 000000000..577246eff --- /dev/null +++ b/src/Squidex/Config/Orleans/SiloServices.cs @@ -0,0 +1,57 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Linq; +using System.Net; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Orleans; +using Orleans.Runtime; +using Orleans.Runtime.Configuration; +using Squidex.Infrastructure.EventSourcing.Grains; + +namespace Squidex.Config.Orleans +{ + public static class SiloServices + { + public static void AddAppSiloServices(this IServiceCollection services, IConfiguration config) + { + services.AddSingletonAs() + .As>(); + + /* + var clusterConfiguration = + services.Where(x => x.ServiceType == typeof(ClusterConfiguration)) + .Select(x => x.ImplementationInstance) + .Select(x => (ClusterConfiguration)x) + .FirstOrDefault(); + + if (clusterConfiguration != null) + { + clusterConfiguration.Globals.RegisterBootstrapProvider("EventConsumers"); + + var ipConfig = config.GetRequiredValue("orleans:hostNameOrIPAddress"); + + if (ipConfig.Equals("Host", StringComparison.OrdinalIgnoreCase)) + { + ipConfig = Dns.GetHostName(); + } + else if (ipConfig.Equals("FirstIPAddressOfHost")) + { + var ips = Dns.GetHostAddressesAsync(Dns.GetHostName()).Result; + + ipConfig = ips.FirstOrDefault()?.ToString(); + } + + clusterConfiguration.Defaults.PropagateActivityId = true; + clusterConfiguration.Defaults.ProxyGatewayEndpoint = new IPEndPoint(IPAddress.Any, 40000); + clusterConfiguration.Defaults.HostNameOrIPAddress = ipConfig; + }*/ + } + } +} \ No newline at end of file diff --git a/src/Squidex/Program.cs b/src/Squidex/Program.cs index 0266f9c99..5e7392532 100644 --- a/src/Squidex/Program.cs +++ b/src/Squidex/Program.cs @@ -5,8 +5,14 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; using System.IO; using Microsoft.AspNetCore.Hosting; +using Orleans; +using Orleans.Hosting; +using Orleans.Runtime.Configuration; +using Squidex.Config.Orleans; +using Squidex.Infrastructure.EventSourcing.Grains; using Squidex.Infrastructure.Log.Adapter; namespace Squidex @@ -15,21 +21,59 @@ namespace Squidex { public static void Main(string[] args) { - new WebHostBuilder() - .UseKestrel(k => { k.AddServerHeader = false; }) + var silo = new SiloHostBuilder() + .UseConfiguration(ClusterConfiguration.LocalhostPrimarySilo(33333)) .UseContentRoot(Directory.GetCurrentDirectory()) - .UseIISIntegration() - .UseStartup() + .ConfigureServices((context, services) => + { + services.AddAppSiloServices(context.Configuration); + services.AddAppServices(context.Configuration); + }) + .ConfigureApplicationParts(builder => + { + builder.AddApplicationPart(typeof(EventConsumerManagerGrain).Assembly); + }) .ConfigureLogging(builder => { builder.AddSemanticLog(); }) .ConfigureAppConfiguration((hostContext, builder) => { - builder.AddAppConfiguration(hostContext.HostingEnvironment.EnvironmentName, args); + builder.AddAppConfiguration(GetEnvironment(), args); }) - .Build() - .Run(); + .Build(); + + silo.StartAsync().Wait(); + + try + { + new WebHostBuilder() + .UseKestrel(k => { k.AddServerHeader = false; }) + .UseContentRoot(Directory.GetCurrentDirectory()) + .UseIISIntegration() + .UseStartup() + .ConfigureLogging(builder => + { + builder.AddSemanticLog(); + }) + .ConfigureAppConfiguration((hostContext, builder) => + { + builder.AddAppConfiguration(hostContext.HostingEnvironment.EnvironmentName, args); + }) + .Build() + .Run(); + } + finally + { + silo.StopAsync().Wait(); + } + } + + private static string GetEnvironment() + { + var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT"); + + return environment ?? "Development"; } } } diff --git a/src/Squidex/Squidex.csproj b/src/Squidex/Squidex.csproj index 6a2adcef5..1f408129b 100644 --- a/src/Squidex/Squidex.csproj +++ b/src/Squidex/Squidex.csproj @@ -70,6 +70,8 @@ + + @@ -77,6 +79,7 @@ + diff --git a/src/Squidex/WebStartup.cs b/src/Squidex/WebStartup.cs index 7632fba3a..08d81bf1b 100644 --- a/src/Squidex/WebStartup.cs +++ b/src/Squidex/WebStartup.cs @@ -15,6 +15,7 @@ using Squidex.Areas.Frontend; using Squidex.Areas.IdentityServer; using Squidex.Areas.Portal; using Squidex.Config.Domain; +using Squidex.Config.Orleans; using Squidex.Config.Web; namespace Squidex @@ -30,6 +31,7 @@ namespace Squidex public IServiceProvider ConfigureServices(IServiceCollection services) { + services.AddAppClient(); services.AddAppServices(configuration); return services.BuildServiceProvider(); diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerTests.cs index 88571afab..1a35c41a2 100644 --- a/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerTests.cs +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerTests.cs @@ -9,7 +9,6 @@ using System; using System.Threading.Tasks; using FakeItEasy; using FluentAssertions; -using Squidex.Infrastructure.EventSourcing.Grains.Messages; using Squidex.Infrastructure.States; using Xunit; @@ -25,7 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains private readonly IPubSub pubSub = new InMemoryPubSub(); private readonly string consumerName1 = "Consumer1"; private readonly string consumerName2 = "Consumer2"; - private readonly EventConsumerGrainManager sut; + private readonly EventConsumerManagerGrain sut; public EventConsumerManagerTests() { @@ -35,7 +34,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => factory.CreateAsync(consumerName1)).Returns(actor1); A.CallTo(() => factory.CreateAsync(consumerName2)).Returns(actor2); - sut = new EventConsumerGrainManager(new IEventConsumer[] { consumer1, consumer2 }, pubSub, factory); + sut = new EventConsumerManagerGrain(new IEventConsumer[] { consumer1, consumer2 }, pubSub, factory); } [Fact]