Browse Source

Merge pull request #135 from Squidex/feature-better-actor

Feature better actor
pull/141/head
Sebastian Stehle 8 years ago
committed by GitHub
parent
commit
94b15dff87
  1. 13
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs
  2. 164
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs
  3. 5
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfo.cs
  4. 61
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs
  5. 141
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs
  6. 228
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
  7. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs
  8. 10
      src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs
  9. 3
      src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
  10. 82
      src/Squidex.Infrastructure/CQRS/Events/PollingSubscription.cs
  11. 48
      src/Squidex.Infrastructure/RetryWindow.cs
  12. 2
      tests/Squidex.Domain.Apps.Core.Tests/ContentEnrichmentTests.cs
  13. 6
      tests/Squidex.Domain.Apps.Core.Tests/ContentValidationTests.cs
  14. 2
      tests/Squidex.Domain.Apps.Core.Tests/Contents/ContentDataTests.cs
  15. 2
      tests/Squidex.Domain.Apps.Core.Tests/InvariantPartitionTests.cs
  16. 2
      tests/Squidex.Domain.Apps.Core.Tests/LanguagesConfigTests.cs
  17. 2
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/DateTimeFieldPropertiesTests.cs
  18. 2
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/NumberFieldPropertiesTests.cs
  19. 2
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/SchemaTests.cs
  20. 4
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/AllowedValuesValidatorTests.cs
  21. 6
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/PatternValidatorTests.cs
  22. 4
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RangeValidatorTests.cs
  23. 6
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredStringValidatorTests.cs
  24. 6
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredValidatorTests.cs
  25. 6
      tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/StringLengthValidatorTests.cs
  26. 2
      tests/Squidex.Domain.Apps.Core.Tests/Scripting/JintUserTests.cs
  27. 2
      tests/Squidex.Infrastructure.Tests/Assets/AzureBlobAssetStoreTests.cs
  28. 2
      tests/Squidex.Infrastructure.Tests/Assets/GoogleCloudAssetStoreTests.cs
  29. 4
      tests/Squidex.Infrastructure.Tests/CQRS/Commands/AggregateHandlerTests.cs
  30. 251
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs
  31. 116
      tests/Squidex.Infrastructure.Tests/CQRS/Events/PollingSubscriptionTests.cs
  32. 4
      tests/Squidex.Infrastructure.Tests/GravatarHelperTests.cs
  33. 6
      tests/Squidex.Infrastructure.Tests/Log/SemanticLogTests.cs
  34. 6
      tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs
  35. 90
      tests/Squidex.Infrastructure.Tests/RetryWindowTests.cs
  36. 17
      tests/Squidex.Infrastructure.Tests/Timers/CompletionTimerTests.cs
  37. 3
      tests/Squidex.Infrastructure.Tests/UsageTracking/BackgroundUsageTrackerTests.cs

13
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs

@ -9,6 +9,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
@ -32,6 +33,10 @@ namespace Squidex.Infrastructure.CQRS.Events
this.prefix = prefix?.Trim(' ', '-').WithFallback("squidex");
}
public GetEventStore()
{
}
public void Connect()
{
try
@ -46,12 +51,14 @@ namespace Squidex.Infrastructure.CQRS.Events
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
{
Guard.NotNull(subscriber, nameof(subscriber));
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter));
return new GetEventStoreSubscription(connection, subscriber, projectionHost, prefix, position, streamFilter);
}
public Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
{
throw new NotSupportedException();
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName)
{
var result = new List<StoredEvent>();

164
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs

@ -8,7 +8,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Net.Sockets;
@ -16,162 +16,78 @@ using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.Projections;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
{
internal sealed class GetEventStoreSubscription : Actor, IEventSubscription
internal sealed class GetEventStoreSubscription : IEventSubscription
{
private const int ReconnectWindowMax = 5;
private const int ReconnectWaitMs = 1000;
private static readonly TimeSpan TimeBetweenReconnects = TimeSpan.FromMinutes(5);
private const string ProjectionName = "by-{0}-{1}";
private static readonly ConcurrentDictionary<string, bool> SubscriptionsCreated = new ConcurrentDictionary<string, bool>();
private readonly IEventStoreConnection connection;
private readonly IEventSubscriber subscriber;
private readonly IEventStoreConnection eventStoreConnection;
private readonly IEventSubscriber eventSubscriber;
private readonly string prefix;
private readonly string streamName;
private readonly string streamFilter;
private readonly string projectionHost;
private readonly Queue<DateTime> reconnectTimes = new Queue<DateTime>();
private EventStoreCatchUpSubscription subscription;
private readonly EventStoreCatchUpSubscription subscription;
private long? position;
private sealed class ESConnect
{
}
private abstract class ESMessage
{
public EventStoreCatchUpSubscription Subscription { get; set; }
}
private sealed class ESSubscriptionFailed : ESMessage
{
public Exception Exception { get; set; }
}
private sealed class ESEventReceived : ESMessage
{
public ResolvedEvent Event { get; set; }
}
public GetEventStoreSubscription(
IEventStoreConnection connection,
IEventSubscriber subscriber,
IEventStoreConnection eventStoreConnection,
IEventSubscriber eventSubscriber,
string projectionHost,
string prefix,
string position,
string streamFilter)
{
this.connection = connection;
Guard.NotNull(eventSubscriber, nameof(eventSubscriber));
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter));
this.eventStoreConnection = eventStoreConnection;
this.eventSubscriber = eventSubscriber;
this.position = ParsePosition(position);
this.prefix = prefix;
this.projectionHost = projectionHost;
this.streamFilter = streamFilter;
this.subscriber = subscriber;
streamName = ParseFilter(prefix, streamFilter);
var streamName = ParseFilter(prefix, streamFilter);
DispatchAsync(new ESConnect()).Forget();
}
InitializeAsync(streamName).Wait();
public Task StopAsync()
{
return StopAndWaitAsync();
subscription = SubscribeToStream(streamName);
}
protected override Task OnStop()
{
subscription?.Stop();
return TaskHelper.Done;
}
protected override async Task OnError(Exception exception)
{
await subscriber.OnErrorAsync(this, exception);
await StopAsync();
}
protected override async Task OnMessage(object message)
{
switch (message)
{
case ESConnect connect when subscription == null:
{
await InitializeAsync();
subscription = SubscribeToStream();
break;
}
case ESSubscriptionFailed subscriptionFailed when subscriptionFailed.Subscription == subscription:
public Task StopAsync()
{
subscription.Stop();
subscription = null;
if (CanReconnect(DateTime.UtcNow))
{
Task.Delay(ReconnectWaitMs).ContinueWith(t => DispatchAsync(new ESConnect())).Forget();
}
else
{
throw subscriptionFailed.Exception;
}
break;
}
case ESEventReceived eventReceived when eventReceived.Subscription == subscription:
{
var storedEvent = Formatter.Read(eventReceived.Event);
await subscriber.OnEventAsync(this, storedEvent);
position = eventReceived.Event.OriginalEventNumber;
break;
}
}
return TaskHelper.Done;
}
private EventStoreCatchUpSubscription SubscribeToStream()
private EventStoreCatchUpSubscription SubscribeToStream(string streamName)
{
var settings = CatchUpSubscriptionSettings.Default;
return connection.SubscribeToStreamFrom(streamName, position, settings,
return eventStoreConnection.SubscribeToStreamFrom(streamName, position, settings,
(s, e) =>
{
DispatchAsync(new ESEventReceived { Event = e, Subscription = s }).Forget();
var storedEvent = Formatter.Read(e);
eventSubscriber.OnEventAsync(this, storedEvent).Wait();
}, null,
(s, reason, ex) =>
{
if (reason == SubscriptionDropReason.ConnectionClosed ||
reason == SubscriptionDropReason.UserInitiated)
if (reason != SubscriptionDropReason.ConnectionClosed &&
reason != SubscriptionDropReason.UserInitiated)
{
ex = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}.");
DispatchAsync(new ESSubscriptionFailed { Exception = ex, Subscription = s }).Forget();
eventSubscriber.OnErrorAsync(this, ex);
}
});
}
private bool CanReconnect(DateTime utcNow)
{
reconnectTimes.Enqueue(utcNow);
while (reconnectTimes.Count >= ReconnectWindowMax)
{
reconnectTimes.Dequeue();
}
return reconnectTimes.Count < ReconnectWindowMax && (reconnectTimes.Count == 0 || (utcNow - reconnectTimes.Peek()) > TimeBetweenReconnects);
}
private async Task InitializeAsync()
private async Task InitializeAsync(string streamName)
{
if (SubscriptionsCreated.TryAdd(streamName, true))
{
@ -189,7 +105,7 @@ namespace Squidex.Infrastructure.CQRS.Events
try
{
var credentials = connection.Settings.DefaultUserCredentials;
var credentials = eventStoreConnection.Settings.DefaultUserCredentials;
await projectsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials);
}
@ -203,16 +119,6 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
private static string ParseFilter(string prefix, string filter)
{
return $"by-{prefix.Simplify()}-{filter.Simplify()}";
}
private static long? ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
private async Task<ProjectionsManager> ConnectToProjections()
{
var addressParts = projectionHost.Split(':');
@ -227,10 +133,20 @@ namespace Squidex.Infrastructure.CQRS.Events
var projectionsManager =
new ProjectionsManager(
connection.Settings.Log, endpoint,
connection.Settings.OperationTimeout);
eventStoreConnection.Settings.Log, endpoint,
eventStoreConnection.Settings.OperationTimeout);
return projectionsManager;
}
private static string ParseFilter(string prefix, string filter)
{
return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Simplify(), filter.Simplify());
}
private static long? ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
}
}

5
src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfo.cs

@ -11,6 +11,7 @@ using MongoDB.Bson.Serialization.Attributes;
namespace Squidex.Infrastructure.CQRS.Events
{
[BsonIgnoreExtraElements]
public sealed class MongoEventConsumerInfo : IEventConsumerInfo
{
[BsonId]
@ -25,10 +26,6 @@ namespace Squidex.Infrastructure.CQRS.Events
[BsonIgnoreIfDefault]
public bool IsStopped { get; set; }
[BsonElement]
[BsonIgnoreIfDefault]
public bool IsResetting { get; set; }
[BsonElement]
[BsonRequired]
public string Position { get; set; }

61
src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs

@ -21,7 +21,6 @@ namespace Squidex.Infrastructure.CQRS.Events
private static readonly FieldDefinition<MongoEventConsumerInfo, string> ErrorField = Fields.Build(x => x.Error);
private static readonly FieldDefinition<MongoEventConsumerInfo, string> PositionField = Fields.Build(x => x.Position);
private static readonly FieldDefinition<MongoEventConsumerInfo, bool> IsStoppedField = Fields.Build(x => x.IsStopped);
private static readonly FieldDefinition<MongoEventConsumerInfo, bool> IsResettingField = Fields.Build(x => x.IsResetting);
public MongoEventConsumerInfoRepository(IMongoDatabase database)
: base(database)
@ -47,67 +46,29 @@ namespace Squidex.Infrastructure.CQRS.Events
return entity;
}
public async Task CreateAsync(string consumerName)
{
if (await Collection.CountAsync(Filter.Eq(NameField, consumerName)) == 0)
{
try
{
await Collection.InsertOneAsync(CreateEntity(consumerName, null));
}
catch (MongoWriteException ex)
{
if (ex.WriteError?.Category != ServerErrorCategory.DuplicateKey)
{
throw;
}
}
}
}
public Task ClearAsync(IEnumerable<string> currentConsumerNames)
{
return Collection.DeleteManyAsync(Filter.Not(Filter.In(NameField, currentConsumerNames)));
}
public Task StartAsync(string consumerName)
public async Task SetAsync(string consumerName, string position, bool isStopped = false, string error = null)
{
var filter = Filter.Eq(NameField, consumerName);
return Collection.UpdateOneAsync(filter, Update.Unset(IsStoppedField).Unset(ErrorField));
}
public Task StopAsync(string consumerName, string error = null)
{
var filter = Filter.Eq(NameField, consumerName);
return Collection.UpdateOneAsync(filter, Update.Set(IsStoppedField, true).Set(ErrorField, error));
}
public Task ResetAsync(string consumerName)
try
{
var filter = Filter.Eq(NameField, consumerName);
return Collection.UpdateOneAsync(filter, Update.Set(IsResettingField, true).Unset(ErrorField));
await Collection.UpdateOneAsync(Filter.Eq(NameField, consumerName),
Update
.Set(ErrorField, error)
.Set(PositionField, position)
.Set(IsStoppedField, isStopped),
new UpdateOptions { IsUpsert = true });
}
public Task SetPositionAsync(string consumerName, string position, bool reset)
{
var filter = Filter.Eq(NameField, consumerName);
if (reset)
catch (MongoWriteException ex)
{
return Collection.ReplaceOneAsync(filter, CreateEntity(consumerName, position));
}
else
if (ex.WriteError?.Category != ServerErrorCategory.DuplicateKey)
{
return Collection.UpdateOneAsync(filter, Update.Set(PositionField, position));
throw;
}
}
private static MongoEventConsumerInfo CreateEntity(string consumerName, string position)
{
return new MongoEventConsumerInfo { Name = consumerName, Position = position };
}
}
}

141
src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs

@ -1,141 +0,0 @@
// ==========================================================================
// PollingSubscription.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
{
public sealed class PollingSubscription : Actor, IEventSubscription
{
private readonly IEventNotifier notifier;
private readonly MongoEventStore store;
private readonly CancellationTokenSource disposeToken = new CancellationTokenSource();
private readonly Regex streamRegex;
private readonly string streamFilter;
private readonly IEventSubscriber subscriber;
private string position;
private bool isPolling;
private IDisposable notification;
private sealed class Connect
{
}
private sealed class StartPoll
{
}
private sealed class StopPoll
{
}
public PollingSubscription(MongoEventStore store, IEventNotifier notifier, IEventSubscriber subscriber, string streamFilter, string position)
{
this.notifier = notifier;
this.position = position;
this.store = store;
this.streamFilter = streamFilter;
this.subscriber = subscriber;
streamRegex = new Regex(streamFilter);
DispatchAsync(new Connect()).Forget();
}
public Task StopAsync()
{
return StopAndWaitAsync();
}
protected override Task OnStop()
{
disposeToken?.Cancel();
notification?.Dispose();
return TaskHelper.Done;
}
protected override async Task OnError(Exception exception)
{
await subscriber.OnErrorAsync(this, exception);
await StopAsync();
}
protected override async Task OnMessage(object message)
{
switch (message)
{
case Connect connect:
{
notification = notifier.Subscribe(streamName =>
{
if (streamRegex.IsMatch(streamName))
{
DispatchAsync(new StartPoll()).Forget();
}
});
DispatchAsync(new StartPoll()).Forget();
break;
}
case StartPoll poll when !isPolling:
{
isPolling = true;
PollAsync().Forget();
break;
}
case StopPoll poll when isPolling:
{
isPolling = false;
Task.Delay(5000).ContinueWith(t => DispatchAsync(new StartPoll())).Forget();
break;
}
case StoredEvent storedEvent:
{
await subscriber.OnEventAsync(this, storedEvent);
position = storedEvent.EventPosition;
break;
}
}
}
private async Task PollAsync()
{
try
{
await store.GetEventsAsync(DispatchAsync, disposeToken.Token, streamFilter, position);
await DispatchAsync(new StopPoll());
}
catch (Exception ex)
{
if (!ex.Is<OperationCanceledException>())
{
await FailAsync(ex);
}
}
}
}
}

228
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs

@ -8,6 +8,7 @@
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Actors;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.Log;
@ -15,16 +16,25 @@ using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public sealed class EventConsumerActor : Actor, IEventSubscriber, IActor
public sealed class EventConsumerActor : DisposableObjectBase, IEventSubscriber, IActor
{
private readonly EventDataFormatter formatter;
private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5);
private readonly IEventStore eventStore;
private readonly IEventConsumerInfoRepository eventConsumerInfoRepository;
private readonly ISemanticLog log;
private readonly ActionBlock<object> dispatcher;
private IEventSubscription eventSubscription;
private IEventConsumer eventConsumer;
private bool isRunning;
private bool isSetup;
private bool isStopped;
private bool statusIsRunning = true;
private string statusPosition;
private string statusError;
private Guid stateId = Guid.NewGuid();
private sealed class Teardown
{
}
private sealed class Setup
{
@ -46,6 +56,13 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
public Exception Exception { get; set; }
}
private sealed class Reconnect
{
public Guid StateId { get; set; }
}
public int ReconnectWaitMs { get; set; } = 5000;
public EventConsumerActor(
EventDataFormatter formatter,
IEventStore eventStore,
@ -62,148 +79,238 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
this.formatter = formatter;
this.eventStore = eventStore;
this.eventConsumerInfoRepository = eventConsumerInfoRepository;
}
public Task SubscribeAsync(IEventConsumer eventConsumer)
var options = new ExecutionDataflowBlockOptions
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
MaxMessagesPerTask = -1,
MaxDegreeOfParallelism = 1,
BoundedCapacity = 10
};
return DispatchAsync(new Setup { EventConsumer = eventConsumer });
dispatcher = new ActionBlock<object>(OnMessage, options);
}
protected override async Task OnStop()
protected override void DisposeObject(bool disposing)
{
if (eventSubscription != null)
if (disposing)
{
await eventSubscription.StopAsync();
dispatcher.SendAsync(new Teardown()).Wait();
dispatcher.Complete();
dispatcher.Completion.Wait();
}
}
protected override async Task OnError(Exception exception)
public async Task WaitForCompletionAsync()
{
log.LogError(exception, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventConsumer", eventConsumer.Name));
while (dispatcher.InputCount > 0)
{
await Task.Delay(20);
}
}
await StopAsync(exception);
public Task SubscribeAsync(IEventConsumer eventConsumer)
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
isRunning = false;
return dispatcher.SendAsync(new Setup { EventConsumer = eventConsumer });
}
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent @event)
{
return DispatchAsync(new SubscriptionEventReceived { Subscription = subscription, Event = @event });
return dispatcher.SendAsync(new SubscriptionEventReceived { Subscription = subscription, Event = @event });
}
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return DispatchAsync(new SubscriptionFailed { Subscription = subscription, Exception = exception });
return dispatcher.SendAsync(new SubscriptionFailed { Subscription = subscription, Exception = exception });
}
void IActor.Tell(object message)
{
DispatchAsync(message).Forget();
dispatcher.SendAsync(message).Forget();
}
private async Task OnMessage(object message)
{
if (isStopped)
{
return;
}
protected override async Task OnMessage(object message)
try
{
var oldStateId = stateId;
var newStateId = stateId = Guid.NewGuid();
switch (message)
{
case Setup setup when !isSetup:
case Teardown teardown:
{
isStopped = true;
return;
}
case Setup setup:
{
eventConsumer = setup.EventConsumer;
await SetupAsync();
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
isSetup = true;
if (status != null)
{
statusError = status.Error;
statusPosition = status.Position;
statusIsRunning = !status.IsStopped;
}
if (statusIsRunning)
{
await SubscribeThisAsync(statusPosition);
}
break;
}
case StartConsumerMessage startConsumer when isSetup && !isRunning:
case StartConsumerMessage startConsumer:
{
if (statusIsRunning)
{
await StartAsync();
return;
}
isRunning = true;
await SubscribeThisAsync(statusPosition);
statusError = null;
statusIsRunning = true;
break;
}
case StopConsumerMessage stopConsumer when isSetup && isRunning:
case StopConsumerMessage stopConsumer:
{
if (!statusIsRunning)
{
await StopAsync();
return;
}
await UnsubscribeThisAsync();
isRunning = false;
statusIsRunning = false;
break;
}
case ResetConsumerMessage resetConsumer when isSetup:
case ResetConsumerMessage resetConsumer:
{
await StopAsync();
await ResetAsync();
await StartAsync();
await UnsubscribeThisAsync();
await ClearAsync();
await SubscribeThisAsync(null);
isRunning = true;
statusError = null;
statusPosition = null;
statusIsRunning = true;
break;
}
case SubscriptionFailed subscriptionFailed when isSetup:
case Reconnect reconnect:
{
if (subscriptionFailed.Subscription == eventSubscription)
if (!statusIsRunning || reconnect.StateId != oldStateId)
{
await FailAsync(subscriptionFailed.Exception);
return;
}
await SubscribeThisAsync(statusPosition);
break;
}
case SubscriptionEventReceived eventReceived when isSetup:
case SubscriptionFailed subscriptionFailed:
{
if (eventReceived.Subscription == eventSubscription)
if (subscriptionFailed.Subscription != eventSubscription)
{
var @event = ParseEvent(eventReceived.Event);
await DispatchConsumerAsync(@event, eventReceived.Event.EventPosition);
return;
}
break;
await UnsubscribeThisAsync();
if (retryWindow.CanRetryAfterFailure())
{
Task.Delay(ReconnectWaitMs).ContinueWith(t => dispatcher.SendAsync(new Reconnect { StateId = newStateId })).Forget();
}
else
{
throw subscriptionFailed.Exception;
}
break;
}
private async Task SetupAsync()
case SubscriptionEventReceived eventReceived:
{
if (eventReceived.Subscription != eventSubscription)
{
await eventConsumerInfoRepository.CreateAsync(eventConsumer.Name);
return;
}
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
var @event = ParseEvent(eventReceived.Event);
if (!status.IsStopped)
{
DispatchAsync(new StartConsumerMessage()).Forget();
await DispatchConsumerAsync(@event);
statusError = null;
statusPosition = @eventReceived.Event.EventPosition;
break;
}
}
private async Task StartAsync()
await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError);
}
catch (Exception ex)
{
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name);
try
{
await UnsubscribeThisAsync();
}
catch (Exception unsubscribeException)
{
ex = new AggregateException(ex, unsubscribeException);
}
eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, status.Position);
log.LogFatal(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventConsumer", eventConsumer.Name));
await eventConsumerInfoRepository.StartAsync(eventConsumer.Name);
statusError = ex.ToString();
statusIsRunning = false;
await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError);
}
}
private async Task StopAsync(Exception exception = null)
private async Task UnsubscribeThisAsync()
{
eventSubscription?.StopAsync().Forget();
if (eventSubscription != null)
{
await eventSubscription.StopAsync();
eventSubscription = null;
}
}
private Task SubscribeThisAsync(string position)
{
if (eventSubscription == null)
{
eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, position);
}
await eventConsumerInfoRepository.StopAsync(eventConsumer.Name, exception?.ToString());
return TaskHelper.Done;
}
private async Task ResetAsync()
private async Task ClearAsync()
{
var actionId = Guid.NewGuid().ToString();
@ -219,13 +326,11 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.Name)))
{
await eventConsumerInfoRepository.ResetAsync(eventConsumer.Name);
await eventConsumer.ClearAsync();
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, null, true);
}
}
private async Task DispatchConsumerAsync(Envelope<IEvent> @event, string position)
private async Task DispatchConsumerAsync(Envelope<IEvent> @event)
{
var eventId = @event.Headers.EventId().ToString();
var eventType = @event.Payload.GetType().Name;
@ -247,7 +352,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
.WriteProperty("eventConsumer", eventConsumer.Name)))
{
await eventConsumer.On(@event);
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, position, false);
}
}

2
src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs

@ -12,8 +12,6 @@ namespace Squidex.Infrastructure.CQRS.Events
{
bool IsStopped { get; }
bool IsResetting { get; }
string Name { get; }
string Error { get; }

10
src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs

@ -19,14 +19,6 @@ namespace Squidex.Infrastructure.CQRS.Events
Task ClearAsync(IEnumerable<string> currentConsumerNames);
Task CreateAsync(string consumerName);
Task StartAsync(string consumerName);
Task StopAsync(string consumerName, string error = null);
Task ResetAsync(string consumerName);
Task SetPositionAsync(string consumerName, string position, bool reset);
Task SetAsync(string consumerName, string position, bool isStopped, string error = null);
}
}

3
src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs

@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.CQRS.Events
@ -16,6 +17,8 @@ namespace Squidex.Infrastructure.CQRS.Events
{
Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName);
Task GetEventsAsync(Func<StoredEvent, Task> callback, CancellationToken cancellationToken, string streamFilter = null, string position = null);
Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events);
Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events);

82
src/Squidex.Infrastructure/CQRS/Events/PollingSubscription.cs

@ -0,0 +1,82 @@
// ==========================================================================
// PollingSubscription.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Squidex.Infrastructure.Timers;
namespace Squidex.Infrastructure.CQRS.Events
{
public sealed class PollingSubscription : IEventSubscription
{
private readonly IEventNotifier eventNotifier;
private readonly IEventStore eventStore;
private readonly IEventSubscriber eventSubscriber;
private readonly IDisposable notification;
private readonly CompletionTimer timer;
private readonly Regex streamRegex;
private readonly string streamFilter;
private string position;
public PollingSubscription(
IEventStore eventStore,
IEventNotifier eventNotifier,
IEventSubscriber eventSubscriber,
string streamFilter,
string position)
{
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventNotifier, nameof(eventNotifier));
Guard.NotNull(eventSubscriber, nameof(eventSubscriber));
this.position = position;
this.eventNotifier = eventNotifier;
this.eventStore = eventStore;
this.eventSubscriber = eventSubscriber;
this.streamFilter = streamFilter;
streamRegex = new Regex(streamFilter);
timer = new CompletionTimer(5000, async ct =>
{
try
{
await eventStore.GetEventsAsync(async storedEvent =>
{
await eventSubscriber.OnEventAsync(this, storedEvent);
position = storedEvent.EventPosition;
}, ct, streamFilter, position);
}
catch (Exception ex)
{
if (!ex.Is<OperationCanceledException>())
{
await eventSubscriber.OnErrorAsync(this, ex);
}
}
});
notification = eventNotifier.Subscribe(streamName =>
{
if (streamRegex.IsMatch(streamName))
{
timer.SkipCurrentDelay();
}
});
}
public Task StopAsync()
{
notification?.Dispose();
return timer.StopAsync();
}
}
}

48
src/Squidex.Infrastructure/RetryWindow.cs

@ -0,0 +1,48 @@
// ==========================================================================
// RetryWindow.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
namespace Squidex.Infrastructure
{
public sealed class RetryWindow
{
private readonly TimeSpan windowDuration;
private readonly int windowSize;
private readonly Queue<DateTime> retries = new Queue<DateTime>();
public RetryWindow(TimeSpan windowDuration, int windowSize)
{
this.windowDuration = windowDuration;
this.windowSize = windowSize + 1;
}
public void Reset()
{
retries.Clear();
}
public bool CanRetryAfterFailure()
{
return CanRetryAfterFailure(DateTime.UtcNow);
}
public bool CanRetryAfterFailure(DateTime utcNow)
{
retries.Enqueue(utcNow);
while (retries.Count > windowSize)
{
retries.Dequeue();
}
return retries.Count < windowSize || (retries.Count > 0 && (utcNow - retries.Peek()) > windowDuration);
}
}
}

2
tests/Squidex.Domain.Apps.Core.Tests/ContentEnrichmentTests.cs

@ -60,7 +60,7 @@ namespace Squidex.Domain.Apps.Core
Assert.Equal(Now, InstantPattern.General.Parse((string)data["my-datetime"]["iv"]).Value);
Assert.Equal(true, (bool)data["my-boolean"]["iv"]);
Assert.True((bool)data["my-boolean"]["iv"]);
}
[Fact]

6
tests/Squidex.Domain.Apps.Core.Tests/ContentValidationTests.cs

@ -154,7 +154,7 @@ namespace Squidex.Domain.Apps.Core
await data.ValidateAsync(context, schema, optionalConfig.ToResolver(), errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -248,7 +248,7 @@ namespace Squidex.Domain.Apps.Core
await data.ValidatePartialAsync(context, schema, languagesConfig.ToResolver(), errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -261,7 +261,7 @@ namespace Squidex.Domain.Apps.Core
await data.ValidatePartialAsync(context, schema, languagesConfig.ToResolver(), errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]

2
tests/Squidex.Domain.Apps.Core.Tests/Contents/ContentDataTests.cs

@ -14,6 +14,8 @@ using Squidex.Domain.Apps.Core.Schemas;
using Squidex.Infrastructure;
using Xunit;
#pragma warning disable xUnit2013 // Do not use equality check to check for collection size.
namespace Squidex.Domain.Apps.Core.Contents
{
public class ContentDataTests

2
tests/Squidex.Domain.Apps.Core.Tests/InvariantPartitionTests.cs

@ -11,6 +11,8 @@ using System.Collections.Generic;
using System.Linq;
using Xunit;
#pragma warning disable xUnit2013 // Do not use equality check to check for collection size.
namespace Squidex.Domain.Apps.Core
{
public sealed class InvariantPartitionTests

2
tests/Squidex.Domain.Apps.Core.Tests/LanguagesConfigTests.cs

@ -224,7 +224,7 @@ namespace Squidex.Domain.Apps.Core
{
var config = LanguagesConfig.Create();
Assert.Equal(0, config.Count);
Assert.Empty(config);
Assert.NotNull(((IEnumerable)config).GetEnumerator());
Assert.NotNull(((IEnumerable<IFieldPartitionItem>)config).GetEnumerator());

2
tests/Squidex.Domain.Apps.Core.Tests/Schemas/DateTimeFieldPropertiesTests.cs

@ -33,7 +33,7 @@ namespace Squidex.Domain.Apps.Core.Schemas
sut.Validate(errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]

2
tests/Squidex.Domain.Apps.Core.Tests/Schemas/NumberFieldPropertiesTests.cs

@ -33,7 +33,7 @@ namespace Squidex.Domain.Apps.Core.Schemas
sut.Validate(errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]

2
tests/Squidex.Domain.Apps.Core.Tests/Schemas/SchemaTests.cs

@ -234,7 +234,7 @@ namespace Squidex.Domain.Apps.Core.Schemas
sut = sut.DeleteField(1);
Assert.Equal(0, sut.FieldsById.Count);
Assert.Empty(sut.FieldsById);
}
[Fact]

4
tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/AllowedValuesValidatorTests.cs

@ -24,7 +24,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(null, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -34,7 +34,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(100, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]

6
tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/PatternValidatorTests.cs

@ -24,7 +24,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync("abc:12", errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -34,7 +34,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(null, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -44,7 +44,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(string.Empty, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]

4
tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RangeValidatorTests.cs

@ -25,7 +25,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(null, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Theory]
@ -39,7 +39,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(1500, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Theory]

6
tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredStringValidatorTests.cs

@ -28,7 +28,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(value, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -38,7 +38,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateOptionalAsync(string.Empty, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -48,7 +48,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(true, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]

6
tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredValidatorTests.cs

@ -24,7 +24,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(true, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -34,7 +34,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(string.Empty, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -44,7 +44,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateOptionalAsync(null, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]

6
tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/StringLengthValidatorTests.cs

@ -26,7 +26,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(null, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Fact]
@ -36,7 +36,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(string.Empty, errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Theory]
@ -50,7 +50,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators
await sut.ValidateAsync(CreateString(1500), errors);
Assert.Equal(0, errors.Count);
Assert.Empty(errors);
}
[Theory]

2
tests/Squidex.Domain.Apps.Core.Tests/Scripting/JintUserTests.cs

@ -11,6 +11,8 @@ using Jint;
using Squidex.Infrastructure.Security;
using Xunit;
#pragma warning disable xUnit2004 // Do not use equality check to test for boolean conditions
namespace Squidex.Domain.Apps.Core.Scripting
{
public class JintUserTests

2
tests/Squidex.Infrastructure.Tests/Assets/AzureBlobAssetStoreTests.cs

@ -22,7 +22,7 @@ namespace Squidex.Infrastructure.Assets
{
}
[Fact]
// [Fact]
public void Should_calculate_source_url()
{
Sut.Connect();

2
tests/Squidex.Infrastructure.Tests/Assets/GoogleCloudAssetStoreTests.cs

@ -22,7 +22,7 @@ namespace Squidex.Infrastructure.Assets
{
}
[Fact]
// [Fact]
public void Should_calculate_source_url()
{
Sut.Connect();

4
tests/Squidex.Infrastructure.Tests/CQRS/Commands/AggregateHandlerTests.cs

@ -87,7 +87,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
await sut.CreateAsync<MyDomainObject>(context, async x =>
{
await Task.Delay(1);
await Task.Yield();
passedDomainObject = x;
});
@ -139,7 +139,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
await sut.UpdateAsync<MyDomainObject>(context, async x =>
{
await Task.Delay(1);
await Task.Yield();
passedDomainObject = x;
});

251
tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs

@ -26,8 +26,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public bool IsStopped { get; set; }
public bool IsResetting { get; set; }
public string Name { get; set; }
public string Error { get; set; }
@ -61,73 +59,104 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log);
sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log) { ReconnectWaitMs = 0 };
sutActor = sut;
sutSubscriber = sut;
}
[Fact]
public async Task Should_subscribe_to_event_store_when_started()
public async Task Should_not_not_subscribe_to_event_store_when_stopped_in_db()
{
await SubscribeAsync();
consumerInfo.IsStopped = true;
await OnSubscribeAsync();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
.MustHaveHappened();
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustNotHaveHappened();
}
A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
.MustHaveHappened();
[Fact]
public async Task Should_subscribe_to_event_store_when_not_found_in_db()
{
A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(Task.FromResult<IEventConsumerInfo>(null));
await OnSubscribeAsync();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_subscribe_to_event_store_when_not_stopped_in_db()
{
await OnSubscribeAsync();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_subscription_when_stopped()
{
await SubscribeAsync();
await OnSubscribeAsync();
sutActor.Tell(new StopConsumerMessage());
sutActor.Tell(new StopConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, null))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened();
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_reset_consumer_when_resetting()
{
await SubscribeAsync();
await OnSubscribeAsync();
sutActor.Tell(new StopConsumerMessage());
sutActor.Tell(new ResetConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
.MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, null, true))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, null))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumer.ClearAsync())
.MustHaveHappened();
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened();
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, consumerInfo.Position))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, null))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
@ -135,17 +164,19 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
await OnSubscribeAsync();
await OnEventAsync(eventSubscription, @event);
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
@ -153,122 +184,180 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
{
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(A.Fake<IEventSubscription>(), @event);
await OnSubscribeAsync();
await OnEventAsync(A.Fake<IEventSubscription>(), @event);
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null))
.MustNotHaveHappened();
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_reopen_subscription_when_exception_is_retrieved()
{
var ex = new InvalidOperationException();
await OnSubscribeAsync();
await OnErrorAsync(eventSubscription, ex);
await Task.Delay(200);
await sut.WaitForCompletionAsync();
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Times(3));
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString()))
.MustNotHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
}
[Fact]
public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription()
{
var ex = new InvalidOperationException();
await OnSubscribeAsync();
await OnErrorAsync(A.Fake<IEventSubscription>(), ex);
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString()))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_stop_if_resetting_failed()
{
var exception = new InvalidOperationException("Exception");
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.ClearAsync())
.Throws(exception);
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await OnSubscribeAsync();
sutActor.Tell(new ResetConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString()))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_if_handling_failed()
{
var exception = new InvalidOperationException("Exception");
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
await OnSubscribeAsync();
await OnEventAsync(eventSubscription, @event);
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
.MustNotHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString()))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_start_after_stop_when_handling_failed()
public async Task Should_stop_if_deserialization_failed()
{
var exception = new InvalidOperationException("Exception");
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
A.CallTo(() => formatter.Parse(eventData, true))
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
await OnSubscribeAsync();
await OnEventAsync(eventSubscription, @event);
sutActor.Tell(new StartConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
.MustNotHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString()))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName))
.MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_if_deserialization_failed()
public async Task Should_start_after_stop_when_handling_failed()
{
var exception = new InvalidOperationException("Exception");
var exception = new InvalidOperationException();
A.CallTo(() => formatter.Parse(eventData, true))
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await SubscribeAsync();
await sutSubscriber.OnEventAsync(eventSubscription, @event);
await OnSubscribeAsync();
await OnEventAsync(eventSubscription, @event);
sutActor.Tell(new StartConsumerMessage());
sutActor.Tell(new StartConsumerMessage());
sut.Dispose();
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
.MustHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false))
.MustNotHaveHappened();
A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, exception.ToString()))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString()))
.MustHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
}
private async Task SubscribeAsync()
private Task OnErrorAsync(IEventSubscription subscriber, Exception ex)
{
await sut.SubscribeAsync(eventConsumer);
return sutSubscriber.OnErrorAsync(subscriber, ex);
}
await Task.Delay(200);
private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev)
{
return sutSubscriber.OnEventAsync(subscriber, ev);
}
private Task OnSubscribeAsync()
{
return sut.SubscribeAsync(eventConsumer);
}
}
}

116
tests/Squidex.Infrastructure.Tests/CQRS/Events/PollingSubscriptionTests.cs

@ -0,0 +1,116 @@
// ==========================================================================
// PollingSubscriptionTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using FakeItEasy;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events
{
public class PollingSubscriptionTests
{
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventNotifier eventNotifier = new DefaultEventNotifier(new InMemoryPubSub());
private readonly IEventSubscriber eventSubscriber = A.Fake<IEventSubscriber>();
private readonly string position = Guid.NewGuid().ToString();
[Fact]
public async Task Should_subscribe_on_start()
{
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, A<CancellationToken>.Ignored, "^my-stream", position))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_propagate_exception_to_subscriber()
{
var ex = new InvalidOperationException();
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, A<CancellationToken>.Ignored, "^my-stream", position))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex))
.MustHaveHappened();
}
[Fact]
public async Task Should_propagate_operation_cancelled_exception_to_subscriber()
{
var ex = new OperationCanceledException();
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, A<CancellationToken>.Ignored, "^my-stream", position))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_propagate_aggregate_operation_cancelled_exception_to_subscriber()
{
var ex = new AggregateException(new OperationCanceledException());
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, A<CancellationToken>.Ignored, "^my-stream", position))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_not_subscribe_on_notify_when_stream_matches()
{
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
eventNotifier.NotifyEventsStored("other-stream-123");
await WaitAndStopAsync(sut);
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, A<CancellationToken>.Ignored, "^my-stream", position))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_subscribe_on_notify_when_stream_matches()
{
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
eventNotifier.NotifyEventsStored("my-stream-123");
await WaitAndStopAsync(sut);
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, A<CancellationToken>.Ignored, "^my-stream", position))
.MustHaveHappened(Repeated.Exactly.Twice);
}
private async Task WaitAndStopAsync(PollingSubscription sut)
{
await Task.Delay(200);
await sut.StopAsync();
}
}
}

4
tests/Squidex.Infrastructure.Tests/GravatarHelperTests.cs

@ -20,7 +20,7 @@ namespace Squidex.Infrastructure
{
var url = GravatarHelper.CreatePictureUrl(email);
Assert.Equal(url, "https://www.gravatar.com/avatar/0bc83cb571cd1c50ba6f3e8a78ef1346");
Assert.Equal("https://www.gravatar.com/avatar/0bc83cb571cd1c50ba6f3e8a78ef1346", url);
}
[Theory]
@ -31,7 +31,7 @@ namespace Squidex.Infrastructure
{
var url = GravatarHelper.CreateProfileUrl(email);
Assert.Equal(url, "https://www.gravatar.com/0bc83cb571cd1c50ba6f3e8a78ef1346");
Assert.Equal("https://www.gravatar.com/0bc83cb571cd1c50ba6f3e8a78ef1346", url);
}
}
}

6
tests/Squidex.Infrastructure.Tests/Log/SemanticLogTests.cs

@ -240,7 +240,7 @@ namespace Squidex.Infrastructure.Log
.WriteProperty("message", "My Message")
.WriteProperty("elapsedMs", 0));
Assert.True(output.StartsWith(expected.Substring(0, 55), StringComparison.Ordinal));
Assert.StartsWith(expected.Substring(0, 55), output, StringComparison.Ordinal);
}
[Fact]
@ -254,7 +254,7 @@ namespace Squidex.Infrastructure.Log
.WriteProperty("message", "My Message")
.WriteProperty("elapsedMs", 0));
Assert.True(output.StartsWith(expected.Substring(0, 55), StringComparison.Ordinal));
Assert.StartsWith(expected.Substring(0, 55), output, StringComparison.Ordinal);
}
[Fact]
@ -268,7 +268,7 @@ namespace Squidex.Infrastructure.Log
.WriteProperty("message", "My Message")
.WriteProperty("elapsedMs", 0));
Assert.True(output.StartsWith(expected.Substring(0, 55), StringComparison.Ordinal));
Assert.StartsWith(expected.Substring(0, 55), output, StringComparison.Ordinal);
}
[Fact]

6
tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs

@ -107,9 +107,11 @@ namespace Squidex.Infrastructure.Reflection
Assert.Equal(class1.MappedString, class2.MappedString);
Assert.Equal(class1.MappedNumber, class2.MappedNumber);
Assert.Equal(class1.MappedGuid.ToString(), class2.MappedGuid);
Assert.Equal(class1.WrongType1, 0L);
Assert.Equal(class1.WrongType2, 0L);
Assert.NotEqual(class1.UnmappedString, class2.UnmappedString);
Assert.Equal(0L, class1.WrongType1);
Assert.Equal(0L, class1.WrongType2);
}
}
}

90
tests/Squidex.Infrastructure.Tests/RetryWindowTests.cs

@ -0,0 +1,90 @@
// ==========================================================================
// RetryWindowTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using Xunit;
namespace Squidex.Infrastructure
{
public class RetryWindowTests
{
private const int WindowSize = 5;
[Fact]
public void Should_allow_to_retry_after_reset()
{
var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize);
for (var i = 0; i < WindowSize * 2; i++)
{
sut.CanRetryAfterFailure();
}
sut.Reset();
Assert.True(sut.CanRetryAfterFailure());
}
[Theory]
[InlineData(6)]
[InlineData(7)]
public void Should_not_allow_to_retry_after_many_errors(int errors)
{
var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize);
var now = DateTime.UtcNow;
for (var i = 0; i < WindowSize; i++)
{
Assert.True(sut.CanRetryAfterFailure(now));
}
var remaining = errors - WindowSize;
for (var i = 0; i < remaining; i++)
{
Assert.False(sut.CanRetryAfterFailure(now));
}
}
[Theory]
[InlineData(1)]
[InlineData(2)]
[InlineData(3)]
[InlineData(4)]
public void Should_allow_to_retry_after_few_errors(int errors)
{
var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize);
var now = DateTime.UtcNow;
for (var i = 0; i < errors; i++)
{
Assert.True(sut.CanRetryAfterFailure(now));
}
}
[Theory]
[InlineData(1)]
[InlineData(2)]
[InlineData(3)]
[InlineData(4)]
[InlineData(5)]
[InlineData(6)]
[InlineData(7)]
[InlineData(8)]
public void Should_allow_to_retry_after_few_errors_in_window(int errors)
{
var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize);
var now = DateTime.UtcNow;
for (var i = 0; i < errors; i++)
{
Assert.True(sut.CanRetryAfterFailure(now.AddMilliseconds(i * 300)));
}
}
}
}

17
tests/Squidex.Infrastructure.Tests/Timers/CompletionTimerTests.cs

@ -31,22 +31,5 @@ namespace Squidex.Infrastructure.Timers
Assert.True(called);
}
public void Should_invoke_dispose_within_timer()
{
CompletionTimer timer = null;
timer = new CompletionTimer(10, ct =>
{
timer?.StopAsync().Wait();
return TaskHelper.Done;
}, 10);
Thread.Sleep(1000);
timer.SkipCurrentDelay();
timer.StopAsync().Wait();
}
}
}

3
tests/Squidex.Infrastructure.Tests/UsageTracking/BackgroundUsageTrackerTests.cs

@ -112,8 +112,7 @@ namespace Squidex.Infrastructure.UsageTracking
await sut.TrackAsync("key1", 0, 1000);
sut.Next();
await Task.Delay(100);
sut.Dispose();
A.CallTo(() => usageStore.TrackUsagesAsync(A<DateTime>.Ignored, A<string>.Ignored, A<double>.Ignored, A<long>.Ignored)).MustNotHaveHappened();
}

Loading…
Cancel
Save