From d124b1d42857caef7dd24d932f1b8a0936a5c7b4 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Mon, 29 Mar 2021 14:09:02 +0200 Subject: [PATCH] Feature/backup performance (#679) * Code cleanup * Batch store. * Fixes. * More performance improvements. * Temp * Fix parallelization. * Ignore restored events for some consumers. * Tests fixed. --- .../src/Migrations/Migrations/ClearRules.cs | 7 +- .../src/Migrations/Migrations/ClearSchemas.cs | 6 +- .../Migrations/Migrations/CreateAssetSlugs.cs | 6 +- .../MongoDb/AddAppIdToEventStream.cs | 2 +- .../Migrations/MongoDb/ConvertDocumentIds.cs | 2 +- ...ongoAssetFolderRepository_SnapshotStore.cs | 49 +++- .../MongoAssetRepository_SnapshotStore.cs | 49 +++- .../Contents/MongoContentCollection.cs | 10 + .../MongoContentRepository_SnapshotStore.cs | 96 +++++--- .../Schemas/MongoSchemasHash.cs | 5 + .../Apps/BackupApps.cs | 1 - .../Apps/DomainObject/AppDomainObject.cs | 4 +- .../Assets/AssetPermanentDeleter.cs | 5 + .../Assets/DomainObject/AssetDomainObject.cs | 4 +- .../DomainObject/AssetFolderDomainObject.cs | 4 +- .../Assets/RecursiveDeleter.cs | 5 + .../Backup/BackupReader.cs | 6 +- .../Backup/CompatibilityExtensions.cs | 5 + .../Backup/RestoreContext.cs | 31 --- .../Backup/RestoreGrain.cs | 97 ++++---- .../Backup/StreamMapper.cs | 76 ++++++ .../Backup/TempFolderBackupArchiveLocation.cs | 16 +- .../DomainObject/ContentDomainObject.cs | 4 +- .../Contents/GraphQL/Types/AllTypes.cs | 1 - .../GraphQL/Types/Contents/ContentActions.cs | 2 - .../History/NotifoService.cs | 4 +- .../Rules/DomainObject/RuleDomainObject.cs | 4 +- .../Rules/RuleEnqueuer.cs | 9 +- .../DomainObject/SchemaDomainObject.cs | 4 +- .../Squidex.Domain.Users/DefaultKeyStore.cs | 7 +- .../DefaultXmlRepository.cs | 6 +- .../EventSourcing/MongoEventStore_Reader.cs | 29 ++- .../EventSourcing/MongoEventStore_Writer.cs | 2 +- .../Log/MongoRequestLogRepository.cs | 10 +- .../MongoDb/MongoExtensions.cs | 2 +- .../MongoDb/MongoRepositoryBase.cs | 11 +- .../States/MongoSnapshotStore.cs | 37 ++- .../States/MongoState.cs | 4 +- .../CQRS/Events/RabbitMqEventConsumer.cs | 5 + .../Commands/DomainObject.cs | 14 +- .../Commands/Rebuilder.cs | 82 +++++-- .../src/Squidex.Infrastructure/DomainId.cs | 5 +- .../EventSourcing/CommonHeaders.cs | 14 +- .../EventSourcing/EnvelopeExtensions.cs | 36 ++- .../EventSourcing/Grains/BatchSubscriber.cs | 4 +- .../EventSourcing/IEventStore.cs | 12 + .../Json/Newtonsoft/JsonValueConverter.cs | 4 +- .../Json/Objects/JsonObject.cs | 5 + .../Orleans/GrainState.cs | 14 +- .../States/BatchContext.cs | 112 +++++++++ .../States/BatchPersistence.cs | 80 +++++++ ...ersistence{TState}.cs => IBatchContext.cs} | 16 +- .../States/IPersistence.cs | 17 +- .../States/IPersistenceFactory.cs | 25 ++ .../States/ISnapshotStore.cs | 11 +- .../Squidex.Infrastructure/States/IStore.cs | 16 +- .../States/Persistence.cs | 205 +++++++++++++++- .../States/Persistence{TSnapshot,TKey}.cs | 221 ------------------ .../Squidex.Infrastructure/States/Store.cs | 59 ++--- .../States/StoreExtensions.cs | 17 -- .../Tasks/PartitionedActionBlock.cs | 4 +- .../Squidex/Config/Domain/StoreServices.cs | 10 +- .../Apps/DomainObject/AppDomainObjectTests.cs | 2 +- .../Assets/AssetPermanentDeleterTests.cs | 11 + .../DomainObject/AssetDomainObjectTests.cs | 2 +- .../AssetFolderDomainObjectTests.cs | 2 +- .../Assets/RecursiveDeleterTests.cs | 11 + .../Backup/BackupCompatibilityTests.cs | 4 +- .../Backup/StreamMapperTests.cs | 70 ++++++ .../DomainObject/ContentDomainObjectTests.cs | 3 +- .../DomainObject/RuleDomainObjectTests.cs | 2 +- .../Rules/RuleEnqueuerTests.cs | 34 ++- .../DomainObject/SchemaDomainObjectTests.cs | 2 +- .../TestHelpers/HandlerTestBase.cs | 21 +- .../DefaultKeyStoreTests.cs | 16 +- .../DefaultXmlRepositoryTests.cs | 5 +- .../Commands/DomainObjectTests.cs | 20 +- .../EventSourcing/EventStoreTests.cs | 88 +++++-- .../States/PersistenceBatchTests.cs | 164 +++++++++++++ .../States/PersistenceEventSourcingTests.cs | 51 ++-- .../States/PersistenceSnapshotTests.cs | 39 +--- .../Tasks/PartitionedActionBlockTests.cs | 2 +- .../TestHelpers/MyDomainObject.cs | 4 +- 83 files changed, 1466 insertions(+), 697 deletions(-) create mode 100644 backend/src/Squidex.Domain.Apps.Entities/Backup/StreamMapper.cs create mode 100644 backend/src/Squidex.Infrastructure/States/BatchContext.cs create mode 100644 backend/src/Squidex.Infrastructure/States/BatchPersistence.cs rename backend/src/Squidex.Infrastructure/States/{IPersistence{TState}.cs => IBatchContext.cs} (59%) create mode 100644 backend/src/Squidex.Infrastructure/States/IPersistenceFactory.cs delete mode 100644 backend/src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs create mode 100644 backend/tests/Squidex.Domain.Apps.Entities.Tests/Backup/StreamMapperTests.cs create mode 100644 backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs diff --git a/backend/src/Migrations/Migrations/ClearRules.cs b/backend/src/Migrations/Migrations/ClearRules.cs index d281028b2..7c8ba563a 100644 --- a/backend/src/Migrations/Migrations/ClearRules.cs +++ b/backend/src/Migrations/Migrations/ClearRules.cs @@ -5,7 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System; using System.Threading.Tasks; using Squidex.Domain.Apps.Entities.Rules.DomainObject; using Squidex.Infrastructure.Migrations; @@ -15,16 +14,16 @@ namespace Migrations.Migrations { public sealed class ClearRules : IMigration { - private readonly IStore store; + private readonly IStore store; - public ClearRules(IStore store) + public ClearRules(IStore store) { this.store = store; } public Task UpdateAsync() { - return store.ClearSnapshotsAsync(); + return store.ClearSnapshotsAsync(); } } } diff --git a/backend/src/Migrations/Migrations/ClearSchemas.cs b/backend/src/Migrations/Migrations/ClearSchemas.cs index a9e14a935..0088c4189 100644 --- a/backend/src/Migrations/Migrations/ClearSchemas.cs +++ b/backend/src/Migrations/Migrations/ClearSchemas.cs @@ -15,16 +15,16 @@ namespace Migrations.Migrations { public sealed class ClearSchemas : IMigration { - private readonly IStore store; + private readonly IStore store; - public ClearSchemas(IStore store) + public ClearSchemas(IStore store) { this.store = store; } public Task UpdateAsync() { - return store.ClearSnapshotsAsync(); + return store.ClearSnapshotsAsync(); } } } diff --git a/backend/src/Migrations/Migrations/CreateAssetSlugs.cs b/backend/src/Migrations/Migrations/CreateAssetSlugs.cs index 9f2ec3948..ba81eefc7 100644 --- a/backend/src/Migrations/Migrations/CreateAssetSlugs.cs +++ b/backend/src/Migrations/Migrations/CreateAssetSlugs.cs @@ -16,9 +16,9 @@ namespace Migrations.Migrations { public sealed class CreateAssetSlugs : IMigration { - private readonly ISnapshotStore stateForAssets; + private readonly ISnapshotStore stateForAssets; - public CreateAssetSlugs(ISnapshotStore stateForAssets) + public CreateAssetSlugs(ISnapshotStore stateForAssets) { this.stateForAssets = stateForAssets; } @@ -29,7 +29,7 @@ namespace Migrations.Migrations { state.Slug = state.FileName.ToAssetSlug(); - var key = DomainId.Combine(state.AppId.Id, state.Id).ToString(); + var key = DomainId.Combine(state.AppId.Id, state.Id); await stateForAssets.WriteAsync(key, state, version, version); }); diff --git a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs index 1d87af1e0..9f4632503 100644 --- a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs +++ b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs @@ -105,7 +105,7 @@ namespace Migrations.Migrations.MongoDb }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 2, - MaxMessagesPerTask = 1, + MaxMessagesPerTask = DataflowBlockOptions.Unbounded, BoundedCapacity = SizeOfQueue }); diff --git a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs index d40b43ad3..95e99081a 100644 --- a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs +++ b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs @@ -139,7 +139,7 @@ namespace Migrations.Migrations.MongoDb }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 2, - MaxMessagesPerTask = 1, + MaxMessagesPerTask = DataflowBlockOptions.Unbounded, BoundedCapacity = SizeOfQueue }); diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs index 1b96e76eb..31125a7df 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs @@ -6,6 +6,8 @@ // ========================================================================== using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; @@ -19,9 +21,9 @@ using Squidex.Log; namespace Squidex.Domain.Apps.Entities.MongoDb.Assets { - public sealed partial class MongoAssetFolderRepository : ISnapshotStore + public sealed partial class MongoAssetFolderRepository : ISnapshotStore { - async Task<(AssetFolderDomainObject.State Value, long Version)> ISnapshotStore.ReadAsync(DomainId key) + async Task<(AssetFolderDomainObject.State Value, long Version)> ISnapshotStore.ReadAsync(DomainId key) { using (Profiler.TraceMethod()) { @@ -38,19 +40,32 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets } } - async Task ISnapshotStore.WriteAsync(DomainId key, AssetFolderDomainObject.State value, long oldVersion, long newVersion) + async Task ISnapshotStore.WriteAsync(DomainId key, AssetFolderDomainObject.State value, long oldVersion, long newVersion) { using (Profiler.TraceMethod()) { - var entity = SimpleMapper.Map(value, new MongoAssetFolderEntity()); - - entity.IndexedAppId = value.AppId.Id; + var entity = Map(value); await Collection.UpsertVersionedAsync(key, oldVersion, newVersion, entity); } } - async Task ISnapshotStore.ReadAllAsync(Func callback, CancellationToken ct) + async Task ISnapshotStore.WriteManyAsync(IEnumerable<(DomainId Key, AssetFolderDomainObject.State Value, long Version)> snapshots) + { + using (Profiler.TraceMethod()) + { + var entities = snapshots.Select(Map).ToList(); + + if (entities.Count == 0) + { + return; + } + + await Collection.InsertManyAsync(entities, InsertUnordered); + } + } + + async Task ISnapshotStore.ReadAllAsync(Func callback, CancellationToken ct) { using (Profiler.TraceMethod()) { @@ -58,7 +73,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets } } - async Task ISnapshotStore.RemoveAsync(DomainId key) + async Task ISnapshotStore.RemoveAsync(DomainId key) { using (Profiler.TraceMethod()) { @@ -66,6 +81,24 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets } } + private static MongoAssetFolderEntity Map(AssetFolderDomainObject.State value) + { + var entity = SimpleMapper.Map(value, new MongoAssetFolderEntity()); + + entity.IndexedAppId = value.AppId.Id; + + return entity; + } + + private static MongoAssetFolderEntity Map((DomainId Key, AssetFolderDomainObject.State Value, long Version) snapshot) + { + var entity = Map(snapshot.Value); + + entity.DocumentId = snapshot.Key; + + return entity; + } + private static AssetFolderDomainObject.State Map(MongoAssetFolderEntity existing) { return SimpleMapper.Map(existing, new AssetFolderDomainObject.State()); diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs index 285f38de9..231aa8e71 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs @@ -6,6 +6,8 @@ // ========================================================================== using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Bson; @@ -19,9 +21,9 @@ using Squidex.Log; namespace Squidex.Domain.Apps.Entities.MongoDb.Assets { - public sealed partial class MongoAssetRepository : ISnapshotStore + public sealed partial class MongoAssetRepository : ISnapshotStore { - async Task<(AssetDomainObject.State Value, long Version)> ISnapshotStore.ReadAsync(DomainId key) + async Task<(AssetDomainObject.State Value, long Version)> ISnapshotStore.ReadAsync(DomainId key) { using (Profiler.TraceMethod()) { @@ -38,19 +40,32 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets } } - async Task ISnapshotStore.WriteAsync(DomainId key, AssetDomainObject.State value, long oldVersion, long newVersion) + async Task ISnapshotStore.WriteAsync(DomainId key, AssetDomainObject.State value, long oldVersion, long newVersion) { using (Profiler.TraceMethod()) { - var entity = SimpleMapper.Map(value, new MongoAssetEntity()); - - entity.IndexedAppId = value.AppId.Id; + var entity = Map(value); await Collection.UpsertVersionedAsync(key, oldVersion, newVersion, entity); } } - async Task ISnapshotStore.ReadAllAsync(Func callback, CancellationToken ct) + async Task ISnapshotStore.WriteManyAsync(IEnumerable<(DomainId Key, AssetDomainObject.State Value, long Version)> snapshots) + { + using (Profiler.TraceMethod()) + { + var entities = snapshots.Select(Map).ToList(); + + if (entities.Count == 0) + { + return; + } + + await Collection.InsertManyAsync(entities, InsertUnordered); + } + } + + async Task ISnapshotStore.ReadAllAsync(Func callback, CancellationToken ct) { using (Profiler.TraceMethod()) { @@ -58,7 +73,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets } } - async Task ISnapshotStore.RemoveAsync(DomainId key) + async Task ISnapshotStore.RemoveAsync(DomainId key) { using (Profiler.TraceMethod()) { @@ -66,6 +81,24 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets } } + private static MongoAssetEntity Map(AssetDomainObject.State value) + { + var entity = SimpleMapper.Map(value, new MongoAssetEntity()); + + entity.IndexedAppId = value.AppId.Id; + + return entity; + } + + private static MongoAssetEntity Map((DomainId Key, AssetDomainObject.State Value, long Version) snapshot) + { + var entity = Map(snapshot.Value); + + entity.DocumentId = snapshot.Key; + + return entity; + } + private static AssetDomainObject.State Map(MongoAssetEntity existing) { return SimpleMapper.Map(existing, new AssetDomainObject.State()); diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs index a57a3593d..47a899ae2 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs @@ -187,5 +187,15 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { return Collection.DeleteOneAsync(x => x.DocumentId == documentId); } + + public Task InsertManyAsync(IReadOnlyList entities) + { + if (entities.Count == 0) + { + return Task.CompletedTask; + } + + return Collection.InsertManyAsync(entities, InsertUnordered); + } } } diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs index 9c0372b19..02fc7f602 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs @@ -19,19 +19,19 @@ using Squidex.Log; namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { - public partial class MongoContentRepository : ISnapshotStore + public partial class MongoContentRepository : ISnapshotStore { - Task ISnapshotStore.ReadAllAsync(Func callback, CancellationToken ct) + Task ISnapshotStore.ReadAllAsync(Func callback, CancellationToken ct) { throw new NotSupportedException(); } - Task<(ContentDomainObject.State Value, long Version)> ISnapshotStore.ReadAsync(DomainId key) + Task<(ContentDomainObject.State Value, long Version)> ISnapshotStore.ReadAsync(DomainId key) { return Task.FromResult<(ContentDomainObject.State, long Version)>((null!, EtagVersion.Empty)); } - async Task ISnapshotStore.ClearAsync() + async Task ISnapshotStore.ClearAsync() { using (Profiler.TraceMethod()) { @@ -40,7 +40,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents } } - async Task ISnapshotStore.RemoveAsync(DomainId key) + async Task ISnapshotStore.RemoveAsync(DomainId key) { using (Profiler.TraceMethod()) { @@ -49,7 +49,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents } } - async Task ISnapshotStore.WriteAsync(DomainId key, ContentDomainObject.State value, long oldVersion, long newVersion) + async Task ISnapshotStore.WriteAsync(DomainId key, ContentDomainObject.State value, long oldVersion, long newVersion) { using (Profiler.TraceMethod()) { @@ -64,9 +64,32 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents } } + async Task ISnapshotStore.WriteManyAsync(IEnumerable<(DomainId Key, ContentDomainObject.State Value, long Version)> snapshots) + { + using (Profiler.TraceMethod()) + { + var entitiesPublished = new List(); + var entitiesAll = new List(); + + foreach (var (_, value, version) in snapshots) + { + if (ShouldWritePublished(value)) + { + entitiesPublished.Add(await CreatePublishedContentAsync(value, version)); + } + + entitiesAll.Add(await CreateDraftContentAsync(value, version)); + } + + await Task.WhenAll( + collectionPublished.InsertManyAsync(entitiesPublished), + collectionAll.InsertManyAsync(entitiesAll)); + } + } + private async Task UpsertOrDeletePublishedAsync(ContentDomainObject.State value, long oldVersion, long newVersion) { - if (value.Status == Status.Published && !value.IsDeleted) + if (ShouldWritePublished(value)) { await UpsertPublishedContentAsync(value, oldVersion, newVersion); } @@ -85,48 +108,67 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents private async Task UpsertDraftContentAsync(ContentDomainObject.State value, long oldVersion, long newVersion) { - var content = await CreateContentAsync(value, value.Data, newVersion); - - content.ScheduledAt = value.ScheduleJob?.DueTime; - content.ScheduleJob = value.ScheduleJob; - content.NewStatus = value.NewStatus; + var entity = await CreateDraftContentAsync(value, newVersion); - await collectionAll.UpsertVersionedAsync(content.DocumentId, oldVersion, content); + await collectionAll.UpsertVersionedAsync(entity.DocumentId, oldVersion, entity); } private async Task UpsertPublishedContentAsync(ContentDomainObject.State value, long oldVersion, long newVersion) { - var content = await CreateContentAsync(value, value.CurrentVersion.Data, newVersion); + var entity = await CreatePublishedContentAsync(value, newVersion); - content.ScheduledAt = null; - content.ScheduleJob = null; - content.NewStatus = null; + await collectionPublished.UpsertVersionedAsync(entity.DocumentId, oldVersion, entity); + } + + private async Task CreatePublishedContentAsync(ContentDomainObject.State value, long newVersion) + { + var entity = await CreateContentAsync(value, value.CurrentVersion.Data, newVersion); + + entity.ScheduledAt = null; + entity.ScheduleJob = null; + entity.NewStatus = null; + + return entity; + } - await collectionPublished.UpsertVersionedAsync(content.DocumentId, oldVersion, content); + private async Task CreateDraftContentAsync(ContentDomainObject.State value, long newVersion) + { + var entity = await CreateContentAsync(value, value.Data, newVersion); + + entity.ScheduledAt = value.ScheduleJob?.DueTime; + entity.ScheduleJob = value.ScheduleJob; + entity.NewStatus = value.NewStatus; + + return entity; } private async Task CreateContentAsync(ContentDomainObject.State value, ContentData data, long newVersion) { - var content = SimpleMapper.Map(value, new MongoContentEntity()); + var entity = SimpleMapper.Map(value, new MongoContentEntity()); - content.Data = data; - content.DocumentId = value.UniqueId; - content.IndexedAppId = value.AppId.Id; - content.IndexedSchemaId = value.SchemaId.Id; - content.Version = newVersion; + entity.Data = data; + entity.DocumentId = value.UniqueId; + entity.IndexedAppId = value.AppId.Id; + entity.IndexedSchemaId = value.SchemaId.Id; + entity.Version = newVersion; var schema = await appProvider.GetSchemaAsync(value.AppId.Id, value.SchemaId.Id, true); if (schema != null) { - content.ReferencedIds = content.Data.GetReferencedIds(schema.SchemaDef); + entity.ReferencedIds = entity.Data.GetReferencedIds(schema.SchemaDef); } else { - content.ReferencedIds = new HashSet(); + entity.ReferencedIds = new HashSet(); } - return content; + return entity; + } + + private static bool ShouldWritePublished(ContentDomainObject.State value) + { + return value.Status == Status.Published && !value.IsDeleted; } } } diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Schemas/MongoSchemasHash.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Schemas/MongoSchemasHash.cs index 9357a2328..9b674e0b5 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Schemas/MongoSchemasHash.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Schemas/MongoSchemasHash.cs @@ -58,6 +58,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Schemas foreach (var @event in events) { + if (@event.Headers.Restored()) + { + continue; + } + switch (@event.Payload) { case SchemaEvent schemaEvent: diff --git a/backend/src/Squidex.Domain.Apps.Entities/Apps/BackupApps.cs b/backend/src/Squidex.Domain.Apps.Entities/Apps/BackupApps.cs index 899b94a6f..7d6a1d6d1 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Apps/BackupApps.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Apps/BackupApps.cs @@ -135,7 +135,6 @@ namespace Squidex.Domain.Apps.Entities.Apps public async Task CompleteRestoreAsync(RestoreContext context) { await appsIndex.AddAsync(appReservation); - await appsIndex.RebuildByContributorsAsync(context.AppId, contributors); } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs index 5f442b6e6..0986f980a 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs @@ -31,12 +31,12 @@ namespace Squidex.Domain.Apps.Entities.Apps.DomainObject private readonly IAppPlanBillingManager appPlansBillingManager; private readonly IUserResolver userResolver; - public AppDomainObject(IStore store, ISemanticLog log, + public AppDomainObject(IPersistenceFactory persistence, ISemanticLog log, InitialPatterns initialPatterns, IAppPlansProvider appPlansProvider, IAppPlanBillingManager appPlansBillingManager, IUserResolver userResolver) - : base(store, log) + : base(persistence, log) { Guard.NotNull(initialPatterns, nameof(initialPatterns)); Guard.NotNull(userResolver, nameof(userResolver)); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetPermanentDeleter.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetPermanentDeleter.cs index c9197180e..cb2b4b28f 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetPermanentDeleter.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetPermanentDeleter.cs @@ -45,6 +45,11 @@ namespace Squidex.Domain.Apps.Entities.Assets public async Task On(Envelope @event) { + if (@event.Headers.Restored()) + { + return; + } + if (@event.Payload is AssetDeleted assetDeleted) { for (var version = 0; version < @event.Headers.EventStreamNumber(); version++) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs index 082834e78..7f74216c7 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs @@ -30,11 +30,11 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject private readonly IAssetTagService assetTags; private readonly IAssetQueryService assetQuery; - public AssetDomainObject(IStore store, ISemanticLog log, + public AssetDomainObject(IPersistenceFactory factory, ISemanticLog log, IAssetTagService assetTags, IAssetQueryService assetQuery, IContentRepository contentRepository) - : base(store, log) + : base(factory, log) { Guard.NotNull(assetTags, nameof(assetTags)); Guard.NotNull(assetQuery, nameof(assetQuery)); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs index 7085174a4..5d132001d 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs @@ -24,9 +24,9 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject { private readonly IAssetQueryService assetQuery; - public AssetFolderDomainObject(IStore store, ISemanticLog log, + public AssetFolderDomainObject(IPersistenceFactory factory, ISemanticLog log, IAssetQueryService assetQuery) - : base(store, log) + : base(factory, log) { Guard.NotNull(assetQuery, nameof(assetQuery)); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/RecursiveDeleter.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/RecursiveDeleter.cs index c65b4c334..cfb5baaa6 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/RecursiveDeleter.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/RecursiveDeleter.cs @@ -63,6 +63,11 @@ namespace Squidex.Domain.Apps.Entities.Assets public async Task On(Envelope @event) { + if (@event.Headers.Restored()) + { + return; + } + if (@event.Payload is AssetFolderDeleted folderDeleted) { async Task PublishAsync(SquidexCommand command) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/BackupReader.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/BackupReader.cs index 5290c985b..05ac223b3 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/BackupReader.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/BackupReader.cs @@ -99,14 +99,14 @@ namespace Squidex.Domain.Apps.Entities.Backup while (true) { - var eventEntry = archive.GetEntry(ArchiveHelper.GetEventPath(readEvents)); + var entry = archive.GetEntry(ArchiveHelper.GetEventPath(readEvents)); - if (eventEntry == null) + if (entry == null) { break; } - using (var stream = eventEntry.Open()) + using (var stream = entry.Open()) { var storedEvent = serializer.Deserialize(stream).ToStoredEvent(); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/CompatibilityExtensions.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/CompatibilityExtensions.cs index d3ad293fe..34feff9f9 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/CompatibilityExtensions.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/CompatibilityExtensions.cs @@ -34,6 +34,11 @@ namespace Squidex.Domain.Apps.Entities.Backup { var current = await reader.ReadVersionAsync(); + if (None.Equals(current)) + { + return; + } + if (!Expected.Equals(current)) { throw new BackupRestoreException("Backup file is not compatible with this version."); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs index 787af4593..b0746c6de 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreContext.cs @@ -5,17 +5,12 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System; -using System.Collections.Generic; using Squidex.Infrastructure; namespace Squidex.Domain.Apps.Entities.Backup { public sealed class RestoreContext : BackupContextBase { - private readonly Dictionary streams = new Dictionary(1000); - private string? appStream; - public IBackupReader Reader { get; } public DomainId PreviousAppId { get; set; } @@ -29,31 +24,5 @@ namespace Squidex.Domain.Apps.Entities.Backup PreviousAppId = previousAppId; } - - public string GetStreamName(string streamName) - { - Guard.NotNullOrEmpty(streamName, nameof(streamName)); - - if (streamName.StartsWith("app-", StringComparison.OrdinalIgnoreCase)) - { - return appStream ??= $"app-{AppId}"; - } - - return streamName.Replace(PreviousAppId.ToString(), AppId.ToString()); - } - - public long GetStreamOffset(string streamName) - { - Guard.NotNullOrEmpty(streamName, nameof(streamName)); - - if (!streams.TryGetValue(streamName, out var offset)) - { - offset = EtagVersion.Empty; - } - - streams[streamName] = offset + 1; - - return offset; - } } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs index 5a54f96f0..69ad45eee 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs @@ -8,7 +8,9 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using Microsoft.Extensions.DependencyInjection; using NodaTime; using Squidex.Domain.Apps.Core.Apps; @@ -19,7 +21,6 @@ using Squidex.Domain.Apps.Events.Apps; using Squidex.Infrastructure; using Squidex.Infrastructure.Commands; using Squidex.Infrastructure.EventSourcing; -using Squidex.Infrastructure.Json.Objects; using Squidex.Infrastructure.Orleans; using Squidex.Infrastructure.States; using Squidex.Infrastructure.Tasks; @@ -31,7 +32,6 @@ namespace Squidex.Domain.Apps.Entities.Backup { public sealed class RestoreGrain : GrainOfString, IRestoreGrain { - private const int BatchSize = 500; private readonly IBackupArchiveLocation backupArchiveLocation; private readonly IClock clock; private readonly ICommandBus commandBus; @@ -42,7 +42,8 @@ namespace Squidex.Domain.Apps.Entities.Backup private readonly IStreamNameResolver streamNameResolver; private readonly IUserResolver userResolver; private readonly IGrainState state; - private RestoreContext restoreContext; + private RestoreContext runningContext; + private StreamMapper runningStreamMapper; private RestoreJob CurrentJob { @@ -176,7 +177,7 @@ namespace Squidex.Domain.Apps.Entities.Backup { using (Profiler.TraceMethod(handler.GetType(), nameof(IBackupHandler.RestoreAsync))) { - await handler.RestoreAsync(restoreContext); + await handler.RestoreAsync(runningContext); } Log($"Restored {handler.Name}"); @@ -186,7 +187,7 @@ namespace Squidex.Domain.Apps.Entities.Backup { using (Profiler.TraceMethod(handler.GetType(), nameof(IBackupHandler.CompleteRestoreAsync))) { - await handler.CompleteRestoreAsync(restoreContext); + await handler.CompleteRestoreAsync(runningContext); } Log($"Completed {handler.Name}"); @@ -243,6 +244,9 @@ namespace Squidex.Domain.Apps.Entities.Backup CurrentJob.Stopped = clock.GetCurrentInstant(); await state.WriteAsync(); + + runningStreamMapper = null!; + runningContext = null!; } } } @@ -316,51 +320,59 @@ namespace Squidex.Domain.Apps.Entities.Backup private async Task ReadEventsAsync(IBackupReader reader, IEnumerable handlers) { - var batch = new List<(string, Envelope)>(BatchSize); + const int BatchSize = 500; + + var handled = 0; - await reader.ReadEventsAsync(streamNameResolver, eventDataFormatter, async storedEvent => + var writeBlock = new ActionBlock<(string, Envelope)[]>(async batch => { - batch.Add(storedEvent); + var commits = new List(batch.Length); - if (batch.Count == BatchSize) + foreach (var (stream, @event) in batch) { - await CommitBatchAsync(reader, handlers, batch); + var offset = runningStreamMapper.GetStreamOffset(stream); - batch.Clear(); + commits.Add(EventCommit.Create(stream, offset, @event, eventDataFormatter)); } - }); - if (batch.Count > 0) + await eventStore.AppendUnsafeAsync(commits); + + handled += commits.Count; + + Log($"Reading {reader.ReadEvents}/{handled} events and {reader.ReadAttachments} attachments completed.", true); + }, new ExecutionDataflowBlockOptions { - await CommitBatchAsync(reader, handlers, batch); - } + MaxDegreeOfParallelism = 1, + MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + BoundedCapacity = 2 + }); - Log($"Reading {reader.ReadEvents} events and {reader.ReadAttachments} attachments completed.", true); - } + var batchBlock = new BatchBlock<(string, Envelope)>(500, new GroupingDataflowBlockOptions + { + BoundedCapacity = BatchSize + }); - private async Task CommitBatchAsync(IBackupReader reader, IEnumerable handlers, List<(string, Envelope)> batch) - { - var commits = new List(batch.Count); + batchBlock.LinkTo(writeBlock, new DataflowLinkOptions + { + PropagateCompletion = true + }); - foreach (var (stream, @event) in batch) + await reader.ReadEventsAsync(streamNameResolver, eventDataFormatter, async job => { - var handled = await HandleEventAsync(reader, handlers, stream, @event); + var newStream = await HandleEventAsync(reader, handlers, job.Stream, job.Event); - if (handled) + if (newStream != null) { - var streamName = restoreContext.GetStreamName(stream); - var streamOffset = restoreContext.GetStreamOffset(streamName); - - commits.Add(EventCommit.Create(streamName, streamOffset, @event, eventDataFormatter)); + await batchBlock.SendAsync((newStream, job.Event)); } - } + }); - await eventStore.AppendUnsafeAsync(commits); + batchBlock.Complete(); - Log($"Read {reader.ReadEvents} events and {reader.ReadAttachments} attachments.", true); + await writeBlock.Completion; } - private async Task HandleEventAsync(IBackupReader reader, IEnumerable handlers, string stream, Envelope @event) + private async Task HandleEventAsync(IBackupReader reader, IEnumerable handlers, string stream, Envelope @event) { if (@event.Payload is AppCreated appCreated) { @@ -382,7 +394,7 @@ namespace Squidex.Domain.Apps.Entities.Backup if (@event.Payload is SquidexEvent squidexEvent && squidexEvent.Actor != null) { - if (restoreContext.UserMapping.TryMap(squidexEvent.Actor, out var newUser)) + if (runningContext.UserMapping.TryMap(squidexEvent.Actor, out var newUser)) { squidexEvent.Actor = newUser; } @@ -393,26 +405,20 @@ namespace Squidex.Domain.Apps.Entities.Backup appEvent.AppId = CurrentJob.AppId; } - if (@event.Headers.TryGet(CommonHeaders.AggregateId, out var value) && value is JsonString idString) - { - var id = idString.Value.Replace( - restoreContext.PreviousAppId.ToString(), - restoreContext.AppId.ToString()); - - var domainId = DomainId.Create(id); + var (newStream, id) = runningStreamMapper.Map(stream); - @event.SetAggregateId(domainId); - } + @event.SetAggregateId(id); + @event.SetRestored(); foreach (var handler in handlers) { - if (!await handler.RestoreEventAsync(@event, restoreContext)) + if (!await handler.RestoreEventAsync(@event, runningContext)) { - return false; + return null; } } - return true; + return newStream; } private async Task CreateContextAsync(IBackupReader reader, DomainId previousAppId) @@ -428,7 +434,8 @@ namespace Squidex.Domain.Apps.Entities.Backup Log("Created Users"); } - restoreContext = new RestoreContext(CurrentJob.AppId.Id, userMapping, reader, previousAppId); + runningContext = new RestoreContext(CurrentJob.AppId.Id, userMapping, reader, previousAppId); + runningStreamMapper = new StreamMapper(runningContext); } private void Log(string message, bool replace = false) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/StreamMapper.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/StreamMapper.cs new file mode 100644 index 000000000..d74df82de --- /dev/null +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/StreamMapper.cs @@ -0,0 +1,76 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using Squidex.Infrastructure; + +namespace Squidex.Domain.Apps.Entities.Backup +{ + public sealed class StreamMapper + { + private readonly Dictionary streams = new Dictionary(1000); + private readonly RestoreContext context; + private readonly DomainId brokenAppId; + + public StreamMapper(RestoreContext context) + { + Guard.NotNull(context, nameof(context)); + + this.context = context; + + brokenAppId = DomainId.Combine(context.PreviousAppId, context.PreviousAppId); + } + + public (string Stream, DomainId) Map(string stream) + { + Guard.NotNullOrEmpty(stream, nameof(stream)); + + var typeIndex = stream.IndexOf("-", StringComparison.Ordinal); + var typeName = stream.Substring(0, typeIndex); + + var id = DomainId.Create(stream[(typeIndex + 1)..]); + + if (id.Equals(context.PreviousAppId) || id.Equals(brokenAppId)) + { + id = context.AppId; + } + else + { + var separator = DomainId.IdSeparator; + + var secondId = id.ToString().AsSpan(); + + var indexOfSecondPart = secondId.IndexOf(separator, StringComparison.Ordinal); + if (indexOfSecondPart > 0 && indexOfSecondPart < secondId.Length - separator.Length - 1) + { + secondId = secondId[(indexOfSecondPart + separator.Length)..]; + } + + id = DomainId.Combine(context.AppId, DomainId.Create(secondId.ToString())); + } + + stream = $"{typeName}-{id}"; + + return (stream, id); + } + + public long GetStreamOffset(string streamName) + { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + + if (!streams.TryGetValue(streamName, out var offset)) + { + offset = EtagVersion.Empty; + } + + streams[streamName] = offset + 1; + + return offset; + } + } +} diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/TempFolderBackupArchiveLocation.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/TempFolderBackupArchiveLocation.cs index b263f821d..10b55341c 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/TempFolderBackupArchiveLocation.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/TempFolderBackupArchiveLocation.cs @@ -29,24 +29,16 @@ namespace Squidex.Domain.Apps.Entities.Backup public async Task OpenReaderAsync(Uri url, DomainId id) { - var stream = OpenStream(id); + Stream stream; if (string.Equals(url.Scheme, "file")) { - try - { - using (var sourceStream = new FileStream(url.LocalPath, FileMode.Open, FileAccess.Read)) - { - await sourceStream.CopyToAsync(stream); - } - } - catch (IOException ex) - { - throw new BackupRestoreException($"Cannot download the archive: {ex.Message}.", ex); - } + stream = new FileStream(url.LocalPath, FileMode.Open, FileAccess.Read); } else { + stream = OpenStream(id); + HttpResponseMessage? response = null; try { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs index 9e20ab568..8e590ad31 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs @@ -28,9 +28,9 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject { private readonly IServiceProvider serviceProvider; - public ContentDomainObject(IStore store, ISemanticLog log, + public ContentDomainObject(IPersistenceFactory persistence, ISemanticLog log, IServiceProvider serviceProvider) - : base(store, log) + : base(persistence, log) { Guard.NotNull(serviceProvider, nameof(serviceProvider)); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/AllTypes.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/AllTypes.cs index 0588d82f8..8b8267e69 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/AllTypes.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/AllTypes.cs @@ -5,7 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System; using GraphQL.Types; using Squidex.Domain.Apps.Core.Assets; using Squidex.Domain.Apps.Entities.Contents.GraphQL.Types.Primitives; diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/Contents/ContentActions.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/Contents/ContentActions.cs index 6c11293f9..ffda57388 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/Contents/ContentActions.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/GraphQL/Types/Contents/ContentActions.cs @@ -6,14 +6,12 @@ // ========================================================================== using System; -using System.Collections.Generic; using GraphQL; using GraphQL.Resolvers; using GraphQL.Types; using NodaTime; using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Entities.Contents.Commands; -using Squidex.Domain.Apps.Entities.Contents.GraphQL.Types.Primitives; using Squidex.Infrastructure; using Squidex.Infrastructure.Json.Objects; using Squidex.Infrastructure.Translations; diff --git a/backend/src/Squidex.Domain.Apps.Entities/History/NotifoService.cs b/backend/src/Squidex.Domain.Apps.Entities/History/NotifoService.cs index a796b373c..d66decb0b 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/History/NotifoService.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/History/NotifoService.cs @@ -127,9 +127,9 @@ namespace Squidex.Domain.Apps.Entities.History var now = clock.GetCurrentInstant(); var publishedEvents = events + .Where(x => x.AppEvent.Headers.Restored() == false) .Where(x => IsTooOld(x.AppEvent.Headers, now) == false) - .Where(x => IsComment(x.AppEvent.Payload) || x.HistoryEvent != null) - .ToList(); + .Where(x => IsComment(x.AppEvent.Payload) || x.HistoryEvent != null); foreach (var batch in publishedEvents.Batch(50)) { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs index 7147ce4ad..6d12c5445 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs @@ -25,9 +25,9 @@ namespace Squidex.Domain.Apps.Entities.Rules.DomainObject private readonly IAppProvider appProvider; private readonly IRuleEnqueuer ruleEnqueuer; - public RuleDomainObject(IStore store, ISemanticLog log, + public RuleDomainObject(IPersistenceFactory factory, ISemanticLog log, IAppProvider appProvider, IRuleEnqueuer ruleEnqueuer) - : base(store, log) + : base(factory, log) { Guard.NotNull(appProvider, nameof(appProvider)); Guard.NotNull(ruleEnqueuer, nameof(ruleEnqueuer)); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs index 2509e1156..c33f040bb 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/RuleEnqueuer.cs @@ -66,9 +66,14 @@ namespace Squidex.Domain.Apps.Entities.Rules public async Task On(Envelope @event) { - using (localCache.StartContext()) + if (@event.Headers.Restored()) { - if (@event.Payload is AppEvent appEvent) + return; + } + + if (@event.Payload is AppEvent appEvent) + { + using (localCache.StartContext()) { var rules = await GetRulesAsync(appEvent.AppId.Id); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs index 75ba519ff..88432b272 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs @@ -25,8 +25,8 @@ namespace Squidex.Domain.Apps.Entities.Schemas.DomainObject { public sealed partial class SchemaDomainObject : DomainObject { - public SchemaDomainObject(IStore store, ISemanticLog log) - : base(store, log) + public SchemaDomainObject(IPersistenceFactory persistence, ISemanticLog log) + : base(persistence, log) { } diff --git a/backend/src/Squidex.Domain.Users/DefaultKeyStore.cs b/backend/src/Squidex.Domain.Users/DefaultKeyStore.cs index 528fe0053..49567d436 100644 --- a/backend/src/Squidex.Domain.Users/DefaultKeyStore.cs +++ b/backend/src/Squidex.Domain.Users/DefaultKeyStore.cs @@ -13,13 +13,14 @@ using IdentityModel; using IdentityServer4.Models; using IdentityServer4.Stores; using Microsoft.IdentityModel.Tokens; +using Squidex.Infrastructure; using Squidex.Infrastructure.States; namespace Squidex.Domain.Users { public sealed class DefaultKeyStore : ISigningCredentialStore, IValidationKeysStore { - private readonly ISnapshotStore store; + private readonly ISnapshotStore store; private SigningCredentials? cachedKey; private SecurityKeyInfo[]? cachedKeyInfo; @@ -31,8 +32,10 @@ namespace Squidex.Domain.Users public RSAParameters Parameters { get; set; } } - public DefaultKeyStore(ISnapshotStore store) + public DefaultKeyStore(ISnapshotStore store) { + Guard.NotNull(store, nameof(store)); + this.store = store; } diff --git a/backend/src/Squidex.Domain.Users/DefaultXmlRepository.cs b/backend/src/Squidex.Domain.Users/DefaultXmlRepository.cs index ad15d0598..dbb0ccaff 100644 --- a/backend/src/Squidex.Domain.Users/DefaultXmlRepository.cs +++ b/backend/src/Squidex.Domain.Users/DefaultXmlRepository.cs @@ -16,7 +16,7 @@ namespace Squidex.Domain.Users { public sealed class DefaultXmlRepository : IXmlRepository { - private readonly ISnapshotStore store; + private readonly ISnapshotStore store; [CollectionName("Identity_Xml")] public sealed class State @@ -38,7 +38,7 @@ namespace Squidex.Domain.Users } } - public DefaultXmlRepository(ISnapshotStore store) + public DefaultXmlRepository(ISnapshotStore store) { Guard.NotNull(store, nameof(store)); @@ -63,7 +63,7 @@ namespace Squidex.Domain.Users { var state = new State(element); - store.WriteAsync(friendlyName, state, EtagVersion.Any, 0); + store.WriteAsync(DomainId.Create(friendlyName), state, EtagVersion.Any, 0); } } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index 7d43935f2..179415283 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -95,12 +95,31 @@ namespace Squidex.Infrastructure.EventSourcing Filter.Gte(EventStreamOffsetField, streamPosition - MaxCommitSize))) .Sort(Sort.Ascending(TimestampField)).ToListAsync(); - var result = new List(); + var result = commits.SelectMany(x => x.Filtered(streamPosition)).ToList(); - foreach (var commit in commits) - { - result.AddRange(commit.Filtered(streamPosition)); - } + return result; + } + } + + public async Task>> QueryManyAsync(IEnumerable streamNames) + { + Guard.NotNull(streamNames, nameof(streamNames)); + + using (Profiler.TraceMethod()) + { + var position = EtagVersion.Empty; + + var commits = + await Collection.Find( + Filter.And( + Filter.In(EventStreamField, streamNames), + Filter.Gte(EventStreamOffsetField, position))) + .Sort(Sort.Ascending(TimestampField)).ToListAsync(); + + var result = commits.GroupBy(x => x.EventStream) + .ToDictionary( + x => x.Key, + x => (IReadOnlyList)x.SelectMany(y => y.Filtered(position)).ToList()); return result; } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs index 600d700d8..ef8f3716b 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -115,7 +115,7 @@ namespace Squidex.Infrastructure.EventSourcing if (writes.Count > 0) { - await Collection.BulkWriteAsync(writes, Unordered); + await Collection.BulkWriteAsync(writes, BulkUnordered); } } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/Log/MongoRequestLogRepository.cs b/backend/src/Squidex.Infrastructure.MongoDb/Log/MongoRequestLogRepository.cs index f8b59ed19..3f58ed89c 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/Log/MongoRequestLogRepository.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/Log/MongoRequestLogRepository.cs @@ -19,7 +19,6 @@ namespace Squidex.Infrastructure.Log { public sealed class MongoRequestLogRepository : MongoRepositoryBase, IRequestLogRepository { - private static readonly InsertManyOptions Unordered = new InsertManyOptions { IsOrdered = false }; private readonly RequestLogStoreOptions options; public MongoRequestLogRepository(IMongoDatabase database, IOptions options) @@ -57,9 +56,14 @@ namespace Squidex.Infrastructure.Log { Guard.NotNull(items, nameof(items)); - var documents = items.Select(x => new MongoRequest { Key = x.Key, Timestamp = x.Timestamp, Properties = x.Properties }); + var entities = items.Select(x => new MongoRequest { Key = x.Key, Timestamp = x.Timestamp, Properties = x.Properties }).ToList(); - return Collection.InsertManyAsync(documents, Unordered); + if (entities.Count == 0) + { + return Task.CompletedTask; + } + + return Collection.InsertManyAsync(entities, InsertUnordered); } public Task QueryAllAsync(Func callback, string key, DateTime fromDate, DateTime toDate, CancellationToken ct = default) diff --git a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs index 802fff4ed..430452097 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs @@ -198,7 +198,7 @@ namespace Squidex.Infrastructure.MongoDb new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1, + MaxMessagesPerTask = DataflowBlockOptions.Unbounded, BoundedCapacity = Batching.BufferSize }); try diff --git a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs index df679ea7d..96397ed6e 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs @@ -21,15 +21,16 @@ namespace Squidex.Infrastructure.MongoDb { private const string CollectionFormat = "{0}Set"; - protected static readonly BulkWriteOptions Unordered = new BulkWriteOptions { IsOrdered = true }; - protected static readonly UpdateOptions Upsert = new UpdateOptions { IsUpsert = true }; - protected static readonly ReplaceOptions UpsertReplace = new ReplaceOptions { IsUpsert = true }; - protected static readonly SortDefinitionBuilder Sort = Builders.Sort; - protected static readonly UpdateDefinitionBuilder Update = Builders.Update; + protected static readonly BulkWriteOptions BulkUnordered = new BulkWriteOptions { IsOrdered = true }; protected static readonly FieldDefinitionBuilder FieldBuilder = FieldDefinitionBuilder.Instance; protected static readonly FilterDefinitionBuilder Filter = Builders.Filter; protected static readonly IndexKeysDefinitionBuilder Index = Builders.IndexKeys; + protected static readonly InsertManyOptions InsertUnordered = new InsertManyOptions { IsOrdered = true }; protected static readonly ProjectionDefinitionBuilder Projection = Builders.Projection; + protected static readonly ReplaceOptions UpsertReplace = new ReplaceOptions { IsUpsert = true }; + protected static readonly SortDefinitionBuilder Sort = Builders.Sort; + protected static readonly UpdateDefinitionBuilder Update = Builders.Update; + protected static readonly UpdateOptions Upsert = new UpdateOptions { IsUpsert = true }; private readonly IMongoDatabase mongoDatabase; private IMongoCollection mongoCollection; diff --git a/backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs b/backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs index 138bc304e..a8dd81155 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs @@ -6,6 +6,7 @@ // ========================================================================== using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -17,7 +18,7 @@ using Squidex.Log; namespace Squidex.Infrastructure.States { - public class MongoSnapshotStore : MongoRepositoryBase>, ISnapshotStore where TKey : notnull + public class MongoSnapshotStore : MongoRepositoryBase>, ISnapshotStore { public MongoSnapshotStore(IMongoDatabase database, JsonSerializer jsonSerializer) : base(database, Register(jsonSerializer)) @@ -42,9 +43,9 @@ namespace Squidex.Infrastructure.States return $"States_{name}"; } - public async Task<(T Value, long Version)> ReadAsync(TKey key) + public async Task<(T Value, long Version)> ReadAsync(DomainId key) { - using (Profiler.TraceMethod>()) + using (Profiler.TraceMethod>()) { var existing = await Collection.Find(x => x.DocumentId.Equals(key)) @@ -59,25 +60,45 @@ namespace Squidex.Infrastructure.States } } - public async Task WriteAsync(TKey key, T value, long oldVersion, long newVersion) + public async Task WriteAsync(DomainId key, T value, long oldVersion, long newVersion) { - using (Profiler.TraceMethod>()) + using (Profiler.TraceMethod>()) { await Collection.UpsertVersionedAsync(key, oldVersion, newVersion, u => u.Set(x => x.Doc, value)); } } + public Task WriteManyAsync(IEnumerable<(DomainId Key, T Value, long Version)> snapshots) + { + using (Profiler.TraceMethod>()) + { + var writes = snapshots.Select(x => new InsertOneModel>(new MongoState + { + Doc = x.Value, + DocumentId = x.Key, + Version = x.Version + })).ToList(); + + if (writes.Count == 0) + { + return Task.CompletedTask; + } + + return Collection.BulkWriteAsync(writes, BulkUnordered); + } + } + public async Task ReadAllAsync(Func callback, CancellationToken ct = default) { - using (Profiler.TraceMethod>()) + using (Profiler.TraceMethod>()) { await Collection.Find(new BsonDocument(), options: Batching.Options).ForEachPipedAsync(x => callback(x.Doc, x.Version), ct); } } - public async Task RemoveAsync(TKey key) + public async Task RemoveAsync(DomainId key) { - using (Profiler.TraceMethod>()) + using (Profiler.TraceMethod>()) { await Collection.DeleteOneAsync(x => x.DocumentId.Equals(key)); } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/States/MongoState.cs b/backend/src/Squidex.Infrastructure.MongoDb/States/MongoState.cs index dfc1687ac..77c7ddddc 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/States/MongoState.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/States/MongoState.cs @@ -12,12 +12,12 @@ using Squidex.Infrastructure.MongoDb; namespace Squidex.Infrastructure.States { [BsonIgnoreExtraElements] - public sealed class MongoState : IVersionedEntity + public sealed class MongoState : IVersionedEntity { [BsonId] [BsonElement] [BsonRepresentation(BsonType.String)] - public TKey DocumentId { get; set; } + public DomainId DocumentId { get; set; } [BsonRequired] [BsonElement] diff --git a/backend/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs b/backend/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs index 1a91d8a4e..58d95c5f1 100644 --- a/backend/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs +++ b/backend/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs @@ -88,6 +88,11 @@ namespace Squidex.Infrastructure.CQRS.Events public Task On(Envelope @event) { + if (@event.Headers.Restored()) + { + return Task.CompletedTask; + } + var jsonString = jsonSerializer.Serialize(@event); var jsonBytes = Encoding.UTF8.GetBytes(jsonString); diff --git a/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs b/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs index ea23cbb0c..0d5a9c0df 100644 --- a/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs +++ b/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs @@ -18,7 +18,7 @@ namespace Squidex.Infrastructure.Commands { private readonly List> uncomittedEvents = new List>(); private readonly SnapshotList snapshots = new SnapshotList(); - private readonly IStore store; + private readonly IPersistenceFactory factory; private readonly ISemanticLog log; private IPersistence? persistence; private bool isLoaded; @@ -45,12 +45,12 @@ namespace Squidex.Infrastructure.Commands set => snapshots.Capacity = value; } - protected DomainObject(IStore store, ISemanticLog log) + protected DomainObject(IPersistenceFactory factory, ISemanticLog log) { - Guard.NotNull(store, nameof(store)); + Guard.NotNull(factory, nameof(factory)); Guard.NotNull(log, nameof(log)); - this.store = store; + this.factory = factory; this.log = log; } @@ -68,7 +68,7 @@ namespace Squidex.Infrastructure.Commands snapshots.Add(snapshot, snapshot.Version, false); - var allEvents = store.WithEventSourcing(GetType(), UniqueId, @event => + var allEvents = factory.WithEventSourcing(GetType(), UniqueId, @event => { var newVersion = snapshot.Version + 1; @@ -97,7 +97,7 @@ namespace Squidex.Infrastructure.Commands { this.uniqueId = uniqueId; - persistence = store.WithSnapshotsAndEventSourcing(GetType(), UniqueId, + persistence = factory.WithSnapshotsAndEventSourcing(GetType(), UniqueId, new HandleSnapshot(snapshot => { snapshot.Version = Version + 1; @@ -175,7 +175,7 @@ namespace Squidex.Infrastructure.Commands if (events != null) { var deletedId = DomainId.Combine(UniqueId, DomainId.Create("deleted")); - var deletedStream = store.WithEventSourcing(GetType(), deletedId, null); + var deletedStream = factory.WithEventSourcing(GetType(), deletedId, null); await deletedStream.WriteEventsAsync(events); diff --git a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs index a68491e1d..2889eb73b 100644 --- a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs +++ b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs @@ -10,10 +10,13 @@ using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; +using Microsoft.Extensions.DependencyInjection; using Squidex.Caching; using Squidex.Infrastructure.EventSourcing; using Squidex.Infrastructure.States; +#pragma warning disable RECS0108 // Warns about static fields in generic types + namespace Squidex.Infrastructure.Commands { public delegate Task IdSource(Func add); @@ -21,31 +24,40 @@ namespace Squidex.Infrastructure.Commands public class Rebuilder { private readonly ILocalCache localCache; - private readonly IStore store; private readonly IEventStore eventStore; private readonly IServiceProvider serviceProvider; + private static class Factory where T : DomainObject where TState : class, IDomainState, new() + { + private static readonly ObjectFactory ObjectFactory = ActivatorUtilities.CreateFactory(typeof(T), new[] { typeof(IPersistenceFactory) }); + + public static T Create(IServiceProvider serviceProvider, IPersistenceFactory persistenceFactory) + { + return (T)ObjectFactory(serviceProvider, new object[] { persistenceFactory }); + } + } + public Rebuilder( ILocalCache localCache, - IStore store, IEventStore eventStore, IServiceProvider serviceProvider) { Guard.NotNull(localCache, nameof(localCache)); - Guard.NotNull(store, nameof(store)); + Guard.NotNull(serviceProvider, nameof(serviceProvider)); Guard.NotNull(eventStore, nameof(eventStore)); this.eventStore = eventStore; this.serviceProvider = serviceProvider; this.localCache = localCache; - this.store = store; } public virtual async Task RebuildAsync(string filter, CancellationToken ct) where T : DomainObject where TState : class, IDomainState, new() { - await store.GetSnapshotStore().ClearAsync(); + var store = serviceProvider.GetRequiredService>(); + + await store.ClearSnapshotsAsync(); - await InsertManyAsync(async target => + await InsertManyAsync(store, async target => { await eventStore.QueryAsync(async storedEvent => { @@ -60,7 +72,9 @@ namespace Squidex.Infrastructure.Commands { Guard.NotNull(source, nameof(source)); - await InsertManyAsync(async target => + var store = serviceProvider.GetRequiredService>(); + + await InsertManyAsync(store, async target => { foreach (var id in source) { @@ -69,26 +83,50 @@ namespace Squidex.Infrastructure.Commands }, ct); } - private async Task InsertManyAsync(IdSource source, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() + private async Task InsertManyAsync(IStore store, IdSource source, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() { - var worker = new ActionBlock(async id => - { - try - { - var domainObject = (T)serviceProvider.GetService(typeof(T))!; + var parallelism = Environment.ProcessorCount * 2; - domainObject.Setup(id); + const int BatchSize = 100; - await domainObject.RebuildStateAsync(); - } - catch (DomainObjectNotFoundException) + var workerBlock = new ActionBlock(async ids => + { + await using (var context = store.WithBatchContext(typeof(T))) { - return; + await context.LoadAsync(ids); + + foreach (var id in ids) + { + try + { + var domainObject = Factory.Create(serviceProvider, context); + + domainObject.Setup(id); + + await domainObject.RebuildStateAsync(); + } + catch (DomainObjectNotFoundException) + { + return; + } + } } }, new ExecutionDataflowBlockOptions { - MaxDegreeOfParallelism = Environment.ProcessorCount * 4 + MaxDegreeOfParallelism = parallelism, + MaxMessagesPerTask = DataflowBlockOptions.Unbounded, + BoundedCapacity = parallelism * 2 + }); + + var batchBlock = new BatchBlock(BatchSize, new GroupingDataflowBlockOptions + { + BoundedCapacity = BatchSize + }); + + batchBlock.LinkTo(workerBlock, new DataflowLinkOptions + { + PropagateCompletion = true }); var handledIds = new HashSet(); @@ -99,13 +137,13 @@ namespace Squidex.Infrastructure.Commands { if (handledIds.Add(id)) { - await worker.SendAsync(id, ct); + await batchBlock.SendAsync(id, ct); } }); - worker.Complete(); + batchBlock.Complete(); - await worker.Completion; + await workerBlock.Completion; } } } diff --git a/backend/src/Squidex.Infrastructure/DomainId.cs b/backend/src/Squidex.Infrastructure/DomainId.cs index d06a5974f..ca571a2cf 100644 --- a/backend/src/Squidex.Infrastructure/DomainId.cs +++ b/backend/src/Squidex.Infrastructure/DomainId.cs @@ -15,6 +15,7 @@ namespace Squidex.Infrastructure { private static readonly string EmptyString = Guid.Empty.ToString(); public static readonly DomainId Empty = default; + public static readonly string IdSeparator = "--"; private readonly string? id; @@ -95,12 +96,12 @@ namespace Squidex.Infrastructure public static DomainId Combine(NamedId id1, DomainId id2) { - return new DomainId($"{id1.Id}--{id2}"); + return new DomainId($"{id1.Id}{IdSeparator}{id2}"); } public static DomainId Combine(DomainId id1, DomainId id2) { - return new DomainId($"{id1}--{id2}"); + return new DomainId($"{id1}{IdSeparator}{id2}"); } } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/CommonHeaders.cs b/backend/src/Squidex.Infrastructure/EventSourcing/CommonHeaders.cs index b7d62c1d0..cf8747db7 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/CommonHeaders.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/CommonHeaders.cs @@ -9,16 +9,18 @@ namespace Squidex.Infrastructure.EventSourcing { public static class CommonHeaders { - public static readonly string AggregateId = "AggregateId"; + public static readonly string AggregateId = nameof(AggregateId); - public static readonly string CommitId = "CommitId"; + public static readonly string CommitId = nameof(CommitId); - public static readonly string EventId = "EventId"; + public static readonly string EventId = nameof(EventId); - public static readonly string EventNumber = "EventNumber"; + public static readonly string EventNumber = nameof(EventNumber); - public static readonly string EventStreamNumber = "EventStreamNumber"; + public static readonly string EventStreamNumber = nameof(EventStreamNumber); - public static readonly string Timestamp = "Timestamp"; + public static readonly string Restored = nameof(Restored); + + public static readonly string Timestamp = nameof(Timestamp); } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/EnvelopeExtensions.cs b/backend/src/Squidex.Infrastructure/EventSourcing/EnvelopeExtensions.cs index 2944dafa6..76afda61c 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/EnvelopeExtensions.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/EnvelopeExtensions.cs @@ -87,6 +87,18 @@ namespace Squidex.Infrastructure.EventSourcing return envelope; } + public static bool Restored(this EnvelopeHeaders headers) + { + return headers.GetBoolean(CommonHeaders.Restored); + } + + public static Envelope SetRestored(this Envelope envelope, bool value = true) where T : class, IEvent + { + envelope.Headers.Add(CommonHeaders.Restored, value); + + return envelope; + } + public static long GetLong(this JsonObject obj, string key) { if (obj.TryGetValue(key, out var v)) @@ -106,12 +118,9 @@ namespace Squidex.Infrastructure.EventSourcing public static Guid GetGuid(this JsonObject obj, string key) { - if (obj.TryGetValue(key, out var v)) + if (obj.TryGetValue(key, out var v) && Guid.TryParse(v.ToString(), out var guid)) { - if (v.Type == JsonValueType.String && Guid.TryParse(v.ToString(), out var guid)) - { - return guid; - } + return guid; } return default; @@ -119,12 +128,9 @@ namespace Squidex.Infrastructure.EventSourcing public static Instant GetInstant(this JsonObject obj, string key) { - if (obj.TryGetValue(key, out var v)) + if (obj.TryGetValue(key, out var v) && InstantPattern.ExtendedIso.Parse(v.ToString()).TryGetValue(default, out var instant)) { - if (v.Type == JsonValueType.String && InstantPattern.ExtendedIso.Parse(v.ToString()).TryGetValue(default, out var instant)) - { - return instant; - } + return instant; } return default; @@ -139,5 +145,15 @@ namespace Squidex.Infrastructure.EventSourcing return string.Empty; } + + public static bool GetBoolean(this JsonObject obj, string key) + { + if (obj.TryGetValue(key, out var v)) + { + return v.Value; + } + + return false; + } } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs index 174d5d237..69cd9d1b1 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs @@ -72,7 +72,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { BoundedCapacity = batchSize, MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1 + MaxMessagesPerTask = DataflowBlockOptions.Unbounded }); var buffer = AsyncHelper.CreateBatchBlock(batchSize, batchDelay, new GroupingDataflowBlockOptions @@ -105,7 +105,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { BoundedCapacity = 2, MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1, + MaxMessagesPerTask = DataflowBlockOptions.Unbounded, TaskScheduler = scheduler }); diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs index bd9961315..b8af5b5e7 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs @@ -35,5 +35,17 @@ namespace Squidex.Infrastructure.EventSourcing await AppendAsync(commit.Id, commit.StreamName, commit.Offset, commit.Events); } } + + async Task>> QueryManyAsync(IEnumerable streamNames) + { + var result = new Dictionary>(); + + foreach (var streamName in streamNames) + { + result[streamName] = await QueryAsync(streamName); + } + + return result; + } } } diff --git a/backend/src/Squidex.Infrastructure/Json/Newtonsoft/JsonValueConverter.cs b/backend/src/Squidex.Infrastructure/Json/Newtonsoft/JsonValueConverter.cs index d65f2805e..83f8f1e9c 100644 --- a/backend/src/Squidex.Infrastructure/Json/Newtonsoft/JsonValueConverter.cs +++ b/backend/src/Squidex.Infrastructure/Json/Newtonsoft/JsonValueConverter.cs @@ -46,7 +46,7 @@ namespace Squidex.Infrastructure.Json.Newtonsoft break; case JsonToken.StartObject: { - var result = JsonValue.Object(); + var result = new JsonObject(1); while (reader.Read()) { @@ -74,7 +74,7 @@ namespace Squidex.Infrastructure.Json.Newtonsoft case JsonToken.StartArray: { - var result = JsonValue.Array(); + var result = new JsonArray(1); while (reader.Read()) { diff --git a/backend/src/Squidex.Infrastructure/Json/Objects/JsonObject.cs b/backend/src/Squidex.Infrastructure/Json/Objects/JsonObject.cs index e476293e7..aa7922c3f 100644 --- a/backend/src/Squidex.Infrastructure/Json/Objects/JsonObject.cs +++ b/backend/src/Squidex.Infrastructure/Json/Objects/JsonObject.cs @@ -56,6 +56,11 @@ namespace Squidex.Infrastructure.Json.Objects inner = new Dictionary(); } + public JsonObject(int capacity) + { + inner = new Dictionary(capacity); + } + public JsonObject(JsonObject obj) { inner = new Dictionary(obj.inner); diff --git a/backend/src/Squidex.Infrastructure/Orleans/GrainState.cs b/backend/src/Squidex.Infrastructure/Orleans/GrainState.cs index 54f2d9a95..6c55c36b4 100644 --- a/backend/src/Squidex.Infrastructure/Orleans/GrainState.cs +++ b/backend/src/Squidex.Infrastructure/Orleans/GrainState.cs @@ -44,19 +44,21 @@ namespace Squidex.Infrastructure.Orleans return Task.CompletedTask; } + DomainId key; + if (context.GrainIdentity.PrimaryKeyString != null) { - var store = context.ActivationServices.GetRequiredService>(); - - persistence = store.WithSnapshots(GetType(), context.GrainIdentity.PrimaryKeyString, ApplyState); + key = DomainId.Create(context.GrainIdentity.PrimaryKeyString); } else { - var store = context.ActivationServices.GetRequiredService>(); - - persistence = store.WithSnapshots(GetType(), context.GrainIdentity.PrimaryKey, ApplyState); + key = DomainId.Create(context.GrainIdentity.PrimaryKey); } + var factory = context.ActivationServices.GetRequiredService>(); + + persistence = factory.WithSnapshots(GetType(), key, ApplyState); + return persistence.ReadAsync(); } diff --git a/backend/src/Squidex.Infrastructure/States/BatchContext.cs b/backend/src/Squidex.Infrastructure/States/BatchContext.cs new file mode 100644 index 000000000..3dd9acd84 --- /dev/null +++ b/backend/src/Squidex.Infrastructure/States/BatchContext.cs @@ -0,0 +1,112 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Squidex.Infrastructure.EventSourcing; + +#pragma warning disable RECS0108 // Warns about static fields in generic types + +namespace Squidex.Infrastructure.States +{ + public sealed class BatchContext : IBatchContext + { + private static readonly List> EmptyStream = new List>(); + private readonly Type owner; + private readonly ISnapshotStore snapshotStore; + private readonly IEventStore eventStore; + private readonly IEventDataFormatter eventDataFormatter; + private readonly IStreamNameResolver streamNameResolver; + private readonly Dictionary>)> @events = new Dictionary>)>(); + private List<(DomainId Key, T Snapshot, long Version)>? snapshots; + + internal BatchContext( + Type owner, + ISnapshotStore snapshotStore, + IEventStore eventStore, + IEventDataFormatter eventDataFormatter, + IStreamNameResolver streamNameResolver) + { + this.owner = owner; + this.snapshotStore = snapshotStore; + this.eventStore = eventStore; + this.eventDataFormatter = eventDataFormatter; + this.streamNameResolver = streamNameResolver; + } + + internal void Add(DomainId key, T snapshot, long version) + { + snapshots ??= new List<(DomainId Key, T Snapshot, long Version)>(); + snapshots.Add((key, snapshot, version)); + } + + public async Task LoadAsync(IEnumerable ids) + { + var streamNames = ids.ToDictionary(x => x, x => streamNameResolver.GetStreamName(owner, x.ToString())); + + if (streamNames.Count == 0) + { + return; + } + + var streams = await eventStore.QueryManyAsync(streamNames.Values); + + foreach (var (id, streamName) in streamNames) + { + if (streams.TryGetValue(streamName, out var data)) + { + var stream = data.Select(eventDataFormatter.ParseIfKnown).NotNull().ToList(); + + events[id] = (data.Count - 1, stream); + } + else + { + events[id] = (EtagVersion.Empty, EmptyStream); + } + } + } + + public async ValueTask DisposeAsync() + { + await CommitAsync(); + } + + public Task CommitAsync() + { + var current = Interlocked.Exchange(ref snapshots, null!); + + if (current == null || current.Count == 0) + { + return Task.CompletedTask; + } + + return snapshotStore.WriteManyAsync(current); + } + + public IPersistence WithEventSourcing(Type owner, DomainId key, HandleEvent? applyEvent) + { + var (version, streamEvents) = events[key]; + + return new BatchPersistence(key, this, version, streamEvents, applyEvent); + } + + public IPersistence WithSnapshotsAndEventSourcing(Type owner, DomainId key, HandleSnapshot? applySnapshot, HandleEvent? applyEvent) + { + var (version, streamEvents) = events[key]; + + return new BatchPersistence(key, this, version, streamEvents, applyEvent); + } + + public IPersistence WithSnapshots(Type owner, DomainId key, HandleSnapshot? applySnapshot) + { + throw new NotSupportedException(); + } + } +} diff --git a/backend/src/Squidex.Infrastructure/States/BatchPersistence.cs b/backend/src/Squidex.Infrastructure/States/BatchPersistence.cs new file mode 100644 index 000000000..cc2c1e0e8 --- /dev/null +++ b/backend/src/Squidex.Infrastructure/States/BatchPersistence.cs @@ -0,0 +1,80 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Squidex.Infrastructure.EventSourcing; + +namespace Squidex.Infrastructure.States +{ + internal class BatchPersistence : IPersistence + { + private readonly DomainId ownerKey; + private readonly BatchContext context; + private readonly IReadOnlyList> events; + private readonly HandleEvent? applyEvent; + + public long Version { get; } + + internal BatchPersistence(DomainId ownerKey, BatchContext context, long version, IReadOnlyList> @events, + HandleEvent? applyEvent) + { + this.ownerKey = ownerKey; + this.context = context; + this.events = events; + this.applyEvent = applyEvent; + + Version = version; + } + + public Task DeleteAsync() + { + throw new NotSupportedException(); + } + + public Task WriteEventsAsync(IReadOnlyList> events) + { + throw new NotSupportedException(); + } + + public Task ReadAsync(long expectedVersion = -2) + { + if (applyEvent != null) + { + foreach (var @event in events) + { + if (!applyEvent(@event)) + { + break; + } + } + } + + if (expectedVersion > EtagVersion.Any && expectedVersion != Version) + { + if (Version == EtagVersion.Empty) + { + throw new DomainObjectNotFoundException(ownerKey.ToString()!); + } + else + { + throw new InconsistentStateException(Version, expectedVersion); + } + } + + return Task.CompletedTask; + } + + public Task WriteSnapshotAsync(T state) + { + context.Add(ownerKey, state, Version); + + return Task.CompletedTask; + } + } +} diff --git a/backend/src/Squidex.Infrastructure/States/IPersistence{TState}.cs b/backend/src/Squidex.Infrastructure/States/IBatchContext.cs similarity index 59% rename from backend/src/Squidex.Infrastructure/States/IPersistence{TState}.cs rename to backend/src/Squidex.Infrastructure/States/IBatchContext.cs index c21e7c2f7..0ab7e4bf0 100644 --- a/backend/src/Squidex.Infrastructure/States/IPersistence{TState}.cs +++ b/backend/src/Squidex.Infrastructure/States/IBatchContext.cs @@ -5,22 +5,16 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; using System.Collections.Generic; using System.Threading.Tasks; -using Squidex.Infrastructure.EventSourcing; namespace Squidex.Infrastructure.States { - public interface IPersistence + public interface IBatchContext : IAsyncDisposable, IPersistenceFactory { - long Version { get; } + Task CommitAsync(); - Task DeleteAsync(); - - Task WriteEventsAsync(IReadOnlyList> events); - - Task WriteSnapshotAsync(TState state); - - Task ReadAsync(long expectedVersion = EtagVersion.Any); + Task LoadAsync(IEnumerable ids); } -} +} \ No newline at end of file diff --git a/backend/src/Squidex.Infrastructure/States/IPersistence.cs b/backend/src/Squidex.Infrastructure/States/IPersistence.cs index e07b75b38..c21e7c2f7 100644 --- a/backend/src/Squidex.Infrastructure/States/IPersistence.cs +++ b/backend/src/Squidex.Infrastructure/States/IPersistence.cs @@ -1,13 +1,26 @@ // ========================================================================== // Squidex Headless CMS // ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) +// Copyright (c) Squidex UG (haftungsbeschraenkt) // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System.Collections.Generic; +using System.Threading.Tasks; +using Squidex.Infrastructure.EventSourcing; + namespace Squidex.Infrastructure.States { - public interface IPersistence : IPersistence + public interface IPersistence { + long Version { get; } + + Task DeleteAsync(); + + Task WriteEventsAsync(IReadOnlyList> events); + + Task WriteSnapshotAsync(TState state); + + Task ReadAsync(long expectedVersion = EtagVersion.Any); } } diff --git a/backend/src/Squidex.Infrastructure/States/IPersistenceFactory.cs b/backend/src/Squidex.Infrastructure/States/IPersistenceFactory.cs new file mode 100644 index 000000000..8bd595603 --- /dev/null +++ b/backend/src/Squidex.Infrastructure/States/IPersistenceFactory.cs @@ -0,0 +1,25 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using Squidex.Infrastructure.EventSourcing; + +namespace Squidex.Infrastructure.States +{ + public delegate bool HandleEvent(Envelope @event); + + public delegate void HandleSnapshot(T state); + + public interface IPersistenceFactory + { + IPersistence WithEventSourcing(Type owner, DomainId id, HandleEvent? applyEvent); + + IPersistence WithSnapshots(Type owner, DomainId id, HandleSnapshot? applySnapshot); + + IPersistence WithSnapshotsAndEventSourcing(Type owner, DomainId id, HandleSnapshot? applySnapshot, HandleEvent? applyEvent); + } +} \ No newline at end of file diff --git a/backend/src/Squidex.Infrastructure/States/ISnapshotStore.cs b/backend/src/Squidex.Infrastructure/States/ISnapshotStore.cs index 68243db74..ffc4f1b26 100644 --- a/backend/src/Squidex.Infrastructure/States/ISnapshotStore.cs +++ b/backend/src/Squidex.Infrastructure/States/ISnapshotStore.cs @@ -6,20 +6,23 @@ // ========================================================================== using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace Squidex.Infrastructure.States { - public interface ISnapshotStore + public interface ISnapshotStore { - Task WriteAsync(TKey key, T value, long oldVersion, long newVersion); + Task WriteAsync(DomainId key, T value, long oldVersion, long newVersion); - Task<(T Value, long Version)> ReadAsync(TKey key); + Task WriteManyAsync(IEnumerable<(DomainId Key, T Value, long Version)> snapshots); + + Task<(T Value, long Version)> ReadAsync(DomainId key); Task ClearAsync(); - Task RemoveAsync(TKey key); + Task RemoveAsync(DomainId key); Task ReadAllAsync(Func callback, CancellationToken ct = default); } diff --git a/backend/src/Squidex.Infrastructure/States/IStore.cs b/backend/src/Squidex.Infrastructure/States/IStore.cs index f659c392e..09d575d02 100644 --- a/backend/src/Squidex.Infrastructure/States/IStore.cs +++ b/backend/src/Squidex.Infrastructure/States/IStore.cs @@ -6,22 +6,14 @@ // ========================================================================== using System; -using Squidex.Infrastructure.EventSourcing; +using System.Threading.Tasks; namespace Squidex.Infrastructure.States { - public delegate bool HandleEvent(Envelope @event); - - public delegate void HandleSnapshot(T state); - - public interface IStore + public interface IStore : IPersistenceFactory { - IPersistence WithEventSourcing(Type owner, TKey key, HandleEvent? applyEvent); - - IPersistence WithSnapshots(Type owner, TKey key, HandleSnapshot? applySnapshot); - - IPersistence WithSnapshotsAndEventSourcing(Type owner, TKey key, HandleSnapshot? applySnapshot, HandleEvent? applyEvent); + IBatchContext WithBatchContext(Type owner); - ISnapshotStore GetSnapshotStore(); + Task ClearSnapshotsAsync(); } } diff --git a/backend/src/Squidex.Infrastructure/States/Persistence.cs b/backend/src/Squidex.Infrastructure/States/Persistence.cs index 3b974de49..4212864a8 100644 --- a/backend/src/Squidex.Infrastructure/States/Persistence.cs +++ b/backend/src/Squidex.Infrastructure/States/Persistence.cs @@ -6,21 +6,216 @@ // ========================================================================== using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; using Squidex.Infrastructure.EventSourcing; +#pragma warning disable RECS0012 // 'if' statement can be re-written as 'switch' statement + namespace Squidex.Infrastructure.States { - internal sealed class Persistence : Persistence, IPersistence where TKey : notnull + internal class Persistence : IPersistence { - public Persistence(TKey ownerKey, Type ownerType, + private readonly DomainId ownerKey; + private readonly ISnapshotStore snapshotStore; + private readonly IEventStore eventStore; + private readonly IEventDataFormatter eventDataFormatter; + private readonly PersistenceMode persistenceMode; + private readonly HandleSnapshot? applyState; + private readonly HandleEvent? applyEvent; + private readonly Lazy streamName; + private long versionSnapshot = EtagVersion.Empty; + private long versionEvents = EtagVersion.Empty; + private long version = EtagVersion.Empty; + + public long Version + { + get => version; + } + + private bool UseSnapshots + { + get => (persistenceMode & PersistenceMode.Snapshots) == PersistenceMode.Snapshots; + } + + private bool UseEventSourcing + { + get => (persistenceMode & PersistenceMode.EventSourcing) == PersistenceMode.EventSourcing; + } + + public Persistence(DomainId ownerKey, Type ownerType, + ISnapshotStore snapshotStore, IEventStore eventStore, IEventDataFormatter eventDataFormatter, - ISnapshotStore snapshotStore, IStreamNameResolver streamNameResolver, + PersistenceMode persistenceMode, + HandleSnapshot? applyState, HandleEvent? applyEvent) - : base(ownerKey, ownerType, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, - PersistenceMode.EventSourcing, null, applyEvent) { + this.ownerKey = ownerKey; + this.applyState = applyState; + this.applyEvent = applyEvent; + this.eventStore = eventStore; + this.eventDataFormatter = eventDataFormatter; + this.persistenceMode = persistenceMode; + this.snapshotStore = snapshotStore; + + streamName = new Lazy(() => streamNameResolver.GetStreamName(ownerType, ownerKey.ToString()!)); + } + + public async Task DeleteAsync() + { + if (UseSnapshots) + { + await snapshotStore.RemoveAsync(ownerKey); + } + + if (UseEventSourcing) + { + await eventStore.DeleteStreamAsync(streamName.Value); + } + } + + public async Task ReadAsync(long expectedVersion = EtagVersion.Any) + { + versionSnapshot = EtagVersion.Empty; + versionEvents = EtagVersion.Empty; + + if (UseSnapshots) + { + await ReadSnapshotAsync(); + } + + if (UseEventSourcing) + { + await ReadEventsAsync(); + } + + UpdateVersion(); + + if (expectedVersion > EtagVersion.Any && expectedVersion != version) + { + if (version == EtagVersion.Empty) + { + throw new DomainObjectNotFoundException(ownerKey.ToString()!); + } + else + { + throw new InconsistentStateException(version, expectedVersion); + } + } + } + + private async Task ReadSnapshotAsync() + { + var (state, position) = await snapshotStore.ReadAsync(ownerKey); + + // Treat all negative values as not-found (empty). + position = Math.Max(position, EtagVersion.Empty); + + versionSnapshot = position; + versionEvents = position; + + if (applyState != null && position >= 0) + { + applyState(state); + } + } + + private async Task ReadEventsAsync() + { + var events = await eventStore.QueryAsync(streamName.Value, versionEvents + 1); + + var isStopped = false; + + foreach (var @event in events) + { + var newVersion = versionEvents + 1; + + if (@event.EventStreamNumber != newVersion) + { + throw new InvalidOperationException("Events must follow the snapshot version in consecutive order with no gaps."); + } + + // Skip the parsing for performance reasons if we are not interested, but continue reading to get the version + if (!isStopped) + { + var parsedEvent = eventDataFormatter.ParseIfKnown(@event); + + if (applyEvent != null && parsedEvent != null) + { + isStopped = !applyEvent(parsedEvent); + } + } + + versionEvents++; + } + } + + public async Task WriteSnapshotAsync(T state) + { + var oldVersion = versionSnapshot; + + if (oldVersion == EtagVersion.Empty && UseEventSourcing) + { + oldVersion = (versionEvents - 1); + } + + var newVersion = UseEventSourcing ? versionEvents : oldVersion + 1; + + if (newVersion == versionSnapshot) + { + return; + } + + await snapshotStore.WriteAsync(ownerKey, state, oldVersion, newVersion); + + versionSnapshot = newVersion; + + UpdateVersion(); + } + + public async Task WriteEventsAsync(IReadOnlyList> events) + { + Guard.NotEmpty(events, nameof(events)); + + var oldVersion = EtagVersion.Any; + + if (UseEventSourcing) + { + oldVersion = versionEvents; + } + + var eventCommitId = Guid.NewGuid(); + var eventData = events.Select(x => eventDataFormatter.ToEventData(x, eventCommitId, true)).ToArray(); + + try + { + await eventStore.AppendAsync(eventCommitId, streamName.Value, oldVersion, eventData); + } + catch (WrongEventVersionException ex) + { + throw new InconsistentStateException(ex.CurrentVersion, ex.ExpectedVersion, ex); + } + + versionEvents += eventData.Length; + } + + private void UpdateVersion() + { + if (persistenceMode == PersistenceMode.Snapshots) + { + version = versionSnapshot; + } + else if (persistenceMode == PersistenceMode.EventSourcing) + { + version = versionEvents; + } + else if (persistenceMode == PersistenceMode.SnapshotsAndEventSourcing) + { + version = Math.Max(versionEvents, versionSnapshot); + } } } } diff --git a/backend/src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs b/backend/src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs deleted file mode 100644 index 7ef7e2452..000000000 --- a/backend/src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs +++ /dev/null @@ -1,221 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschränkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using Squidex.Infrastructure.EventSourcing; - -#pragma warning disable RECS0012 // 'if' statement can be re-written as 'switch' statement - -namespace Squidex.Infrastructure.States -{ - internal class Persistence : IPersistence where TKey : notnull - { - private readonly TKey ownerKey; - private readonly ISnapshotStore snapshotStore; - private readonly IEventStore eventStore; - private readonly IEventDataFormatter eventDataFormatter; - private readonly PersistenceMode persistenceMode; - private readonly HandleSnapshot? applyState; - private readonly HandleEvent? applyEvent; - private readonly Lazy streamName; - private long versionSnapshot = EtagVersion.Empty; - private long versionEvents = EtagVersion.Empty; - private long version = EtagVersion.Empty; - - public long Version - { - get => version; - } - - private bool UseSnapshots - { - get => (persistenceMode & PersistenceMode.Snapshots) == PersistenceMode.Snapshots; - } - - private bool UseEventSourcing - { - get => (persistenceMode & PersistenceMode.EventSourcing) == PersistenceMode.EventSourcing; - } - - public Persistence(TKey ownerKey, Type ownerType, - IEventStore eventStore, - IEventDataFormatter eventDataFormatter, - ISnapshotStore snapshotStore, - IStreamNameResolver streamNameResolver, - PersistenceMode persistenceMode, - HandleSnapshot? applyState, - HandleEvent? applyEvent) - { - this.ownerKey = ownerKey; - this.applyState = applyState; - this.applyEvent = applyEvent; - this.eventStore = eventStore; - this.eventDataFormatter = eventDataFormatter; - this.persistenceMode = persistenceMode; - this.snapshotStore = snapshotStore; - - streamName = new Lazy(() => streamNameResolver.GetStreamName(ownerType, ownerKey.ToString()!)); - } - - public async Task DeleteAsync() - { - if (UseSnapshots) - { - await snapshotStore.RemoveAsync(ownerKey); - } - - if (UseEventSourcing) - { - await eventStore.DeleteStreamAsync(streamName.Value); - } - } - - public async Task ReadAsync(long expectedVersion = EtagVersion.Any) - { - versionSnapshot = EtagVersion.Empty; - versionEvents = EtagVersion.Empty; - - if (UseSnapshots) - { - await ReadSnapshotAsync(); - } - - if (UseEventSourcing) - { - await ReadEventsAsync(); - } - - UpdateVersion(); - - if (expectedVersion > EtagVersion.Any && expectedVersion != version) - { - if (version == EtagVersion.Empty) - { - throw new DomainObjectNotFoundException(ownerKey.ToString()!); - } - else - { - throw new InconsistentStateException(version, expectedVersion); - } - } - } - - private async Task ReadSnapshotAsync() - { - var (state, position) = await snapshotStore.ReadAsync(ownerKey); - - // Treat all negative values as not-found (empty). - position = Math.Max(position, EtagVersion.Empty); - - versionSnapshot = position; - versionEvents = position; - - if (applyState != null && position >= 0) - { - applyState(state); - } - } - - private async Task ReadEventsAsync() - { - var events = await eventStore.QueryAsync(streamName.Value, versionEvents + 1); - - var isStopped = false; - - foreach (var @event in events) - { - var newVersion = versionEvents + 1; - - if (@event.EventStreamNumber != newVersion) - { - throw new InvalidOperationException("Events must follow the snapshot version in consecutive order with no gaps."); - } - - // Skip the parsing for performance reasons if we are not interested, but continue reading to get the version. - if (!isStopped) - { - var parsedEvent = eventDataFormatter.ParseIfKnown(@event); - - if (applyEvent != null && parsedEvent != null) - { - isStopped = !applyEvent(parsedEvent); - } - } - - versionEvents++; - } - } - - public async Task WriteSnapshotAsync(TSnapshot state) - { - var oldVersion = versionSnapshot; - - if (oldVersion == EtagVersion.Empty && UseEventSourcing) - { - oldVersion = (versionEvents - 1); - } - - var newVersion = UseEventSourcing ? versionEvents : oldVersion + 1; - - if (newVersion == versionSnapshot) - { - return; - } - - await snapshotStore.WriteAsync(ownerKey, state, oldVersion, newVersion); - - versionSnapshot = newVersion; - - UpdateVersion(); - } - - public async Task WriteEventsAsync(IReadOnlyList> events) - { - Guard.NotEmpty(events, nameof(events)); - - var oldVersion = EtagVersion.Any; - - if (UseEventSourcing) - { - oldVersion = versionEvents; - } - - var eventCommitId = Guid.NewGuid(); - var eventData = events.Select(x => eventDataFormatter.ToEventData(x, eventCommitId, true)).ToArray(); - - try - { - await eventStore.AppendAsync(eventCommitId, streamName.Value, oldVersion, eventData); - } - catch (WrongEventVersionException ex) - { - throw new InconsistentStateException(ex.CurrentVersion, ex.ExpectedVersion, ex); - } - - versionEvents += eventData.Length; - } - - private void UpdateVersion() - { - if (persistenceMode == PersistenceMode.Snapshots) - { - version = versionSnapshot; - } - else if (persistenceMode == PersistenceMode.EventSourcing) - { - version = versionEvents; - } - else if (persistenceMode == PersistenceMode.SnapshotsAndEventSourcing) - { - version = Math.Max(versionEvents, versionSnapshot); - } - } - } -} diff --git a/backend/src/Squidex.Infrastructure/States/Store.cs b/backend/src/Squidex.Infrastructure/States/Store.cs index 2663bbe77..f19a111fd 100644 --- a/backend/src/Squidex.Infrastructure/States/Store.cs +++ b/backend/src/Squidex.Infrastructure/States/Store.cs @@ -6,73 +6,76 @@ // ========================================================================== using System; +using System.Threading.Tasks; using Squidex.Infrastructure.EventSourcing; namespace Squidex.Infrastructure.States { - public sealed class Store : IStore where TKey : notnull + public sealed class Store : IStore { - private readonly IServiceProvider services; private readonly IStreamNameResolver streamNameResolver; + private readonly ISnapshotStore snapshotStore; private readonly IEventStore eventStore; private readonly IEventDataFormatter eventDataFormatter; public Store( + ISnapshotStore snapshotStore, IEventStore eventStore, IEventDataFormatter eventDataFormatter, - IServiceProvider services, IStreamNameResolver streamNameResolver) { + Guard.NotNull(snapshotStore, nameof(snapshotStore)); Guard.NotNull(eventStore, nameof(eventStore)); Guard.NotNull(eventDataFormatter, nameof(eventDataFormatter)); - Guard.NotNull(services, nameof(services)); Guard.NotNull(streamNameResolver, nameof(streamNameResolver)); + this.snapshotStore = snapshotStore; this.eventStore = eventStore; this.eventDataFormatter = eventDataFormatter; - this.services = services; this.streamNameResolver = streamNameResolver; } - public IPersistence WithEventSourcing(Type owner, TKey key, HandleEvent? applyEvent) + public Task ClearSnapshotsAsync() { - return CreatePersistence(owner, key, applyEvent); + return snapshotStore.ClearAsync(); } - public IPersistence WithSnapshots(Type owner, TKey key, HandleSnapshot? applySnapshot) + public IBatchContext WithBatchContext(Type owner) { - return CreatePersistence(owner, key, PersistenceMode.Snapshots, applySnapshot, null); + return new BatchContext(owner, + snapshotStore, + eventStore, + eventDataFormatter, + streamNameResolver); } - public IPersistence WithSnapshotsAndEventSourcing(Type owner, TKey key, HandleSnapshot? applySnapshot, HandleEvent? applyEvent) + public IPersistence WithEventSourcing(Type owner, DomainId key, HandleEvent? applyEvent) { - return CreatePersistence(owner, key, PersistenceMode.SnapshotsAndEventSourcing, - applySnapshot, applyEvent); + return CreatePersistence(owner, key, PersistenceMode.EventSourcing, null, applyEvent); } - private IPersistence CreatePersistence(Type owner, TKey key, HandleEvent? applyEvent) + public IPersistence WithSnapshots(Type owner, DomainId key, HandleSnapshot? applySnapshot) { - Guard.NotNull(key, nameof(key)); - - var snapshotStore = GetSnapshotStore(); - - return new Persistence(key, owner, eventStore, eventDataFormatter, - snapshotStore, streamNameResolver, applyEvent); + return CreatePersistence(owner, key, PersistenceMode.Snapshots, applySnapshot, null); } - private IPersistence CreatePersistence(Type owner, TKey key, PersistenceMode mode, HandleSnapshot? applySnapshot, HandleEvent? applyEvent) + public IPersistence WithSnapshotsAndEventSourcing(Type owner, DomainId key, HandleSnapshot? applySnapshot, HandleEvent? applyEvent) { - Guard.NotNull(key, nameof(key)); - - var snapshotStore = GetSnapshotStore(); - - return new Persistence(key, owner, eventStore, eventDataFormatter, - snapshotStore, streamNameResolver, mode, applySnapshot, applyEvent); + return CreatePersistence(owner, key, PersistenceMode.SnapshotsAndEventSourcing, applySnapshot, applyEvent); } - public ISnapshotStore GetSnapshotStore() + private IPersistence CreatePersistence(Type owner, DomainId key, PersistenceMode mode, HandleSnapshot? applySnapshot, HandleEvent? applyEvent) { - return (ISnapshotStore)services.GetService(typeof(ISnapshotStore))!; + Guard.NotNull(key, nameof(key)); + + return new Persistence(key, owner, + snapshotStore, + eventStore, + eventDataFormatter, + streamNameResolver, + mode, + applySnapshot, + applyEvent); } } } diff --git a/backend/src/Squidex.Infrastructure/States/StoreExtensions.cs b/backend/src/Squidex.Infrastructure/States/StoreExtensions.cs index 8fd4d1dbd..667098995 100644 --- a/backend/src/Squidex.Infrastructure/States/StoreExtensions.cs +++ b/backend/src/Squidex.Infrastructure/States/StoreExtensions.cs @@ -16,22 +16,5 @@ namespace Squidex.Infrastructure.States { return persistence.WriteEventsAsync(new[] { @event }); } - - public static Task ClearSnapshotsAsync(this IStore store) - { - return store.GetSnapshotStore().ClearAsync(); - } - - public static Task RemoveSnapshotAsync(this IStore store, TKey key) - { - return store.GetSnapshotStore().RemoveAsync(key); - } - - public static async Task GetSnapshotAsync(this IStore store, TKey key) - { - var result = await store.GetSnapshotStore().ReadAsync(key); - - return result.Value; - } } } diff --git a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs index da818d2b6..ee2c921b3 100644 --- a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs +++ b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs @@ -42,7 +42,7 @@ namespace Squidex.Infrastructure.Tasks var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions()); workerOption.MaxDegreeOfParallelism = 1; - workerOption.MaxMessagesPerTask = 1; + workerOption.MaxMessagesPerTask = DataflowBlockOptions.Unbounded; workers[i] = new ActionBlock(action, workerOption); } @@ -50,7 +50,7 @@ namespace Squidex.Infrastructure.Tasks var distributorOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1, + MaxMessagesPerTask = DataflowBlockOptions.Unbounded, BoundedCapacity = 1 }; diff --git a/backend/src/Squidex/Config/Domain/StoreServices.cs b/backend/src/Squidex/Config/Domain/StoreServices.cs index 169ce3e81..502eb73e6 100644 --- a/backend/src/Squidex/Config/Domain/StoreServices.cs +++ b/backend/src/Squidex/Config/Domain/StoreServices.cs @@ -53,7 +53,7 @@ namespace Squidex.Config.Domain var mongoDatabaseName = config.GetRequiredValue("store:mongoDb:database"); var mongoContentDatabaseName = config.GetOptionalValue("store:mongoDb:contentDatabase", mongoDatabaseName); - services.AddSingleton(typeof(ISnapshotStore<,>), typeof(MongoSnapshotStore<,>)); + services.AddSingleton(typeof(ISnapshotStore<>), typeof(MongoSnapshotStore<>)); services.AddSingletonAs(c => GetClient(mongoConfiguration)) .As(); @@ -110,13 +110,13 @@ namespace Squidex.Config.Domain .As>().As(); services.AddSingletonAs() - .As().As>(); + .As().As>(); services.AddSingletonAs() - .As().As>(); + .As().As>(); services.AddSingletonAs(c => ActivatorUtilities.CreateInstance(c, GetDatabase(c, mongoContentDatabaseName), false)) - .As().As>(); + .As().As>(); services.AddSingletonAs() .AsOptional().As(); @@ -138,6 +138,8 @@ namespace Squidex.Config.Domain }); services.AddSingleton(typeof(IStore<>), typeof(Store<>)); + + services.AddSingleton(typeof(IPersistenceFactory<>), typeof(Store<>)); } private static IMongoClient GetClient(string configuration) diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/DomainObject/AppDomainObjectTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/DomainObject/AppDomainObjectTests.cs index 2f31e2916..dd4d67c5d 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/DomainObject/AppDomainObjectTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/DomainObject/AppDomainObjectTests.cs @@ -69,7 +69,7 @@ namespace Squidex.Domain.Apps.Entities.Apps.DomainObject { patternId2, new AppPattern("Numbers", "[0-9]*") } }; - sut = new AppDomainObject(Store, A.Dummy(), initialPatterns, appPlansProvider, appPlansBillingManager, userResolver); + sut = new AppDomainObject(PersistenceFactory, A.Dummy(), initialPatterns, appPlansProvider, appPlansBillingManager, userResolver); sut.Setup(Id); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetPermanentDeleterTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetPermanentDeleterTests.cs index f8cfedfcb..08e7f50e3 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetPermanentDeleterTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetPermanentDeleterTests.cs @@ -54,6 +54,17 @@ namespace Squidex.Domain.Apps.Entities.Assets Assert.Equal(nameof(AssetPermanentDeleter), consumer.Name); } + [Fact] + public async Task Should_not_delete_assets_when_event_restored() + { + var @event = new AssetDeleted { AppId = appId, AssetId = DomainId.NewGuid() }; + + await sut.On(Envelope.Create(@event).SetRestored()); + + A.CallTo(() => assetFiletore.DeleteAsync(appId.Id, @event.AssetId, A._)) + .MustNotHaveHappened(); + } + [Fact] public async Task Should_delete_assets_for_all_versions() { diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetDomainObjectTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetDomainObjectTests.cs index 220db0305..22cc1a1f5 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetDomainObjectTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetDomainObjectTests.cs @@ -46,7 +46,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject A.CallTo(() => tagService.NormalizeTagsAsync(AppId, TagGroups.Assets, A>._, A>._)) .ReturnsLazily(x => Task.FromResult(x.GetArgument>(2)?.ToDictionary(x => x) ?? new Dictionary())); - sut = new AssetDomainObject(Store, A.Dummy(), tagService, assetQuery, contentRepository); + sut = new AssetDomainObject(PersistenceFactory, A.Dummy(), tagService, assetQuery, contentRepository); sut.Setup(Id); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetFolderDomainObjectTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetFolderDomainObjectTests.cs index 894bf56d1..6c10a252a 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetFolderDomainObjectTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/DomainObject/AssetFolderDomainObjectTests.cs @@ -34,7 +34,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject A.CallTo(() => assetQuery.FindAssetFolderAsync(AppId, parentId)) .Returns(new List { A.Fake() }); - sut = new AssetFolderDomainObject(Store, A.Dummy(), assetQuery); + sut = new AssetFolderDomainObject(PersistenceFactory, A.Dummy(), assetQuery); sut.Setup(Id); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RecursiveDeleterTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RecursiveDeleterTests.cs index a59af9d6d..8d003c967 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RecursiveDeleterTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RecursiveDeleterTests.cs @@ -61,6 +61,17 @@ namespace Squidex.Domain.Apps.Entities.Assets Assert.Equal(nameof(RecursiveDeleter), consumer.Name); } + [Fact] + public async Task Should_Not_invoke_delete_commands_when_event_restored() + { + var @event = new AssetFolderDeleted { AppId = appId, AssetFolderId = DomainId.NewGuid() }; + + await sut.On(Envelope.Create(@event).SetRestored()); + + A.CallTo(() => commandBus.PublishAsync(A._)) + .MustNotHaveHappened(); + } + [Fact] public async Task Should_invoke_delete_commands_for_all_subfolders() { diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Backup/BackupCompatibilityTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Backup/BackupCompatibilityTests.cs index fbb728a75..3b22d4c14 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Backup/BackupCompatibilityTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Backup/BackupCompatibilityTests.cs @@ -49,14 +49,14 @@ namespace Squidex.Domain.Apps.Entities.Backup } [Fact] - public async Task Should_throw_exception_if_backup_has_no_version() + public async Task Should_not_throw_exception_if_backup_has_no_version() { var reader = A.Fake(); A.CallTo(() => reader.ReadJsonAsync(A._)) .Throws(new FileNotFoundException()); - await Assert.ThrowsAsync(() => reader.CheckCompatibilityAsync()); + await reader.CheckCompatibilityAsync(); } } } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Backup/StreamMapperTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Backup/StreamMapperTests.cs new file mode 100644 index 000000000..cccc84aa2 --- /dev/null +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Backup/StreamMapperTests.cs @@ -0,0 +1,70 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using FakeItEasy; +using Squidex.Infrastructure; +using Xunit; + +namespace Squidex.Domain.Apps.Entities.Backup +{ + public class StreamMapperTests + { + private readonly DomainId appIdOld = DomainId.NewGuid(); + private readonly DomainId appId = DomainId.NewGuid(); + private readonly StreamMapper sut; + + public StreamMapperTests() + { + sut = new StreamMapper(new RestoreContext(appId, + A.Fake(), + A.Fake(), + appIdOld)); + } + + [Fact] + public void Should_map_old_app_id() + { + var result = sut.Map($"app-{appIdOld}"); + + Assert.Equal(($"app-{appId}", appId), result); + } + + [Fact] + public void Should_map_old_app_broken_id() + { + var result = sut.Map($"app-{appIdOld}--{appIdOld}"); + + Assert.Equal(($"app-{appId}", appId), result); + } + + [Fact] + public void Should_map_non_app_id() + { + var result = sut.Map($"content-{appIdOld}--123"); + + Assert.Equal(($"content-{appId}--123", DomainId.Create($"{appId}--123")), result); + } + + [Fact] + public void Should_map_non_app_id_with_double_slash() + { + var result = sut.Map($"content-{appIdOld}--other--id"); + + Assert.Equal(($"content-{appId}--other--id", DomainId.Create($"{appId}--other--id")), result); + } + + [Fact] + public void Should_map_non_combined_id() + { + var id = DomainId.NewGuid(); + + var result = sut.Map($"content-{id}"); + + Assert.Equal(($"content-{appId}--{id}", DomainId.Create($"{appId}--{id}")), result); + } + } +} diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/DomainObject/ContentDomainObjectTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/DomainObject/ContentDomainObjectTests.cs index afdf60744..d58ff451d 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/DomainObject/ContentDomainObjectTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/DomainObject/ContentDomainObjectTests.cs @@ -5,7 +5,6 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Linq; using System.Threading.Tasks; using FakeItEasy; using Microsoft.Extensions.DependencyInjection; @@ -119,7 +118,7 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject .AddSingleton(new DefaultValidatorsFactory()) .BuildServiceProvider(); - sut = new ContentDomainObject(Store, log, serviceProvider); + sut = new ContentDomainObject(PersistenceFactory, log, serviceProvider); sut.Setup(Id); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/DomainObject/RuleDomainObjectTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/DomainObject/RuleDomainObjectTests.cs index d8f7f82ef..0fcb9690f 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/DomainObject/RuleDomainObjectTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/DomainObject/RuleDomainObjectTests.cs @@ -38,7 +38,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.DomainObject public RuleDomainObjectTests() { - sut = new RuleDomainObject(Store, A.Dummy(), appProvider, ruleEnqueuer); + sut = new RuleDomainObject(PersistenceFactory, A.Dummy(), appProvider, ruleEnqueuer); sut.Setup(Id); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs index 70d127b37..2b94341bd 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs @@ -97,11 +97,36 @@ namespace Squidex.Domain.Apps.Entities.Rules { var @event = Envelope.Create(new ContentCreated { AppId = appId }); - var rule1 = CreateRule(); - var rule2 = CreateRule(); + var job1 = new RuleJob { Created = now }; + + SetupRules(@event, job1); + + await sut.On(@event); + + A.CallTo(() => ruleEventRepository.EnqueueAsync(job1, (Exception?)null)) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_not_eqneue_when_event_restored() + { + var @event = Envelope.Create(new ContentCreated { AppId = appId }); var job1 = new RuleJob { Created = now }; + SetupRules(@event, job1); + + await sut.On(@event.SetRestored(true)); + + A.CallTo(() => ruleEventRepository.EnqueueAsync(A._, A._)) + .MustNotHaveHappened(); + } + + private void SetupRules(Envelope @event, RuleJob job1) + { + var rule1 = CreateRule(); + var rule2 = CreateRule(); + A.CallTo(() => appProvider.GetRulesAsync(appId.Id)) .Returns(new List { rule1, rule2 }); @@ -110,11 +135,6 @@ namespace Squidex.Domain.Apps.Entities.Rules A.CallTo(() => ruleService.CreateJobsAsync(rule2.RuleDef, rule2.Id, @event, true)) .Returns(new List<(RuleJob, Exception?)>()); - - await sut.On(@event); - - A.CallTo(() => ruleEventRepository.EnqueueAsync(job1, (Exception?)null)) - .MustHaveHappened(); } private static RuleEntity CreateRule() diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Schemas/DomainObject/SchemaDomainObjectTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Schemas/DomainObject/SchemaDomainObjectTests.cs index 6cf79ca23..eb37cf5c7 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Schemas/DomainObject/SchemaDomainObjectTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Schemas/DomainObject/SchemaDomainObjectTests.cs @@ -37,7 +37,7 @@ namespace Squidex.Domain.Apps.Entities.Schemas.DomainObject public SchemaDomainObjectTests() { - sut = new SchemaDomainObject(Store, A.Dummy()); + sut = new SchemaDomainObject(PersistenceFactory, A.Dummy()); sut.Setup(Id); } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/TestHelpers/HandlerTestBase.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/TestHelpers/HandlerTestBase.cs index ed609dbfa..8d53a04fa 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/TestHelpers/HandlerTestBase.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/TestHelpers/HandlerTestBase.cs @@ -23,9 +23,8 @@ namespace Squidex.Domain.Apps.Entities.TestHelpers { public abstract class HandlerTestBase { - private readonly IStore store = A.Fake>(); - private readonly IPersistence persistenceWithState = A.Fake>(); - private readonly IPersistence persistence = A.Fake(); + private readonly IPersistenceFactory persistenceFactory = A.Fake>(); + private readonly IPersistence persistence = A.Fake>(); protected RefToken Actor { get; } = RefToken.User("me"); @@ -53,30 +52,24 @@ namespace Squidex.Domain.Apps.Entities.TestHelpers protected abstract DomainId Id { get; } - public IStore Store + public IPersistenceFactory PersistenceFactory { - get => store; + get => persistenceFactory; } public IEnumerable> LastEvents { get; private set; } = Enumerable.Empty>(); protected HandlerTestBase() { - A.CallTo(() => store.WithSnapshotsAndEventSourcing(A._, Id, A>._, A._)) - .Returns(persistenceWithState); - - A.CallTo(() => store.WithEventSourcing(A._, Id, A._)) + A.CallTo(() => persistenceFactory.WithSnapshotsAndEventSourcing(A._, Id, A>._, A._)) .Returns(persistence); - A.CallTo(() => persistenceWithState.WriteEventsAsync(A>>._)) - .Invokes((IReadOnlyList> events) => LastEvents = events); + A.CallTo(() => persistenceFactory.WithEventSourcing(A._, Id, A._)) + .Returns(persistence); A.CallTo(() => persistence.WriteEventsAsync(A>>._)) .Invokes((IReadOnlyList> events) => LastEvents = events); - A.CallTo(() => persistenceWithState.DeleteAsync()) - .Invokes(() => LastEvents = Enumerable.Empty>()); - A.CallTo(() => persistence.DeleteAsync()) .Invokes(() => LastEvents = Enumerable.Empty>()); } diff --git a/backend/tests/Squidex.Domain.Users.Tests/DefaultKeyStoreTests.cs b/backend/tests/Squidex.Domain.Users.Tests/DefaultKeyStoreTests.cs index ea481a613..eef7fa8cf 100644 --- a/backend/tests/Squidex.Domain.Users.Tests/DefaultKeyStoreTests.cs +++ b/backend/tests/Squidex.Domain.Users.Tests/DefaultKeyStoreTests.cs @@ -5,9 +5,9 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System; using System.Threading.Tasks; using FakeItEasy; +using Squidex.Infrastructure; using Squidex.Infrastructure.States; using Xunit; @@ -15,7 +15,7 @@ namespace Squidex.Domain.Users { public class DefaultKeyStoreTests { - private readonly ISnapshotStore store = A.Fake>(); + private readonly ISnapshotStore store = A.Fake>(); private readonly DefaultKeyStore sut; public DefaultKeyStoreTests() @@ -26,7 +26,7 @@ namespace Squidex.Domain.Users [Fact] public async Task Should_generate_signing_credentials_once() { - A.CallTo(() => store.ReadAsync(A._)) + A.CallTo(() => store.ReadAsync(A._)) .Returns((null!, 0)); var credentials1 = await sut.GetSigningCredentialsAsync(); @@ -34,17 +34,17 @@ namespace Squidex.Domain.Users Assert.Same(credentials1, credentials2); - A.CallTo(() => store.ReadAsync(A._)) + A.CallTo(() => store.ReadAsync(A._)) .MustHaveHappenedOnceExactly(); - A.CallTo(() => store.WriteAsync(A._, A._, 0, 0)) + A.CallTo(() => store.WriteAsync(A._, A._, 0, 0)) .MustHaveHappenedOnceExactly(); } [Fact] public async Task Should_generate_validation_keys_once() { - A.CallTo(() => store.ReadAsync(A._)) + A.CallTo(() => store.ReadAsync(A._)) .Returns((null!, 0)); var credentials1 = await sut.GetValidationKeysAsync(); @@ -52,10 +52,10 @@ namespace Squidex.Domain.Users Assert.Same(credentials1, credentials2); - A.CallTo(() => store.ReadAsync(A._)) + A.CallTo(() => store.ReadAsync(A._)) .MustHaveHappenedOnceExactly(); - A.CallTo(() => store.WriteAsync(A._, A._, 0, 0)) + A.CallTo(() => store.WriteAsync(A._, A._, 0, 0)) .MustHaveHappenedOnceExactly(); } } diff --git a/backend/tests/Squidex.Domain.Users.Tests/DefaultXmlRepositoryTests.cs b/backend/tests/Squidex.Domain.Users.Tests/DefaultXmlRepositoryTests.cs index 1fc1fa3fe..816c9e737 100644 --- a/backend/tests/Squidex.Domain.Users.Tests/DefaultXmlRepositoryTests.cs +++ b/backend/tests/Squidex.Domain.Users.Tests/DefaultXmlRepositoryTests.cs @@ -10,6 +10,7 @@ using System.Threading; using System.Threading.Tasks; using System.Xml.Linq; using FakeItEasy; +using Squidex.Infrastructure; using Squidex.Infrastructure.States; using Xunit; @@ -17,7 +18,7 @@ namespace Squidex.Domain.Users { public sealed class DefaultXmlRepositoryTests { - private readonly ISnapshotStore store = A.Fake>(); + private readonly ISnapshotStore store = A.Fake>(); private readonly DefaultXmlRepository sut; public DefaultXmlRepositoryTests() @@ -54,7 +55,7 @@ namespace Squidex.Domain.Users sut.StoreElement(xml, "name"); - A.CallTo(() => store.WriteAsync("name", A._, A._, 0)) + A.CallTo(() => store.WriteAsync(DomainId.Create("name"), A._, A._, 0)) .MustHaveHappened(); } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs index 072bb4c54..b9b2efc2f 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs @@ -18,14 +18,14 @@ namespace Squidex.Infrastructure.Commands { public class DomainObjectTests { - private readonly IStore store = A.Fake>(); + private readonly IPersistenceFactory persistenceFactory = A.Fake>(); private readonly IPersistence persistence = A.Fake>(); private readonly DomainId id = DomainId.NewGuid(); private readonly MyDomainObject sut; public DomainObjectTests() { - sut = new MyDomainObject(store); + sut = new MyDomainObject(persistenceFactory); } [Fact] @@ -338,9 +338,9 @@ namespace Squidex.Infrastructure.Commands SetupCreated(4); SetupDeleted(); - var deleteStream = A.Fake(); + var deleteStream = A.Fake>(); - A.CallTo(() => store.WithEventSourcing(typeof(MyDomainObject), DomainId.Combine(id, DomainId.Create("deleted")), null)) + A.CallTo(() => persistenceFactory.WithEventSourcing(typeof(MyDomainObject), DomainId.Combine(id, DomainId.Create("deleted")), null)) .Returns(deleteStream); await sut.ExecuteAsync(new DeletePermanent()); @@ -376,7 +376,7 @@ namespace Squidex.Infrastructure.Commands AssertSnapshot(version_0, 3, 0); AssertSnapshot(version_1, 4, 1); - A.CallTo(() => store.WithEventSourcing(typeof(MyDomainObject), id, A._)) + A.CallTo(() => persistenceFactory.WithEventSourcing(typeof(MyDomainObject), id, A._)) .MustNotHaveHappened(); } @@ -400,7 +400,7 @@ namespace Squidex.Infrastructure.Commands AssertSnapshot(version_0, 3, 0); AssertSnapshot(version_1, 4, 1); - A.CallTo(() => store.WithEventSourcing(typeof(MyDomainObject), id, A._)) + A.CallTo(() => persistenceFactory.WithEventSourcing(typeof(MyDomainObject), id, A._)) .MustHaveHappened(); } @@ -428,7 +428,7 @@ namespace Squidex.Infrastructure.Commands handleEvent(Envelope.Create(new ValueChanged { Value = value })); }); - A.CallTo(() => store.WithSnapshotsAndEventSourcing(typeof(MyDomainObject), id, A>._, A._)) + A.CallTo(() => persistenceFactory.WithSnapshotsAndEventSourcing(typeof(MyDomainObject), id, A>._, A._)) .Invokes(args => { handleEvent = args.GetArgument(3)!; @@ -450,9 +450,9 @@ namespace Squidex.Infrastructure.Commands A.CallTo(() => persistence.WriteEventsAsync(A>>._)) .Invokes(c => @events.AddRange(c.GetArgument>>(0)!)); - var eventsPersistence = A.Fake(); + var eventsPersistence = A.Fake>(); - A.CallTo(() => store.WithEventSourcing(typeof(MyDomainObject), id, A._)) + A.CallTo(() => persistenceFactory.WithEventSourcing(typeof(MyDomainObject), id, A._)) .Invokes(args => { handleEvent = args.GetArgument(2)!; @@ -471,7 +471,7 @@ namespace Squidex.Infrastructure.Commands private void SetupEmpty() { - A.CallTo(() => store.WithSnapshotsAndEventSourcing(typeof(MyDomainObject), id, A>._, A._)) + A.CallTo(() => persistenceFactory.WithSnapshotsAndEventSourcing(typeof(MyDomainObject), id, A>._, A._)) .Returns(persistence); A.CallTo(() => persistence.Version) diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index 647b8d90e..b8f51171c 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -62,8 +62,8 @@ namespace Squidex.Infrastructure.EventSourcing var events = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; await Assert.ThrowsAsync(() => Sut.AppendAsync(Guid.NewGuid(), streamName, 0, events)); @@ -76,8 +76,8 @@ namespace Squidex.Infrastructure.EventSourcing var events = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; await Sut.AppendAsync(Guid.NewGuid(), streamName, events); @@ -92,8 +92,8 @@ namespace Squidex.Infrastructure.EventSourcing var events = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; await Sut.AppendAsync(Guid.NewGuid(), streamName, events); @@ -112,14 +112,14 @@ namespace Squidex.Infrastructure.EventSourcing } [Fact] - public async Task Should_append_event_unsafe() + public async Task Should_append_events_unsafe() { var streamName = $"test-{Guid.NewGuid()}"; var events = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; await Sut.AppendUnsafeAsync(new List @@ -147,8 +147,8 @@ namespace Squidex.Infrastructure.EventSourcing var events = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; var readEvents = await QueryWithSubscriptionAsync(streamName, async () => @@ -172,8 +172,8 @@ namespace Squidex.Infrastructure.EventSourcing var events1 = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; await QueryWithSubscriptionAsync(streamName, async () => @@ -183,8 +183,8 @@ namespace Squidex.Infrastructure.EventSourcing var events2 = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; var readEventsFromPosition = await QueryWithSubscriptionAsync(streamName, async () => @@ -219,8 +219,8 @@ namespace Squidex.Infrastructure.EventSourcing var events = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; await Sut.AppendAsync(Guid.NewGuid(), streamName, events); @@ -239,6 +239,45 @@ namespace Squidex.Infrastructure.EventSourcing ShouldBeEquivalentTo(readEvents2, expected); } + [Fact] + public async Task Should_read_multiple_streams() + { + var streamName1 = $"test-{Guid.NewGuid()}"; + var streamName2 = $"test-{Guid.NewGuid()}"; + + var events1 = new[] + { + CreateEventData(1), + CreateEventData(2) + }; + + var events2 = new[] + { + CreateEventData(3), + CreateEventData(4) + }; + + await Sut.AppendAsync(Guid.NewGuid(), streamName1, events1); + await Sut.AppendAsync(Guid.NewGuid(), streamName2, events2); + + var readEvents = await Sut.QueryManyAsync(new[] { streamName1, streamName2 }); + + var expected1 = new[] + { + new StoredEvent(streamName1, "Position", 0, events1[0]), + new StoredEvent(streamName1, "Position", 1, events1[1]) + }; + + var expected2 = new[] + { + new StoredEvent(streamName2, "Position", 0, events2[0]), + new StoredEvent(streamName2, "Position", 1, events2[1]) + }; + + ShouldBeEquivalentTo(readEvents[streamName1], expected1); + ShouldBeEquivalentTo(readEvents[streamName2], expected2); + } + [Theory] [InlineData(30)] [InlineData(1000)] @@ -250,7 +289,7 @@ namespace Squidex.Infrastructure.EventSourcing for (var i = 0; i < count; i++) { - events.Add(new EventData($"Type{i}", new EnvelopeHeaders(), i.ToString())); + events.Add(CreateEventData(i)); } for (var i = 0; i < events.Count / 2; i++) @@ -285,8 +324,8 @@ namespace Squidex.Infrastructure.EventSourcing var events = new[] { - new EventData("Type1", new EnvelopeHeaders(), "1"), - new EventData("Type2", new EnvelopeHeaders(), "2") + CreateEventData(1), + CreateEventData(2) }; await Sut.AppendAsync(Guid.NewGuid(), streamName, events); @@ -303,6 +342,11 @@ namespace Squidex.Infrastructure.EventSourcing return Sut.QueryAsync(streamName, position); } + private static EventData CreateEventData(int i) + { + return new EventData($"Type{i}", new EnvelopeHeaders(), i.ToString()); + } + private async Task?> QueryWithCallbackAsync(string? streamFilter = null, string? position = null) { using (var cts = new CancellationTokenSource(30000)) @@ -370,9 +414,7 @@ namespace Squidex.Infrastructure.EventSourcing private static void ShouldBeEquivalentTo(IEnumerable? actual, params StoredEvent[] expected) { - var actualArray = actual?.Select(x => new StoredEvent(x.StreamName, "Position", x.EventStreamNumber, x.Data)).ToArray(); - - actualArray.Should().BeEquivalentTo(expected); + actual.Should().BeEquivalentTo(expected, opts => opts.Excluding(x => x.EventPosition)); } } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs new file mode 100644 index 000000000..89bc941ed --- /dev/null +++ b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs @@ -0,0 +1,164 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using FakeItEasy; +using Squidex.Infrastructure.EventSourcing; +using Squidex.Infrastructure.TestHelpers; +using Xunit; + +namespace Squidex.Infrastructure.States +{ + public class PersistenceBatchTests + { + private readonly ISnapshotStore snapshotStore = A.Fake>(); + private readonly IEventDataFormatter eventDataFormatter = A.Fake(); + private readonly IEventStore eventStore = A.Fake(); + private readonly IStreamNameResolver streamNameResolver = A.Fake(); + private readonly IStore sut; + + public PersistenceBatchTests() + { + A.CallTo(() => streamNameResolver.GetStreamName(None.Type, A._)) + .ReturnsLazily(x => x.GetArgument(1)!); + + sut = new Store(snapshotStore, eventStore, eventDataFormatter, streamNameResolver); + } + + [Fact] + public async Task Should_read_from_preloaded_events() + { + var event1_1 = new MyEvent { MyProperty = "event1_1" }; + var event1_2 = new MyEvent { MyProperty = "event1_2" }; + var event2_1 = new MyEvent { MyProperty = "event2_1" }; + var event2_2 = new MyEvent { MyProperty = "event2_2" }; + + var key1 = DomainId.NewGuid(); + var key2 = DomainId.NewGuid(); + + var bulk = sut.WithBatchContext(None.Type); + + SetupEventStore(new Dictionary> + { + [key1] = new List { event1_1, event1_2 }, + [key2] = new List { event2_1, event2_2 } + }); + + await bulk.LoadAsync(new[] { key1, key2 }); + + var persistedEvents1 = Save.Events(); + var persistence1 = bulk.WithEventSourcing(None.Type, key1, persistedEvents1.Write); + + await persistence1.ReadAsync(); + + var persistedEvents2 = Save.Events(); + var persistence2 = bulk.WithEventSourcing(None.Type, key2, persistedEvents2.Write); + + await persistence2.ReadAsync(); + + Assert.Equal(persistedEvents1.ToArray(), new[] { event1_1, event1_2 }); + Assert.Equal(persistedEvents2.ToArray(), new[] { event2_1, event2_2 }); + } + + [Fact] + public async Task Should_provide_empty_events_if_nothing_loaded() + { + var key = DomainId.NewGuid(); + + var bulk = sut.WithBatchContext(None.Type); + + await bulk.LoadAsync(new[] { key }); + + var persistedEvents = Save.Events(); + var persistence = bulk.WithEventSourcing(None.Type, key, persistedEvents.Write); + + await persistence.ReadAsync(); + + Assert.Empty(persistedEvents.ToArray()); + Assert.Empty(persistedEvents.ToArray()); + } + + [Fact] + public void Should_throw_exception_if_not_preloaded() + { + var key = DomainId.NewGuid(); + + var bulk = sut.WithBatchContext(None.Type); + + Assert.Throws(() => bulk.WithEventSourcing(None.Type, key, null)); + } + + [Fact] + public async Task Should_write_batched() + { + var key1 = DomainId.NewGuid(); + var key2 = DomainId.NewGuid(); + + var bulk = sut.WithBatchContext(None.Type); + + await bulk.LoadAsync(new[] { key1, key2 }); + + var persistedEvents1 = Save.Events(); + var persistence1 = bulk.WithEventSourcing(None.Type, key1, persistedEvents1.Write); + + var persistedEvents2 = Save.Events(); + var persistence2 = bulk.WithEventSourcing(None.Type, key2, persistedEvents2.Write); + + await persistence1.WriteSnapshotAsync(12); + await persistence2.WriteSnapshotAsync(12); + + A.CallTo(() => snapshotStore.WriteAsync(A._, A._, A._, A._)) + .MustNotHaveHappened(); + + A.CallTo(() => snapshotStore.WriteManyAsync(A>._)) + .MustNotHaveHappened(); + + await bulk.CommitAsync(); + await bulk.DisposeAsync(); + + A.CallTo(() => snapshotStore.WriteManyAsync(A>.That.Matches(x => x.Count() == 2))) + .MustHaveHappenedOnceExactly(); + } + + private void SetupEventStore(Dictionary> streams) + { + var storedStreams = new Dictionary>(); + + foreach (var (id, stream) in streams) + { + var storedStream = new List(); + + var i = 0; + + foreach (var @event in stream) + { + var eventData = new EventData("Type", new EnvelopeHeaders(), "Payload"); + var eventStored = new StoredEvent(id.ToString(), i.ToString(), i, eventData); + + storedStream.Add(eventStored); + + A.CallTo(() => eventDataFormatter.Parse(eventStored)) + .Returns(new Envelope(@event)); + + A.CallTo(() => eventDataFormatter.ParseIfKnown(eventStored)) + .Returns(new Envelope(@event)); + + i++; + } + + storedStreams[id.ToString()] = storedStream; + } + + var streamNames = streams.Keys.Select(x => x.ToString()).ToArray(); + + A.CallTo(() => eventStore.QueryManyAsync(A>.That.IsSameSequenceAs(streamNames))) + .Returns(storedStreams); + } + } +} \ No newline at end of file diff --git a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs index 6be16f4e6..7476230fd 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs @@ -18,33 +18,26 @@ namespace Squidex.Infrastructure.States { public class PersistenceEventSourcingTests { - private readonly string key = Guid.NewGuid().ToString(); + private readonly DomainId key = DomainId.NewGuid(); + private readonly ISnapshotStore snapshotStore = A.Fake>(); private readonly IEventDataFormatter eventDataFormatter = A.Fake(); private readonly IEventStore eventStore = A.Fake(); - private readonly IServiceProvider services = A.Fake(); - private readonly ISnapshotStore snapshotStore = A.Fake>(); - private readonly ISnapshotStore snapshotStoreNone = A.Fake>(); private readonly IStreamNameResolver streamNameResolver = A.Fake(); - private readonly IStore sut; + private readonly IStore sut; public PersistenceEventSourcingTests() { - A.CallTo(() => services.GetService(typeof(ISnapshotStore))) - .Returns(snapshotStore); - A.CallTo(() => services.GetService(typeof(ISnapshotStore))) - .Returns(snapshotStoreNone); + A.CallTo(() => streamNameResolver.GetStreamName(None.Type, A._)) + .ReturnsLazily(x => x.GetArgument(1)!); - A.CallTo(() => streamNameResolver.GetStreamName(None.Type, key)) - .Returns(key); - - sut = new Store(eventStore, eventDataFormatter, services, streamNameResolver); + sut = new Store(snapshotStore, eventStore, eventDataFormatter, streamNameResolver); } [Fact] public async Task Should_read_from_store() { - var event1 = new MyEvent(); - var event2 = new MyEvent(); + var event1 = new MyEvent { MyProperty = "event1" }; + var event2 = new MyEvent { MyProperty = "event2" }; SetupEventStore(event1, event2); @@ -59,8 +52,8 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_read_until_stopped() { - var event1 = new MyEvent(); - var event2 = new MyEvent(); + var event1 = new MyEvent { MyProperty = "event1" }; + var event2 = new MyEvent { MyProperty = "event2" }; SetupEventStore(event1, event2); @@ -77,7 +70,7 @@ namespace Squidex.Infrastructure.States { var storedEvent = new StoredEvent("1", "1", 0, new EventData("Type", new EnvelopeHeaders(), "Payload")); - A.CallTo(() => eventStore.QueryAsync(key, 0)) + A.CallTo(() => eventStore.QueryAsync(key.ToString(), 0)) .Returns(new List { storedEvent }); A.CallTo(() => eventDataFormatter.ParseIfKnown(storedEvent)) @@ -106,7 +99,7 @@ namespace Squidex.Infrastructure.States await persistence.ReadAsync(); - A.CallTo(() => eventStore.QueryAsync(key, 3)) + A.CallTo(() => eventStore.QueryAsync(key.ToString(), 3)) .MustHaveHappened(); } @@ -202,12 +195,12 @@ namespace Squidex.Infrastructure.States await persistence.WriteEventAsync(Envelope.Create(new MyEvent())); await persistence.WriteEventAsync(Envelope.Create(new MyEvent())); - A.CallTo(() => eventStore.AppendAsync(A._, key, 2, A>.That.Matches(x => x.Count == 1))) + A.CallTo(() => eventStore.AppendAsync(A._, key.ToString(), 2, A>.That.Matches(x => x.Count == 1))) .MustHaveHappened(); - A.CallTo(() => eventStore.AppendAsync(A._, key, 3, A>.That.Matches(x => x.Count == 1))) + A.CallTo(() => eventStore.AppendAsync(A._, key.ToString(), 3, A>.That.Matches(x => x.Count == 1))) .MustHaveHappened(); - A.CallTo(() => snapshotStore.WriteAsync(A._, A._, A._, A._)) + A.CallTo(() => snapshotStore.WriteAsync(A._, A._, A._, A._)) .MustNotHaveHappened(); } @@ -218,7 +211,7 @@ namespace Squidex.Infrastructure.States await persistence.WriteEventAsync(Envelope.Create(new MyEvent())); - A.CallTo(() => eventStore.AppendAsync(A._, key, EtagVersion.Empty, A>.That.Matches(x => x.Count == 1))) + A.CallTo(() => eventStore.AppendAsync(A._, key.ToString(), EtagVersion.Empty, A>.That.Matches(x => x.Count == 1))) .MustHaveHappened(); } @@ -304,7 +297,7 @@ namespace Squidex.Infrastructure.States await persistence.ReadAsync(); - A.CallTo(() => eventStore.AppendAsync(A._, key, 2, A>.That.Matches(x => x.Count == 1))) + A.CallTo(() => eventStore.AppendAsync(A._, key.ToString(), 2, A>.That.Matches(x => x.Count == 1))) .Throws(new WrongEventVersionException(1, 1)); await Assert.ThrowsAsync(() => persistence.WriteEventAsync(Envelope.Create(new MyEvent()))); @@ -317,7 +310,7 @@ namespace Squidex.Infrastructure.States await persistence.DeleteAsync(); - A.CallTo(() => eventStore.DeleteStreamAsync(key)) + A.CallTo(() => eventStore.DeleteStreamAsync(key.ToString())) .MustHaveHappened(); A.CallTo(() => snapshotStore.RemoveAsync(key)) @@ -327,11 +320,11 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_delete_events_and_snapshot_when_deleted() { - var persistence = sut.WithSnapshotsAndEventSourcing(None.Type, key, null, null); + var persistence = sut.WithSnapshotsAndEventSourcing(None.Type, key, null, null); await persistence.DeleteAsync(); - A.CallTo(() => eventStore.DeleteStreamAsync(key)) + A.CallTo(() => eventStore.DeleteStreamAsync(key.ToString())) .MustHaveHappened(); A.CallTo(() => snapshotStore.RemoveAsync(key)) @@ -357,7 +350,7 @@ namespace Squidex.Infrastructure.States foreach (var @event in events) { var eventData = new EventData("Type", new EnvelopeHeaders(), "Payload"); - var eventStored = new StoredEvent(i.ToString(), i.ToString(), i, eventData); + var eventStored = new StoredEvent(key.ToString(), i.ToString(), i, eventData); eventsStored.Add(eventStored); @@ -370,7 +363,7 @@ namespace Squidex.Infrastructure.States i++; } - A.CallTo(() => eventStore.QueryAsync(key, readPosition)) + A.CallTo(() => eventStore.QueryAsync(key.ToString(), readPosition)) .Returns(eventsStored); } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs index 4e8c85586..68cdbabdd 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs @@ -15,20 +15,16 @@ namespace Squidex.Infrastructure.States { public class PersistenceSnapshotTests { - private readonly string key = Guid.NewGuid().ToString(); + private readonly DomainId key = DomainId.NewGuid(); + private readonly ISnapshotStore snapshotStore = A.Fake>(); private readonly IEventDataFormatter eventDataFormatter = A.Fake(); private readonly IEventStore eventStore = A.Fake(); - private readonly IServiceProvider services = A.Fake(); - private readonly ISnapshotStore snapshotStore = A.Fake>(); private readonly IStreamNameResolver streamNameResolver = A.Fake(); - private readonly IStore sut; + private readonly IStore sut; public PersistenceSnapshotTests() { - A.CallTo(() => services.GetService(typeof(ISnapshotStore))) - .Returns(snapshotStore); - - sut = new Store(eventStore, eventDataFormatter, services, streamNameResolver); + sut = new Store(snapshotStore, eventStore, eventDataFormatter, streamNameResolver); } [Fact] @@ -122,7 +118,7 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_write_snapshot_to_store_with_empty_version() { - var persistence = sut.WithSnapshots(None.Type, key, null); + var persistence = sut.WithSnapshots(None.Type, key, null); await persistence.WriteSnapshotAsync(100); @@ -165,33 +161,10 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_call_snapshot_store_on_clear() { - await sut.ClearSnapshotsAsync(); + await sut.ClearSnapshotsAsync(); A.CallTo(() => snapshotStore.ClearAsync()) .MustHaveHappened(); } - - [Fact] - public async Task Should_delete_snapshot_but_not_events_when_deleted_from_store() - { - await sut.RemoveSnapshotAsync(key); - - A.CallTo(() => eventStore.DeleteStreamAsync(A._)) - .MustNotHaveHappened(); - - A.CallTo(() => snapshotStore.RemoveAsync(key)) - .MustHaveHappened(); - } - - [Fact] - public async Task Should_get_snapshot() - { - A.CallTo(() => snapshotStore.ReadAsync(key)) - .Returns((123, -1)); - - var result = await sut.GetSnapshotAsync(key); - - Assert.Equal(123, result); - } } } \ No newline at end of file diff --git a/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs index 55012fa4e..067b9a186 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/Tasks/PartitionedActionBlockTests.cs @@ -39,7 +39,7 @@ namespace Squidex.Infrastructure.Tasks }, x => x.P, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100, - MaxMessagesPerTask = 1, + MaxMessagesPerTask = DataflowBlockOptions.Unbounded, BoundedCapacity = 100 }); diff --git a/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs b/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs index 3e1857505..e30bf64b8 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs @@ -24,8 +24,8 @@ namespace Squidex.Infrastructure.TestHelpers set => Capacity = value; } - public MyDomainObject(IStore store) - : base(store, A.Dummy()) + public MyDomainObject(IPersistenceFactory factory) + : base(factory, A.Dummy()) { }