diff --git a/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs b/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs deleted file mode 100644 index eed563b25..000000000 --- a/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/IRuleDequeuerGrain.cs +++ /dev/null @@ -1,21 +0,0 @@ -// ========================================================================== -// IRuleDequeuerGrain.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Threading.Tasks; -using Orleans; -using Orleans.Concurrency; - -namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains -{ - public interface IRuleDequeuerGrain : IGrainWithStringKey - { - Task ActivateAsync(); - - Task HandleAsync(Immutable @event); - } -} diff --git a/src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs b/src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs deleted file mode 100644 index 636e8acf9..000000000 --- a/src/Squidex.Domain.Apps.Read/Rules/Orleans/RuleDequeuerBootstrap.cs +++ /dev/null @@ -1,32 +0,0 @@ -// ========================================================================== -// RuleDequeuerBootstrap.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Threading.Tasks; -using Orleans.Providers; -using Squidex.Domain.Apps.Read.Rules.Orleans.Grains; -using Squidex.Infrastructure.Tasks; - -namespace Squidex.Domain.Apps.Read.Rules.Orleans -{ - public sealed class RuleDequeuerBootstrap : IBootstrapProvider - { - public string Name { get; private set; } - - public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config) - { - var grain = providerRuntime.GrainFactory.GetGrain("Default"); - - return grain.ActivateAsync(); - } - - public Task Close() - { - return TaskHelper.Done; - } - } -} diff --git a/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/Implementation/RuleDequeuerGrain.cs b/src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs similarity index 57% rename from src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/Implementation/RuleDequeuerGrain.cs rename to src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs index 2880b1c42..b738dfeef 100644 --- a/src/Squidex.Domain.Apps.Read/Rules/Orleans/Grains/Implementation/RuleDequeuerGrain.cs +++ b/src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs @@ -1,5 +1,5 @@ // ========================================================================== -// RuleDequeuerGrain.cs +// RuleDequeuer.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -7,41 +7,31 @@ // ========================================================================== using System; -using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using NodaTime; -using Orleans; -using Orleans.Concurrency; -using Orleans.Core; -using Orleans.Runtime; using Squidex.Domain.Apps.Core.HandleRules; using Squidex.Domain.Apps.Core.Rules; using Squidex.Domain.Apps.Read.Rules.Repositories; using Squidex.Infrastructure; using Squidex.Infrastructure.Log; -using Squidex.Infrastructure.Tasks; +using Squidex.Infrastructure.Timers; -namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation +namespace Squidex.Domain.Apps.Read.Rules { - [Reentrant] - public class RuleDequeuerGrain : Grain, IRuleDequeuerGrain, IRemindable + public sealed class RuleDequeuer : DisposableObjectBase, IExternalSystem { + private readonly ActionBlock requestBlock; private readonly IRuleEventRepository ruleEventRepository; private readonly RuleService ruleService; + private readonly CompletionTimer timer; + private readonly ConcurrentDictionary executing = new ConcurrentDictionary(); private readonly IClock clock; private readonly ISemanticLog log; - private readonly HashSet executing = new HashSet(); - private TaskFactory scheduler; - public RuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock) - : this(ruleService, ruleEventRepository, log, clock, null, null) - { - } - - protected RuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock, - IGrainIdentity identity, - IGrainRuntime runtime) - : base(identity, runtime) + public RuleDequeuer(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock) { Guard.NotNull(ruleEventRepository, nameof(ruleEventRepository)); Guard.NotNull(ruleService, nameof(ruleService)); @@ -54,42 +44,41 @@ namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation this.clock = clock; this.log = log; - } - public override Task OnActivateAsync() - { - scheduler = new TaskFactory(TaskScheduler.Current ?? TaskScheduler.Default); + requestBlock = + new ActionBlock(HandleAsync, + new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 32, BoundedCapacity = 32 }); - DelayDeactivation(TimeSpan.FromDays(1)); + timer = new CompletionTimer(5000, QueryAsync); + } - RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10)); - RegisterTimer(x => QueryAsync(), null, TimeSpan.Zero, TimeSpan.FromSeconds(2)); + protected override void DisposeObject(bool disposing) + { + if (disposing) + { + timer.StopAsync().Wait(); - return base.OnActivateAsync(); + requestBlock.Complete(); + requestBlock.Completion.Wait(); + } } - public Task ReceiveReminder(string reminderName, TickStatus status) + public void Connect() { - return TaskHelper.Done; } - public Task ActivateAsync() + public void Next() { - return TaskHelper.Done; + timer.SkipCurrentDelay(); } - public async Task QueryAsync() + private async Task QueryAsync(CancellationToken cancellationToken) { try { - var self = GetSelf(); - - await ruleEventRepository.QueryPendingAsync(clock.GetCurrentInstant(), x => - { - scheduler.StartNew(() => self.HandleAsync(x.AsImmutable()).Forget()).Forget(); + var now = clock.GetCurrentInstant(); - return TaskHelper.Done; - }); + await ruleEventRepository.QueryPendingAsync(now, requestBlock.SendAsync, cancellationToken); } catch (Exception ex) { @@ -99,23 +88,23 @@ namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation } } - public async Task HandleAsync(Immutable @event) + public async Task HandleAsync(IRuleEventEntity @event) { - if (!executing.Add(@event.Value.Id)) + if (!executing.TryAdd(@event.Id, false)) { return; } try { - var job = @event.Value.Job; + var job = @event.Job; var response = await ruleService.InvokeAsync(job.ActionName, job.ActionData); var jobInvoke = ComputeJobInvoke(response.Result, @event, job); var jobResult = ComputeJobResult(response.Result, jobInvoke); - await ruleEventRepository.MarkSentAsync(@event.Value.Id, response.Dump, response.Result, jobResult, response.Elapsed, jobInvoke); + await ruleEventRepository.MarkSentAsync(@event.Id, response.Dump, response.Result, jobResult, response.Elapsed, jobInvoke); } catch (Exception ex) { @@ -125,7 +114,7 @@ namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation } finally { - executing.Remove(@event.Value.Id); + executing.TryRemove(@event.Id, out var value); } } @@ -145,11 +134,11 @@ namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation } } - private static Instant? ComputeJobInvoke(RuleResult result, Immutable @event, RuleJob job) + private static Instant? ComputeJobInvoke(RuleResult result, IRuleEventEntity @event, RuleJob job) { if (result != RuleResult.Success) { - switch (@event.Value.NumCalls) + switch (@event.NumCalls) { case 0: return job.Created.Plus(Duration.FromMinutes(5)); @@ -164,10 +153,5 @@ namespace Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation return null; } - - protected virtual IRuleDequeuerGrain GetSelf() - { - return this.AsReference(); - } } } diff --git a/src/Squidex.Domain.Apps.Read/Squidex.Domain.Apps.Read.csproj b/src/Squidex.Domain.Apps.Read/Squidex.Domain.Apps.Read.csproj index 520176ee4..58922eba3 100644 --- a/src/Squidex.Domain.Apps.Read/Squidex.Domain.Apps.Read.csproj +++ b/src/Squidex.Domain.Apps.Read/Squidex.Domain.Apps.Read.csproj @@ -15,9 +15,6 @@ - - - diff --git a/src/Squidex.Domain.Apps.Read/State/AppProvider.cs b/src/Squidex.Domain.Apps.Read/State/AppProvider.cs new file mode 100644 index 000000000..935d0708e --- /dev/null +++ b/src/Squidex.Domain.Apps.Read/State/AppProvider.cs @@ -0,0 +1,89 @@ +// ========================================================================== +// AppProvider.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Squidex.Domain.Apps.Read.Apps; +using Squidex.Domain.Apps.Read.Rules; +using Squidex.Domain.Apps.Read.Schemas; +using Squidex.Domain.Apps.Read.State.Orleans.Grains; +using Squidex.Infrastructure; +using Squidex.Infrastructure.States; + +namespace Squidex.Domain.Apps.Read.State.Orleans +{ + public sealed class AppProvider : IAppProvider + { + private readonly IStateFactory factory; + + public AppProvider(IStateFactory factory) + { + Guard.NotNull(factory, nameof(factory)); + + this.factory = factory; + } + + public async Task GetAppAsync(string appName) + { + var app = await factory.GetAsync(appName); + + return await app.GetAppAsync(); + } + + public async Task<(IAppEntity, ISchemaEntity)> GetAppWithSchemaAsync(string appName, Guid id) + { + var app = await factory.GetAsync(appName); + + return await app.GetAppWithSchemaAsync(id); + } + + public async Task> GetRulesAsync(string appName) + { + var app = await factory.GetAsync(appName); + + return await app.GetRulesAsync(); + } + + public async Task GetSchemaAsync(string appName, Guid id, bool provideDeleted = false) + { + var app = await factory.GetAsync(appName); + + return await app.GetSchemaAsync(id, provideDeleted); + } + + public async Task GetSchemaAsync(string appName, string name, bool provideDeleted = false) + { + var app = await factory.GetAsync(appName); + + return await app.GetSchemaAsync(name, provideDeleted); + } + + public async Task> GetSchemasAsync(string appName) + { + var app = await factory.GetAsync(appName); + + return await app.GetSchemasAsync(); + } + + public async Task> GetUserApps(string userId) + { + var appUser = await factory.GetAsync(userId); + var appNames = await appUser.GetAppNamesAsync(); + + var tasks = + appNames + .Select(x => GetAppAsync(x)); + + var apps = await Task.WhenAll(tasks); + + return apps.Where(a => a != null).ToList(); + } + } +} diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/AppStateEventConsumer.cs b/src/Squidex.Domain.Apps.Read/State/AppStateEventConsumer.cs similarity index 75% rename from src/Squidex.Domain.Apps.Read/State/Orleans/AppStateEventConsumer.cs rename to src/Squidex.Domain.Apps.Read/State/AppStateEventConsumer.cs index 84b485b1c..48e4c1678 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/AppStateEventConsumer.cs +++ b/src/Squidex.Domain.Apps.Read/State/AppStateEventConsumer.cs @@ -7,20 +7,19 @@ // ========================================================================== using System.Threading.Tasks; -using Orleans; using Squidex.Domain.Apps.Events; using Squidex.Domain.Apps.Events.Apps; using Squidex.Domain.Apps.Read.State.Orleans.Grains; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.Json.Orleans; +using Squidex.Infrastructure.States; using Squidex.Infrastructure.Tasks; namespace Squidex.Domain.Apps.Read.State.Orleans { public sealed class AppStateEventConsumer : IEventConsumer { - private readonly IGrainFactory factory; + private readonly IStateFactory factory; public string Name { @@ -32,7 +31,7 @@ namespace Squidex.Domain.Apps.Read.State.Orleans get { return @"(^app-)|(^schema-)|(^rule\-)"; } } - public AppStateEventConsumer(IGrainFactory factory) + public AppStateEventConsumer(IStateFactory factory) { Guard.NotNull(factory, nameof(factory)); @@ -48,21 +47,21 @@ namespace Squidex.Domain.Apps.Read.State.Orleans { if (@event.Payload is AppEvent appEvent) { - var appGrain = factory.GetGrain(appEvent.AppId.Name); + var appGrain = await factory.GetAsync(appEvent.AppId.Name); - await appGrain.HandleAsync(@event.AsJ()); + await appGrain.HandleAsync(@event); } if (@event.Payload is AppContributorAssigned contributorAssigned) { - var userGrain = factory.GetGrain(contributorAssigned.ContributorId); + var userGrain = await factory.GetAsync(contributorAssigned.ContributorId); await userGrain.AddAppAsync(contributorAssigned.AppId.Name); } if (@event.Payload is AppContributorRemoved contributorRemoved) { - var userGrain = factory.GetGrain(contributorRemoved.ContributorId); + var userGrain = await factory.GetAsync(contributorRemoved.ContributorId); await userGrain.RemoveAppAsync(contributorRemoved.AppId.Name); } diff --git a/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrain.cs b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrain.cs new file mode 100644 index 000000000..64bd1b54f --- /dev/null +++ b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrain.cs @@ -0,0 +1,135 @@ +// ========================================================================== +// AppStateGrain.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Squidex.Domain.Apps.Core.Schemas; +using Squidex.Domain.Apps.Events.Apps; +using Squidex.Domain.Apps.Read.Apps; +using Squidex.Domain.Apps.Read.Rules; +using Squidex.Domain.Apps.Read.Schemas; +using Squidex.Infrastructure; +using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.States; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains +{ + public class AppStateGrain : StatefulObject + { + private readonly FieldRegistry fieldRegistry; + private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(); + private Exception exception; + + public AppStateGrain(FieldRegistry fieldRegistry) + { + Guard.NotNull(fieldRegistry, nameof(fieldRegistry)); + + this.fieldRegistry = fieldRegistry; + } + + public override async Task ReadStateAsync() + { + try + { + await base.ReadStateAsync(); + } + catch (Exception ex) + { + exception = ex; + + State = new AppStateGrainState(); + } + + State.SetRegistry(fieldRegistry); + } + + public virtual Task<(IAppEntity, ISchemaEntity)> GetAppWithSchemaAsync(Guid id) + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + var schema = State.FindSchema(x => x.Id == id && !x.IsDeleted); + + return Task.FromResult((State.GetApp(), schema)); + }); + } + + public virtual Task GetAppAsync() + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + var value = State.GetApp(); + + return Task.FromResult(value); + }); + } + + public virtual Task> GetRulesAsync() + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + var value = State.FindRules(); + + return Task.FromResult(value); + }); + } + + public virtual Task> GetSchemasAsync() + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + var value = State.FindSchemas(x => !x.IsDeleted); + + return Task.FromResult(value); + }); + } + + public virtual Task GetSchemaAsync(Guid id, bool provideDeleted = false) + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + var value = State.FindSchema(x => x.Id == id && (!x.IsDeleted || provideDeleted)); + + return Task.FromResult(value); + }); + } + + public virtual Task GetSchemaAsync(string name, bool provideDeleted = false) + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + var value = State.FindSchema(x => string.Equals(x.Name, name, StringComparison.OrdinalIgnoreCase) && (!x.IsDeleted || provideDeleted)); + + return Task.FromResult(value); + }); + } + + public virtual Task HandleAsync(Envelope message) + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + if (exception != null) + { + if (message.Payload is AppCreated) + { + exception = null; + } + else + { + throw exception; + } + } + + State.Apply(message); + + return WriteStateAsync(); + }); + } + } +} diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState.cs b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState.cs similarity index 88% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState.cs index cc2ddf84d..3d777f420 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState.cs @@ -17,7 +17,7 @@ using Squidex.Domain.Apps.Read.Schemas; using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.Dispatching; -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { public sealed partial class AppStateGrainState { @@ -44,17 +44,17 @@ namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations public ISchemaEntity FindSchema(Func filter) { - return Schemas.Values?.FirstOrDefault(filter); + return Schemas?.Values.FirstOrDefault(filter); } - public List FindRules() + public List FindSchemas(Func filter) { - return Rules.Values?.OfType().ToList() ?? new List(); + return Schemas?.Values.Where(filter).OfType().ToList() ?? new List(); } - public List FindSchemas(Func filter) + public List FindRules() { - return Schemas.Values?.Where(filter).OfType().ToList() ?? new List(); + return Rules?.Values.OfType().ToList() ?? new List(); } public void Reset() diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Apps.cs b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Apps.cs similarity index 97% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Apps.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Apps.cs index 7576c904d..701f85cb6 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Apps.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Apps.cs @@ -15,7 +15,7 @@ using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.Reflection; -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { public sealed partial class AppStateGrainState { diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Rules.cs b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Rules.cs similarity index 96% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Rules.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Rules.cs index 828676281..53f0e9a82 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Rules.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Rules.cs @@ -11,7 +11,7 @@ using Squidex.Domain.Apps.Events.Rules; using Squidex.Domain.Apps.Events.Rules.Utils; using Squidex.Infrastructure.CQRS.Events; -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { public sealed partial class AppStateGrainState { diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Schemas.cs b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Schemas.cs similarity index 98% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Schemas.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Schemas.cs index 1cc9028c8..dfb772b9d 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrainState_Schemas.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrainState_Schemas.cs @@ -16,7 +16,7 @@ using Squidex.Infrastructure.Reflection; #pragma warning disable CS0612 // Type or member is obsolete -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { public sealed partial class AppStateGrainState { diff --git a/src/Squidex.Domain.Apps.Read/State/Grains/AppUserGrain.cs b/src/Squidex.Domain.Apps.Read/State/Grains/AppUserGrain.cs new file mode 100644 index 000000000..9748c65e9 --- /dev/null +++ b/src/Squidex.Domain.Apps.Read/State/Grains/AppUserGrain.cs @@ -0,0 +1,49 @@ +// ========================================================================== +// AppUserGrain.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Squidex.Infrastructure.States; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains +{ + public sealed class AppUserGrain : StatefulObject + { + private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(); + + public Task AddAppAsync(string appName) + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + State.AppNames.Add(appName); + + return WriteStateAsync(); + }); + } + + public Task RemoveAppAsync(string appName) + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + State.AppNames.Remove(appName); + + return WriteStateAsync(); + }); + } + + public Task> GetAppNamesAsync() + { + return dispatcher.DispatchAndUnwrapAsync(() => + { + return Task.FromResult(State.AppNames.ToList()); + }); + } + } +} diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppUserGrainState.cs b/src/Squidex.Domain.Apps.Read/State/Grains/AppUserGrainState.cs similarity index 88% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppUserGrainState.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/AppUserGrainState.cs index 53984019b..216b6c51d 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppUserGrainState.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/AppUserGrainState.cs @@ -9,7 +9,7 @@ using System.Collections.Generic; using Newtonsoft.Json; -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { public sealed class AppUserGrainState { diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonAppEntity.cs b/src/Squidex.Domain.Apps.Read/State/Grains/JsonAppEntity.cs similarity index 90% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonAppEntity.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/JsonAppEntity.cs index b8c3bf33e..306a9a05e 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonAppEntity.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/JsonAppEntity.cs @@ -7,13 +7,11 @@ // ========================================================================== using Newtonsoft.Json; -using Orleans.Concurrency; using Squidex.Domain.Apps.Core.Apps; using Squidex.Domain.Apps.Read.Apps; -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { - [Immutable] public sealed class JsonAppEntity : JsonEntity, IAppEntity { [JsonProperty] diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonEntity.cs b/src/Squidex.Domain.Apps.Read/State/Grains/JsonEntity.cs similarity index 88% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonEntity.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/JsonEntity.cs index 6da734b18..0cf3ba63b 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonEntity.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/JsonEntity.cs @@ -9,12 +9,10 @@ using System; using Newtonsoft.Json; using NodaTime; -using Orleans.Concurrency; using Squidex.Infrastructure; -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { - [Immutable] public abstract class JsonEntity : Cloneable, IUpdateableEntityWithVersion where T : Cloneable { [JsonProperty] diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonRuleEntity.cs b/src/Squidex.Domain.Apps.Read/State/Grains/JsonRuleEntity.cs similarity index 89% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonRuleEntity.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/JsonRuleEntity.cs index 6916db43e..3ddf988c4 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonRuleEntity.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/JsonRuleEntity.cs @@ -8,14 +8,12 @@ using System; using Newtonsoft.Json; -using Orleans.Concurrency; using Squidex.Domain.Apps.Core.Rules; using Squidex.Domain.Apps.Read.Rules; using Squidex.Infrastructure; -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { - [Immutable] public sealed class JsonRuleEntity : JsonEntity, IRuleEntity, diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonSchemaEntity.cs b/src/Squidex.Domain.Apps.Read/State/Grains/JsonSchemaEntity.cs similarity index 93% rename from src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonSchemaEntity.cs rename to src/Squidex.Domain.Apps.Read/State/Grains/JsonSchemaEntity.cs index 026ed1bd7..346a59054 100644 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/JsonSchemaEntity.cs +++ b/src/Squidex.Domain.Apps.Read/State/Grains/JsonSchemaEntity.cs @@ -8,14 +8,12 @@ using System; using Newtonsoft.Json; -using Orleans.Concurrency; using Squidex.Domain.Apps.Core.Schemas; using Squidex.Domain.Apps.Read.Schemas; using Squidex.Infrastructure; -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations +namespace Squidex.Domain.Apps.Read.State.Orleans.Grains { - [Immutable] public sealed class JsonSchemaEntity : JsonEntity, ISchemaEntity, diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/IAppStateGrain.cs b/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/IAppStateGrain.cs deleted file mode 100644 index e3bfe27ee..000000000 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/IAppStateGrain.cs +++ /dev/null @@ -1,37 +0,0 @@ -// ========================================================================== -// IAppStateGrain.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Orleans; -using Squidex.Domain.Apps.Read.Apps; -using Squidex.Domain.Apps.Read.Rules; -using Squidex.Domain.Apps.Read.Schemas; -using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.Json.Orleans; - -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains -{ - public interface IAppStateGrain : IGrainWithStringKey - { - Task> GetAppWithSchemaAsync(Guid id); - - Task> GetAppAsync(); - - Task> GetSchemaAsync(Guid id, bool provideDeleted = false); - - Task> GetSchemaAsync(string name, bool provideDeleted = false); - - Task>> GetSchemasAsync(); - - Task>> GetRulesAsync(); - - Task HandleAsync(J> message); - } -} diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/IAppUserGrain.cs b/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/IAppUserGrain.cs deleted file mode 100644 index 68a1db147..000000000 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/IAppUserGrain.cs +++ /dev/null @@ -1,23 +0,0 @@ -// ========================================================================== -// IAppUserGrain.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Collections.Generic; -using System.Threading.Tasks; -using Orleans; - -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains -{ - public interface IAppUserGrain : IGrainWithStringKey - { - Task> GetSchemaNamesAsync(); - - Task AddAppAsync(string appName); - - Task RemoveAppAsync(string appName); - } -} diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrain.cs b/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrain.cs deleted file mode 100644 index b964d8009..000000000 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppStateGrain.cs +++ /dev/null @@ -1,121 +0,0 @@ -// ========================================================================== -// AppStateGrain.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Orleans.Runtime; -using Squidex.Domain.Apps.Core.Schemas; -using Squidex.Domain.Apps.Events.Apps; -using Squidex.Domain.Apps.Read.Apps; -using Squidex.Domain.Apps.Read.Rules; -using Squidex.Domain.Apps.Read.Schemas; -using Squidex.Infrastructure; -using Squidex.Infrastructure.CQRS.Events; -using Squidex.Infrastructure.Json.Orleans; -using Squidex.Infrastructure.Orleans; - -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations -{ - public sealed class AppStateGrain : GrainV2, IAppStateGrain - { - private readonly FieldRegistry fieldRegistry; - private Exception exception; - - public AppStateGrain(FieldRegistry fieldRegistry, IGrainRuntime runtime) - : base(runtime) - { - Guard.NotNull(fieldRegistry, nameof(fieldRegistry)); - Guard.NotNull(runtime, nameof(runtime)); - - this.fieldRegistry = fieldRegistry; - } - - protected override async Task ReadStateAsync() - { - try - { - await base.ReadStateAsync(); - } - catch (Exception ex) - { - exception = ex; - - State = new AppStateGrainState(); - } - } - - public override Task OnActivateAsync() - { - State.SetRegistry(fieldRegistry); - - return base.OnActivateAsync(); - } - - public Task> GetAppWithSchemaAsync(Guid id) - { - var schema = State.FindSchema(x => x.Id == id && !x.IsDeleted); - - return Task.FromResult((State.GetApp(), schema).AsJ()); - } - - public Task> GetAppAsync() - { - var value = State.GetApp(); - - return Task.FromResult(value.AsJ()); - } - - public Task>> GetRulesAsync() - { - var value = State.FindRules(); - - return Task.FromResult(value.AsJ()); - } - - public Task>> GetSchemasAsync() - { - var value = State.FindSchemas(x => !x.IsDeleted); - - return Task.FromResult(value.AsJ()); - } - - public Task> GetSchemaAsync(Guid id, bool provideDeleted = false) - { - var value = State.FindSchema(x => x.Id == id && (!x.IsDeleted || provideDeleted)); - - return Task.FromResult(value.AsJ()); - } - - public Task> GetSchemaAsync(string name, bool provideDeleted = false) - { - var value = State.FindSchema(x => string.Equals(x.Name, name, StringComparison.OrdinalIgnoreCase) && (!x.IsDeleted || provideDeleted)); - - return Task.FromResult(value.AsJ()); - } - - public Task HandleAsync(J> message) - { - if (exception != null) - { - if (message.Value.Payload is AppCreated) - { - exception = null; - } - else - { - throw exception; - } - } - - State.Apply(message.Value); - - return WriteStateAsync(); - } - } -} diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppUserGrain.cs b/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppUserGrain.cs deleted file mode 100644 index c4cdd146a..000000000 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/Grains/Implementations/AppUserGrain.cs +++ /dev/null @@ -1,43 +0,0 @@ -// ========================================================================== -// AppUserGrain.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Orleans.Runtime; -using Squidex.Infrastructure.Orleans; - -namespace Squidex.Domain.Apps.Read.State.Orleans.Grains.Implementations -{ - public sealed class AppUserGrain : GrainV2, IAppUserGrain - { - public AppUserGrain(IGrainRuntime runtime) - : base(runtime) - { - } - - public Task AddAppAsync(string appName) - { - State.AppNames.Add(appName); - - return WriteStateAsync(); - } - - public Task RemoveAppAsync(string appName) - { - State.AppNames.Remove(appName); - - return WriteStateAsync(); - } - - public Task> GetSchemaNamesAsync() - { - return Task.FromResult(State.AppNames.ToList()); - } - } -} diff --git a/src/Squidex.Domain.Apps.Read/State/Orleans/OrleansAppProvider.cs b/src/Squidex.Domain.Apps.Read/State/Orleans/OrleansAppProvider.cs deleted file mode 100644 index 44ed20870..000000000 --- a/src/Squidex.Domain.Apps.Read/State/Orleans/OrleansAppProvider.cs +++ /dev/null @@ -1,129 +0,0 @@ -// ========================================================================== -// OrleansAppProvider.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Orleans; -using Squidex.Domain.Apps.Read.Apps; -using Squidex.Domain.Apps.Read.Rules; -using Squidex.Domain.Apps.Read.Schemas; -using Squidex.Domain.Apps.Read.State.Orleans.Grains; -using Squidex.Infrastructure; -using Squidex.Infrastructure.Log; - -namespace Squidex.Domain.Apps.Read.State.Orleans -{ - public sealed class OrleansAppProvider : IAppProvider - { - private readonly IGrainFactory factory; - private readonly ISemanticLog log; - - public OrleansAppProvider(IGrainFactory factory, ISemanticLog log) - { - Guard.NotNull(factory, nameof(factory)); - Guard.NotNull(log, nameof(log)); - - this.factory = factory; - - this.log = log; - } - - public async Task GetAppAsync(string appName) - { - using (log.MeasureTrace(w => w - .WriteProperty("module", nameof(OrleansAppProvider)) - .WriteProperty("method", nameof(GetAppAsync)))) - { - var result = await factory.GetGrain(appName).GetAppAsync(); - - return result.Value; - } - } - - public async Task<(IAppEntity, ISchemaEntity)> GetAppWithSchemaAsync(string appName, Guid id) - { - using (log.MeasureTrace(w => w - .WriteProperty("module", nameof(OrleansAppProvider)) - .WriteProperty("method", nameof(GetAppWithSchemaAsync)))) - { - var result = await factory.GetGrain(appName).GetAppWithSchemaAsync(id); - - return result.Value; - } - } - - public async Task> GetRulesAsync(string appName) - { - using (log.MeasureTrace(w => w - .WriteProperty("module", nameof(OrleansAppProvider)) - .WriteProperty("method", nameof(GetRulesAsync)))) - { - var result = await factory.GetGrain(appName).GetRulesAsync(); - - return result.Value; - } - } - - public async Task GetSchemaAsync(string appName, Guid id, bool provideDeleted = false) - { - using (log.MeasureTrace(w => w - .WriteProperty("module", nameof(OrleansAppProvider)) - .WriteProperty("method", nameof(GetSchemaAsync)))) - { - var result = await factory.GetGrain(appName).GetSchemaAsync(id, provideDeleted); - - return result.Value; - } - } - - public async Task GetSchemaAsync(string appName, string name, bool provideDeleted = false) - { - using (log.MeasureTrace(w => w - .WriteProperty("module", nameof(OrleansAppProvider)) - .WriteProperty("method", nameof(GetSchemaAsync)))) - { - var result = await factory.GetGrain(appName).GetSchemaAsync(name, provideDeleted); - - return result.Value; - } - } - - public async Task> GetSchemasAsync(string appName) - { - using (log.MeasureTrace(w => w - .WriteProperty("module", nameof(OrleansAppProvider)) - .WriteProperty("method", nameof(GetSchemasAsync)))) - { - var result = await factory.GetGrain(appName).GetSchemasAsync(); - - return result.Value; - } - } - - public async Task> GetUserApps(string userId) - { - using (log.MeasureTrace(w => w - .WriteProperty("module", nameof(OrleansAppProvider)) - .WriteProperty("method", nameof(GetUserApps)))) - { - var schemaIds = await factory.GetGrain(userId).GetSchemaNamesAsync(); - - var tasks = - schemaIds - .Select(x => factory.GetGrain(x)) - .Select(x => x.GetAppAsync()); - - var apps = await Task.WhenAll(tasks); - - return apps.Select(a => a.Value).Where(a => a != null).ToList(); - } - } - } -} diff --git a/src/Squidex.Domain.Users.MongoDb/Squidex.Domain.Users.MongoDb.csproj b/src/Squidex.Domain.Users.MongoDb/Squidex.Domain.Users.MongoDb.csproj index 14a02680e..d9b6e64e1 100644 --- a/src/Squidex.Domain.Users.MongoDb/Squidex.Domain.Users.MongoDb.csproj +++ b/src/Squidex.Domain.Users.MongoDb/Squidex.Domain.Users.MongoDb.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs index 10059cf56..2abb9c2cb 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs @@ -60,7 +60,7 @@ namespace Squidex.Infrastructure.CQRS.Events Guard.NotNull(subscriber, nameof(subscriber)); Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); - return new EventStoreSubscription(this, subscriber, streamFilter, position); + return new PollingSubscription(this, notifier, subscriber, streamFilter, position); } public async Task> GetEventsAsync(string streamName) diff --git a/src/Squidex.Infrastructure.MongoDb/States/MongoStateStore.cs b/src/Squidex.Infrastructure.MongoDb/States/MongoStateStore.cs new file mode 100644 index 000000000..a40637bfa --- /dev/null +++ b/src/Squidex.Infrastructure.MongoDb/States/MongoStateStore.cs @@ -0,0 +1,113 @@ +// ========================================================================== +// MongoStateStore.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using MongoDB.Bson; +using MongoDB.Driver; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using Squidex.Infrastructure.MongoDb; + +namespace Squidex.Infrastructure.States +{ + public sealed class MongoStateStore : IStateStore, IExternalSystem + { + private const string FieldId = "_id"; + private const string FieldDoc = "_doc"; + private const string FieldEtag = "_etag"; + private static readonly UpdateOptions Upsert = new UpdateOptions { IsUpsert = true }; + private static readonly FilterDefinitionBuilder Filter = Builders.Filter; + private static readonly UpdateDefinitionBuilder Update = Builders.Update; + private static readonly ProjectionDefinitionBuilder Projection = Builders.Projection; + private readonly IMongoDatabase database; + private readonly JsonSerializer serializer; + + public MongoStateStore(IMongoDatabase database, JsonSerializer serializer) + { + Guard.NotNull(database, nameof(database)); + Guard.NotNull(serializer, nameof(serializer)); + + this.database = database; + this.serializer = serializer; + } + + public void Connect() + { + try + { + database.ListCollections(); + } + catch (Exception ex) + { + throw new ConfigurationException($"MongoDb connection failed to connect to database {database.DatabaseNamespace.DatabaseName}", ex); + } + } + + public async Task<(T Value, string Etag)> ReadAsync(string key) + { + var collection = GetCollection(); + + var existing = + await collection.Find(Filter.Eq(FieldId, key)) + .FirstOrDefaultAsync(); + + if (existing != null) + { + var value = existing[FieldDoc].AsBsonDocument.ToJson().ToObject(serializer); + + return (value, existing[FieldEtag].AsString); + } + + return (default(T), null); + } + + public async Task WriteAsync(string key, T value, string oldEtag, string newEtag) + { + var collection = GetCollection(); + + var newData = JToken.FromObject(value, serializer).ToBson(); + + try + { + await collection.UpdateOneAsync( + Filter.And( + Filter.Eq(FieldId, key), + Filter.Eq(FieldEtag, oldEtag) + ), + Update + .Set(FieldEtag, newEtag) + .Set(FieldDoc, newData), + Upsert); + } + catch (MongoWriteException ex) + { + if (ex.WriteError.Category == ServerErrorCategory.DuplicateKey) + { + var existingEtag = + await collection.Find(Filter.Eq(FieldId, key)) + .Project(Projection.Exclude(FieldDoc)).FirstOrDefaultAsync(); + + if (existingEtag != null && existingEtag.Contains(FieldEtag)) + { + throw new InconsistentStateException(existingEtag[FieldEtag].AsString, oldEtag, ex); + } + } + else + { + throw; + } + } + } + + private IMongoCollection GetCollection() + { + return database.GetCollection($"States_{typeof(T).Name}"); + } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index d1f58ff68..c8f722b6f 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -78,16 +78,16 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors dispatcher.DispatchAsync(() => HandleSetupAsync(eventConsumer)).Forget(); } - private async Task HandleSetupAsync(IEventConsumer consumer) + private Task HandleSetupAsync(IEventConsumer consumer) { eventConsumer = consumer; - await ReadStateAsync(); - if (!State.IsStopped) { Subscribe(State.Position); } + + return TaskHelper.Done; } private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) diff --git a/src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs b/src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs new file mode 100644 index 000000000..d716c4b52 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/DefaultEventNotifier.cs @@ -0,0 +1,41 @@ +// ========================================================================== +// DefaultEventNotifier.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; + +namespace Squidex.Infrastructure.CQRS.Events +{ + public sealed class DefaultEventNotifier : IEventNotifier + { + private static readonly string ChannelName = typeof(DefaultEventNotifier).Name; + + private readonly IPubSub pubsub; + + public sealed class EventNotification + { + public string StreamName { get; set; } + } + + public DefaultEventNotifier(IPubSub pubsub) + { + Guard.NotNull(pubsub, nameof(pubsub)); + + this.pubsub = pubsub; + } + + public void NotifyEventsStored(string streamName) + { + pubsub.Publish(new EventNotification { StreamName = streamName }, true); + } + + public IDisposable Subscribe(Action handler) + { + return pubsub.Subscribe(x => handler?.Invoke(x.StreamName)); + } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs index b34766972..72e61b3d9 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventNotifier.cs @@ -6,10 +6,14 @@ // All rights reserved. // ========================================================================== +using System; + namespace Squidex.Infrastructure.CQRS.Events { public interface IEventNotifier { void NotifyEventsStored(string streamName); + + IDisposable Subscribe(Action handler); } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs b/src/Squidex.Infrastructure/CQRS/Events/PollingSubscription.cs similarity index 59% rename from src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs rename to src/Squidex.Infrastructure/CQRS/Events/PollingSubscription.cs index 22d11f041..e69234d47 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/PollingSubscription.cs @@ -1,5 +1,5 @@ // ========================================================================== -// EventStoreSubscription.cs +// PollingSubscription.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -7,33 +7,43 @@ // ========================================================================== using System; -using System.Threading; +using System.Text.RegularExpressions; using System.Threading.Tasks; +using Squidex.Infrastructure.Timers; namespace Squidex.Infrastructure.CQRS.Events { - public sealed class EventStoreSubscription : IEventSubscription + public sealed class PollingSubscription : IEventSubscription { + private readonly IEventNotifier eventNotifier; private readonly IEventStore eventStore; private readonly IEventSubscriber eventSubscriber; - private readonly CancellationTokenSource cts = new CancellationTokenSource(); - private readonly Task task; + private readonly IDisposable notification; + private readonly CompletionTimer timer; + private readonly Regex streamRegex; private readonly string streamFilter; + private string position; - public EventStoreSubscription( + public PollingSubscription( IEventStore eventStore, + IEventNotifier eventNotifier, IEventSubscriber eventSubscriber, string streamFilter, string position) { Guard.NotNull(eventStore, nameof(eventStore)); + Guard.NotNull(eventNotifier, nameof(eventNotifier)); Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); + this.position = position; + this.eventNotifier = eventNotifier; this.eventStore = eventStore; this.eventSubscriber = eventSubscriber; this.streamFilter = streamFilter; - task = Task.Run(async () => + streamRegex = new Regex(streamFilter); + + timer = new CompletionTimer(5000, async ct => { try { @@ -42,7 +52,7 @@ namespace Squidex.Infrastructure.CQRS.Events await eventSubscriber.OnEventAsync(this, storedEvent); position = storedEvent.EventPosition; - }, cts.Token, streamFilter, position); + }, ct, streamFilter, position); } catch (Exception ex) { @@ -52,13 +62,21 @@ namespace Squidex.Infrastructure.CQRS.Events } } }); + + notification = eventNotifier.Subscribe(streamName => + { + if (streamRegex.IsMatch(streamName)) + { + timer.SkipCurrentDelay(); + } + }); } public Task StopAsync() { - cts.Cancel(); + notification?.Dispose(); - return task; + return timer.StopAsync(); } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/WrongEventVersionException.cs b/src/Squidex.Infrastructure/CQRS/Events/WrongEventVersionException.cs index 010e32dfe..afa2c92ff 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/WrongEventVersionException.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/WrongEventVersionException.cs @@ -11,6 +11,7 @@ using System.Runtime.Serialization; namespace Squidex.Infrastructure.CQRS.Events { + [Serializable] public class WrongEventVersionException : Exception { private readonly long currentVersion; diff --git a/src/Squidex.Infrastructure/Caching/InvalidationMessage.cs b/src/Squidex.Infrastructure/Caching/InvalidateMessage.cs similarity index 85% rename from src/Squidex.Infrastructure/Caching/InvalidationMessage.cs rename to src/Squidex.Infrastructure/Caching/InvalidateMessage.cs index 03bae01b3..db8c81c32 100644 --- a/src/Squidex.Infrastructure/Caching/InvalidationMessage.cs +++ b/src/Squidex.Infrastructure/Caching/InvalidateMessage.cs @@ -1,5 +1,5 @@ // ========================================================================== -// InvalidationMessage.cs +// InvalidateMessage.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -8,7 +8,7 @@ namespace Squidex.Infrastructure.Caching { - public sealed class InvalidationMessage + public sealed class InvalidateMessage { public string CacheKey { get; set; } } diff --git a/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs b/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs index adfe5e764..2e9261ed9 100644 --- a/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs +++ b/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs @@ -25,7 +25,7 @@ namespace Squidex.Infrastructure.Caching this.inner = inner; this.invalidator = invalidator; - subscription = invalidator.Subscribe(m => + subscription = invalidator.Subscribe(m => { inner.Remove(m.CacheKey); }); @@ -60,7 +60,7 @@ namespace Squidex.Infrastructure.Caching { if (key is string stringKey) { - invalidator.Publish(new InvalidationMessage { CacheKey = stringKey }, true); + invalidator.Publish(new InvalidateMessage { CacheKey = stringKey }, true); } } } diff --git a/src/Squidex.Infrastructure/States/InconsistentStateException.cs b/src/Squidex.Infrastructure/States/InconsistentStateException.cs new file mode 100644 index 000000000..236919152 --- /dev/null +++ b/src/Squidex.Infrastructure/States/InconsistentStateException.cs @@ -0,0 +1,48 @@ +// ========================================================================== +// InconsistentStateException.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Runtime.Serialization; + +namespace Squidex.Infrastructure.States +{ + [Serializable] + public class InconsistentStateException : Exception + { + private readonly string currentEtag; + private readonly string expectedEtag; + + public string CurrentEtag + { + get { return currentEtag; } + } + + public string ExpectedEtag + { + get { return expectedEtag; } + } + + public InconsistentStateException(string currentEtag, string expectedEtag, Exception ex) + : base(FormatMessage(currentEtag, expectedEtag), ex) + { + this.currentEtag = currentEtag; + + this.expectedEtag = expectedEtag; + } + + protected InconsistentStateException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + + private static string FormatMessage(string currentEtag, string expectedEtag) + { + return $"Requested etag {expectedEtag}, but found {currentEtag}."; + } + } +} diff --git a/src/Squidex.Infrastructure/States/StateFactory.cs b/src/Squidex.Infrastructure/States/StateFactory.cs index ea6482abe..12b257bc9 100644 --- a/src/Squidex.Infrastructure/States/StateFactory.cs +++ b/src/Squidex.Infrastructure/States/StateFactory.cs @@ -22,7 +22,7 @@ namespace Squidex.Infrastructure.States private readonly IMemoryCache statesCache; private readonly IServiceProvider services; private readonly List states = new List(); - private readonly SingleThreadedDispatcher cleanupDispatcher = new SingleThreadedDispatcher(); + private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(); private IDisposable pubSubscription; public StateFactory( @@ -50,58 +50,74 @@ namespace Squidex.Infrastructure.States }); } - public async Task GetAsync(string key) where T : StatefulObject + public Task GetAsync(string key) where T : StatefulObject { Guard.NotNull(key, nameof(key)); - if (statesCache.TryGetValue(key, out var state)) + var tcs = new TaskCompletionSource(); + + dispatcher.DispatchAsync(async () => { - return state; - } + try + { + if (statesCache.TryGetValue(key, out var state)) + { + tcs.SetResult(state); + } + else + { + state = (T)services.GetService(typeof(T)); - state = (T)services.GetService(typeof(T)); + var stateHolder = new StateHolder(key, () => + { + pubSub.Publish(new InvalidateMessage { Key = key }, false); + }, store); - var stateHolder = new StateHolder(key, () => - { - pubSub.Publish(new InvalidateMessage { Key = key }, false); - }, store); + await state.ActivateAsync(stateHolder); - await state.ActivateAsync(stateHolder); + var stateEntry = statesCache.CreateEntry(key); - var stateEntry = statesCache.CreateEntry(key); + stateEntry.Value = state; + stateEntry.AbsoluteExpirationRelativeToNow = CacheDuration; - stateEntry.Value = state; - stateEntry.AbsoluteExpirationRelativeToNow = CacheDuration; + stateEntry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration + { + EvictionCallback = (k, v, r, s) => + { + dispatcher.DispatchAsync(() => + { + state.Dispose(); + states.Remove(state); + }).Forget(); + } + }); - stateEntry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration - { - EvictionCallback = (k, v, r, s) => + states.Add(state); + + tcs.SetResult(state); + } + } + catch (Exception ex) { - cleanupDispatcher.DispatchAsync(() => - { - state.Dispose(); - states.Remove(state); - }).Forget(); + tcs.SetException(ex); } }); - states.Add(state); - - return state; + return tcs.Task; } protected override void DisposeObject(bool disposing) { if (disposing) { - cleanupDispatcher.DispatchAsync(() => + dispatcher.DispatchAsync(() => { foreach (var state in states) { state.Dispose(); } }); - cleanupDispatcher.StopAndWaitAsync().Wait(); + dispatcher.StopAndWaitAsync().Wait(); } } } diff --git a/src/Squidex.Infrastructure/States/StateHolder.cs b/src/Squidex.Infrastructure/States/StateHolder.cs index 7e5fba0f3..d65a166d0 100644 --- a/src/Squidex.Infrastructure/States/StateHolder.cs +++ b/src/Squidex.Infrastructure/States/StateHolder.cs @@ -30,6 +30,11 @@ namespace Squidex.Infrastructure.States public async Task ReadAsync() { (State, etag) = await store.ReadAsync(key); + + if (Equals(State, default(T))) + { + State = Activator.CreateInstance(); + } } public async Task WriteAsync() diff --git a/src/Squidex.Infrastructure/States/StatefulObject.cs b/src/Squidex.Infrastructure/States/StatefulObject.cs index 70e4c44a7..b384c09e4 100644 --- a/src/Squidex.Infrastructure/States/StatefulObject.cs +++ b/src/Squidex.Infrastructure/States/StatefulObject.cs @@ -46,7 +46,7 @@ namespace Squidex.Infrastructure.States return stateHolder.ReadAsync(); } - public async Task ReadStateAsync() + public virtual async Task ReadStateAsync() { if (stateHolder != null) { @@ -54,12 +54,16 @@ namespace Squidex.Infrastructure.States } } - public async Task WriteStateAsync() + public virtual async Task WriteStateAsync() { if (stateHolder != null) { await stateHolder.WriteAsync(); } } + + protected override void DisposeObject(bool disposing) + { + } } } diff --git a/src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs b/src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs index c382270aa..378bfc19c 100644 --- a/src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs +++ b/src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs @@ -29,6 +29,52 @@ namespace Squidex.Infrastructure.Tasks block = new ActionBlock>(Handle, options); } + public Task DispatchAndUnwrapAsync(Func action) + { + Guard.NotNull(action, nameof(action)); + + var tcs = new TaskCompletionSource(); + + block.SendAsync(async () => + { + try + { + await action(); + + tcs.SetResult(true); + } + catch (Exception ex) + { + tcs.SetException(ex); + } + }); + + return tcs.Task; + } + + public Task DispatchAndUnwrapAsync(Func> action) + { + Guard.NotNull(action, nameof(action)); + + var tcs = new TaskCompletionSource(); + + block.SendAsync(async () => + { + try + { + var result = await action(); + + tcs.SetResult(result); + } + catch (Exception ex) + { + tcs.SetException(ex); + } + }); + + return tcs.Task; + } + public Task DispatchAsync(Func action) { Guard.NotNull(action, nameof(action)); diff --git a/src/Squidex/Areas/Api/Controllers/Assets/AssetsController.cs b/src/Squidex/Areas/Api/Controllers/Assets/AssetsController.cs index c005a66ae..fe83ccc82 100644 --- a/src/Squidex/Areas/Api/Controllers/Assets/AssetsController.cs +++ b/src/Squidex/Areas/Api/Controllers/Assets/AssetsController.cs @@ -164,7 +164,7 @@ namespace Squidex.Areas.Api.Controllers.Assets [Route("apps/{app}/assets/")] [ProducesResponseType(typeof(AssetCreatedDto), 201)] [ProducesResponseType(typeof(ErrorDto), 400)] - public async Task PostAsset(string app, List file) + public async Task PostAsset(string app, [SwaggerIgnore] List file) { var assetFile = await CheckAssetFileAsync(file); @@ -188,13 +188,16 @@ namespace Squidex.Areas.Api.Controllers.Assets /// 404 => Asset or app not found. /// 400 => Asset exceeds the maximum size. /// + /// + /// Use multipart request to upload an asset. + /// [MustBeAppEditor] [HttpPut] [Route("apps/{app}/assets/{id}/content/")] [ProducesResponseType(typeof(AssetReplacedDto), 201)] [ProducesResponseType(typeof(ErrorDto), 400)] [ApiCosts(1)] - public async Task PutAssetContent(string app, Guid id, List file) + public async Task PutAssetContent(string app, Guid id, [SwaggerIgnore] List file) { var assetFile = await CheckAssetFileAsync(file); diff --git a/tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerGrainTests.cs b/tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerTests.cs similarity index 74% rename from tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerGrainTests.cs rename to tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerTests.cs index a98b27a2b..8a43c6345 100644 --- a/tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerGrainTests.cs +++ b/tests/Squidex.Domain.Apps.Read.Tests/Rules/RuleDequeuerTests.cs @@ -10,13 +10,8 @@ using System; using System.Threading.Tasks; using FakeItEasy; using NodaTime; -using Orleans.Concurrency; -using Orleans.Core; -using Orleans.Runtime; using Squidex.Domain.Apps.Core.HandleRules; using Squidex.Domain.Apps.Core.Rules; -using Squidex.Domain.Apps.Read.Rules.Orleans.Grains; -using Squidex.Domain.Apps.Read.Rules.Orleans.Grains.Implementation; using Squidex.Domain.Apps.Read.Rules.Repositories; using Squidex.Infrastructure.Log; using Xunit; @@ -25,42 +20,25 @@ using Xunit; namespace Squidex.Domain.Apps.Read.Rules { - public class RuleDequeuerGrainTests + public class RuleDequeuerTests { private readonly IClock clock = A.Fake(); private readonly ISemanticLog log = A.Fake(); private readonly IAppProvider appProvider = A.Fake(); private readonly IRuleEventRepository ruleEventRepository = A.Fake(); private readonly RuleService ruleService = A.Fake(); - private readonly MyRuleDequeuerGrain sut; + private readonly RuleDequeuer sut; private readonly Instant now = SystemClock.Instance.GetCurrentInstant(); - public sealed class MyRuleDequeuerGrain : RuleDequeuerGrain - { - public MyRuleDequeuerGrain(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock, - IGrainIdentity identity, - IGrainRuntime runtime) - : base(ruleService, ruleEventRepository, log, clock, identity, runtime) - { - } - - protected override IRuleDequeuerGrain GetSelf() - { - return this; - } - } - - public RuleDequeuerGrainTests() + public RuleDequeuerTests() { A.CallTo(() => clock.GetCurrentInstant()).Returns(now); - sut = new MyRuleDequeuerGrain( + sut = new RuleDequeuer( ruleService, ruleEventRepository, log, - clock, - A.Fake(), - A.Fake()); + clock); } [Theory] @@ -90,9 +68,9 @@ namespace Squidex.Domain.Apps.Read.Rules nextCall = now.Plus(Duration.FromMinutes(minutes)); } - await sut.OnActivateAsync(); - await sut.HandleAsync(@event.AsImmutable()); - await sut.OnDeactivateAsync(); + await sut.HandleAsync(@event); + + sut.Dispose(); A.CallTo(() => ruleEventRepository.MarkSentAsync(@event.Id, requestDump, result, jobResult, requestElapsed, nextCall)) .MustHaveHappened(); diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/DefaultEventNotifierTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/DefaultEventNotifierTests.cs new file mode 100644 index 000000000..8810c42a1 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/DefaultEventNotifierTests.cs @@ -0,0 +1,50 @@ +// ========================================================================== +// DefaultEventNotifierTests.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Collections.Generic; +using Xunit; + +namespace Squidex.Infrastructure.CQRS.Events +{ + public sealed class DefaultEventNotifierTests + { + private readonly DefaultEventNotifier sut = new DefaultEventNotifier(new InMemoryPubSub()); + + [Fact] + public void Should_invalidate_all_actions() + { + var handler1Handled = 0; + var handler2Handled = 0; + + var streamNames = new List(); + + sut.Subscribe(x => + { + streamNames.Add(x); + + handler1Handled++; + }); + + sut.NotifyEventsStored("a"); + + sut.Subscribe(x => + { + streamNames.Add(x); + + handler2Handled++; + }); + + sut.NotifyEventsStored("b"); + + Assert.Equal(2, handler1Handled); + Assert.Equal(1, handler2Handled); + + Assert.Equal(streamNames.ToArray(), new[] { "a", "b", "b" }); + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/PollingSubscriptionTests.cs similarity index 62% rename from tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs rename to tests/Squidex.Infrastructure.Tests/CQRS/Events/PollingSubscriptionTests.cs index 35588e911..e5e2c063b 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/PollingSubscriptionTests.cs @@ -14,16 +14,17 @@ using Xunit; namespace Squidex.Infrastructure.CQRS.Events { - public class EventSubscriptionTests + public class PollingSubscriptionTests { private readonly IEventStore eventStore = A.Fake(); + private readonly IEventNotifier eventNotifier = new DefaultEventNotifier(new InMemoryPubSub()); private readonly IEventSubscriber eventSubscriber = A.Fake(); private readonly string position = Guid.NewGuid().ToString(); [Fact] public async Task Should_subscribe_on_start() { - var sut = new EventStoreSubscription(eventStore, eventSubscriber, "^my-stream", position); + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); await WaitAndStopAsync(sut); @@ -39,7 +40,7 @@ namespace Squidex.Infrastructure.CQRS.Events A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) .Throws(ex); - var sut = new EventStoreSubscription(eventStore, eventSubscriber, "^my-stream", position); + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); await WaitAndStopAsync(sut); @@ -55,7 +56,7 @@ namespace Squidex.Infrastructure.CQRS.Events A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) .Throws(ex); - var sut = new EventStoreSubscription(eventStore, eventSubscriber, "^my-stream", position); + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); await WaitAndStopAsync(sut); @@ -71,7 +72,7 @@ namespace Squidex.Infrastructure.CQRS.Events A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) .Throws(ex); - var sut = new EventStoreSubscription(eventStore, eventSubscriber, "^my-stream", position); + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); await WaitAndStopAsync(sut); @@ -79,6 +80,32 @@ namespace Squidex.Infrastructure.CQRS.Events .MustNotHaveHappened(); } + [Fact] + public async Task Should_not_subscribe_on_notify_when_stream_matches() + { + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); + + eventNotifier.NotifyEventsStored("other-stream-123"); + + await WaitAndStopAsync(sut); + + A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_subscribe_on_notify_when_stream_matches() + { + var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position); + + eventNotifier.NotifyEventsStored("my-stream-123"); + + await WaitAndStopAsync(sut); + + A.CallTo(() => eventStore.GetEventsAsync(A>.Ignored, A.Ignored, "^my-stream", position)) + .MustHaveHappened(Repeated.Exactly.Twice); + } + private static async Task WaitAndStopAsync(IEventSubscription sut) { await Task.Delay(200); diff --git a/tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs b/tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs index 181dad179..b5617c284 100644 --- a/tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs @@ -61,7 +61,7 @@ namespace Squidex.Infrastructure.Caching { sut.Invalidate(123); - A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustNotHaveHappened(); + A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustNotHaveHappened(); } [Fact] @@ -69,7 +69,7 @@ namespace Squidex.Infrastructure.Caching { sut.Invalidate("a-key"); - A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustHaveHappened(); + A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustHaveHappened(); } [Fact] @@ -77,7 +77,7 @@ namespace Squidex.Infrastructure.Caching { ((IMemoryCache)sut).Invalidate("a-key"); - A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustHaveHappened(); + A.CallTo(() => pubsub.Publish(A.That.Matches(x => x.CacheKey == "a-key"), true)).MustHaveHappened(); } [Fact] @@ -117,7 +117,7 @@ namespace Squidex.Infrastructure.Caching Assert.Equal(123, anotherSut.Get("a-key")); - anotherPubsub.Publish(new InvalidationMessage { CacheKey = "a-key" }, true); + anotherPubsub.Publish(new InvalidateMessage { CacheKey = "a-key" }, true); Assert.Equal(0, anotherSut.Get("a-key")); }