Browse Source

Refactorings and some tests

pull/131/head
Sebastian Stehle 9 years ago
parent
commit
bbc16bb226
  1. 63
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs
  2. 42
      src/Squidex.Infrastructure/Actors/Actor.cs
  3. 10
      src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs
  4. 10
      src/Squidex.Infrastructure/Actors/IActor.cs
  5. 10
      src/Squidex.Infrastructure/Actors/IActors.cs
  6. 10
      src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs
  7. 10
      src/Squidex.Infrastructure/Actors/RemoteActors.cs
  8. 10
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
  9. 10
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs
  10. 17
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs
  11. 9
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs
  12. 17
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs
  13. 9
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs
  14. 19
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs
  15. 11
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs
  16. 10
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs
  17. 3
      src/Squidex/Config/Domain/EventStoreModule.cs
  18. 14
      src/Squidex/Config/Domain/InfrastructureModule.cs
  19. 12
      src/Squidex/Config/Domain/Usages.cs
  20. 28
      src/Squidex/Controllers/Api/EventConsumers/EventConsumersController.cs
  21. 2
      src/Squidex/app/features/administration/administration-area.component.html
  22. 2
      tests/Benchmarks/Properties/launchSettings.json
  23. 1
      tests/Benchmarks/Tests/AppendToEventStore.cs
  24. 1
      tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs
  25. 11
      tests/Benchmarks/Tests/HandleEvents.cs
  26. 10
      tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs
  27. 6
      tests/Benchmarks/Utils/Helper.cs
  28. 82
      tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs
  29. 130
      tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs
  30. 17
      tests/Squidex.Infrastructure.Tests/CQRS/Events/DefaultEventNotifierTests.cs
  31. 216
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

63
src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs

@ -22,24 +22,20 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
private readonly IEventNotifier eventNotifier; private readonly IEventNotifier eventNotifier;
private readonly MongoEventStore eventStore; private readonly MongoEventStore eventStore;
private CancellationTokenSource cancelPolling; private readonly CancellationTokenSource pollStop = new CancellationTokenSource();
private Timer pollTimer;
private Regex streamRegex; private Regex streamRegex;
private Guid subscription;
private string streamFilter; private string streamFilter;
private string position; private string position;
private bool isPolling;
private IDisposable pollSubscription; private IDisposable pollSubscription;
private IActor parent; private IActor parent;
private sealed class PollMessage : IMessage private sealed class StartPollMessage : IMessage
{ {
} }
private sealed class ReceiveMongoEventMessage : IMessage private sealed class StopPollMessage : IMessage
{ {
public StoredEvent Event;
public Guid Subscription;
} }
public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier) public PollingSubscription(MongoEventStore eventStore, IEventNotifier eventNotifier)
@ -50,9 +46,7 @@ namespace Squidex.Infrastructure.CQRS.Events
protected override Task OnStop() protected override Task OnStop()
{ {
cancelPolling?.Cancel(); pollStop?.Cancel();
pollTimer?.Dispose();
pollSubscription?.Dispose(); pollSubscription?.Dispose();
parent = null; parent = null;
@ -86,57 +80,54 @@ namespace Squidex.Infrastructure.CQRS.Events
{ {
if (streamRegex.IsMatch(streamName)) if (streamRegex.IsMatch(streamName))
{ {
SendAsync(new PollMessage()).Forget(); SendAsync(new StartPollMessage()).Forget();
} }
}); });
pollTimer = new Timer(d => SendAsync(new StartPollMessage()).Forget();
break;
}
case StartPollMessage poll when parent != null:
{
if (!isPolling)
{ {
SendAsync(new PollMessage()).Forget(); isPolling = true;
});
pollTimer.Change(0, 5000); PollAsync().Forget();
}
break; break;
} }
case PollMessage poll when parent != null: case StopPollMessage poll when parent != null:
{ {
cancelPolling?.Cancel(); isPolling = false;
cancelPolling = new CancellationTokenSource();
subscription = Guid.NewGuid();
PollAsync(subscription, cancelPolling.Token).Forget(); Task.Delay(5000).ContinueWith(t => SendAsync(new StartPollMessage())).Forget();
break; break;
} }
case ReceiveMongoEventMessage receiveEvent when parent != null: case ReceiveEventMessage receiveEvent when parent != null:
{ {
if (receiveEvent.Subscription == subscription) await parent.SendAsync(receiveEvent);
{
await parent.SendAsync(new ReceiveEventMessage { Event = receiveEvent.Event, Source = this });
position = receiveEvent.Event.EventPosition; position = receiveEvent.Event.EventPosition;
}
break; break;
} }
} }
} }
private async Task PollAsync(Guid subscriptionId, CancellationToken ct) private async Task PollAsync()
{ {
try try
{ {
await eventStore.GetEventsAsync(async e => await eventStore.GetEventsAsync(e => SendAsync(new ReceiveEventMessage { Event = e, Source = this }), pollStop.Token, streamFilter, position);
{
if (ct.IsCancellationRequested == true) await SendAsync(new StopPollMessage());
{
await SendAsync(new ReceiveMongoEventMessage { Event = e, Subscription = subscriptionId });
}
}, ct, streamFilter, position);
} }
catch (Exception ex) when (!(ex is OperationCanceledException)) catch (Exception ex) when (!(ex is OperationCanceledException))
{ {

42
src/Squidex.Infrastructure/Actors/Actor.cs

@ -18,6 +18,7 @@ namespace Squidex.Infrastructure.Actors
public abstract class Actor : IActor, IDisposable public abstract class Actor : IActor, IDisposable
{ {
private readonly ActionBlock<IMessage> block; private readonly ActionBlock<IMessage> block;
private bool isStopped;
private sealed class StopMessage : IMessage private sealed class StopMessage : IMessage
{ {
@ -30,7 +31,7 @@ namespace Squidex.Infrastructure.Actors
protected Actor() protected Actor()
{ {
block = new ActionBlock<IMessage>(Handle, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 }); block = new ActionBlock<IMessage>(Handle, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 10 });
} }
public void Dispose() public void Dispose()
@ -75,28 +76,43 @@ namespace Squidex.Infrastructure.Actors
private async Task Handle(IMessage message) private async Task Handle(IMessage message)
{ {
try if (isStopped)
{ {
if (message is StopMessage) return;
}
switch (message)
{
case StopMessage stopMessage:
{ {
isStopped = true;
block.Complete(); block.Complete();
await OnStop(); await OnStop();
break;
} }
else if (message is ErrorMessage errorMessage)
case ErrorMessage errorMessage:
{ {
await OnError(errorMessage.Exception); await OnError(errorMessage.Exception);
break;
} }
else
{ default:
await OnMessage(message);
}
}
catch (Exception ex)
{
if (!(message is ErrorMessage))
{ {
await block.SendAsync(new ErrorMessage { Exception = ex }); try
{
await OnMessage(message);
}
catch (Exception ex)
{
await OnError(ex);
}
break;
} }
} }
} }

10
src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs

@ -1,4 +1,12 @@
using System; // ==========================================================================
// DefaultRemoteActorChannel.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using Newtonsoft.Json; using Newtonsoft.Json;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;

10
src/Squidex.Infrastructure/Actors/IActor.cs

@ -1,4 +1,12 @@
using System; // ==========================================================================
// IActor.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Squidex.Infrastructure.Actors namespace Squidex.Infrastructure.Actors

10
src/Squidex.Infrastructure/Actors/IActors.cs

@ -1,4 +1,12 @@
namespace Squidex.Infrastructure.Actors // ==========================================================================
// IActors.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.Actors
{ {
public interface IActors public interface IActors
{ {

10
src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs

@ -1,4 +1,12 @@
using System; // ==========================================================================
// IRemoteActorChannel.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Squidex.Infrastructure.Actors namespace Squidex.Infrastructure.Actors

10
src/Squidex.Infrastructure/Actors/RemoteActors.cs

@ -1,4 +1,12 @@
using System; // ==========================================================================
// RemoteActors.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Threading.Tasks; using System.Threading.Tasks;
using Squidex.Infrastructure.Tasks; using Squidex.Infrastructure.Tasks;

10
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs

@ -48,6 +48,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
Guard.NotNull(eventConsumer, nameof(eventConsumer)); Guard.NotNull(eventConsumer, nameof(eventConsumer));
this.eventConsumer = eventConsumer; this.eventConsumer = eventConsumer;
SendAsync(new StartConsumerMessage());
} }
protected override async Task OnStop() protected override async Task OnStop()
@ -67,21 +69,21 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{ {
switch (message) switch (message)
{ {
case StopReceiverMessage stopReceiver: case StopConsumerMessage stopConsumer:
{ {
await StopAsync(stopReceiver.Exception); await StopAsync(stopConsumer.Exception);
break; break;
} }
case StartReceiverMessage startReceiver: case StartConsumerMessage startConsumer:
{ {
await StartAsync(); await StartAsync();
break; break;
} }
case ResetReceiverMessage resetReceiver: case ResetConsumerMessage resetConsumer:
{ {
await StopAsync(); await StopAsync();
await ResetAsync(); await ResetAsync();

10
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs

@ -1,4 +1,12 @@
using Squidex.Infrastructure.Actors; // ==========================================================================
// ReceiveEventMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{ {

17
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs

@ -0,0 +1,17 @@
// ==========================================================================
// ResetConsumerMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(ResetConsumerMessage))]
public sealed class ResetConsumerMessage : IMessage
{
}
}

9
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetReceiverMessage.cs

@ -1,9 +0,0 @@
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(ResetReceiverMessage))]
public sealed class ResetReceiverMessage : IMessage
{
}
}

17
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs

@ -0,0 +1,17 @@
// ==========================================================================
// StartConsumerMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(StartConsumerMessage))]
public sealed class StartConsumerMessage : IMessage
{
}
}

9
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartReceiverMessage.cs

@ -1,9 +0,0 @@
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(StartReceiverMessage))]
public sealed class StartReceiverMessage : IMessage
{
}
}

19
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs

@ -0,0 +1,19 @@
// ==========================================================================
// StopConsumerMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(StopConsumerMessage))]
public sealed class StopConsumerMessage : IMessage
{
public Exception Exception { get; set; }
}
}

11
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopReceiverMessage.cs

@ -1,11 +0,0 @@
using System;
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
[TypeName(nameof(StopReceiverMessage))]
public sealed class StopReceiverMessage : IMessage
{
public Exception Exception { get; set; }
}
}

10
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SubscribeMessage.cs

@ -1,4 +1,12 @@
using Squidex.Infrastructure.Actors; // ==========================================================================
// SubscribeMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Squidex.Infrastructure.Actors;
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{ {

3
src/Squidex/Config/Domain/EventStoreModule.cs

@ -14,6 +14,7 @@ using Microsoft.Extensions.Configuration;
using MongoDB.Driver; using MongoDB.Driver;
using Squidex.Infrastructure; using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Events.Actors;
namespace Squidex.Config.Domain namespace Squidex.Config.Domain
{ {
@ -35,7 +36,7 @@ namespace Squidex.Config.Domain
if (consumeEvents) if (consumeEvents)
{ {
builder.RegisterType<EventReceiver>() builder.RegisterType<EventConsumerActor>()
.AsSelf() .AsSelf()
.InstancePerDependency(); .InstancePerDependency();
} }

14
src/Squidex/Config/Domain/InfrastructureModule.cs

@ -18,6 +18,7 @@ using NodaTime;
using Squidex.Domain.Apps.Core.Schemas; using Squidex.Domain.Apps.Core.Schemas;
using Squidex.Domain.Apps.Core.Schemas.Json; using Squidex.Domain.Apps.Core.Schemas.Json;
using Squidex.Infrastructure; using Squidex.Infrastructure;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.Assets; using Squidex.Infrastructure.Assets;
using Squidex.Infrastructure.Assets.ImageSharp; using Squidex.Infrastructure.Assets.ImageSharp;
using Squidex.Infrastructure.Caching; using Squidex.Infrastructure.Caching;
@ -131,6 +132,15 @@ namespace Squidex.Config.Domain
.As<IAssetThumbnailGenerator>() .As<IAssetThumbnailGenerator>()
.SingleInstance(); .SingleInstance();
builder.Register(c => new InvalidatingMemoryCache(new MemoryCache(c.Resolve<IOptions<MemoryCacheOptions>>()), c.Resolve<IPubSub>()))
.As<IMemoryCache>()
.SingleInstance();
builder.RegisterType<RemoteActors>()
.As<IActors>()
.AsSelf()
.SingleInstance();
builder.RegisterType<EventConsumerCleaner>() builder.RegisterType<EventConsumerCleaner>()
.AsSelf() .AsSelf()
.SingleInstance(); .SingleInstance();
@ -146,10 +156,6 @@ namespace Squidex.Config.Domain
builder.RegisterType<FieldRegistry>() builder.RegisterType<FieldRegistry>()
.AsSelf() .AsSelf()
.SingleInstance(); .SingleInstance();
builder.Register(c => new InvalidatingMemoryCache(new MemoryCache(c.Resolve<IOptions<MemoryCacheOptions>>()), c.Resolve<IPubSub>()))
.As<IMemoryCache>()
.SingleInstance();
} }
} }
} }

12
src/Squidex/Config/Domain/Usages.cs

@ -10,7 +10,9 @@ using System.Collections.Generic;
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Squidex.Infrastructure; using Squidex.Infrastructure;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Events.Actors;
namespace Squidex.Config.Domain namespace Squidex.Config.Domain
{ {
@ -20,13 +22,15 @@ namespace Squidex.Config.Domain
{ {
app.ApplicationServices.GetService<EventConsumerCleaner>().CleanAsync().Wait(); app.ApplicationServices.GetService<EventConsumerCleaner>().CleanAsync().Wait();
var catchConsumers = app.ApplicationServices.GetServices<IEventConsumer>(); var consumers = app.ApplicationServices.GetServices<IEventConsumer>();
foreach (var catchConsumer in catchConsumers) foreach (var consumer in consumers)
{ {
var receiver = app.ApplicationServices.GetService<EventReceiver>(); var actor = app.ApplicationServices.GetService<EventConsumerActor>();
receiver?.Subscribe(catchConsumer); actor?.Subscribe(consumer);
app.ApplicationServices.GetService<RemoteActors>().Connect(consumer.Name, actor);
} }
return app; return app;

28
src/Squidex/Controllers/Api/EventConsumers/EventConsumersController.cs

@ -11,7 +11,9 @@ using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using NSwag.Annotations; using NSwag.Annotations;
using Squidex.Controllers.Api.EventConsumers.Models; using Squidex.Controllers.Api.EventConsumers.Models;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.Reflection; using Squidex.Infrastructure.Reflection;
using Squidex.Pipeline; using Squidex.Pipeline;
@ -23,10 +25,13 @@ namespace Squidex.Controllers.Api.EventConsumers
public sealed class EventConsumersController : Controller public sealed class EventConsumersController : Controller
{ {
private readonly IEventConsumerInfoRepository eventConsumerRepository; private readonly IEventConsumerInfoRepository eventConsumerRepository;
private readonly IActors actors;
public EventConsumersController(IEventConsumerInfoRepository eventConsumerRepository) public EventConsumersController(IEventConsumerInfoRepository eventConsumerRepository, IActors actors)
{ {
this.eventConsumerRepository = eventConsumerRepository; this.eventConsumerRepository = eventConsumerRepository;
this.actors = actors;
} }
[HttpGet] [HttpGet]
@ -46,7 +51,12 @@ namespace Squidex.Controllers.Api.EventConsumers
[ApiCosts(0)] [ApiCosts(0)]
public async Task<IActionResult> Start(string name) public async Task<IActionResult> Start(string name)
{ {
await eventConsumerRepository.StartAsync(name); var actor = actors.Get(name);
if (actor != null)
{
await actor.SendAsync(new StartConsumerMessage());
}
return NoContent(); return NoContent();
} }
@ -56,7 +66,12 @@ namespace Squidex.Controllers.Api.EventConsumers
[ApiCosts(0)] [ApiCosts(0)]
public async Task<IActionResult> Stop(string name) public async Task<IActionResult> Stop(string name)
{ {
await eventConsumerRepository.StopAsync(name); var actor = actors.Get(name);
if (actor != null)
{
await actor.SendAsync(new StopConsumerMessage());
}
return NoContent(); return NoContent();
} }
@ -66,7 +81,12 @@ namespace Squidex.Controllers.Api.EventConsumers
[ApiCosts(0)] [ApiCosts(0)]
public async Task<IActionResult> Reset(string name) public async Task<IActionResult> Reset(string name)
{ {
await eventConsumerRepository.ResetAsync(name); var actor = actors.Get(name);
if (actor != null)
{
await actor.SendAsync(new ResetConsumerMessage());
}
return NoContent(); return NoContent();
} }

2
src/Squidex/app/features/administration/administration-area.component.html

@ -1,3 +1,5 @@
<sqx-title message="Administration"></sqx-title>
<div class="sidebar"> <div class="sidebar">
<ul class="nav nav-panel flex-column"> <ul class="nav nav-panel flex-column">
<li class="nav-item"> <li class="nav-item">

2
tests/Benchmarks/Properties/launchSettings.json

@ -2,7 +2,7 @@
"profiles": { "profiles": {
"Benchmarks": { "Benchmarks": {
"commandName": "Project", "commandName": "Project",
"commandLineArgs": "appendToEventStoreParallel" "commandLineArgs": "handleEvents"
} }
} }
} }

1
tests/Benchmarks/Tests/AppendToEventStore.cs

@ -40,7 +40,6 @@ namespace Benchmarks.Tests
mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString()); mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString());
eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub())); eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()));
eventStore.Warmup();
} }
public long Run() public long Run()

1
tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs

@ -41,7 +41,6 @@ namespace Benchmarks.Tests
mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString()); mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString());
eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub())); eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()));
eventStore.Warmup();
} }
public long Run() public long Run()

11
tests/Benchmarks/Tests/HandleEvents.cs

@ -8,11 +8,11 @@
using System; using System;
using Benchmarks.Tests.TestData; using Benchmarks.Tests.TestData;
using Benchmarks.Utils;
using MongoDB.Driver; using MongoDB.Driver;
using Newtonsoft.Json; using Newtonsoft.Json;
using Squidex.Infrastructure; using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Events.Actors;
using Squidex.Infrastructure.Json; using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.Log; using Squidex.Infrastructure.Log;
@ -29,7 +29,7 @@ namespace Benchmarks.Tests
private IEventStore eventStore; private IEventStore eventStore;
private IEventNotifier eventNotifier; private IEventNotifier eventNotifier;
private IEventConsumerInfoRepository eventConsumerInfos; private IEventConsumerInfoRepository eventConsumerInfos;
private EventReceiver eventReceiver; private EventConsumerActor eventConsumerActor;
private MyEventConsumer eventConsumer; private MyEventConsumer eventConsumer;
public string Id public string Id
@ -65,10 +65,9 @@ namespace Benchmarks.Tests
eventNotifier = new DefaultEventNotifier(new InMemoryPubSub()); eventNotifier = new DefaultEventNotifier(new InMemoryPubSub());
eventStore = new MongoEventStore(mongoDatabase, eventNotifier); eventStore = new MongoEventStore(mongoDatabase, eventNotifier);
eventStore.Warmup();
eventReceiver = new EventReceiver(formatter, eventStore, eventConsumerInfos, log); eventConsumerActor = new EventConsumerActor(formatter, eventStore, eventConsumerInfos, log);
eventReceiver.Subscribe(eventConsumer); eventConsumerActor.Subscribe(eventConsumer);
} }
public long Run() public long Run()
@ -91,7 +90,7 @@ namespace Benchmarks.Tests
{ {
mongoClient.DropDatabase(mongoDatabase.DatabaseNamespace.DatabaseName); mongoClient.DropDatabase(mongoDatabase.DatabaseNamespace.DatabaseName);
eventReceiver.Dispose(); eventConsumerActor.Dispose();
} }
public void Cleanup() public void Cleanup()

10
tests/Benchmarks/Tests/HandleEventsWithManyWriters.cs

@ -14,6 +14,7 @@ using MongoDB.Driver;
using Newtonsoft.Json; using Newtonsoft.Json;
using Squidex.Infrastructure; using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Events.Actors;
using Squidex.Infrastructure.Json; using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.Log; using Squidex.Infrastructure.Log;
@ -31,7 +32,7 @@ namespace Benchmarks.Tests
private IEventStore eventStore; private IEventStore eventStore;
private IEventNotifier eventNotifier; private IEventNotifier eventNotifier;
private IEventConsumerInfoRepository eventConsumerInfos; private IEventConsumerInfoRepository eventConsumerInfos;
private EventReceiver eventReceiver; private EventConsumerActor eventConsumerActor;
private MyEventConsumer eventConsumer; private MyEventConsumer eventConsumer;
public string Id public string Id
@ -67,10 +68,9 @@ namespace Benchmarks.Tests
eventNotifier = new DefaultEventNotifier(new InMemoryPubSub()); eventNotifier = new DefaultEventNotifier(new InMemoryPubSub());
eventStore = new MongoEventStore(mongoDatabase, eventNotifier); eventStore = new MongoEventStore(mongoDatabase, eventNotifier);
eventStore.Warmup();
eventReceiver = new EventReceiver(formatter, eventStore, eventConsumerInfos, log); eventConsumerActor = new EventConsumerActor(formatter, eventStore, eventConsumerInfos, log);
eventReceiver.Subscribe(eventConsumer); eventConsumerActor.Subscribe(eventConsumer);
} }
public long Run() public long Run()
@ -98,7 +98,7 @@ namespace Benchmarks.Tests
{ {
mongoClient.DropDatabase(mongoDatabase.DatabaseNamespace.DatabaseName); mongoClient.DropDatabase(mongoDatabase.DatabaseNamespace.DatabaseName);
eventReceiver.Dispose(); eventConsumerActor.Dispose();
} }
public void Cleanup() public void Cleanup()

6
tests/Benchmarks/Utils/Helper.cs

@ -7,7 +7,6 @@
// ========================================================================== // ==========================================================================
using System; using System;
using System.Collections.Generic;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
namespace Benchmarks.Utils namespace Benchmarks.Utils
@ -18,10 +17,5 @@ namespace Benchmarks.Utils
{ {
return new EventData { EventId = Guid.NewGuid(), Metadata = "EventMetdata", Payload = "EventPayload", Type = "MyEvent" }; return new EventData { EventId = Guid.NewGuid(), Metadata = "EventMetdata", Payload = "EventPayload", Type = "MyEvent" };
} }
public static void Warmup(this IEventStore eventStore)
{
eventStore.AppendEventsAsync(Guid.NewGuid(), "my-stream", new List<EventData> { CreateEventData() }).Wait();
}
} }
} }

82
tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs

@ -0,0 +1,82 @@
// ==========================================================================
// ActorRemoteTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FluentAssertions;
using Squidex.Infrastructure.Tasks;
using Xunit;
namespace Squidex.Infrastructure.Actors
{
public class ActorRemoteTests
{
[TypeName(nameof(SuccessMessage))]
public class SuccessMessage : IMessage
{
public int Counter { get; set; }
}
private sealed class MyActor : Actor
{
public List<IMessage> Invokes { get; } = new List<IMessage>();
protected override Task OnMessage(IMessage message)
{
Invokes.Add(message);
return TaskHelper.Done;
}
}
private readonly MyActor actor = new MyActor();
private readonly TypeNameRegistry registry = new TypeNameRegistry();
private readonly RemoteActors actors;
private readonly IActor remoteActor;
public ActorRemoteTests()
{
registry.Map(typeof(SuccessMessage));
actors = new RemoteActors(new DefaultRemoteActorChannel(new InMemoryPubSub(), registry));
actors.Connect("my", actor);
remoteActor = actors.Get("my");
}
[Fact]
public void Should_throw_exception_when_stopping_remote_actor()
{
Assert.Throws<NotSupportedException>(() => remoteActor.StopAsync().Forget());
}
[Fact]
public void Should_throw_exception_when_sending_exception_to_remote_actor()
{
Assert.Throws<NotSupportedException>(() => remoteActor.SendAsync(new InvalidOperationException()).Forget());
}
[Fact]
public async Task Should_handle_messages_sequentially()
{
remoteActor.SendAsync(new SuccessMessage { Counter = 1 }).Forget();
remoteActor.SendAsync(new SuccessMessage { Counter = 2 }).Forget();
remoteActor.SendAsync(new SuccessMessage { Counter = 3 }).Forget();
await actor.StopAsync();
actor.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
new SuccessMessage { Counter = 2 },
new SuccessMessage { Counter = 3 }
});
}
}
}

130
tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs

@ -0,0 +1,130 @@
// ==========================================================================
// ActorTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Squidex.Infrastructure.Tasks;
using Xunit;
namespace Squidex.Infrastructure.Actors
{
public class ActorTests
{
public class SuccessMessage : IMessage
{
public int Counter { get; set; }
}
public class FailedMessage : IMessage
{
}
private sealed class MyActor : Actor
{
public List<object> Invokes { get; } = new List<object>();
protected override Task OnStop()
{
Invokes.Add(true);
return TaskHelper.Done;
}
protected override Task OnError(Exception exception)
{
Invokes.Add(exception);
return TaskHelper.Done;
}
protected override Task OnMessage(IMessage message)
{
if (message is FailedMessage)
{
throw new InvalidOperationException();
}
Invokes.Add(message);
return TaskHelper.Done;
}
}
private readonly MyActor sut = new MyActor();
[Fact]
public async Task Should_invoke_with_exception()
{
sut.SendAsync(new InvalidOperationException()).Forget();
await sut.StopAsync();
Assert.True(sut.Invokes[0] is InvalidOperationException);
}
[Fact]
public async Task Should_handle_messages_sequentially()
{
sut.SendAsync(new SuccessMessage { Counter = 1 }).Forget();
sut.SendAsync(new SuccessMessage { Counter = 2 }).Forget();
sut.SendAsync(new SuccessMessage { Counter = 3 }).Forget();
await sut.StopAsync();
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
new SuccessMessage { Counter = 2 },
new SuccessMessage { Counter = 3 },
true
});
}
[Fact]
public async Task Should_raise_error_event_when_event_handling_failed()
{
sut.SendAsync(new FailedMessage()).Forget();
sut.SendAsync(new SuccessMessage { Counter = 2 }).Forget();
sut.SendAsync(new SuccessMessage { Counter = 3 }).Forget();
await sut.StopAsync();
Assert.True(sut.Invokes[0] is InvalidOperationException);
sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 2 },
new SuccessMessage { Counter = 3 },
true
});
}
[Fact]
public async Task Should_not_handle_messages_after_stop()
{
sut.SendAsync(new SuccessMessage { Counter = 1 }).Forget();
sut.StopAsync().Forget();
sut.SendAsync(new SuccessMessage { Counter = 2 }).Forget();
sut.SendAsync(new SuccessMessage { Counter = 3 }).Forget();
sut.SendAsync(new InvalidOperationException()).Forget();
await sut.StopAsync();
sut.Invokes.ShouldBeEquivalentTo(new List<object>
{
new SuccessMessage { Counter = 1 },
true
});
}
}
}

17
tests/Squidex.Infrastructure.Tests/CQRS/Events/DefaultEventNotifierTests.cs

@ -6,6 +6,7 @@
// All rights reserved. // All rights reserved.
// ========================================================================== // ==========================================================================
using System.Collections.Generic;
using Xunit; using Xunit;
namespace Squidex.Infrastructure.CQRS.Events namespace Squidex.Infrastructure.CQRS.Events
@ -20,22 +21,30 @@ namespace Squidex.Infrastructure.CQRS.Events
var handler1Handled = 0; var handler1Handled = 0;
var handler2Handled = 0; var handler2Handled = 0;
sut.Subscribe(() => var streamNames = new List<string>();
sut.Subscribe(x =>
{ {
streamNames.Add(x);
handler1Handled++; handler1Handled++;
}); });
sut.NotifyEventsStored(); sut.NotifyEventsStored("a");
sut.Subscribe(() => sut.Subscribe(x =>
{ {
streamNames.Add(x);
handler2Handled++; handler2Handled++;
}); });
sut.NotifyEventsStored(); sut.NotifyEventsStored("b");
Assert.Equal(2, handler1Handled); Assert.Equal(2, handler1Handled);
Assert.Equal(1, handler2Handled); Assert.Equal(1, handler2Handled);
Assert.Equal(streamNames.ToArray(), new[] { "a", "b", "b" });
} }
} }
} }

216
tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

@ -1,216 +0,0 @@
// ==========================================================================
// EventReceiverTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FakeItEasy;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Tasks;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events
{
public class EventReceiverTests
{
public sealed class MyEvent : IEvent
{
}
private sealed class MyEventConsumerInfo : IEventConsumerInfo
{
public bool IsStopped { get; set; }
public bool IsResetting { get; set; }
public string Name { get; set; }
public string Error { get; set; }
public string Position { get; set; }
}
private sealed class MyEventSubscription : IEventSubscription
{
private readonly IEnumerable<StoredEvent> storedEvents;
private bool isDisposed;
public MyEventSubscription(IEnumerable<StoredEvent> storedEvents)
{
this.storedEvents = storedEvents;
}
public async Task SubscribeAsync(Func<StoredEvent, Task> onNext, Func<Exception, Task> onError)
{
foreach (var storedEvent in storedEvents)
{
if (isDisposed)
{
break;
}
try
{
await onNext(storedEvent);
}
catch (Exception ex)
{
await onError(ex);
}
}
}
public void Dispose()
{
isDisposed = true;
}
}
private sealed class MyEventStore : IEventStore
{
private readonly IEnumerable<StoredEvent> storedEvents;
public MyEventStore(IEnumerable<StoredEvent> storedEvents)
{
this.storedEvents = storedEvents;
}
public IEventSubscription CreateSubscription(string streamFilter = null, string position = null)
{
return new MyEventSubscription(storedEvents);
}
public Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName)
{
throw new NotSupportedException();
}
public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
throw new NotSupportedException();
}
public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
{
throw new NotSupportedException();
}
}
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository = A.Fake<IEventConsumerInfoRepository>();
private readonly IEventConsumer eventConsumer = A.Fake<IEventConsumer>();
private readonly ISemanticLog log = A.Fake<ISemanticLog>();
private readonly EventDataFormatter formatter = A.Fake<EventDataFormatter>();
private readonly EventData eventData1 = new EventData();
private readonly EventData eventData2 = new EventData();
private readonly EventData eventData3 = new EventData();
private readonly Envelope<IEvent> envelope1 = new Envelope<IEvent>(new MyEvent());
private readonly Envelope<IEvent> envelope2 = new Envelope<IEvent>(new MyEvent());
private readonly Envelope<IEvent> envelope3 = new Envelope<IEvent>(new MyEvent());
private readonly EventReceiver sut;
private readonly MyEventConsumerInfo consumerInfo = new MyEventConsumerInfo();
private readonly string consumerName;
public EventReceiverTests()
{
var events = new[]
{
new StoredEvent("3", 3, eventData1),
new StoredEvent("4", 4, eventData2),
new StoredEvent("5", 5, eventData3)
};
consumerName = eventConsumer.GetType().Name;
var eventStore = new MyEventStore(events);
A.CallTo(() => eventConsumer.Name).Returns(consumerName);
A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(consumerInfo);
A.CallTo(() => formatter.Parse(eventData1, true)).Returns(envelope1);
A.CallTo(() => formatter.Parse(eventData2, true)).Returns(envelope2);
A.CallTo(() => formatter.Parse(eventData3, true)).Returns(envelope3);
sut = new EventReceiver(formatter, eventStore, eventConsumerInfoRepository, log);
}
[Fact]
public void Should_only_connect_once()
{
sut.Subscribe(eventConsumer);
sut.Subscribe(eventConsumer);
sut.Refresh();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName)).MustHaveHappened();
}
[Fact]
public void Should_subscribe_to_consumer_and_handle_events()
{
consumerInfo.Position = "2";
sut.Subscribe(eventConsumer);
sut.Refresh();
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope1)).MustHaveHappened();
A.CallTo(() => eventConsumer.On(envelope2)).MustHaveHappened();
A.CallTo(() => eventConsumer.On(envelope3)).MustHaveHappened();
}
[Fact]
public void Should_abort_if_handling_failed()
{
consumerInfo.Position = "2";
A.CallTo(() => eventConsumer.On(envelope1)).Returns(TaskHelper.True);
A.CallTo(() => eventConsumer.On(envelope2)).Throws(new InvalidOperationException());
sut.Subscribe(eventConsumer);
sut.Refresh();
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope1)).MustHaveHappened();
A.CallTo(() => eventConsumer.On(envelope2)).MustHaveHappened();
A.CallTo(() => eventConsumer.On(envelope3)).MustNotHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, A<string>.Ignored)).MustHaveHappened();
}
[Fact]
public void Should_abort_if_serialization_failed()
{
consumerInfo.Position = "2";
A.CallTo(() => formatter.Parse(eventData2, true)).Throws(new InvalidOperationException());
sut.Subscribe(eventConsumer);
sut.Refresh();
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope1)).MustHaveHappened();
A.CallTo(() => eventConsumer.On(envelope2)).MustNotHaveHappened();
A.CallTo(() => eventConsumer.On(envelope3)).MustNotHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, A<string>.Ignored)).MustHaveHappened();
}
[Fact]
public void Should_reset_if_requested()
{
consumerInfo.IsResetting = true;
consumerInfo.Position = "2";
sut.Subscribe(eventConsumer);
sut.Refresh();
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope1)).MustHaveHappened();
A.CallTo(() => eventConsumer.On(envelope2)).MustHaveHappened();
A.CallTo(() => eventConsumer.On(envelope3)).MustHaveHappened();
A.CallTo(() => eventConsumer.ClearAsync()).MustHaveHappened();
}
}
}
Loading…
Cancel
Save