diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs index e1044e299..4ed616a77 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs @@ -9,6 +9,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using EventStore.ClientAPI; @@ -32,6 +33,10 @@ namespace Squidex.Infrastructure.CQRS.Events this.prefix = prefix?.Trim(' ', '-').WithFallback("squidex"); } + public GetEventStore() + { + } + public void Connect() { try @@ -46,12 +51,14 @@ namespace Squidex.Infrastructure.CQRS.Events public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) { - Guard.NotNull(subscriber, nameof(subscriber)); - Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); - return new GetEventStoreSubscription(connection, subscriber, projectionHost, prefix, position, streamFilter); } + public Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null) + { + throw new NotSupportedException(); + } + public async Task> GetEventsAsync(string streamName) { var result = new List(); diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs index d1e461f3d..e2909ec81 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs @@ -8,7 +8,7 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; +using System.Globalization; using System.Linq; using System.Net; using System.Net.Sockets; @@ -16,162 +16,78 @@ using System.Threading.Tasks; using EventStore.ClientAPI; using EventStore.ClientAPI.Exceptions; using EventStore.ClientAPI.Projections; -using Squidex.Infrastructure.Actors; using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.CQRS.Events { - internal sealed class GetEventStoreSubscription : Actor, IEventSubscription + internal sealed class GetEventStoreSubscription : IEventSubscription { - private const int ReconnectWindowMax = 5; - private const int ReconnectWaitMs = 1000; - private static readonly TimeSpan TimeBetweenReconnects = TimeSpan.FromMinutes(5); + private const string ProjectionName = "by-{0}-{1}"; private static readonly ConcurrentDictionary SubscriptionsCreated = new ConcurrentDictionary(); - private readonly IEventStoreConnection connection; - private readonly IEventSubscriber subscriber; + private readonly IEventStoreConnection eventStoreConnection; + private readonly IEventSubscriber eventSubscriber; private readonly string prefix; - private readonly string streamName; private readonly string streamFilter; private readonly string projectionHost; - private readonly Queue reconnectTimes = new Queue(); - private EventStoreCatchUpSubscription subscription; + private readonly EventStoreCatchUpSubscription subscription; private long? position; - private sealed class ESConnect - { - } - - private abstract class ESMessage - { - public EventStoreCatchUpSubscription Subscription { get; set; } - } - - private sealed class ESSubscriptionFailed : ESMessage - { - public Exception Exception { get; set; } - } - - private sealed class ESEventReceived : ESMessage - { - public ResolvedEvent Event { get; set; } - } - public GetEventStoreSubscription( - IEventStoreConnection connection, - IEventSubscriber subscriber, + IEventStoreConnection eventStoreConnection, + IEventSubscriber eventSubscriber, string projectionHost, string prefix, string position, string streamFilter) { - this.connection = connection; + Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); + Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); + + this.eventStoreConnection = eventStoreConnection; + this.eventSubscriber = eventSubscriber; this.position = ParsePosition(position); this.prefix = prefix; this.projectionHost = projectionHost; this.streamFilter = streamFilter; - this.subscriber = subscriber; - streamName = ParseFilter(prefix, streamFilter); + var streamName = ParseFilter(prefix, streamFilter); - DispatchAsync(new ESConnect()).Forget(); - } + InitializeAsync(streamName).Wait(); - public Task StopAsync() - { - return StopAndWaitAsync(); + subscription = SubscribeToStream(streamName); } - protected override Task OnStop() + public Task StopAsync() { - subscription?.Stop(); + subscription.Stop(); return TaskHelper.Done; } - protected override async Task OnError(Exception exception) - { - await subscriber.OnErrorAsync(this, exception); - - await StopAsync(); - } - - protected override async Task OnMessage(object message) - { - switch (message) - { - case ESConnect connect when subscription == null: - { - await InitializeAsync(); - - subscription = SubscribeToStream(); - - break; - } - - case ESSubscriptionFailed subscriptionFailed when subscriptionFailed.Subscription == subscription: - { - subscription.Stop(); - subscription = null; - - if (CanReconnect(DateTime.UtcNow)) - { - Task.Delay(ReconnectWaitMs).ContinueWith(t => DispatchAsync(new ESConnect())).Forget(); - } - else - { - throw subscriptionFailed.Exception; - } - - break; - } - - case ESEventReceived eventReceived when eventReceived.Subscription == subscription: - { - var storedEvent = Formatter.Read(eventReceived.Event); - - await subscriber.OnEventAsync(this, storedEvent); - - position = eventReceived.Event.OriginalEventNumber; - - break; - } - } - } - - private EventStoreCatchUpSubscription SubscribeToStream() + private EventStoreCatchUpSubscription SubscribeToStream(string streamName) { var settings = CatchUpSubscriptionSettings.Default; - return connection.SubscribeToStreamFrom(streamName, position, settings, + return eventStoreConnection.SubscribeToStreamFrom(streamName, position, settings, (s, e) => { - DispatchAsync(new ESEventReceived { Event = e, Subscription = s }).Forget(); + var storedEvent = Formatter.Read(e); + + eventSubscriber.OnEventAsync(this, storedEvent).Wait(); }, null, (s, reason, ex) => { - if (reason == SubscriptionDropReason.ConnectionClosed || - reason == SubscriptionDropReason.UserInitiated) + if (reason != SubscriptionDropReason.ConnectionClosed && + reason != SubscriptionDropReason.UserInitiated) { ex = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); - DispatchAsync(new ESSubscriptionFailed { Exception = ex, Subscription = s }).Forget(); + eventSubscriber.OnErrorAsync(this, ex); } }); } - private bool CanReconnect(DateTime utcNow) - { - reconnectTimes.Enqueue(utcNow); - - while (reconnectTimes.Count >= ReconnectWindowMax) - { - reconnectTimes.Dequeue(); - } - - return reconnectTimes.Count < ReconnectWindowMax && (reconnectTimes.Count == 0 || (utcNow - reconnectTimes.Peek()) > TimeBetweenReconnects); - } - - private async Task InitializeAsync() + private async Task InitializeAsync(string streamName) { if (SubscriptionsCreated.TryAdd(streamName, true)) { @@ -189,7 +105,7 @@ namespace Squidex.Infrastructure.CQRS.Events try { - var credentials = connection.Settings.DefaultUserCredentials; + var credentials = eventStoreConnection.Settings.DefaultUserCredentials; await projectsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials); } @@ -203,16 +119,6 @@ namespace Squidex.Infrastructure.CQRS.Events } } - private static string ParseFilter(string prefix, string filter) - { - return $"by-{prefix.Simplify()}-{filter.Simplify()}"; - } - - private static long? ParsePosition(string position) - { - return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; - } - private async Task ConnectToProjections() { var addressParts = projectionHost.Split(':'); @@ -227,10 +133,20 @@ namespace Squidex.Infrastructure.CQRS.Events var projectionsManager = new ProjectionsManager( - connection.Settings.Log, endpoint, - connection.Settings.OperationTimeout); + eventStoreConnection.Settings.Log, endpoint, + eventStoreConnection.Settings.OperationTimeout); return projectionsManager; } + + private static string ParseFilter(string prefix, string filter) + { + return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Simplify(), filter.Simplify()); + } + + private static long? ParsePosition(string position) + { + return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null; + } } } diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfo.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfo.cs index 9839076c8..07abec5cc 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfo.cs @@ -11,6 +11,7 @@ using MongoDB.Bson.Serialization.Attributes; namespace Squidex.Infrastructure.CQRS.Events { + [BsonIgnoreExtraElements] public sealed class MongoEventConsumerInfo : IEventConsumerInfo { [BsonId] @@ -25,10 +26,6 @@ namespace Squidex.Infrastructure.CQRS.Events [BsonIgnoreIfDefault] public bool IsStopped { get; set; } - [BsonElement] - [BsonIgnoreIfDefault] - public bool IsResetting { get; set; } - [BsonElement] [BsonRequired] public string Position { get; set; } diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs index 306e1b736..1d7f9acef 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventConsumerInfoRepository.cs @@ -21,7 +21,6 @@ namespace Squidex.Infrastructure.CQRS.Events private static readonly FieldDefinition ErrorField = Fields.Build(x => x.Error); private static readonly FieldDefinition PositionField = Fields.Build(x => x.Position); private static readonly FieldDefinition IsStoppedField = Fields.Build(x => x.IsStopped); - private static readonly FieldDefinition IsResettingField = Fields.Build(x => x.IsResetting); public MongoEventConsumerInfoRepository(IMongoDatabase database) : base(database) @@ -47,67 +46,29 @@ namespace Squidex.Infrastructure.CQRS.Events return entity; } - public async Task CreateAsync(string consumerName) - { - if (await Collection.CountAsync(Filter.Eq(NameField, consumerName)) == 0) - { - try - { - await Collection.InsertOneAsync(CreateEntity(consumerName, null)); - } - catch (MongoWriteException ex) - { - if (ex.WriteError?.Category != ServerErrorCategory.DuplicateKey) - { - throw; - } - } - } - } - public Task ClearAsync(IEnumerable currentConsumerNames) { return Collection.DeleteManyAsync(Filter.Not(Filter.In(NameField, currentConsumerNames))); } - public Task StartAsync(string consumerName) + public async Task SetAsync(string consumerName, string position, bool isStopped = false, string error = null) { - var filter = Filter.Eq(NameField, consumerName); - - return Collection.UpdateOneAsync(filter, Update.Unset(IsStoppedField).Unset(ErrorField)); - } - - public Task StopAsync(string consumerName, string error = null) - { - var filter = Filter.Eq(NameField, consumerName); - - return Collection.UpdateOneAsync(filter, Update.Set(IsStoppedField, true).Set(ErrorField, error)); - } - - public Task ResetAsync(string consumerName) - { - var filter = Filter.Eq(NameField, consumerName); - - return Collection.UpdateOneAsync(filter, Update.Set(IsResettingField, true).Unset(ErrorField)); - } - - public Task SetPositionAsync(string consumerName, string position, bool reset) - { - var filter = Filter.Eq(NameField, consumerName); - - if (reset) + try { - return Collection.ReplaceOneAsync(filter, CreateEntity(consumerName, position)); + await Collection.UpdateOneAsync(Filter.Eq(NameField, consumerName), + Update + .Set(ErrorField, error) + .Set(PositionField, position) + .Set(IsStoppedField, isStopped), + new UpdateOptions { IsUpsert = true }); } - else + catch (MongoWriteException ex) { - return Collection.UpdateOneAsync(filter, Update.Set(PositionField, position)); + if (ex.WriteError?.Category != ServerErrorCategory.DuplicateKey) + { + throw; + } } } - - private static MongoEventConsumerInfo CreateEntity(string consumerName, string position) - { - return new MongoEventConsumerInfo { Name = consumerName, Position = position }; - } } } diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs deleted file mode 100644 index 196809500..000000000 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs +++ /dev/null @@ -1,141 +0,0 @@ -// ========================================================================== -// PollingSubscription.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Text.RegularExpressions; -using System.Threading; -using System.Threading.Tasks; -using Squidex.Infrastructure.Actors; -using Squidex.Infrastructure.Tasks; - -namespace Squidex.Infrastructure.CQRS.Events -{ - public sealed class PollingSubscription : Actor, IEventSubscription - { - private readonly IEventNotifier notifier; - private readonly MongoEventStore store; - private readonly CancellationTokenSource disposeToken = new CancellationTokenSource(); - private readonly Regex streamRegex; - private readonly string streamFilter; - private readonly IEventSubscriber subscriber; - private string position; - private bool isPolling; - private IDisposable notification; - - private sealed class Connect - { - } - - private sealed class StartPoll - { - } - - private sealed class StopPoll - { - } - - public PollingSubscription(MongoEventStore store, IEventNotifier notifier, IEventSubscriber subscriber, string streamFilter, string position) - { - this.notifier = notifier; - this.position = position; - this.store = store; - this.streamFilter = streamFilter; - this.subscriber = subscriber; - - streamRegex = new Regex(streamFilter); - - DispatchAsync(new Connect()).Forget(); - } - - public Task StopAsync() - { - return StopAndWaitAsync(); - } - - protected override Task OnStop() - { - disposeToken?.Cancel(); - - notification?.Dispose(); - - return TaskHelper.Done; - } - - protected override async Task OnError(Exception exception) - { - await subscriber.OnErrorAsync(this, exception); - - await StopAsync(); - } - - protected override async Task OnMessage(object message) - { - switch (message) - { - case Connect connect: - { - notification = notifier.Subscribe(streamName => - { - if (streamRegex.IsMatch(streamName)) - { - DispatchAsync(new StartPoll()).Forget(); - } - }); - - DispatchAsync(new StartPoll()).Forget(); - - break; - } - - case StartPoll poll when !isPolling: - { - isPolling = true; - - PollAsync().Forget(); - - break; - } - - case StopPoll poll when isPolling: - { - isPolling = false; - - Task.Delay(5000).ContinueWith(t => DispatchAsync(new StartPoll())).Forget(); - - break; - } - - case StoredEvent storedEvent: - { - await subscriber.OnEventAsync(this, storedEvent); - - position = storedEvent.EventPosition; - - break; - } - } - } - - private async Task PollAsync() - { - try - { - await store.GetEventsAsync(DispatchAsync, disposeToken.Token, streamFilter, position); - - await DispatchAsync(new StopPoll()); - } - catch (Exception ex) - { - if (!ex.Is()) - { - await FailAsync(ex); - } - } - } - } -} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index 2c2f3c3ed..b28b6d2e6 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -8,6 +8,7 @@ using System; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Squidex.Infrastructure.Actors; using Squidex.Infrastructure.CQRS.Events.Actors.Messages; using Squidex.Infrastructure.Log; @@ -15,16 +16,25 @@ using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.CQRS.Events.Actors { - public sealed class EventConsumerActor : Actor, IEventSubscriber, IActor + public sealed class EventConsumerActor : DisposableObjectBase, IEventSubscriber, IActor { private readonly EventDataFormatter formatter; + private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5); private readonly IEventStore eventStore; private readonly IEventConsumerInfoRepository eventConsumerInfoRepository; private readonly ISemanticLog log; + private readonly ActionBlock dispatcher; private IEventSubscription eventSubscription; private IEventConsumer eventConsumer; - private bool isRunning; - private bool isSetup; + private bool isStopped; + private bool statusIsRunning = true; + private string statusPosition; + private string statusError; + private Guid stateId = Guid.NewGuid(); + + private sealed class Teardown + { + } private sealed class Setup { @@ -46,6 +56,13 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors public Exception Exception { get; set; } } + private sealed class Reconnect + { + public Guid StateId { get; set; } + } + + public int ReconnectWaitMs { get; set; } = 5000; + public EventConsumerActor( EventDataFormatter formatter, IEventStore eventStore, @@ -62,148 +79,238 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors this.formatter = formatter; this.eventStore = eventStore; this.eventConsumerInfoRepository = eventConsumerInfoRepository; - } - public Task SubscribeAsync(IEventConsumer eventConsumer) - { - Guard.NotNull(eventConsumer, nameof(eventConsumer)); + var options = new ExecutionDataflowBlockOptions + { + MaxMessagesPerTask = -1, + MaxDegreeOfParallelism = 1, + BoundedCapacity = 10 + }; - return DispatchAsync(new Setup { EventConsumer = eventConsumer }); + dispatcher = new ActionBlock(OnMessage, options); } - protected override async Task OnStop() + protected override void DisposeObject(bool disposing) { - if (eventSubscription != null) + if (disposing) { - await eventSubscription.StopAsync(); + dispatcher.SendAsync(new Teardown()).Wait(); + dispatcher.Complete(); + dispatcher.Completion.Wait(); } } - protected override async Task OnError(Exception exception) + public async Task WaitForCompletionAsync() { - log.LogError(exception, w => w - .WriteProperty("action", "HandleEvent") - .WriteProperty("state", "Failed") - .WriteProperty("eventConsumer", eventConsumer.Name)); + while (dispatcher.InputCount > 0) + { + await Task.Delay(20); + } + } - await StopAsync(exception); + public Task SubscribeAsync(IEventConsumer eventConsumer) + { + Guard.NotNull(eventConsumer, nameof(eventConsumer)); - isRunning = false; + return dispatcher.SendAsync(new Setup { EventConsumer = eventConsumer }); } Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent @event) { - return DispatchAsync(new SubscriptionEventReceived { Subscription = subscription, Event = @event }); + return dispatcher.SendAsync(new SubscriptionEventReceived { Subscription = subscription, Event = @event }); } Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) { - return DispatchAsync(new SubscriptionFailed { Subscription = subscription, Exception = exception }); + return dispatcher.SendAsync(new SubscriptionFailed { Subscription = subscription, Exception = exception }); } void IActor.Tell(object message) { - DispatchAsync(message).Forget(); + dispatcher.SendAsync(message).Forget(); } - protected override async Task OnMessage(object message) + private async Task OnMessage(object message) { - switch (message) + if (isStopped) + { + return; + } + + try { - case Setup setup when !isSetup: + var oldStateId = stateId; + var newStateId = stateId = Guid.NewGuid(); + + switch (message) + { + case Teardown teardown: + { + isStopped = true; + + return; + } + + case Setup setup: { eventConsumer = setup.EventConsumer; - await SetupAsync(); + var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); + + if (status != null) + { + statusError = status.Error; + statusPosition = status.Position; + statusIsRunning = !status.IsStopped; + } - isSetup = true; + if (statusIsRunning) + { + await SubscribeThisAsync(statusPosition); + } break; } - case StartConsumerMessage startConsumer when isSetup && !isRunning: + case StartConsumerMessage startConsumer: { - await StartAsync(); + if (statusIsRunning) + { + return; + } + + await SubscribeThisAsync(statusPosition); - isRunning = true; + statusError = null; + statusIsRunning = true; break; } - case StopConsumerMessage stopConsumer when isSetup && isRunning: + case StopConsumerMessage stopConsumer: { - await StopAsync(); + if (!statusIsRunning) + { + return; + } - isRunning = false; + await UnsubscribeThisAsync(); + + statusIsRunning = false; break; } - case ResetConsumerMessage resetConsumer when isSetup: + case ResetConsumerMessage resetConsumer: { - await StopAsync(); - await ResetAsync(); - await StartAsync(); + await UnsubscribeThisAsync(); + await ClearAsync(); + await SubscribeThisAsync(null); - isRunning = true; + statusError = null; + statusPosition = null; + statusIsRunning = true; break; } - case SubscriptionFailed subscriptionFailed when isSetup: + case Reconnect reconnect: { - if (subscriptionFailed.Subscription == eventSubscription) + if (!statusIsRunning || reconnect.StateId != oldStateId) { - await FailAsync(subscriptionFailed.Exception); + return; } + await SubscribeThisAsync(statusPosition); + break; } - case SubscriptionEventReceived eventReceived when isSetup: + case SubscriptionFailed subscriptionFailed: { - if (eventReceived.Subscription == eventSubscription) + if (subscriptionFailed.Subscription != eventSubscription) { - var @event = ParseEvent(eventReceived.Event); + return; + } + + await UnsubscribeThisAsync(); - await DispatchConsumerAsync(@event, eventReceived.Event.EventPosition); + if (retryWindow.CanRetryAfterFailure()) + { + Task.Delay(ReconnectWaitMs).ContinueWith(t => dispatcher.SendAsync(new Reconnect { StateId = newStateId })).Forget(); + } + else + { + throw subscriptionFailed.Exception; } break; } - } - } - private async Task SetupAsync() - { - await eventConsumerInfoRepository.CreateAsync(eventConsumer.Name); + case SubscriptionEventReceived eventReceived: + { + if (eventReceived.Subscription != eventSubscription) + { + return; + } + + var @event = ParseEvent(eventReceived.Event); - var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); + await DispatchConsumerAsync(@event); + + statusError = null; + statusPosition = @eventReceived.Event.EventPosition; + + break; + } + } - if (!status.IsStopped) + await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError); + } + catch (Exception ex) { - DispatchAsync(new StartConsumerMessage()).Forget(); + try + { + await UnsubscribeThisAsync(); + } + catch (Exception unsubscribeException) + { + ex = new AggregateException(ex, unsubscribeException); + } + + log.LogFatal(ex, w => w + .WriteProperty("action", "HandleEvent") + .WriteProperty("state", "Failed") + .WriteProperty("eventConsumer", eventConsumer.Name)); + + statusError = ex.ToString(); + statusIsRunning = false; + + await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError); } } - private async Task StartAsync() + private async Task UnsubscribeThisAsync() { - var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); - - eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, status.Position); + if (eventSubscription != null) + { + await eventSubscription.StopAsync(); - await eventConsumerInfoRepository.StartAsync(eventConsumer.Name); + eventSubscription = null; + } } - private async Task StopAsync(Exception exception = null) + private Task SubscribeThisAsync(string position) { - eventSubscription?.StopAsync().Forget(); - eventSubscription = null; + if (eventSubscription == null) + { + eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, position); + } - await eventConsumerInfoRepository.StopAsync(eventConsumer.Name, exception?.ToString()); + return TaskHelper.Done; } - private async Task ResetAsync() + private async Task ClearAsync() { var actionId = Guid.NewGuid().ToString(); @@ -219,13 +326,11 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors .WriteProperty("state", "Completed") .WriteProperty("eventConsumer", eventConsumer.Name))) { - await eventConsumerInfoRepository.ResetAsync(eventConsumer.Name); await eventConsumer.ClearAsync(); - await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, null, true); } } - private async Task DispatchConsumerAsync(Envelope @event, string position) + private async Task DispatchConsumerAsync(Envelope @event) { var eventId = @event.Headers.EventId().ToString(); var eventType = @event.Payload.GetType().Name; @@ -247,7 +352,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors .WriteProperty("eventConsumer", eventConsumer.Name))) { await eventConsumer.On(@event); - await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, position, false); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs index e05f9623a..f074ee5ef 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs @@ -12,8 +12,6 @@ namespace Squidex.Infrastructure.CQRS.Events { bool IsStopped { get; } - bool IsResetting { get; } - string Name { get; } string Error { get; } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs index 1cc02b0f9..f44d4de57 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs @@ -19,14 +19,6 @@ namespace Squidex.Infrastructure.CQRS.Events Task ClearAsync(IEnumerable currentConsumerNames); - Task CreateAsync(string consumerName); - - Task StartAsync(string consumerName); - - Task StopAsync(string consumerName, string error = null); - - Task ResetAsync(string consumerName); - - Task SetPositionAsync(string consumerName, string position, bool reset); + Task SetAsync(string consumerName, string position, bool isStopped, string error = null); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs index 249b9927a..c48e2cb78 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace Squidex.Infrastructure.CQRS.Events @@ -16,6 +17,8 @@ namespace Squidex.Infrastructure.CQRS.Events { Task> GetEventsAsync(string streamName); + Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null); + Task AppendEventsAsync(Guid commitId, string streamName, ICollection events); Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events); diff --git a/src/Squidex.Infrastructure/CQRS/Events/PollingSubscription.cs b/src/Squidex.Infrastructure/CQRS/Events/PollingSubscription.cs new file mode 100644 index 000000000..e69234d47 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/PollingSubscription.cs @@ -0,0 +1,82 @@ +// ========================================================================== +// PollingSubscription.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Text.RegularExpressions; +using System.Threading.Tasks; +using Squidex.Infrastructure.Timers; + +namespace Squidex.Infrastructure.CQRS.Events +{ + public sealed class PollingSubscription : IEventSubscription + { + private readonly IEventNotifier eventNotifier; + private readonly IEventStore eventStore; + private readonly IEventSubscriber eventSubscriber; + private readonly IDisposable notification; + private readonly CompletionTimer timer; + private readonly Regex streamRegex; + private readonly string streamFilter; + private string position; + + public PollingSubscription( + IEventStore eventStore, + IEventNotifier eventNotifier, + IEventSubscriber eventSubscriber, + string streamFilter, + string position) + { + Guard.NotNull(eventStore, nameof(eventStore)); + Guard.NotNull(eventNotifier, nameof(eventNotifier)); + Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); + + this.position = position; + this.eventNotifier = eventNotifier; + this.eventStore = eventStore; + this.eventSubscriber = eventSubscriber; + this.streamFilter = streamFilter; + + streamRegex = new Regex(streamFilter); + + timer = new CompletionTimer(5000, async ct => + { + try + { + await eventStore.GetEventsAsync(async storedEvent => + { + await eventSubscriber.OnEventAsync(this, storedEvent); + + position = storedEvent.EventPosition; + }, ct, streamFilter, position); + } + catch (Exception ex) + { + if (!ex.Is()) + { + await eventSubscriber.OnErrorAsync(this, ex); + } + } + }); + + notification = eventNotifier.Subscribe(streamName => + { + if (streamRegex.IsMatch(streamName)) + { + timer.SkipCurrentDelay(); + } + }); + } + + public Task StopAsync() + { + notification?.Dispose(); + + return timer.StopAsync(); + } + } +} diff --git a/src/Squidex.Infrastructure/RetryWindow.cs b/src/Squidex.Infrastructure/RetryWindow.cs new file mode 100644 index 000000000..78b5c2d94 --- /dev/null +++ b/src/Squidex.Infrastructure/RetryWindow.cs @@ -0,0 +1,48 @@ +// ========================================================================== +// RetryWindow.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Generic; + +namespace Squidex.Infrastructure +{ + public sealed class RetryWindow + { + private readonly TimeSpan windowDuration; + private readonly int windowSize; + private readonly Queue retries = new Queue(); + + public RetryWindow(TimeSpan windowDuration, int windowSize) + { + this.windowDuration = windowDuration; + this.windowSize = windowSize + 1; + } + + public void Reset() + { + retries.Clear(); + } + + public bool CanRetryAfterFailure() + { + return CanRetryAfterFailure(DateTime.UtcNow); + } + + public bool CanRetryAfterFailure(DateTime utcNow) + { + retries.Enqueue(utcNow); + + while (retries.Count > windowSize) + { + retries.Dequeue(); + } + + return retries.Count < windowSize || (retries.Count > 0 && (utcNow - retries.Peek()) > windowDuration); + } + } +} diff --git a/tests/Squidex.Domain.Apps.Core.Tests/ContentEnrichmentTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/ContentEnrichmentTests.cs index c50f0bc42..fd51c4d45 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/ContentEnrichmentTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/ContentEnrichmentTests.cs @@ -60,7 +60,7 @@ namespace Squidex.Domain.Apps.Core Assert.Equal(Now, InstantPattern.General.Parse((string)data["my-datetime"]["iv"]).Value); - Assert.Equal(true, (bool)data["my-boolean"]["iv"]); + Assert.True((bool)data["my-boolean"]["iv"]); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/ContentValidationTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/ContentValidationTests.cs index 2129dcb0f..d5dca8567 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/ContentValidationTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/ContentValidationTests.cs @@ -154,7 +154,7 @@ namespace Squidex.Domain.Apps.Core await data.ValidateAsync(context, schema, optionalConfig.ToResolver(), errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -248,7 +248,7 @@ namespace Squidex.Domain.Apps.Core await data.ValidatePartialAsync(context, schema, languagesConfig.ToResolver(), errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -261,7 +261,7 @@ namespace Squidex.Domain.Apps.Core await data.ValidatePartialAsync(context, schema, languagesConfig.ToResolver(), errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Contents/ContentDataTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Contents/ContentDataTests.cs index 36837bff5..700c39757 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Contents/ContentDataTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Contents/ContentDataTests.cs @@ -14,6 +14,8 @@ using Squidex.Domain.Apps.Core.Schemas; using Squidex.Infrastructure; using Xunit; +#pragma warning disable xUnit2013 // Do not use equality check to check for collection size. + namespace Squidex.Domain.Apps.Core.Contents { public class ContentDataTests diff --git a/tests/Squidex.Domain.Apps.Core.Tests/InvariantPartitionTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/InvariantPartitionTests.cs index 86a12765d..34c66ef74 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/InvariantPartitionTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/InvariantPartitionTests.cs @@ -11,6 +11,8 @@ using System.Collections.Generic; using System.Linq; using Xunit; +#pragma warning disable xUnit2013 // Do not use equality check to check for collection size. + namespace Squidex.Domain.Apps.Core { public sealed class InvariantPartitionTests diff --git a/tests/Squidex.Domain.Apps.Core.Tests/LanguagesConfigTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/LanguagesConfigTests.cs index 926e22536..14e206b1e 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/LanguagesConfigTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/LanguagesConfigTests.cs @@ -224,7 +224,7 @@ namespace Squidex.Domain.Apps.Core { var config = LanguagesConfig.Create(); - Assert.Equal(0, config.Count); + Assert.Empty(config); Assert.NotNull(((IEnumerable)config).GetEnumerator()); Assert.NotNull(((IEnumerable)config).GetEnumerator()); diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/DateTimeFieldPropertiesTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/DateTimeFieldPropertiesTests.cs index e0c77b58f..d893e94ca 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/DateTimeFieldPropertiesTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/DateTimeFieldPropertiesTests.cs @@ -33,7 +33,7 @@ namespace Squidex.Domain.Apps.Core.Schemas sut.Validate(errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/NumberFieldPropertiesTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/NumberFieldPropertiesTests.cs index f6a1bb6df..3d4627f4b 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/NumberFieldPropertiesTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/NumberFieldPropertiesTests.cs @@ -33,7 +33,7 @@ namespace Squidex.Domain.Apps.Core.Schemas sut.Validate(errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/SchemaTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/SchemaTests.cs index 683d0aef3..52a09139e 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/SchemaTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/SchemaTests.cs @@ -234,7 +234,7 @@ namespace Squidex.Domain.Apps.Core.Schemas sut = sut.DeleteField(1); - Assert.Equal(0, sut.FieldsById.Count); + Assert.Empty(sut.FieldsById); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/AllowedValuesValidatorTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/AllowedValuesValidatorTests.cs index b2b80fd9d..fa23a2d72 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/AllowedValuesValidatorTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/AllowedValuesValidatorTests.cs @@ -24,7 +24,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(null, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -34,7 +34,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(100, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/PatternValidatorTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/PatternValidatorTests.cs index 7bc064baa..c4d35fca6 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/PatternValidatorTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/PatternValidatorTests.cs @@ -24,7 +24,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync("abc:12", errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -34,7 +34,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(null, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -44,7 +44,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(string.Empty, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RangeValidatorTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RangeValidatorTests.cs index d383fca22..b2b5b39f1 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RangeValidatorTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RangeValidatorTests.cs @@ -25,7 +25,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(null, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Theory] @@ -39,7 +39,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(1500, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Theory] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredStringValidatorTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredStringValidatorTests.cs index cb30d7d4f..cdd7fddce 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredStringValidatorTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredStringValidatorTests.cs @@ -28,7 +28,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(value, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -38,7 +38,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateOptionalAsync(string.Empty, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -48,7 +48,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(true, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredValidatorTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredValidatorTests.cs index baf07a9ef..119e65d47 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredValidatorTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/RequiredValidatorTests.cs @@ -24,7 +24,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(true, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -34,7 +34,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(string.Empty, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -44,7 +44,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateOptionalAsync(null, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/StringLengthValidatorTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/StringLengthValidatorTests.cs index eac43ae62..e4e114222 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/StringLengthValidatorTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Schemas/Validators/StringLengthValidatorTests.cs @@ -26,7 +26,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(null, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Fact] @@ -36,7 +36,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(string.Empty, errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Theory] @@ -50,7 +50,7 @@ namespace Squidex.Domain.Apps.Core.Schemas.Validators await sut.ValidateAsync(CreateString(1500), errors); - Assert.Equal(0, errors.Count); + Assert.Empty(errors); } [Theory] diff --git a/tests/Squidex.Domain.Apps.Core.Tests/Scripting/JintUserTests.cs b/tests/Squidex.Domain.Apps.Core.Tests/Scripting/JintUserTests.cs index e5be0d7db..af47df5f7 100644 --- a/tests/Squidex.Domain.Apps.Core.Tests/Scripting/JintUserTests.cs +++ b/tests/Squidex.Domain.Apps.Core.Tests/Scripting/JintUserTests.cs @@ -11,6 +11,8 @@ using Jint; using Squidex.Infrastructure.Security; using Xunit; +#pragma warning disable xUnit2004 // Do not use equality check to test for boolean conditions + namespace Squidex.Domain.Apps.Core.Scripting { public class JintUserTests diff --git a/tests/Squidex.Infrastructure.Tests/Assets/AzureBlobAssetStoreTests.cs b/tests/Squidex.Infrastructure.Tests/Assets/AzureBlobAssetStoreTests.cs index eabd6bb77..dcab0c97b 100644 --- a/tests/Squidex.Infrastructure.Tests/Assets/AzureBlobAssetStoreTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Assets/AzureBlobAssetStoreTests.cs @@ -22,7 +22,7 @@ namespace Squidex.Infrastructure.Assets { } - [Fact] + // [Fact] public void Should_calculate_source_url() { Sut.Connect(); diff --git a/tests/Squidex.Infrastructure.Tests/Assets/GoogleCloudAssetStoreTests.cs b/tests/Squidex.Infrastructure.Tests/Assets/GoogleCloudAssetStoreTests.cs index e38da3b4d..2318a2208 100644 --- a/tests/Squidex.Infrastructure.Tests/Assets/GoogleCloudAssetStoreTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Assets/GoogleCloudAssetStoreTests.cs @@ -22,7 +22,7 @@ namespace Squidex.Infrastructure.Assets { } - [Fact] + // [Fact] public void Should_calculate_source_url() { Sut.Connect(); diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Commands/AggregateHandlerTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Commands/AggregateHandlerTests.cs index c2cf61f7b..fd388e909 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Commands/AggregateHandlerTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Commands/AggregateHandlerTests.cs @@ -87,7 +87,7 @@ namespace Squidex.Infrastructure.CQRS.Commands await sut.CreateAsync(context, async x => { - await Task.Delay(1); + await Task.Yield(); passedDomainObject = x; }); @@ -139,7 +139,7 @@ namespace Squidex.Infrastructure.CQRS.Commands await sut.UpdateAsync(context, async x => { - await Task.Delay(1); + await Task.Yield(); passedDomainObject = x; }); diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs index b53121677..dd9b71c10 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs @@ -26,8 +26,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { public bool IsStopped { get; set; } - public bool IsResetting { get; set; } - public string Name { get; set; } public string Error { get; set; } @@ -61,73 +59,104 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope); - sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log); + sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log) { ReconnectWaitMs = 0 }; sutActor = sut; sutSubscriber = sut; } [Fact] - public async Task Should_subscribe_to_event_store_when_started() + public async Task Should_not_not_subscribe_to_event_store_when_stopped_in_db() { - await SubscribeAsync(); + consumerInfo.IsStopped = true; + + await OnSubscribeAsync(); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName)) - .MustHaveHappened(); + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustNotHaveHappened(); + } - A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName)) - .MustHaveHappened(); + [Fact] + public async Task Should_subscribe_to_event_store_when_not_found_in_db() + { + A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(Task.FromResult(null)); + + await OnSubscribeAsync(); + + sut.Dispose(); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_subscribe_to_event_store_when_not_stopped_in_db() + { + await OnSubscribeAsync(); + + sut.Dispose(); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_stop_subscription_when_stopped() { - await SubscribeAsync(); + await OnSubscribeAsync(); + sutActor.Tell(new StopConsumerMessage()); sutActor.Tell(new StopConsumerMessage()); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName)) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName)) - .MustHaveHappened(); - - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, null)) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null)) + .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(); + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_reset_consumer_when_resetting() { - await SubscribeAsync(); + await OnSubscribeAsync(); + sutActor.Tell(new StopConsumerMessage()); sutActor.Tell(new ResetConsumerMessage()); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.CreateAsync(consumerName)) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName)) - .MustHaveHappened(Repeated.Exactly.Twice); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null)) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, null, true)) - .MustHaveHappened(); - - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, null)) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventConsumer.ClearAsync()) - .MustHaveHappened(); + .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(); + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, consumerInfo.Position)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, null)) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] @@ -135,17 +164,19 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - - await sutSubscriber.OnEventAsync(eventSubscription, @event); + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); sut.Dispose(); - A.CallTo(() => eventConsumer.On(envelope)) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] @@ -153,122 +184,180 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - - await sutSubscriber.OnEventAsync(A.Fake(), @event); + await OnSubscribeAsync(); + await OnEventAsync(A.Fake(), @event); sut.Dispose(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) + .MustNotHaveHappened(); + A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_reopen_subscription_when_exception_is_retrieved() + { + var ex = new InvalidOperationException(); + + await OnSubscribeAsync(); + await OnErrorAsync(eventSubscription, ex); + + await Task.Delay(200); + + await sut.WaitForCompletionAsync(); + + sut.Dispose(); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Times(3)); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString())) + .MustNotHaveHappened(); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Twice); + } + + [Fact] + public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription() + { + var ex = new InvalidOperationException(); + + await OnSubscribeAsync(); + await OnErrorAsync(A.Fake(), ex); - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) + sut.Dispose(); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString())) .MustNotHaveHappened(); } [Fact] public async Task Should_stop_if_resetting_failed() { - var exception = new InvalidOperationException("Exception"); + var ex = new InvalidOperationException(); A.CallTo(() => eventConsumer.ClearAsync()) - .Throws(exception); + .Throws(ex); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); + await OnSubscribeAsync(); sutActor.Tell(new ResetConsumerMessage()); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString())) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_stop_if_handling_failed() { - var exception = new InvalidOperationException("Exception"); + var ex = new InvalidOperationException(); A.CallTo(() => eventConsumer.On(envelope)) - .Throws(exception); + .Throws(ex); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - await sutSubscriber.OnEventAsync(eventSubscription, @event); + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(); - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) - .MustNotHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString())) - .MustHaveHappened(); + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] - public async Task Should_start_after_stop_when_handling_failed() + public async Task Should_stop_if_deserialization_failed() { - var exception = new InvalidOperationException("Exception"); + var ex = new InvalidOperationException(); - A.CallTo(() => eventConsumer.On(envelope)) - .Throws(exception); + A.CallTo(() => formatter.Parse(eventData, true)) + .Throws(ex); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - await sutSubscriber.OnEventAsync(eventSubscription, @event); + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); - sutActor.Tell(new StartConsumerMessage()); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) - .MustHaveHappened(); - - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) .MustNotHaveHappened(); - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString())) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName)) - .MustHaveHappened(Repeated.Exactly.Twice); + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] - public async Task Should_stop_if_deserialization_failed() + public async Task Should_start_after_stop_when_handling_failed() { - var exception = new InvalidOperationException("Exception"); + var exception = new InvalidOperationException(); - A.CallTo(() => formatter.Parse(eventData, true)) + A.CallTo(() => eventConsumer.On(envelope)) .Throws(exception); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - await sutSubscriber.OnEventAsync(eventSubscription, @event); + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); + sutActor.Tell(new StartConsumerMessage()); + sutActor.Tell(new StartConsumerMessage()); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) - .MustNotHaveHappened(); + .MustHaveHappened(); - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) - .MustNotHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, exception.ToString())) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString())) - .MustHaveHappened(); + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Twice); } - private async Task SubscribeAsync() + private Task OnErrorAsync(IEventSubscription subscriber, Exception ex) { - await sut.SubscribeAsync(eventConsumer); + return sutSubscriber.OnErrorAsync(subscriber, ex); + } - await Task.Delay(200); + private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev) + { + return sutSubscriber.OnEventAsync(subscriber, ev); + } + + private Task OnSubscribeAsync() + { + return sut.SubscribeAsync(eventConsumer); } } } \ No newline at end of file diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/PollingSubscriptionTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/PollingSubscriptionTests.cs new file mode 100644 index 000000000..5056ccd0f --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/PollingSubscriptionTests.cs @@ -0,0 +1,116 @@ +// ========================================================================== +// PollingSubscriptionTests.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading; +using System.Threading.Tasks; +using FakeItEasy; +using Xunit; + +namespace Squidex.Infrastructure.CQRS.Events +{ + public class PollingSubscriptionTests + { + private readonly IEventStore eventStore = A.Fake(); + private readonly IEventNotifier eventNotifier = new DefaultEventNotifier(new InMemoryPubSub()); + private readonly IEventSubscriber eventSubscriber = A.Fake(); + private readonly string position = Guid.NewGuid().ToString(); + + [Fact] + public async Task Should_subscribe_on_start() + { + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); + + await WaitAndStopAsync(sut); + + A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_propagate_exception_to_subscriber() + { + var ex = new InvalidOperationException(); + + A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) + .Throws(ex); + + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); + + await WaitAndStopAsync(sut); + + A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex)) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_propagate_operation_cancelled_exception_to_subscriber() + { + var ex = new OperationCanceledException(); + + A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) + .Throws(ex); + + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); + + await WaitAndStopAsync(sut); + + A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex)) + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_propagate_aggregate_operation_cancelled_exception_to_subscriber() + { + var ex = new AggregateException(new OperationCanceledException()); + + A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) + .Throws(ex); + + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); + + await WaitAndStopAsync(sut); + + A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex)) + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_not_subscribe_on_notify_when_stream_matches() + { + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); + + eventNotifier.NotifyEventsStored("other-stream-123"); + + await WaitAndStopAsync(sut); + + A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_subscribe_on_notify_when_stream_matches() + { + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); + + eventNotifier.NotifyEventsStored("my-stream-123"); + + await WaitAndStopAsync(sut); + + A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) + .MustHaveHappened(Repeated.Exactly.Twice); + } + + private async Task WaitAndStopAsync(PollingSubscription sut) + { + await Task.Delay(200); + + await sut.StopAsync(); + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/GravatarHelperTests.cs b/tests/Squidex.Infrastructure.Tests/GravatarHelperTests.cs index 09865302a..d31c115bc 100644 --- a/tests/Squidex.Infrastructure.Tests/GravatarHelperTests.cs +++ b/tests/Squidex.Infrastructure.Tests/GravatarHelperTests.cs @@ -20,7 +20,7 @@ namespace Squidex.Infrastructure { var url = GravatarHelper.CreatePictureUrl(email); - Assert.Equal(url, "https://www.gravatar.com/avatar/0bc83cb571cd1c50ba6f3e8a78ef1346"); + Assert.Equal("https://www.gravatar.com/avatar/0bc83cb571cd1c50ba6f3e8a78ef1346", url); } [Theory] @@ -31,7 +31,7 @@ namespace Squidex.Infrastructure { var url = GravatarHelper.CreateProfileUrl(email); - Assert.Equal(url, "https://www.gravatar.com/0bc83cb571cd1c50ba6f3e8a78ef1346"); + Assert.Equal("https://www.gravatar.com/0bc83cb571cd1c50ba6f3e8a78ef1346", url); } } } \ No newline at end of file diff --git a/tests/Squidex.Infrastructure.Tests/Log/SemanticLogTests.cs b/tests/Squidex.Infrastructure.Tests/Log/SemanticLogTests.cs index 2f7ae0128..b91e7deef 100644 --- a/tests/Squidex.Infrastructure.Tests/Log/SemanticLogTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Log/SemanticLogTests.cs @@ -240,7 +240,7 @@ namespace Squidex.Infrastructure.Log .WriteProperty("message", "My Message") .WriteProperty("elapsedMs", 0)); - Assert.True(output.StartsWith(expected.Substring(0, 55), StringComparison.Ordinal)); + Assert.StartsWith(expected.Substring(0, 55), output, StringComparison.Ordinal); } [Fact] @@ -254,7 +254,7 @@ namespace Squidex.Infrastructure.Log .WriteProperty("message", "My Message") .WriteProperty("elapsedMs", 0)); - Assert.True(output.StartsWith(expected.Substring(0, 55), StringComparison.Ordinal)); + Assert.StartsWith(expected.Substring(0, 55), output, StringComparison.Ordinal); } [Fact] @@ -268,7 +268,7 @@ namespace Squidex.Infrastructure.Log .WriteProperty("message", "My Message") .WriteProperty("elapsedMs", 0)); - Assert.True(output.StartsWith(expected.Substring(0, 55), StringComparison.Ordinal)); + Assert.StartsWith(expected.Substring(0, 55), output, StringComparison.Ordinal); } [Fact] diff --git a/tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs b/tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs index cadc4bd54..47b5e7029 100644 --- a/tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Reflection/SimpleMapperTests.cs @@ -107,9 +107,11 @@ namespace Squidex.Infrastructure.Reflection Assert.Equal(class1.MappedString, class2.MappedString); Assert.Equal(class1.MappedNumber, class2.MappedNumber); Assert.Equal(class1.MappedGuid.ToString(), class2.MappedGuid); - Assert.Equal(class1.WrongType1, 0L); - Assert.Equal(class1.WrongType2, 0L); + Assert.NotEqual(class1.UnmappedString, class2.UnmappedString); + + Assert.Equal(0L, class1.WrongType1); + Assert.Equal(0L, class1.WrongType2); } } } diff --git a/tests/Squidex.Infrastructure.Tests/RetryWindowTests.cs b/tests/Squidex.Infrastructure.Tests/RetryWindowTests.cs new file mode 100644 index 000000000..c3361f71e --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/RetryWindowTests.cs @@ -0,0 +1,90 @@ +// ========================================================================== +// RetryWindowTests.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using Xunit; + +namespace Squidex.Infrastructure +{ + public class RetryWindowTests + { + private const int WindowSize = 5; + + [Fact] + public void Should_allow_to_retry_after_reset() + { + var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize); + + for (var i = 0; i < WindowSize * 2; i++) + { + sut.CanRetryAfterFailure(); + } + + sut.Reset(); + + Assert.True(sut.CanRetryAfterFailure()); + } + + [Theory] + [InlineData(6)] + [InlineData(7)] + public void Should_not_allow_to_retry_after_many_errors(int errors) + { + var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize); + var now = DateTime.UtcNow; + + for (var i = 0; i < WindowSize; i++) + { + Assert.True(sut.CanRetryAfterFailure(now)); + } + + var remaining = errors - WindowSize; + + for (var i = 0; i < remaining; i++) + { + Assert.False(sut.CanRetryAfterFailure(now)); + } + } + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + [InlineData(4)] + public void Should_allow_to_retry_after_few_errors(int errors) + { + var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize); + var now = DateTime.UtcNow; + + for (var i = 0; i < errors; i++) + { + Assert.True(sut.CanRetryAfterFailure(now)); + } + } + + [Theory] + [InlineData(1)] + [InlineData(2)] + [InlineData(3)] + [InlineData(4)] + [InlineData(5)] + [InlineData(6)] + [InlineData(7)] + [InlineData(8)] + public void Should_allow_to_retry_after_few_errors_in_window(int errors) + { + var sut = new RetryWindow(TimeSpan.FromSeconds(1), WindowSize); + var now = DateTime.UtcNow; + + for (var i = 0; i < errors; i++) + { + Assert.True(sut.CanRetryAfterFailure(now.AddMilliseconds(i * 300))); + } + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/Timers/CompletionTimerTests.cs b/tests/Squidex.Infrastructure.Tests/Timers/CompletionTimerTests.cs index 8893b1b44..c98d8feef 100644 --- a/tests/Squidex.Infrastructure.Tests/Timers/CompletionTimerTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Timers/CompletionTimerTests.cs @@ -31,22 +31,5 @@ namespace Squidex.Infrastructure.Timers Assert.True(called); } - - public void Should_invoke_dispose_within_timer() - { - CompletionTimer timer = null; - - timer = new CompletionTimer(10, ct => - { - timer?.StopAsync().Wait(); - - return TaskHelper.Done; - }, 10); - - Thread.Sleep(1000); - - timer.SkipCurrentDelay(); - timer.StopAsync().Wait(); - } } } diff --git a/tests/Squidex.Infrastructure.Tests/UsageTracking/BackgroundUsageTrackerTests.cs b/tests/Squidex.Infrastructure.Tests/UsageTracking/BackgroundUsageTrackerTests.cs index 1e2bef9d5..6259fd841 100644 --- a/tests/Squidex.Infrastructure.Tests/UsageTracking/BackgroundUsageTrackerTests.cs +++ b/tests/Squidex.Infrastructure.Tests/UsageTracking/BackgroundUsageTrackerTests.cs @@ -112,8 +112,7 @@ namespace Squidex.Infrastructure.UsageTracking await sut.TrackAsync("key1", 0, 1000); sut.Next(); - - await Task.Delay(100); + sut.Dispose(); A.CallTo(() => usageStore.TrackUsagesAsync(A.Ignored, A.Ignored, A.Ignored, A.Ignored)).MustNotHaveHappened(); }