From bbc16bb22647523e9fbefe49175151e7ac29c821 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 24 Sep 2017 20:22:32 +0200 Subject: [PATCH] Refactorings and some tests --- .../CQRS/Events/PollingSubscription.cs | 63 +++-- src/Squidex.Infrastructure/Actors/Actor.cs | 42 ++-- .../Actors/DefaultRemoteActorChannel.cs | 10 +- src/Squidex.Infrastructure/Actors/IActor.cs | 10 +- src/Squidex.Infrastructure/Actors/IActors.cs | 10 +- .../Actors/IRemoteActorChannel.cs | 10 +- .../Actors/RemoteActors.cs | 10 +- .../CQRS/Events/Actors/EventConsumerActor.cs | 10 +- .../Actors/Messages/ReceiveEventMessage.cs | 10 +- .../Actors/Messages/ResetConsumerMessage.cs | 17 ++ .../Actors/Messages/ResetReceiverMessage.cs | 9 - .../Actors/Messages/StartConsumerMessage.cs | 17 ++ .../Actors/Messages/StartReceiverMessage.cs | 9 - .../Actors/Messages/StopConsumerMessage.cs | 19 ++ .../Actors/Messages/StopReceiverMessage.cs | 11 - .../Actors/Messages/SubscribeMessage.cs | 10 +- src/Squidex/Config/Domain/EventStoreModule.cs | 3 +- .../Config/Domain/InfrastructureModule.cs | 14 +- src/Squidex/Config/Domain/Usages.cs | 12 +- .../EventConsumersController.cs | 28 ++- .../administration-area.component.html | 2 + .../Benchmarks/Properties/launchSettings.json | 2 +- tests/Benchmarks/Tests/AppendToEventStore.cs | 1 - .../AppendToEventStoreWithManyWriters.cs | 1 - tests/Benchmarks/Tests/HandleEvents.cs | 11 +- .../Tests/HandleEventsWithManyWriters.cs | 10 +- tests/Benchmarks/Utils/Helper.cs | 6 - .../Actors/ActorRemoteTests.cs | 82 +++++++ .../Actors/ActorTests.cs | 130 +++++++++++ .../CQRS/Events/DefaultEventNotifierTests.cs | 17 +- .../CQRS/Events/EventReceiverTests.cs | 216 ------------------ 31 files changed, 460 insertions(+), 342 deletions(-) create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs create mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs delete mode 100644 src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs create mode 100644 tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs create mode 100644 tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs delete mode 100644 tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs index 9b46a5c72..416bd2691 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs @@ -22,24 +22,20 @@ namespace Squidex.Infrastructure.CQRS.Events { private readonly IEventNotifier eventNotifier; private readonly MongoEventStore eventStore; - private CancellationTokenSource cancelPolling; - private Timer pollTimer; + private readonly CancellationTokenSource pollStop = new CancellationTokenSource(); private Regex streamRegex; - private Guid subscription; private string streamFilter; private string position; + private bool isPolling; private IDisposable pollSubscription; private IActor parent; - private sealed class PollMessage : IMessage + private sealed class StartPollMessage : IMessage { } - private sealed class ReceiveMongoEventMessage : IMessage + private sealed class StopPollMessage : IMessage { - public StoredEvent Event; - - public Guid Subscription; } public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier) @@ -50,9 +46,7 @@ namespace Squidex.Infrastructure.CQRS.Events protected override Task OnStop() { - cancelPolling?.Cancel(); - - pollTimer?.Dispose(); + pollStop?.Cancel(); pollSubscription?.Dispose(); parent = null; @@ -86,57 +80,54 @@ namespace Squidex.Infrastructure.CQRS.Events { if (streamRegex.IsMatch(streamName)) { - SendAsync(new PollMessage()).Forget(); + SendAsync(new StartPollMessage()).Forget(); } }); - pollTimer = new Timer(d => + SendAsync(new StartPollMessage()).Forget(); + + break; + } + + case StartPollMessage poll when parent != null: + { + if (!isPolling) { - SendAsync(new PollMessage()).Forget(); - }); + isPolling = true; - pollTimer.Change(0, 5000); + PollAsync().Forget(); + } break; } - case PollMessage poll when parent != null: + case StopPollMessage poll when parent != null: { - cancelPolling?.Cancel(); - cancelPolling = new CancellationTokenSource(); - - subscription = Guid.NewGuid(); + isPolling = false; - PollAsync(subscription, cancelPolling.Token).Forget(); + Task.Delay(5000).ContinueWith(t => SendAsync(new StartPollMessage())).Forget(); break; } - case ReceiveMongoEventMessage receiveEvent when parent != null: + case ReceiveEventMessage receiveEvent when parent != null: { - if (receiveEvent.Subscription == subscription) - { - await parent.SendAsync(new ReceiveEventMessage { Event = receiveEvent.Event, Source = this }); + await parent.SendAsync(receiveEvent); - position = receiveEvent.Event.EventPosition; - } + position = receiveEvent.Event.EventPosition; break; } } } - private async Task PollAsync(Guid subscriptionId, CancellationToken ct) + private async Task PollAsync() { try { - await eventStore.GetEventsAsync(async e => - { - if (ct.IsCancellationRequested == true) - { - await SendAsync(new ReceiveMongoEventMessage { Event = e, Subscription = subscriptionId }); - } - }, ct, streamFilter, position); + await eventStore.GetEventsAsync(e => SendAsync(new ReceiveEventMessage { Event = e, Source = this }), pollStop.Token, streamFilter, position); + + await SendAsync(new StopPollMessage()); } catch (Exception ex) when (!(ex is OperationCanceledException)) { diff --git a/src/Squidex.Infrastructure/Actors/Actor.cs b/src/Squidex.Infrastructure/Actors/Actor.cs index 67788d75b..7c8065b8d 100644 --- a/src/Squidex.Infrastructure/Actors/Actor.cs +++ b/src/Squidex.Infrastructure/Actors/Actor.cs @@ -18,6 +18,7 @@ namespace Squidex.Infrastructure.Actors public abstract class Actor : IActor, IDisposable { private readonly ActionBlock block; + private bool isStopped; private sealed class StopMessage : IMessage { @@ -30,7 +31,7 @@ namespace Squidex.Infrastructure.Actors protected Actor() { - block = new ActionBlock(Handle, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 }); + block = new ActionBlock(Handle, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 10 }); } public void Dispose() @@ -75,28 +76,43 @@ namespace Squidex.Infrastructure.Actors private async Task Handle(IMessage message) { - try + if (isStopped) { - if (message is StopMessage) + return; + } + + switch (message) + { + case StopMessage stopMessage: { + isStopped = true; + block.Complete(); await OnStop(); + + break; } - else if (message is ErrorMessage errorMessage) + + case ErrorMessage errorMessage: { await OnError(errorMessage.Exception); + + break; } - else - { - await OnMessage(message); - } - } - catch (Exception ex) - { - if (!(message is ErrorMessage)) + + default: { - await block.SendAsync(new ErrorMessage { Exception = ex }); + try + { + await OnMessage(message); + } + catch (Exception ex) + { + await OnError(ex); + } + + break; } } } diff --git a/src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs b/src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs index aa7610ef5..82a9f21d5 100644 --- a/src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs +++ b/src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs @@ -1,4 +1,12 @@ -using System; +// ========================================================================== +// DefaultRemoteActorChannel.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; using System.Threading.Tasks; using Newtonsoft.Json; using Newtonsoft.Json.Linq; diff --git a/src/Squidex.Infrastructure/Actors/IActor.cs b/src/Squidex.Infrastructure/Actors/IActor.cs index 69f8aa7dd..06cf772fe 100644 --- a/src/Squidex.Infrastructure/Actors/IActor.cs +++ b/src/Squidex.Infrastructure/Actors/IActor.cs @@ -1,4 +1,12 @@ -using System; +// ========================================================================== +// IActor.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; using System.Threading.Tasks; namespace Squidex.Infrastructure.Actors diff --git a/src/Squidex.Infrastructure/Actors/IActors.cs b/src/Squidex.Infrastructure/Actors/IActors.cs index f2d898ff4..b5f1c84bb 100644 --- a/src/Squidex.Infrastructure/Actors/IActors.cs +++ b/src/Squidex.Infrastructure/Actors/IActors.cs @@ -1,4 +1,12 @@ -namespace Squidex.Infrastructure.Actors +// ========================================================================== +// IActors.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +namespace Squidex.Infrastructure.Actors { public interface IActors { diff --git a/src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs b/src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs index f3c8ace3e..04f5b22e2 100644 --- a/src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs +++ b/src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs @@ -1,4 +1,12 @@ -using System; +// ========================================================================== +// IRemoteActorChannel.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; using System.Threading.Tasks; namespace Squidex.Infrastructure.Actors diff --git a/src/Squidex.Infrastructure/Actors/RemoteActors.cs b/src/Squidex.Infrastructure/Actors/RemoteActors.cs index a4073687a..933afc977 100644 --- a/src/Squidex.Infrastructure/Actors/RemoteActors.cs +++ b/src/Squidex.Infrastructure/Actors/RemoteActors.cs @@ -1,4 +1,12 @@ -using System; +// ========================================================================== +// RemoteActors.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; using System.Collections.Concurrent; using System.Threading.Tasks; using Squidex.Infrastructure.Tasks; diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index f92da6df8..445c84dd0 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -48,6 +48,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors Guard.NotNull(eventConsumer, nameof(eventConsumer)); this.eventConsumer = eventConsumer; + + SendAsync(new StartConsumerMessage()); } protected override async Task OnStop() @@ -67,21 +69,21 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { switch (message) { - case StopReceiverMessage stopReceiver: + case StopConsumerMessage stopConsumer: { - await StopAsync(stopReceiver.Exception); + await StopAsync(stopConsumer.Exception); break; } - case StartReceiverMessage startReceiver: + case StartConsumerMessage startConsumer: { await StartAsync(); break; } - case ResetReceiverMessage resetReceiver: + case ResetConsumerMessage resetConsumer: { await StopAsync(); await ResetAsync(); diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs index bbb6d3a26..9cf8208bf 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs @@ -1,4 +1,12 @@ -using Squidex.Infrastructure.Actors; +// ========================================================================== +// ReceiveEventMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Squidex.Infrastructure.Actors; namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs new file mode 100644 index 000000000..b86c4d090 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs @@ -0,0 +1,17 @@ +// ========================================================================== +// ResetConsumerMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Squidex.Infrastructure.Actors; + +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages +{ + [TypeName(nameof(ResetConsumerMessage))] + public sealed class ResetConsumerMessage : IMessage + { + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs deleted file mode 100644 index 292680cd3..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs +++ /dev/null @@ -1,9 +0,0 @@ -using Squidex.Infrastructure.Actors; - -namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages -{ - [TypeName(nameof(ResetReceiverMessage))] - public sealed class ResetReceiverMessage : IMessage - { - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs new file mode 100644 index 000000000..2f6cc9a08 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs @@ -0,0 +1,17 @@ +// ========================================================================== +// StartConsumerMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Squidex.Infrastructure.Actors; + +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages +{ + [TypeName(nameof(StartConsumerMessage))] + public sealed class StartConsumerMessage : IMessage + { + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs deleted file mode 100644 index b6851589e..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs +++ /dev/null @@ -1,9 +0,0 @@ -using Squidex.Infrastructure.Actors; - -namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages -{ - [TypeName(nameof(StartReceiverMessage))] - public sealed class StartReceiverMessage : IMessage - { - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs new file mode 100644 index 000000000..ac6a196c1 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs @@ -0,0 +1,19 @@ +// ========================================================================== +// StopConsumerMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using Squidex.Infrastructure.Actors; + +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages +{ + [TypeName(nameof(StopConsumerMessage))] + public sealed class StopConsumerMessage : IMessage + { + public Exception Exception { get; set; } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs deleted file mode 100644 index 52937d2bf..000000000 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using Squidex.Infrastructure.Actors; - -namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages -{ - [TypeName(nameof(StopReceiverMessage))] - public sealed class StopReceiverMessage : IMessage - { - public Exception Exception { get; set; } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs index 003a255de..5055b857a 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs @@ -1,4 +1,12 @@ -using Squidex.Infrastructure.Actors; +// ========================================================================== +// SubscribeMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Squidex.Infrastructure.Actors; namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages { diff --git a/src/Squidex/Config/Domain/EventStoreModule.cs b/src/Squidex/Config/Domain/EventStoreModule.cs index ab54322a7..ea282e94b 100644 --- a/src/Squidex/Config/Domain/EventStoreModule.cs +++ b/src/Squidex/Config/Domain/EventStoreModule.cs @@ -14,6 +14,7 @@ using Microsoft.Extensions.Configuration; using MongoDB.Driver; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.CQRS.Events.Actors; namespace Squidex.Config.Domain { @@ -35,7 +36,7 @@ namespace Squidex.Config.Domain if (consumeEvents) { - builder.RegisterType() + builder.RegisterType() .AsSelf() .InstancePerDependency(); } diff --git a/src/Squidex/Config/Domain/InfrastructureModule.cs b/src/Squidex/Config/Domain/InfrastructureModule.cs index 464e202a2..651af7079 100644 --- a/src/Squidex/Config/Domain/InfrastructureModule.cs +++ b/src/Squidex/Config/Domain/InfrastructureModule.cs @@ -18,6 +18,7 @@ using NodaTime; using Squidex.Domain.Apps.Core.Schemas; using Squidex.Domain.Apps.Core.Schemas.Json; using Squidex.Infrastructure; +using Squidex.Infrastructure.Actors; using Squidex.Infrastructure.Assets; using Squidex.Infrastructure.Assets.ImageSharp; using Squidex.Infrastructure.Caching; @@ -131,6 +132,15 @@ namespace Squidex.Config.Domain .As() .SingleInstance(); + builder.Register(c => new InvalidatingMemoryCache(new MemoryCache(c.Resolve>()), c.Resolve())) + .As() + .SingleInstance(); + + builder.RegisterType() + .As() + .AsSelf() + .SingleInstance(); + builder.RegisterType() .AsSelf() .SingleInstance(); @@ -146,10 +156,6 @@ namespace Squidex.Config.Domain builder.RegisterType() .AsSelf() .SingleInstance(); - - builder.Register(c => new InvalidatingMemoryCache(new MemoryCache(c.Resolve>()), c.Resolve())) - .As() - .SingleInstance(); } } } diff --git a/src/Squidex/Config/Domain/Usages.cs b/src/Squidex/Config/Domain/Usages.cs index 2fad3865a..facf93d5a 100644 --- a/src/Squidex/Config/Domain/Usages.cs +++ b/src/Squidex/Config/Domain/Usages.cs @@ -10,7 +10,9 @@ using System.Collections.Generic; using Microsoft.AspNetCore.Builder; using Microsoft.Extensions.DependencyInjection; using Squidex.Infrastructure; +using Squidex.Infrastructure.Actors; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.CQRS.Events.Actors; namespace Squidex.Config.Domain { @@ -20,13 +22,15 @@ namespace Squidex.Config.Domain { app.ApplicationServices.GetService().CleanAsync().Wait(); - var catchConsumers = app.ApplicationServices.GetServices(); + var consumers = app.ApplicationServices.GetServices(); - foreach (var catchConsumer in catchConsumers) + foreach (var consumer in consumers) { - var receiver = app.ApplicationServices.GetService(); + var actor = app.ApplicationServices.GetService(); - receiver?.Subscribe(catchConsumer); + actor?.Subscribe(consumer); + + app.ApplicationServices.GetService().Connect(consumer.Name, actor); } return app; diff --git a/src/Squidex/Controllers/Api/EventConsumers/EventConsumersController.cs b/src/Squidex/Controllers/Api/EventConsumers/EventConsumersController.cs index 6364b2d16..4c792a84d 100644 --- a/src/Squidex/Controllers/Api/EventConsumers/EventConsumersController.cs +++ b/src/Squidex/Controllers/Api/EventConsumers/EventConsumersController.cs @@ -11,7 +11,9 @@ using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using NSwag.Annotations; using Squidex.Controllers.Api.EventConsumers.Models; +using Squidex.Infrastructure.Actors; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.CQRS.Events.Actors.Messages; using Squidex.Infrastructure.Reflection; using Squidex.Pipeline; @@ -23,10 +25,13 @@ namespace Squidex.Controllers.Api.EventConsumers public sealed class EventConsumersController : Controller { private readonly IEventConsumerInfoRepository eventConsumerRepository; + private readonly IActors actors; - public EventConsumersController(IEventConsumerInfoRepository eventConsumerRepository) + public EventConsumersController(IEventConsumerInfoRepository eventConsumerRepository, IActors actors) { this.eventConsumerRepository = eventConsumerRepository; + + this.actors = actors; } [HttpGet] @@ -46,7 +51,12 @@ namespace Squidex.Controllers.Api.EventConsumers [ApiCosts(0)] public async Task Start(string name) { - await eventConsumerRepository.StartAsync(name); + var actor = actors.Get(name); + + if (actor != null) + { + await actor.SendAsync(new StartConsumerMessage()); + } return NoContent(); } @@ -56,7 +66,12 @@ namespace Squidex.Controllers.Api.EventConsumers [ApiCosts(0)] public async Task Stop(string name) { - await eventConsumerRepository.StopAsync(name); + var actor = actors.Get(name); + + if (actor != null) + { + await actor.SendAsync(new StopConsumerMessage()); + } return NoContent(); } @@ -66,7 +81,12 @@ namespace Squidex.Controllers.Api.EventConsumers [ApiCosts(0)] public async Task Reset(string name) { - await eventConsumerRepository.ResetAsync(name); + var actor = actors.Get(name); + + if (actor != null) + { + await actor.SendAsync(new ResetConsumerMessage()); + } return NoContent(); } diff --git a/src/Squidex/app/features/administration/administration-area.component.html b/src/Squidex/app/features/administration/administration-area.component.html index 6f9a1a759..e75dab4a4 100644 --- a/src/Squidex/app/features/administration/administration-area.component.html +++ b/src/Squidex/app/features/administration/administration-area.component.html @@ -1,3 +1,5 @@ + +