diff --git a/Squidex.sln.DotSettings b/Squidex.sln.DotSettings
index 7563aa71c..dd880d5ac 100644
--- a/Squidex.sln.DotSettings
+++ b/Squidex.sln.DotSettings
@@ -1,97 +1,7 @@
- True
- False
- True
- False
- True
- False
- True
- True
- True
- True
- True
- True
- True
- True
- True
- False
- Truev
- False
- True
- True
- True
- True
- True
- True
- DO_NOT_SHOW
-
- DO_NOT_SHOW
- WARNING
- DO_NOT_SHOW
- DO_NOT_SHOW
- DO_NOT_SHOW
- DO_NOT_SHOW
- DO_NOT_SHOW
- DO_NOT_SHOW
- DO_NOT_SHOW
- DO_NOT_SHOW
- WARNING
- DO_NOT_SHOW
- DO_NOT_SHOW
- DO_NOT_SHOW
- DO_NOT_SHOW
- WARNING
-
- ExplicitlyExcluded
- TypeScript16
<?xml version="1.0" encoding="utf-16"?><Profile name="Header"><CSUpdateFileHeader>True</CSUpdateFileHeader></Profile>
<?xml version="1.0" encoding="utf-16"?><Profile name="Namespaces"><CSOptimizeUsings><OptimizeUsings>True</OptimizeUsings><EmbraceInRegion>False</EmbraceInRegion><RegionName></RegionName></CSOptimizeUsings><CSUpdateFileHeader>True</CSUpdateFileHeader></Profile>
<?xml version="1.0" encoding="utf-16"?><Profile name="Typescript"><JsInsertSemicolon>True</JsInsertSemicolon><FormatAttributeQuoteDescriptor>True</FormatAttributeQuoteDescriptor><CorrectVariableKindsDescriptor>True</CorrectVariableKindsDescriptor><VariablesToInnerScopesDescriptor>True</VariablesToInnerScopesDescriptor><StringToTemplatesDescriptor>True</StringToTemplatesDescriptor><RemoveRedundantQualifiersTs>True</RemoveRedundantQualifiersTs><OptimizeImportsTs>True</OptimizeImportsTs></Profile>
-
- SingleQuoted
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="I" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="T" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- <Policy Inspect="True" Prefix="" Suffix="" Style="AaBb" />
- C:\Users\mail2\AppData\Local\JetBrains\Transient\ReSharperPlatformVs15\v08_85ffde88\SolutionCaches
==========================================================================
$FILENAME$
Squidex Headless CMS
@@ -99,5 +9,5 @@
Copyright (c) Squidex Group
All rights reserved.
==========================================================================
-
+
\ No newline at end of file
diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
index f650103fb..5c05fb531 100644
--- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
+++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
@@ -44,11 +44,11 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
this.eventConsumerInfoRepository = eventConsumerInfoRepository;
}
- public void Subscribe(IEventConsumer eventConsumer)
+ public Task SubscribeAsync(IEventConsumer eventConsumer)
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
- SendAsync(new SetupConsumerMessage { EventConsumer = eventConsumer });
+ return SendAsync(new SetupConsumerMessage { EventConsumer = eventConsumer });
}
protected override async Task OnStop()
diff --git a/src/Squidex/Config/Domain/Usages.cs b/src/Squidex/Config/Domain/Usages.cs
index 564f63419..4150b363a 100644
--- a/src/Squidex/Config/Domain/Usages.cs
+++ b/src/Squidex/Config/Domain/Usages.cs
@@ -30,7 +30,7 @@ namespace Squidex.Config.Domain
if (actor != null)
{
- actor.Subscribe(consumer);
+ actor.SubscribeAsync(consumer);
app.ApplicationServices.GetService().Connect(consumer.Name, actor);
}
diff --git a/tests/Benchmarks/Tests/HandleEvents.cs b/tests/Benchmarks/Tests/HandleEvents.cs
index bf4ec52c0..75c552af2 100644
--- a/tests/Benchmarks/Tests/HandleEvents.cs
+++ b/tests/Benchmarks/Tests/HandleEvents.cs
@@ -67,7 +67,7 @@ namespace Benchmarks.Tests
eventStore = new MongoEventStore(mongoDatabase, eventNotifier);
eventConsumerActor = new EventConsumerActor(formatter, eventStore, eventConsumerInfos, log);
- eventConsumerActor.Subscribe(eventConsumer);
+ eventConsumerActor.SubscribeAsync(eventConsumer);
}
public long Run()
diff --git a/tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs b/tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs
index 32f73c8e8..372130baf 100644
--- a/tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs
+++ b/tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs
@@ -70,7 +70,7 @@ namespace Benchmarks.Tests
eventStore = new MongoEventStore(mongoDatabase, eventNotifier);
eventConsumerActor = new EventConsumerActor(formatter, eventStore, eventConsumerInfos, log);
- eventConsumerActor.Subscribe(eventConsumer);
+ eventConsumerActor.SubscribeAsync(eventConsumer);
}
public long Run()
diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventReceiverActorTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventReceiverActorTests.cs
new file mode 100644
index 000000000..e08468429
--- /dev/null
+++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventReceiverActorTests.cs
@@ -0,0 +1,242 @@
+// ==========================================================================
+// EventReceiverActorTests.cs
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex Group
+// All rights reserved.
+// ==========================================================================
+
+using System;
+using System.Threading.Tasks;
+using FakeItEasy;
+using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
+using Squidex.Infrastructure.Log;
+using Xunit;
+
+namespace Squidex.Infrastructure.CQRS.Events.Actors
+{
+ public class EventReceiverActorTests
+ {
+ public sealed class MyEvent : IEvent
+ {
+ }
+
+ private sealed class MyEventConsumerInfo : IEventConsumerInfo
+ {
+ public bool IsStopped { get; set; }
+ public bool IsResetting { get; set; }
+
+ public string Name { get; set; }
+ public string Error { get; set; }
+ public string Position { get; set; }
+ }
+
+ private readonly IEventConsumerInfoRepository eventConsumerInfoRepository = A.Fake();
+ private readonly IEventConsumer eventConsumer = A.Fake();
+ private readonly IEventStore eventStore = A.Fake();
+ private readonly IEventSubscription eventSubscription = A.Fake();
+ private readonly ISemanticLog log = A.Fake();
+ private readonly EventDataFormatter formatter = A.Fake();
+ private readonly EventData eventData = new EventData();
+ private readonly Envelope envelope = new Envelope(new MyEvent());
+ private readonly EventConsumerActor sut;
+ private readonly MyEventConsumerInfo consumerInfo = new MyEventConsumerInfo();
+ private readonly string consumerName;
+
+ public EventReceiverActorTests()
+ {
+ consumerInfo.Position = Guid.NewGuid().ToString();
+ consumerName = eventConsumer.GetType().Name;
+
+ A.CallTo(() => eventStore.CreateSubscription()).Returns(eventSubscription);
+
+ A.CallTo(() => eventConsumer.Name).Returns(consumerName);
+ A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(consumerInfo);
+
+ A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
+
+ sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log);
+ }
+
+ [Fact]
+ public async Task Should_subscribe_to_event_store_when_started()
+ {
+ await SubscribeAsync();
+
+ await sut.StopAsync();
+
+ A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventSubscription.SendAsync(A.That.Matches(s => s.Parent == sut && s.Position == consumerInfo.Position)))
+ .MustHaveHappened();
+ }
+
+ [Fact]
+ public async Task Should_stop_subscription_when_stopped()
+ {
+ await SubscribeAsync();
+
+ await sut.SendAsync(new StopConsumerMessage());
+ await sut.StopAsync();
+
+ A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, null))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventSubscription.StopAsync())
+ .MustHaveHappened();
+
+ A.CallTo(() => eventSubscription.SendAsync(A.That.Matches(s => s.Parent == sut && s.Position == consumerInfo.Position)))
+ .MustHaveHappened();
+ }
+
+ [Fact]
+ public async Task Should_reset_consumer_when_resetting()
+ {
+ await SubscribeAsync();
+
+ await sut.SendAsync(new ResetConsumerMessage());
+ await sut.StopAsync();
+
+ A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
+ .MustHaveHappened(Repeated.Exactly.Twice);
+
+ A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, null, true))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, null))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventConsumer.ClearAsync())
+ .MustHaveHappened();
+
+ A.CallTo(() => eventSubscription.SendAsync(A.That.Matches(s => s.Parent == sut && s.Position == consumerInfo.Position)))
+ .MustHaveHappened(Repeated.Exactly.Twice);
+
+ A.CallTo(() => eventSubscription.StopAsync())
+ .MustHaveHappened();
+ }
+
+ [Fact]
+ public async Task Should_invoke_and_update_position_when_event_received()
+ {
+ var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
+
+ await SubscribeAsync();
+
+ await sut.SendAsync(new ReceiveEventMessage { Event = @event, Source = eventSubscription });
+ await sut.StopAsync();
+
+ A.CallTo(() => eventConsumer.On(envelope))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
+ .MustHaveHappened();
+ }
+
+ [Fact]
+ public async Task Should_not_invoke_and_update_position_when_event_is_from_another_subscription()
+ {
+ var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
+
+ await SubscribeAsync();
+
+ await sut.SendAsync(new ReceiveEventMessage { Event = @event });
+ await sut.StopAsync();
+
+ A.CallTo(() => eventConsumer.On(envelope))
+ .MustNotHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
+ .MustNotHaveHappened();
+ }
+
+ [Fact]
+ public async Task Should_stop_if_resetting_failed()
+ {
+ var exception = new InvalidOperationException("Exception");
+
+ A.CallTo(() => eventConsumer.ClearAsync())
+ .Throws(exception);
+
+ var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
+
+ await SubscribeAsync();
+
+ await sut.SendAsync(new ResetConsumerMessage());
+ await sut.StopAsync();
+
+ A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.Message))
+ .MustHaveHappened();
+ }
+
+ [Fact]
+ public async Task Should_stop_if_handling_failed()
+ {
+ var exception = new InvalidOperationException("Exception");
+
+ A.CallTo(() => eventConsumer.On(envelope))
+ .Throws(exception);
+
+ var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
+
+ await SubscribeAsync();
+
+ await sut.SendAsync(new ReceiveEventMessage { Event = @event, Source = eventSubscription });
+ await sut.StopAsync();
+
+ A.CallTo(() => eventConsumer.On(envelope))
+ .MustHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
+ .MustNotHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.Message))
+ .MustHaveHappened();
+ }
+
+ [Fact]
+ public async Task Should_stop_if_deserialization_failed()
+ {
+ var exception = new InvalidOperationException("Exception");
+
+ A.CallTo(() => formatter.Parse(eventData, true))
+ .Throws(exception);
+
+ var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
+
+ await SubscribeAsync();
+
+ await sut.SendAsync(new ReceiveEventMessage { Event = @event, Source = eventSubscription });
+ await sut.StopAsync();
+
+ A.CallTo(() => eventConsumer.On(envelope))
+ .MustNotHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
+ .MustNotHaveHappened();
+
+ A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.Message))
+ .MustHaveHappened();
+ }
+
+ private async Task SubscribeAsync()
+ {
+ await sut.SubscribeAsync(eventConsumer);
+
+ await Task.Delay(200);
+ }
+ }
+}
\ No newline at end of file