diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs index f77be31ba..f3c95069c 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs @@ -11,50 +11,57 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; +using NodaTime; using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Entities.Apps; using Squidex.Domain.Apps.Entities.Contents; +using Squidex.Domain.Apps.Entities.Contents.State; using Squidex.Domain.Apps.Entities.MongoDb.Contents.Visitors; using Squidex.Domain.Apps.Entities.Schemas; using Squidex.Infrastructure; using Squidex.Infrastructure.Json; using Squidex.Infrastructure.MongoDb; using Squidex.Infrastructure.Queries; +using Squidex.Infrastructure.Reflection; +using Squidex.Infrastructure.States; namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { internal class MongoContentCollection : MongoRepositoryBase { - private readonly string collectionName; - protected IJsonSerializer Serializer { get; } - public MongoContentCollection(IMongoDatabase database, IJsonSerializer serializer, string collectionName) + public MongoContentCollection(IMongoDatabase database, IJsonSerializer serializer) : base(database) { - this.collectionName = collectionName; - Serializer = serializer; } - protected override async Task SetupCollectionAsync(IMongoCollection collection, CancellationToken ct = default) + protected override Task SetupCollectionAsync(IMongoCollection collection, CancellationToken ct = default) { - await collection.Indexes.CreateOneAsync( - new CreateIndexModel(Index.Ascending(x => x.ReferencedIds)), cancellationToken: ct); + return collection.Indexes.CreateManyAsync(new[] + { + new CreateIndexModel(Index + .Ascending(x => x.IndexedSchemaId).Ascending(x => x.Id).Ascending(x => x.Status)), + new CreateIndexModel(Index + .Ascending(x => x.ScheduledAt).Ascending(x => x.IsDeleted)), + new CreateIndexModel(Index + .Ascending(x => x.ReferencedIds)) + }, ct); } protected override string CollectionName() { - return collectionName; + return "State_Contents"; } - public async Task> QueryAsync(IAppEntity app, ISchemaEntity schema, Query query, Status[] status = null, bool useDraft = false) + public async Task> QueryAsync(IAppEntity app, ISchemaEntity schema, Query query, List ids, Status[] status = null, bool useDraft = false) { try { query = query.AdjustToModel(schema.SchemaDef, useDraft); - var filter = query.ToFilter(schema.Id, status); + var filter = query.ToFilter(schema.Id, ids, status); var contentCount = Collection.Find(filter).CountDocumentsAsync(); var contentItems = @@ -62,7 +69,6 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents .ContentTake(query) .ContentSkip(query) .ContentSort(query) - .Not(x => x.DataText) .ToListAsync(); await Task.WhenAll(contentItems, contentCount); @@ -89,12 +95,9 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents public async Task> QueryAsync(IAppEntity app, ISchemaEntity schema, HashSet ids, Status[] status = null) { - var find = - status != null && status.Length > 0 ? - Collection.Find(x => x.IndexedSchemaId == schema.Id && ids.Contains(x.Id) && x.IsDeleted != true && status.Contains(x.Status)) : - Collection.Find(x => x.IndexedSchemaId == schema.Id && ids.Contains(x.Id)); + var find = Collection.Find(FilterFactory.Build(schema.Id, ids, status)); - var contentItems = find.Not(x => x.DataText).ToListAsync(); + var contentItems = find.ToListAsync(); var contentCount = find.CountDocumentsAsync(); await Task.WhenAll(contentItems, contentCount); @@ -107,6 +110,66 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents return ResultList.Create(contentCount.Result, contentItems.Result); } + public async Task FindContentAsync(IAppEntity app, ISchemaEntity schema, Guid id, Status[] status = null) + { + var find = Collection.Find(FilterFactory.Build(schema.Id, id, status)); + + var contentEntity = await find.FirstOrDefaultAsync(); + + contentEntity?.ParseData(schema.SchemaDef, Serializer); + + return contentEntity; + } + + public Task QueryScheduledWithoutDataAsync(Instant now, Func callback) + { + return Collection.Find(x => x.ScheduledAt < now && x.IsDeleted != true) + .Not(x => x.DataByIds) + .Not(x => x.DataDraftByIds) + .ForEachAsync(c => + { + callback(c); + }); + } + + public async Task> QueryIdsAsync(Guid appId, ISchemaEntity schema, FilterNode filterNode) + { + var filter = filterNode.AdjustToModel(schema.SchemaDef, true).ToFilter(schema.Id); + + var contentEntities = + await Collection.Find(filter).Only(x => x.Id) + .ToListAsync(); + + return contentEntities.Select(x => Guid.Parse(x["_id"].AsString)).ToList(); + } + + public async Task> QueryIdsAsync(Guid appId) + { + var contentEntities = + await Collection.Find(x => x.IndexedAppId == appId).Only(x => x.Id) + .ToListAsync(); + + return contentEntities.Select(x => Guid.Parse(x["_id"].AsString)).ToList(); + } + + public async Task<(ContentState Value, long Version)> ReadAsync(Guid key, Func> getSchema) + { + var contentEntity = + await Collection.Find(x => x.Id == key) + .FirstOrDefaultAsync(); + + if (contentEntity != null) + { + var schema = await getSchema(contentEntity.IndexedAppId, contentEntity.IndexedSchemaId); + + contentEntity.ParseData(schema.SchemaDef, Serializer); + + return (SimpleMapper.Map(contentEntity, new ContentState()), contentEntity.Version); + } + + return (null, EtagVersion.NotFound); + } + public Task CleanupAsync(Guid id) { return Collection.UpdateManyAsync( @@ -120,5 +183,31 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { return Collection.DeleteOneAsync(x => x.Id == id); } + + public async Task UpsertAsync(MongoContentEntity content, long oldVersion) + { + try + { + await Collection.ReplaceOneAsync(x => x.Id == content.Id && x.Version == oldVersion, content, Upsert); + } + catch (MongoWriteException ex) + { + if (ex.WriteError.Category == ServerErrorCategory.DuplicateKey) + { + var existingVersion = + await Collection.Find(x => x.Id == content.Id).Only(x => x.Id, x => x.Version) + .FirstOrDefaultAsync(); + + if (existingVersion != null) + { + throw new InconsistentStateException(existingVersion["vs"].AsInt64, oldVersion, ex); + } + } + else + { + throw; + } + } + } } } diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentDraftCollection.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentDraftCollection.cs deleted file mode 100644 index 30c0edda0..000000000 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentDraftCollection.cs +++ /dev/null @@ -1,147 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschraenkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MongoDB.Driver; -using NodaTime; -using Squidex.Domain.Apps.Core.ConvertContent; -using Squidex.Domain.Apps.Entities.Apps; -using Squidex.Domain.Apps.Entities.Contents; -using Squidex.Domain.Apps.Entities.Contents.State; -using Squidex.Domain.Apps.Entities.MongoDb.Contents.Visitors; -using Squidex.Domain.Apps.Entities.Schemas; -using Squidex.Infrastructure; -using Squidex.Infrastructure.Json; -using Squidex.Infrastructure.MongoDb; -using Squidex.Infrastructure.Queries; -using Squidex.Infrastructure.Reflection; -using Squidex.Infrastructure.States; - -namespace Squidex.Domain.Apps.Entities.MongoDb.Contents -{ - internal sealed class MongoContentDraftCollection : MongoContentCollection - { - public MongoContentDraftCollection(IMongoDatabase database, IJsonSerializer serializer) - : base(database, serializer, "State_Content_Draft") - { - } - - protected override async Task SetupCollectionAsync(IMongoCollection collection, CancellationToken ct = default) - { - await collection.Indexes.CreateManyAsync( - new[] - { - new CreateIndexModel( - Index - .Ascending(x => x.IndexedSchemaId) - .Ascending(x => x.Id) - .Ascending(x => x.IsDeleted)), - new CreateIndexModel( - Index - .Text(x => x.DataText) - .Ascending(x => x.IndexedSchemaId) - .Ascending(x => x.IsDeleted) - .Ascending(x => x.Status)) - }, ct); - - await base.SetupCollectionAsync(collection, ct); - } - - public async Task> QueryIdsAsync(Guid appId, ISchemaEntity schema, FilterNode filterNode) - { - var filter = filterNode.AdjustToModel(schema.SchemaDef, true).ToFilter(schema.Id); - - var contentEntities = - await Collection.Find(filter).Only(x => x.Id) - .ToListAsync(); - - return contentEntities.Select(x => Guid.Parse(x["_id"].AsString)).ToList(); - } - - public async Task> QueryIdsAsync(Guid appId) - { - var contentEntities = - await Collection.Find(x => x.IndexedAppId == appId).Only(x => x.Id) - .ToListAsync(); - - return contentEntities.Select(x => Guid.Parse(x["_id"].AsString)).ToList(); - } - - public Task QueryScheduledWithoutDataAsync(Instant now, Func callback) - { - return Collection.Find(x => x.ScheduledAt < now && x.IsDeleted != true) - .Not(x => x.DataByIds) - .Not(x => x.DataDraftByIds) - .Not(x => x.DataText) - .ForEachAsync(c => - { - callback(c); - }); - } - - public async Task FindContentAsync(IAppEntity app, ISchemaEntity schema, Guid id) - { - var contentEntity = - await Collection.Find(x => x.IndexedSchemaId == schema.Id && x.Id == id && x.IsDeleted != true).Not(x => x.DataText) - .FirstOrDefaultAsync(); - - contentEntity?.ParseData(schema.SchemaDef, Serializer); - - return contentEntity; - } - - public async Task<(ContentState Value, long Version)> ReadAsync(Guid key, Func> getSchema) - { - var contentEntity = - await Collection.Find(x => x.Id == key).Not(x => x.DataText) - .FirstOrDefaultAsync(); - - if (contentEntity != null) - { - var schema = await getSchema(contentEntity.IndexedAppId, contentEntity.IndexedSchemaId); - - contentEntity.ParseData(schema.SchemaDef, Serializer); - - return (SimpleMapper.Map(contentEntity, new ContentState()), contentEntity.Version); - } - - return (null, EtagVersion.NotFound); - } - - public async Task UpsertAsync(MongoContentEntity content, long oldVersion) - { - try - { - content.DataText = content.DataDraftByIds.ToFullText(); - - await Collection.ReplaceOneAsync(x => x.Id == content.Id && x.Version == oldVersion, content, Upsert); - } - catch (MongoWriteException ex) - { - if (ex.WriteError.Category == ServerErrorCategory.DuplicateKey) - { - var existingVersion = - await Collection.Find(x => x.Id == content.Id).Only(x => x.Id, x => x.Version) - .FirstOrDefaultAsync(); - - if (existingVersion != null) - { - throw new InconsistentStateException(existingVersion["vs"].AsInt64, oldVersion, ex); - } - } - else - { - throw; - } - } - } - } -} diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentEntity.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentEntity.cs index a2346db31..9c8f3eba7 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentEntity.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentEntity.cs @@ -69,10 +69,6 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents [BsonJson] public ScheduleJob ScheduleJob { get; set; } - [BsonIgnoreIfDefault] - [BsonElement("dt")] - public string DataText { get; set; } - [BsonRequired] [BsonElement("ai")] public NamedId AppId { get; set; } diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentPublishedCollection.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentPublishedCollection.cs deleted file mode 100644 index 0184b87e1..000000000 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentPublishedCollection.cs +++ /dev/null @@ -1,61 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschraenkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System; -using System.Threading; -using System.Threading.Tasks; -using MongoDB.Driver; -using Squidex.Domain.Apps.Core.ConvertContent; -using Squidex.Domain.Apps.Entities.Apps; -using Squidex.Domain.Apps.Entities.Contents; -using Squidex.Domain.Apps.Entities.Schemas; -using Squidex.Infrastructure.Json; -using Squidex.Infrastructure.MongoDb; - -namespace Squidex.Domain.Apps.Entities.MongoDb.Contents -{ - internal sealed class MongoContentPublishedCollection : MongoContentCollection - { - public MongoContentPublishedCollection(IMongoDatabase database, IJsonSerializer serializer) - : base(database, serializer, "State_Content_Published") - { - } - - protected override async Task SetupCollectionAsync(IMongoCollection collection, CancellationToken ct = default) - { - await collection.Indexes.CreateManyAsync( - new[] - { - new CreateIndexModel(Index.Text(x => x.DataText).Ascending(x => x.IndexedSchemaId)), - new CreateIndexModel(Index.Ascending(x => x.IndexedSchemaId).Ascending(x => x.Id)) - }, ct); - - await base.SetupCollectionAsync(collection, ct); - } - - public async Task FindContentAsync(IAppEntity app, ISchemaEntity schema, Guid id) - { - var contentEntity = - await Collection.Find(x => x.IndexedSchemaId == schema.Id && x.Id == id).Not(x => x.DataText) - .FirstOrDefaultAsync(); - - contentEntity?.ParseData(schema.SchemaDef, Serializer); - - return contentEntity; - } - - public Task UpsertAsync(MongoContentEntity content) - { - content.DataText = content.DataByIds.ToFullText(); - content.DataDraftByIds = null; - content.ScheduleJob = null; - content.ScheduledAt = null; - - return Collection.ReplaceOneAsync(x => x.Id == content.Id, content, new UpdateOptions { IsUpsert = true }); - } - } -} diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs index 3c663d642..cd9ee470e 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs @@ -15,6 +15,7 @@ using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Entities.Apps; using Squidex.Domain.Apps.Entities.Contents; using Squidex.Domain.Apps.Entities.Contents.Repositories; +using Squidex.Domain.Apps.Entities.Contents.Text; using Squidex.Domain.Apps.Entities.Schemas; using Squidex.Infrastructure; using Squidex.Infrastructure.Json; @@ -28,41 +29,42 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents private readonly IMongoDatabase database; private readonly IAppProvider appProvider; private readonly IJsonSerializer serializer; - private readonly MongoContentDraftCollection contentsDraft; - private readonly MongoContentPublishedCollection contentsPublished; + private readonly ITextIndexer indexer; + private readonly MongoContentCollection contents; - public MongoContentRepository(IMongoDatabase database, IAppProvider appProvider, IJsonSerializer serializer) + public MongoContentRepository(IMongoDatabase database, IAppProvider appProvider, IJsonSerializer serializer, ITextIndexer indexer) { Guard.NotNull(appProvider, nameof(appProvider)); Guard.NotNull(serializer, nameof(serializer)); + Guard.NotNull(indexer, nameof(ITextIndexer)); this.appProvider = appProvider; - + this.database = database; + this.indexer = indexer; this.serializer = serializer; - contentsDraft = new MongoContentDraftCollection(database, serializer); - contentsPublished = new MongoContentPublishedCollection(database, serializer); - - this.database = database; + contents = new MongoContentCollection(database, serializer); } public Task InitializeAsync(CancellationToken ct = default) { - return Task.WhenAll(contentsDraft.InitializeAsync(ct), contentsPublished.InitializeAsync(ct)); + return contents.InitializeAsync(ct); } public async Task> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, Query query) { using (Profiler.TraceMethod("QueryAsyncByQuery")) { - if (RequiresPublished(status)) - { - return await contentsPublished.QueryAsync(app, schema, query); - } - else + var useDraft = RequiresPublished(status); + + var fullTextIds = await indexer.SearchAsync(query.FullText, app, schema.Id, useDraft); + + if (fullTextIds?.Count == 0) { - return await contentsDraft.QueryAsync(app, schema, query, status, true); + return ResultList.Create(0); } + + return await contents.QueryAsync(app, schema, query, fullTextIds, status, true); } } @@ -72,11 +74,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { if (RequiresPublished(status)) { - return await contentsPublished.QueryAsync(app, schema, ids); + return await contents.QueryAsync(app, schema, ids); } else { - return await contentsDraft.QueryAsync(app, schema, ids, status); + return await contents.QueryAsync(app, schema, ids, status); } } } @@ -87,11 +89,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { if (RequiresPublished(status)) { - return await contentsPublished.FindContentAsync(app, schema, id); + return await contents.FindContentAsync(app, schema, id); } else { - return await contentsDraft.FindContentAsync(app, schema, id); + return await contents.FindContentAsync(app, schema, id, status); } } } @@ -100,7 +102,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { using (Profiler.TraceMethod()) { - return await contentsDraft.QueryIdsAsync(appId, await appProvider.GetSchemaAsync(appId, schemaId), filterNode); + return await contents.QueryIdsAsync(appId, await appProvider.GetSchemaAsync(appId, schemaId), filterNode); } } @@ -108,7 +110,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { using (Profiler.TraceMethod()) { - return await contentsDraft.QueryIdsAsync(appId); + return await contents.QueryIdsAsync(appId); } } @@ -116,22 +118,13 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { using (Profiler.TraceMethod()) { - await contentsDraft.QueryScheduledWithoutDataAsync(now, callback); + await contents.QueryScheduledWithoutDataAsync(now, callback); } } - public Task RemoveAsync(Guid appId) - { - return Task.WhenAll( - contentsDraft.RemoveAsync(appId), - contentsPublished.RemoveAsync(appId)); - } - public Task ClearAsync() { - return Task.WhenAll( - contentsDraft.ClearAsync(), - contentsPublished.ClearAsync()); + return contents.ClearAsync(); } public Task DeleteArchiveAsync() diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs index 427629d1d..bb072df70 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_EventHandling.cs @@ -33,16 +33,12 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents protected Task On(AssetDeleted @event) { - return Task.WhenAll( - contentsDraft.CleanupAsync(@event.AssetId), - contentsPublished.CleanupAsync(@event.AssetId)); + return contents.CleanupAsync(@event.AssetId); } protected Task On(ContentDeleted @event) { - return Task.WhenAll( - contentsDraft.CleanupAsync(@event.ContentId), - contentsPublished.CleanupAsync(@event.ContentId)); + return contents.CleanupAsync(@event.ContentId); } Task IEventConsumer.ClearAsync() diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs index 32f38e982..31b46bc64 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs @@ -7,7 +7,6 @@ using System; using System.Threading.Tasks; -using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Entities.Contents.State; using Squidex.Domain.Apps.Entities.Schemas; using Squidex.Infrastructure; @@ -23,7 +22,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents { using (Profiler.TraceMethod()) { - return await contentsDraft.ReadAsync(key, GetSchemaAsync); + return await contents.ReadAsync(key, GetSchemaAsync); } } @@ -58,15 +57,15 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents Version = newVersion }); - await contentsDraft.UpsertAsync(content, oldVersion); + await contents.UpsertAsync(content, oldVersion); - if (value.Status == Status.Published && !value.IsDeleted) + if (value.IsDeleted) { - await contentsPublished.UpsertAsync(content); + await indexer.DeleteAsync(value.SchemaId.Id, value.Id); } else { - await contentsPublished.RemoveAsync(content.Id); + await indexer.IndexAsync(value.SchemaId.Id, value.Id, value.Data, value.DataDraft); } } } diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/Visitors/FindExtensions.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/Visitors/FilterFactory.cs similarity index 82% rename from src/Squidex.Domain.Apps.Entities.MongoDb/Contents/Visitors/FindExtensions.cs rename to src/Squidex.Domain.Apps.Entities.MongoDb/Contents/Visitors/FilterFactory.cs index 14195985f..152e98b87 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/Visitors/FindExtensions.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/Visitors/FilterFactory.cs @@ -19,7 +19,7 @@ using Squidex.Infrastructure.Queries; namespace Squidex.Domain.Apps.Entities.MongoDb.Contents.Visitors { - public static class FindExtensions + public static class FilterFactory { private static readonly FilterDefinitionBuilder Filter = Builders.Filter; @@ -109,7 +109,22 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents.Visitors return cursor.Skip(query); } - public static FilterDefinition ToFilter(this Query query, Guid schemaId, Status[] status) + public static FilterDefinition Build(Guid schemaId, Guid id, Status[] status) + { + return CreateFilter(schemaId, new List { id }, status, null); + } + + public static FilterDefinition Build(Guid schemaId, ICollection ids, Status[] status) + { + return CreateFilter(schemaId, ids, status, null); + } + + public static FilterDefinition ToFilter(this Query query, Guid schemaId, ICollection ids, Status[] status) + { + return CreateFilter(schemaId, ids, status, query); + } + + private static FilterDefinition CreateFilter(Guid schemaId, ICollection ids, Status[] status, Query query) { var filters = new List> { @@ -117,25 +132,28 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents.Visitors Filter.Ne(x => x.IsDeleted, true) }; - if (status != null) - { - filters.Add(Filter.In(x => x.Status, status)); - } - - var filter = query.BuildFilter(); - - if (filter.Filter != null) + if (ids != null && ids.Count > 0) { - if (filter.Last) + if (ids.Count > 1) { - filters.Add(filter.Filter); + filters.Add(Filter.In(x => x.Id, ids)); } else { - filters.Insert(0, filter.Filter); + filters.Add(Filter.Eq(x => x.Id, ids.First())); } } + if (status != null) + { + filters.Add(Filter.In(x => x.Status, status)); + } + + if (query.Filter != null) + { + filters.Add(query.Filter.BuildFilter()); + } + return Filter.And(filters); } diff --git a/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs b/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs index 6b5e6cd77..5a39bb706 100644 --- a/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs +++ b/src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs @@ -9,6 +9,8 @@ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using MongoDB.Bson.Serialization; using MongoDB.Driver; using Squidex.Domain.Apps.Entities.History; using Squidex.Domain.Apps.Entities.History.Repositories; @@ -18,9 +20,17 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.History { public class MongoHistoryEventRepository : MongoRepositoryBase, IHistoryEventRepository { - public MongoHistoryEventRepository(IMongoDatabase database) + public MongoHistoryEventRepository(IMongoDatabase database, IOptions options) : base(database) { + if (options.Value.IsCosmosDb) + { + var classMap = BsonClassMap.RegisterClassMap(); + + classMap.MapProperty(x => x.Created) + .SetElementName("_ts"); + classMap.AutoMap(); + } } protected override string CollectionName() diff --git a/src/Squidex.Domain.Apps.Entities/Backup/BackupGrain.cs b/src/Squidex.Domain.Apps.Entities/Backup/BackupGrain.cs index f1bff2c44..fe6a484ec 100644 --- a/src/Squidex.Domain.Apps.Entities/Backup/BackupGrain.cs +++ b/src/Squidex.Domain.Apps.Entities/Backup/BackupGrain.cs @@ -164,7 +164,7 @@ namespace Squidex.Domain.Apps.Entities.Backup currentTask.Token.ThrowIfCancellationRequested(); - await assetStore.UploadAsync(job.Id.ToString(), 0, null, stream, currentTask.Token); + await assetStore.UploadAsync(job.Id.ToString(), 0, null, stream, false, currentTask.Token); } job.Status = JobStatus.Completed; diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Repositories/IContentRepository.cs b/src/Squidex.Domain.Apps.Entities/Contents/Repositories/IContentRepository.cs index e82d39932..7944c1b32 100644 --- a/src/Squidex.Domain.Apps.Entities/Contents/Repositories/IContentRepository.cs +++ b/src/Squidex.Domain.Apps.Entities/Contents/Repositories/IContentRepository.cs @@ -28,7 +28,5 @@ namespace Squidex.Domain.Apps.Entities.Contents.Repositories Task FindContentAsync(IAppEntity app, ISchemaEntity schema, Status[] status, Guid id); Task QueryScheduledWithoutDataAsync(Instant now, Func callback); - - Task RemoveAsync(Guid appId); } } diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Text/GrainTextIndexer.cs b/src/Squidex.Domain.Apps.Entities/Contents/Text/GrainTextIndexer.cs new file mode 100644 index 000000000..4dc53ce67 --- /dev/null +++ b/src/Squidex.Domain.Apps.Entities/Contents/Text/GrainTextIndexer.cs @@ -0,0 +1,106 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Orleans; +using Squidex.Domain.Apps.Core.Contents; +using Squidex.Domain.Apps.Entities.Apps; +using Squidex.Domain.Apps.Entities.Schemas; +using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public sealed class GrainTextIndexer : ITextIndexer + { + private readonly IGrainFactory grainFactory; + private readonly ISemanticLog log; + + public GrainTextIndexer(IGrainFactory grainFactory, ISemanticLog log) + { + Guard.NotNull(grainFactory, nameof(grainFactory)); + Guard.NotNull(log, nameof(log)); + + this.grainFactory = grainFactory; + + this.log = log; + } + + public async Task DeleteAsync(Guid schemaId, Guid id) + { + var index = grainFactory.GetGrain(schemaId); + + using (Profiler.TraceMethod()) + { + try + { + await index.DeleteAsync(id); + } + catch (Exception ex) + { + log.LogError(ex, w => w + .WriteProperty("action", "DeleteTextEntry") + .WriteProperty("status", "Failed")); + } + } + } + + public async Task IndexAsync(Guid schemaId, Guid id, NamedContentData data, NamedContentData dataDraft) + { + var index = grainFactory.GetGrain(schemaId); + + using (Profiler.TraceMethod()) + { + try + { + if (data != null) + { + await index.IndexAsync(id, new IndexData { Data = data }); + } + + if (dataDraft != null) + { + await index.IndexAsync(id, new IndexData { Data = dataDraft, IsDraft = true }); + } + } + catch (Exception ex) + { + log.LogError(ex, w => w + .WriteProperty("action", "UpdateTextEntry") + .WriteProperty("status", "Failed")); + } + } + } + + public async Task> SearchAsync(string queryText, IAppEntity app, Guid schemaId, bool useDraft = false) + { + if (string.IsNullOrWhiteSpace(queryText)) + { + return null; + } + + var index = grainFactory.GetGrain(schemaId); + + using (Profiler.TraceMethod()) + { + var context = CreateContext(app, useDraft); + + return await index.SearchAsync(queryText, context); + } + } + + private static SearchContext CreateContext(IAppEntity app, bool useDraft) + { + var languages = new HashSet(app.LanguagesConfig.Select(x => x.Key)); + + return new SearchContext { Languages = languages, IsDraft = useDraft }; + } + } +} diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Text/ITextIndexer.cs b/src/Squidex.Domain.Apps.Entities/Contents/Text/ITextIndexer.cs new file mode 100644 index 000000000..b350b8ee9 --- /dev/null +++ b/src/Squidex.Domain.Apps.Entities/Contents/Text/ITextIndexer.cs @@ -0,0 +1,24 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Squidex.Domain.Apps.Core.Contents; +using Squidex.Domain.Apps.Entities.Apps; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public interface ITextIndexer + { + Task DeleteAsync(Guid schemaId, Guid id); + + Task IndexAsync(Guid schemaId, Guid id, NamedContentData data, NamedContentData dataDraft); + + Task> SearchAsync(string queryText, IAppEntity app, Guid schemaId, bool useDraft = false); + } +} diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Text/ITextIndexerGrain.cs b/src/Squidex.Domain.Apps.Entities/Contents/Text/ITextIndexerGrain.cs new file mode 100644 index 000000000..dd1d4c5c8 --- /dev/null +++ b/src/Squidex.Domain.Apps.Entities/Contents/Text/ITextIndexerGrain.cs @@ -0,0 +1,24 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans; +using Squidex.Infrastructure.Orleans; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public interface ITextIndexerGrain : IGrainWithGuidKey + { + Task DeleteAsync(Guid id); + + Task IndexAsync(Guid id, J data); + + Task> SearchAsync(string queryText, SearchContext context); + } +} \ No newline at end of file diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Text/IndexData.cs b/src/Squidex.Domain.Apps.Entities/Contents/Text/IndexData.cs new file mode 100644 index 000000000..7911be4a7 --- /dev/null +++ b/src/Squidex.Domain.Apps.Entities/Contents/Text/IndexData.cs @@ -0,0 +1,18 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Squidex.Domain.Apps.Core.Contents; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public sealed class IndexData + { + public NamedContentData Data { get; set; } + + public bool IsDraft { get; set; } + } +} diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Text/MultiLanguageAnalyzer.cs b/src/Squidex.Domain.Apps.Entities/Contents/Text/MultiLanguageAnalyzer.cs new file mode 100644 index 000000000..8bd41df9c --- /dev/null +++ b/src/Squidex.Domain.Apps.Entities/Contents/Text/MultiLanguageAnalyzer.cs @@ -0,0 +1,65 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using Lucene.Net.Analysis; +using Lucene.Net.Analysis.Standard; +using Lucene.Net.Util; +using Squidex.Infrastructure; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public sealed class MultiLanguageAnalyzer : AnalyzerWrapper + { + private readonly StandardAnalyzer fallbackAnalyzer; + private readonly Dictionary analyzers = new Dictionary(StringComparer.OrdinalIgnoreCase); + + public MultiLanguageAnalyzer(LuceneVersion version) + : base(PER_FIELD_REUSE_STRATEGY) + { + fallbackAnalyzer = new StandardAnalyzer(version); + + foreach (var type in typeof(StandardAnalyzer).Assembly.GetTypes()) + { + if (typeof(Analyzer).IsAssignableFrom(type)) + { + var language = type.Namespace.Split('.').Last(); + + if (language.Length == 2) + { + try + { + var analyzer = Activator.CreateInstance(type, version); + + analyzers[language] = (Analyzer)analyzer; + } + catch (MissingMethodException) + { + continue; + } + } + } + } + } + + protected override Analyzer GetWrappedAnalyzer(string fieldName) + { + if (fieldName.Length > 0) + { + var analyzer = analyzers.GetOrDefault(fieldName.Substring(0, 2)) ?? fallbackAnalyzer; + + return analyzer; + } + else + { + return fallbackAnalyzer; + } + } + } +} diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Text/PersistenceHelper.cs b/src/Squidex.Domain.Apps.Entities/Contents/Text/PersistenceHelper.cs new file mode 100644 index 000000000..621e141c9 --- /dev/null +++ b/src/Squidex.Domain.Apps.Entities/Contents/Text/PersistenceHelper.cs @@ -0,0 +1,91 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.IO; +using System.IO.Compression; +using System.Threading.Tasks; +using Squidex.Infrastructure.Assets; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public static class PersistenceHelper + { + private const string ArchiveFile = "Archive.zip"; + private const string LockFile = "write.lock"; + + public static async Task UploadDirectoryAsync(this IAssetStore assetStore, DirectoryInfo directory) + { + using (var fileStream = new FileStream( + Path.Combine(directory.FullName, ArchiveFile), + FileMode.Create, + FileAccess.ReadWrite, + FileShare.None, + 4096, + FileOptions.DeleteOnClose)) + { + using (var zipArchive = new ZipArchive(fileStream, ZipArchiveMode.Create, true)) + { + foreach (var file in directory.GetFiles()) + { + try + { + if (!file.Name.Equals(ArchiveFile, StringComparison.OrdinalIgnoreCase) && + !file.Name.Equals(LockFile, StringComparison.OrdinalIgnoreCase)) + { + zipArchive.CreateEntryFromFile(file.FullName, file.Name); + } + } + catch (IOException) + { + continue; + } + } + } + + fileStream.Position = 0; + + await assetStore.UploadAsync(directory.Name, 0, string.Empty, fileStream, true); + } + } + + public static async Task DownloadAsync(this IAssetStore assetStore, DirectoryInfo directory) + { + if (directory.Exists) + { + directory.Delete(true); + } + + directory.Create(); + + using (var fileStream = new FileStream( + Path.Combine(directory.FullName, ArchiveFile), + FileMode.Create, + FileAccess.ReadWrite, + FileShare.None, + 4096, + FileOptions.DeleteOnClose)) + { + try + { + await assetStore.DownloadAsync(directory.Name, 0, string.Empty, fileStream); + + fileStream.Position = 0; + + using (var zipArchive = new ZipArchive(fileStream, ZipArchiveMode.Read, true)) + { + zipArchive.ExtractToDirectory(directory.FullName); + } + } + catch (AssetNotFoundException) + { + return; + } + } + } + } +} diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Text/SearchContext.cs b/src/Squidex.Domain.Apps.Entities/Contents/Text/SearchContext.cs new file mode 100644 index 000000000..01bd8f78a --- /dev/null +++ b/src/Squidex.Domain.Apps.Entities/Contents/Text/SearchContext.cs @@ -0,0 +1,18 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Collections.Generic; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public sealed class SearchContext + { + public bool IsDraft { get; set; } + + public HashSet Languages { get; set; } + } +} diff --git a/src/Squidex.Domain.Apps.Entities/Contents/Text/TextIndexerGrain.cs b/src/Squidex.Domain.Apps.Entities/Contents/Text/TextIndexerGrain.cs new file mode 100644 index 000000000..5335954ca --- /dev/null +++ b/src/Squidex.Domain.Apps.Entities/Contents/Text/TextIndexerGrain.cs @@ -0,0 +1,272 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Lucene.Net.Analysis; +using Lucene.Net.Documents; +using Lucene.Net.Index; +using Lucene.Net.Queries; +using Lucene.Net.QueryParsers.Classic; +using Lucene.Net.Search; +using Lucene.Net.Store; +using Lucene.Net.Util; +using Squidex.Infrastructure; +using Squidex.Infrastructure.Assets; +using Squidex.Infrastructure.Json.Objects; +using Squidex.Infrastructure.Orleans; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public sealed class TextIndexerGrain : GrainOfGuid, ITextIndexerGrain + { + private const LuceneVersion Version = LuceneVersion.LUCENE_48; + private const int MaxResults = 2000; + private const int MaxUpdates = 100; + private static readonly TimeSpan CommitDelay = TimeSpan.FromSeconds(30); + private static readonly Analyzer Analyzer = new MultiLanguageAnalyzer(Version); + private readonly IAssetStore assetStore; + private IDisposable timer; + private DirectoryInfo directory; + private IndexWriter indexWriter; + private IndexReader indexReader; + private QueryParser queryParser; + private HashSet currentLanguages; + private long updates; + + public TextIndexerGrain(IAssetStore assetStore) + { + Guard.NotNull(assetStore, nameof(assetStore)); + + this.assetStore = assetStore; + } + + public override async Task OnDeactivateAsync() + { + await DeactivateAsync(true); + } + + protected override async Task OnActivateAsync(Guid key) + { + directory = new DirectoryInfo(Path.Combine(Path.GetTempPath(), $"Index_{key}")); + + await assetStore.DownloadAsync(directory); + + indexWriter = new IndexWriter(FSDirectory.Open(directory), new IndexWriterConfig(Version, Analyzer)); + indexReader = indexWriter.GetReader(true); + } + + public Task DeleteAsync(Guid id) + { + indexWriter.DeleteDocuments(new Term("id", id.ToString())); + + return TryFlushAsync(); + } + + public Task IndexAsync(Guid id, J data) + { + var docId = id.ToString(); + var docDraft = data.Value.IsDraft.ToString(); + var docKey = $"{docId}_{docDraft}"; + + var query = new BooleanQuery(); + + indexWriter.DeleteDocuments(new Term("key", docKey)); + + var languages = new Dictionary(); + + void AppendText(string language, string text) + { + if (!string.IsNullOrWhiteSpace(text)) + { + var sb = languages.GetOrAddNew(language); + + if (sb.Length > 0) + { + sb.Append(" "); + } + + sb.Append(text); + } + } + + foreach (var field in data.Value.Data) + { + foreach (var fieldValue in field.Value) + { + var appendText = new Action(text => AppendText(fieldValue.Key, text)); + + AppendJsonText(fieldValue.Value, appendText); + } + } + + if (languages.Count > 0) + { + var document = new Document(); + + document.AddStringField("id", docId, Field.Store.YES); + document.AddStringField("key", docKey, Field.Store.YES); + document.AddStringField("draft", docDraft, Field.Store.YES); + + foreach (var field in languages) + { + var fieldName = BuildFieldName(field.Key); + + document.AddTextField(fieldName, field.Value.ToString(), Field.Store.NO); + } + + indexWriter.AddDocument(document); + } + + return TryFlushAsync(); + } + + private static void AppendJsonText(IJsonValue value, Action appendText) + { + if (value.Type == JsonValueType.String) + { + appendText(value.ToString()); + } + else if (value is JsonArray array) + { + foreach (var item in array) + { + AppendJsonText(item, appendText); + } + } + else if (value is JsonObject obj) + { + foreach (var item in obj.Values) + { + AppendJsonText(item, appendText); + } + } + } + + public Task> SearchAsync(string queryText, SearchContext context) + { + var result = new HashSet(); + + if (!string.IsNullOrWhiteSpace(queryText)) + { + var query = BuildQuery(queryText, context); + + if (indexReader != null) + { + var filter = new TermsFilter(new Term("draft", context.IsDraft.ToString())); + + var hits = new IndexSearcher(indexReader).Search(query, filter, MaxResults).ScoreDocs; + + foreach (var hit in hits) + { + var document = indexReader.Document(hit.Doc); + + var idField = document.GetField("id")?.GetStringValue(); + + if (idField != null && Guid.TryParse(idField, out var guid)) + { + result.Add(guid); + } + } + } + } + + return Task.FromResult(result.ToList()); + } + + private Query BuildQuery(string query, SearchContext context) + { + if (queryParser == null || !currentLanguages.SetEquals(context.Languages)) + { + var fields = + context.Languages.Select(BuildFieldName) + .Union(Enumerable.Repeat(BuildFieldName("iv"), 1)).ToArray(); + + queryParser = new MultiFieldQueryParser(Version, fields, Analyzer); + + currentLanguages = context.Languages; + } + + try + { + return queryParser.Parse(query); + } + catch (ParseException ex) + { + throw new ValidationException(ex.Message); + } + } + + private async Task TryFlushAsync() + { + updates++; + + if (updates >= MaxUpdates) + { + await FlushAsync(); + } + else + { + timer?.Dispose(); + + try + { + timer = RegisterTimer(_ => FlushAsync(), null, CommitDelay, CommitDelay); + } + catch (InvalidOperationException) + { + return; + } + } + } + + public async Task FlushAsync() + { + if (updates > 0 && indexWriter != null) + { + indexWriter.Flush(true, true); + indexWriter.Commit(); + + indexReader?.Dispose(); + indexReader = indexWriter.GetReader(true); + + await assetStore.UploadDirectoryAsync(directory); + + updates = 0; + } + else + { + timer?.Dispose(); + } + } + + public async Task DeactivateAsync(bool deleteFolder = false) + { + await TryFlushAsync(); + + indexWriter?.Dispose(); + indexWriter = null; + + indexReader?.Dispose(); + indexReader = null; + + if (deleteFolder && directory.Exists) + { + directory.Delete(true); + } + } + + private static string BuildFieldName(string language) + { + return $"{language}_field"; + } + } +} diff --git a/src/Squidex.Domain.Apps.Entities/Squidex.Domain.Apps.Entities.csproj b/src/Squidex.Domain.Apps.Entities/Squidex.Domain.Apps.Entities.csproj index 6adb6a190..7b7db6907 100644 --- a/src/Squidex.Domain.Apps.Entities/Squidex.Domain.Apps.Entities.csproj +++ b/src/Squidex.Domain.Apps.Entities/Squidex.Domain.Apps.Entities.csproj @@ -16,6 +16,10 @@ + + + + all runtime; build; native; contentfiles; analyzers diff --git a/src/Squidex.Infrastructure.Azure/Assets/AzureBlobAssetStore.cs b/src/Squidex.Infrastructure.Azure/Assets/AzureBlobAssetStore.cs index 291fda481..e09a88879 100644 --- a/src/Squidex.Infrastructure.Azure/Assets/AzureBlobAssetStore.cs +++ b/src/Squidex.Infrastructure.Azure/Assets/AzureBlobAssetStore.cs @@ -110,14 +110,14 @@ namespace Squidex.Infrastructure.Assets } } - public Task UploadAsync(string id, long version, string suffix, Stream stream, CancellationToken ct = default) + public Task UploadAsync(string id, long version, string suffix, Stream stream, bool overwrite = false, CancellationToken ct = default) { - return UploadCoreAsync(GetObjectName(id, version, suffix), stream, ct); + return UploadCoreAsync(GetObjectName(id, version, suffix), stream, overwrite, ct); } public Task UploadAsync(string fileName, Stream stream, CancellationToken ct = default) { - return UploadCoreAsync(fileName, stream, ct); + return UploadCoreAsync(fileName, stream, false, ct); } public Task DeleteAsync(string id, long version, string suffix) @@ -137,13 +137,13 @@ namespace Squidex.Infrastructure.Assets return blob.DeleteIfExistsAsync(); } - private async Task UploadCoreAsync(string blobName, Stream stream, CancellationToken ct = default) + private async Task UploadCoreAsync(string blobName, Stream stream, bool overwrite = false, CancellationToken ct = default) { try { var tempBlob = blobContainer.GetBlockBlobReference(blobName); - await tempBlob.UploadFromStreamAsync(stream, AccessCondition.GenerateIfNotExistsCondition(), null, null, ct); + await tempBlob.UploadFromStreamAsync(stream, overwrite ? null : AccessCondition.GenerateIfNotExistsCondition(), null, null, ct); } catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == 409) { diff --git a/src/Squidex.Infrastructure.Azure/Diagnostics/CosmosDbHealthCheck.cs b/src/Squidex.Infrastructure.Azure/Diagnostics/CosmosDbHealthCheck.cs new file mode 100644 index 000000000..09698e250 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/Diagnostics/CosmosDbHealthCheck.cs @@ -0,0 +1,32 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Client; +using Microsoft.Extensions.Diagnostics.HealthChecks; + +namespace Squidex.Infrastructure.Diagnostics +{ + public sealed class CosmosDbHealthCheck : IHealthCheck + { + private readonly DocumentClient documentClient; + + public CosmosDbHealthCheck(Uri uri, string masterKey) + { + documentClient = new DocumentClient(uri, masterKey); + } + + public async Task CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default) + { + await documentClient.ReadDatabaseFeedAsync(); + + return HealthCheckResult.Healthy("Application must query data from CosmosDB."); + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/Constants.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/Constants.cs new file mode 100644 index 000000000..120f38704 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/Constants.cs @@ -0,0 +1,16 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +namespace Squidex.Infrastructure.EventSourcing +{ + internal static class Constants + { + public const string Collection = "Events"; + + public const string LeaseCollection = "Leases"; + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs new file mode 100644 index 000000000..c12c6548f --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs @@ -0,0 +1,33 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Newtonsoft.Json; + +namespace Squidex.Infrastructure.EventSourcing +{ + internal sealed class CosmosDbEvent + { + [JsonProperty("type")] + public string Type { get; set; } + + [JsonProperty("payload")] + public string Payload { get; set; } + + [JsonProperty("header")] + public EnvelopeHeaders Headers { get; set; } + + public static CosmosDbEvent FromEventData(EventData data) + { + return new CosmosDbEvent { Type = data.Type, Headers = data.Headers, Payload = data.Payload }; + } + + public EventData ToEventData() + { + return new EventData(Type, Headers, Payload); + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs new file mode 100644 index 000000000..6a5dca9b3 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs @@ -0,0 +1,33 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using Newtonsoft.Json; + +namespace Squidex.Infrastructure.EventSourcing +{ + internal sealed class CosmosDbEventCommit + { + [JsonProperty("id")] + public Guid Id { get; set; } + + [JsonProperty("events")] + public CosmosDbEvent[] Events { get; set; } + + [JsonProperty("eventStreamOffset")] + public long EventStreamOffset { get; set; } + + [JsonProperty("eventsCount")] + public long EventsCount { get; set; } + + [JsonProperty("eventStream")] + public string EventStream { get; set; } + + [JsonProperty("timestamp")] + public long Timestamp { get; set; } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs new file mode 100644 index 000000000..a07bf13ec --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs @@ -0,0 +1,124 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.ObjectModel; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Newtonsoft.Json; + +namespace Squidex.Infrastructure.EventSourcing +{ + public sealed partial class CosmosDbEventStore : DisposableObjectBase, IEventStore, IInitializable + { + private readonly DocumentClient documentClient; + private readonly Uri collectionUri; + private readonly Uri databaseUri; + private readonly string masterKey; + private readonly string databaseId; + private readonly JsonSerializerSettings serializerSettings; + + public JsonSerializerSettings SerializerSettings + { + get { return serializerSettings; } + } + + public string DatabaseId + { + get { return databaseId; } + } + + public string MasterKey + { + get { return masterKey; } + } + + public Uri ServiceUri + { + get { return documentClient.ServiceEndpoint; } + } + + public CosmosDbEventStore(DocumentClient documentClient, string masterKey, string database, JsonSerializerSettings serializerSettings) + { + Guard.NotNull(documentClient, nameof(documentClient)); + Guard.NotNull(serializerSettings, nameof(serializerSettings)); + Guard.NotNullOrEmpty(masterKey, nameof(masterKey)); + Guard.NotNullOrEmpty(database, nameof(database)); + + this.documentClient = documentClient; + + databaseUri = UriFactory.CreateDatabaseUri(database); + databaseId = database; + + collectionUri = UriFactory.CreateDocumentCollectionUri(database, Constants.Collection); + + this.masterKey = masterKey; + + this.serializerSettings = serializerSettings; + } + + protected override void DisposeObject(bool disposing) + { + if (disposing) + { + documentClient.Dispose(); + } + } + + public async Task InitializeAsync(CancellationToken ct = default) + { + await documentClient.CreateDatabaseIfNotExistsAsync(new Database { Id = databaseId }); + + await documentClient.CreateDocumentCollectionIfNotExistsAsync(databaseUri, + new DocumentCollection + { + Id = Constants.LeaseCollection, + }); + + await documentClient.CreateDocumentCollectionIfNotExistsAsync(databaseUri, + new DocumentCollection + { + IndexingPolicy = new IndexingPolicy + { + IncludedPaths = new Collection + { + new IncludedPath + { + Path = "/*", + Indexes = new Collection + { + Index.Range(DataType.Number), + Index.Range(DataType.String), + } + } + } + }, + UniqueKeyPolicy = new UniqueKeyPolicy + { + UniqueKeys = new Collection + { + new UniqueKey + { + Paths = new Collection + { + $"/eventStream", + $"/eventStreamOffset" + } + } + } + }, + Id = Constants.Collection, + }, + new RequestOptions + { + PartitionKey = new PartitionKey($"/eventStream") + }); + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs new file mode 100644 index 000000000..e0cccf559 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs @@ -0,0 +1,142 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Squidex.Infrastructure.Log; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.EventSourcing +{ + public delegate bool EventPredicate(EventData data); + + public partial class CosmosDbEventStore : IEventStore, IInitializable + { + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter = null, string position = null) + { + Guard.NotNull(subscriber, nameof(subscriber)); + + ThrowIfDisposed(); + + return new CosmosDbSubscription(this, subscriber, streamFilter, position); + } + + public Task CreateIndexAsync(string property) + { + Guard.NotNullOrEmpty(property, nameof(property)); + + ThrowIfDisposed(); + + return Task.CompletedTask; + } + + public async Task> QueryAsync(string streamName, long streamPosition = 0) + { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + + ThrowIfDisposed(); + + using (Profiler.TraceMethod()) + { + var query = FilterBuilder.ByStreamName(streamName, streamPosition - MaxCommitSize); + + var result = new List(); + + await documentClient.QueryAsync(collectionUri, query, commit => + { + var eventStreamOffset = (int)commit.EventStreamOffset; + + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; + + foreach (var @event in commit.Events) + { + eventStreamOffset++; + + if (eventStreamOffset >= streamPosition) + { + var eventData = @event.ToEventData(); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + + result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); + } + } + + return TaskHelper.Done; + }); + + return result; + } + } + + public Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default) + { + Guard.NotNull(callback, nameof(callback)); + Guard.NotNullOrEmpty(property, nameof(property)); + Guard.NotNull(value, nameof(value)); + + ThrowIfDisposed(); + + StreamPosition lastPosition = position; + + var filterDefinition = FilterBuilder.CreateByProperty(property, value, lastPosition); + var filterExpression = FilterBuilder.CreateExpression(property, value); + + return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); + } + + public Task QueryAsync(Func callback, string streamFilter = null, string position = null, CancellationToken ct = default) + { + Guard.NotNull(callback, nameof(callback)); + + ThrowIfDisposed(); + + StreamPosition lastPosition = position; + + var filterDefinition = FilterBuilder.CreateByFilter(streamFilter, lastPosition); + var filterExpression = FilterBuilder.CreateExpression(null, null); + + return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); + } + + private async Task QueryAsync(Func callback, StreamPosition lastPosition, SqlQuerySpec query, EventPredicate filterExpression, CancellationToken ct = default) + { + using (Profiler.TraceMethod()) + { + await documentClient.QueryAsync(collectionUri, query, async commit => + { + var eventStreamOffset = (int)commit.EventStreamOffset; + + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; + + foreach (var @event in commit.Events) + { + eventStreamOffset++; + + if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) + { + var eventData = @event.ToEventData(); + + if (filterExpression(eventData)) + { + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + + await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData)); + } + } + + commitOffset++; + } + }, ct); + } + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs new file mode 100644 index 000000000..45144e56e --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs @@ -0,0 +1,149 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using NodaTime; +using Squidex.Infrastructure.Log; + +namespace Squidex.Infrastructure.EventSourcing +{ + public partial class CosmosDbEventStore + { + private const int MaxWriteAttempts = 20; + private const int MaxCommitSize = 10; + + public Task DeleteStreamAsync(string streamName) + { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + + ThrowIfDisposed(); + + var query = FilterBuilder.AllIds(streamName); + + return documentClient.QueryAsync(collectionUri, query, commit => + { + var documentUri = UriFactory.CreateDocumentUri(databaseId, Constants.Collection, commit.Id.ToString()); + + return documentClient.DeleteDocumentAsync(documentUri); + }); + } + + public Task AppendAsync(Guid commitId, string streamName, ICollection events) + { + return AppendAsync(commitId, streamName, EtagVersion.Any, events); + } + + public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) + { + Guard.NotEmpty(commitId, nameof(commitId)); + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); + Guard.LessThan(events.Count, MaxCommitSize, "events.Count"); + + ThrowIfDisposed(); + + using (Profiler.TraceMethod()) + { + if (events.Count == 0) + { + return; + } + + var currentVersion = await GetEventStreamOffsetAsync(streamName); + + if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } + + var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); + + for (var attempt = 0; attempt < MaxWriteAttempts; attempt++) + { + try + { + await documentClient.CreateDocumentAsync(collectionUri, commit); + + return; + } + catch (DocumentClientException ex) + { + if (ex.StatusCode == HttpStatusCode.Conflict) + { + currentVersion = await GetEventStreamOffsetAsync(streamName); + + if (expectedVersion != EtagVersion.Any) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } + + if (attempt < MaxWriteAttempts) + { + expectedVersion = currentVersion; + } + else + { + throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); + } + } + else + { + throw; + } + } + } + } + } + + private async Task GetEventStreamOffsetAsync(string streamName) + { + var query = + documentClient.CreateDocumentQuery(collectionUri, + FilterBuilder.LastPosition(streamName)); + + var document = await query.FirstOrDefaultAsync(); + + if (document != null) + { + return document.EventStreamOffset + document.EventsCount; + } + + return EtagVersion.Empty; + } + + private static CosmosDbEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection events) + { + var commitEvents = new CosmosDbEvent[events.Count]; + + var i = 0; + + foreach (var e in events) + { + var mongoEvent = CosmosDbEvent.FromEventData(e); + + commitEvents[i++] = mongoEvent; + } + + var mongoCommit = new CosmosDbEventCommit + { + Id = commitId, + Events = commitEvents, + EventsCount = events.Count, + EventStream = streamName, + EventStreamOffset = expectedVersion, + Timestamp = SystemClock.Instance.GetCurrentInstant().ToUnixTimeTicks() + }; + + return mongoCommit; + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbSubscription.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbSubscription.cs new file mode 100644 index 000000000..fa5d8af86 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbSubscription.cs @@ -0,0 +1,150 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.ChangeFeedProcessor.FeedProcessing; +using Newtonsoft.Json; +using Builder = Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedProcessorBuilder; +using Collection = Microsoft.Azure.Documents.ChangeFeedProcessor.DocumentCollectionInfo; +using Options = Microsoft.Azure.Documents.ChangeFeedProcessor.ChangeFeedProcessorOptions; + +#pragma warning disable IDE0017 // Simplify object initialization + +namespace Squidex.Infrastructure.EventSourcing +{ + internal sealed class CosmosDbSubscription : IEventSubscription, IChangeFeedObserverFactory, IChangeFeedObserver + { + private readonly TaskCompletionSource processorStopRequested = new TaskCompletionSource(); + private readonly Task processorTask; + private readonly CosmosDbEventStore store; + private readonly Regex regex; + private readonly string hostName; + private readonly IEventSubscriber subscriber; + + public CosmosDbSubscription(CosmosDbEventStore store, IEventSubscriber subscriber, string streamFilter, string position = null) + { + this.store = store; + + var fromBeginning = string.IsNullOrWhiteSpace(position); + + if (fromBeginning) + { + hostName = $"squidex.{DateTime.UtcNow.Ticks.ToString()}"; + } + else + { + hostName = position; + } + + if (!StreamFilter.IsAll(streamFilter)) + { + regex = new Regex(streamFilter); + } + + this.subscriber = subscriber; + + processorTask = Task.Run(async () => + { + try + { + Collection CreateCollection(string name) + { + var collection = new Collection(); + + collection.CollectionName = name; + collection.DatabaseName = store.DatabaseId; + collection.MasterKey = store.MasterKey; + collection.Uri = store.ServiceUri; + + return collection; + } + + var processor = + await new Builder() + .WithFeedCollection(CreateCollection(Constants.Collection)) + .WithLeaseCollection(CreateCollection(Constants.LeaseCollection)) + .WithHostName(hostName) + .WithProcessorOptions(new Options { StartFromBeginning = fromBeginning, LeasePrefix = hostName }) + .WithObserverFactory(this) + .BuildAsync(); + + await processor.StartAsync(); + await processorStopRequested.Task; + await processor.StopAsync(); + } + catch (Exception ex) + { + await subscriber.OnErrorAsync(this, ex); + } + }); + } + + public IChangeFeedObserver CreateObserver() + { + return this; + } + + public async Task CloseAsync(IChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason) + { + if (reason == ChangeFeedObserverCloseReason.ObserverError) + { + await subscriber.OnErrorAsync(this, new InvalidOperationException("Change feed observer failed.")); + } + } + + public Task OpenAsync(IChangeFeedObserverContext context) + { + return Task.CompletedTask; + } + + public async Task ProcessChangesAsync(IChangeFeedObserverContext context, IReadOnlyList docs, CancellationToken cancellationToken) + { + if (!processorStopRequested.Task.IsCompleted) + { + foreach (var document in docs) + { + if (!processorStopRequested.Task.IsCompleted) + { + var streamName = document.GetPropertyValue("eventStream"); + + if (regex == null || regex.IsMatch(streamName)) + { + var commit = JsonConvert.DeserializeObject(document.ToString(), store.SerializerSettings); + + var eventStreamOffset = (int)commit.EventStreamOffset; + + foreach (var @event in commit.Events) + { + eventStreamOffset++; + + var eventData = @event.ToEventData(); + + await subscriber.OnEventAsync(this, new StoredEvent(commit.EventStream, hostName, eventStreamOffset, eventData)); + } + } + } + } + } + } + + public void WakeUp() + { + } + + public Task StopAsync() + { + processorStopRequested.SetResult(true); + + return processorTask; + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs new file mode 100644 index 000000000..b6bd7686c --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs @@ -0,0 +1,156 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Collections.Generic; +using Microsoft.Azure.Documents; +using Squidex.Infrastructure.Json.Objects; + +namespace Squidex.Infrastructure.EventSourcing +{ + internal static class FilterBuilder + { + public static SqlQuerySpec AllIds(string streamName) + { + var query = + $"SELECT TOP 1 " + + $" e.id," + + $" e.eventsCount " + + $"FROM {Constants.Collection} e " + + $"WHERE " + + $" e.eventStream = @name " + + $"ORDER BY e.eventStreamOffset DESC"; + + var parameters = new SqlParameterCollection + { + new SqlParameter("@name", streamName) + }; + + return new SqlQuerySpec(query, parameters); + } + + public static SqlQuerySpec LastPosition(string streamName) + { + var query = + $"SELECT TOP 1 " + + $" e.eventStreamOffset," + + $" e.eventsCount " + + $"FROM {Constants.Collection} e " + + $"WHERE " + + $" e.eventStream = @name " + + $"ORDER BY e.eventStreamOffset DESC"; + + var parameters = new SqlParameterCollection + { + new SqlParameter("@name", streamName) + }; + + return new SqlQuerySpec(query, parameters); + } + + public static SqlQuerySpec ByStreamName(string streamName, long streamPosition = 0) + { + var query = + $"SELECT * " + + $"FROM {Constants.Collection} e " + + $"WHERE " + + $" e.eventStream = @name " + + $"AND e.eventStreamOffset >= @position " + + $"ORDER BY e.eventStreamOffset ASC"; + + var parameters = new SqlParameterCollection + { + new SqlParameter("@name", streamName), + new SqlParameter("@position", streamPosition) + }; + + return new SqlQuerySpec(query, parameters); + } + + public static SqlQuerySpec CreateByProperty(string property, object value, StreamPosition streamPosition) + { + var filters = new List(); + + var parameters = new SqlParameterCollection(); + + filters.ForPosition(parameters, streamPosition); + filters.ForProperty(parameters, property, value); + + return BuildQuery(filters, parameters); + } + + public static SqlQuerySpec CreateByFilter(string streamFilter, StreamPosition streamPosition) + { + var filters = new List(); + + var parameters = new SqlParameterCollection(); + + filters.ForPosition(parameters, streamPosition); + filters.ForRegex(parameters, streamFilter); + + return BuildQuery(filters, parameters); + } + + private static SqlQuerySpec BuildQuery(List filters, SqlParameterCollection parameters) + { + var query = $"SELECT * FROM {Constants.Collection} e WHERE {string.Join(" AND ", filters)} ORDER BY e.timestamp"; + + return new SqlQuerySpec(query, parameters); + } + + private static void ForProperty(this List filters, SqlParameterCollection parameters, string property, object value) + { + filters.Add($"ARRAY_CONTAINS(e.events, {{ \"header\": {{ \"{property}\": @value }} }}, true)"); + + parameters.Add(new SqlParameter("@value", value)); + } + + private static void ForRegex(this List filters, SqlParameterCollection parameters, string streamFilter) + { + if (!StreamFilter.IsAll(streamFilter)) + { + if (streamFilter.Contains("^")) + { + filters.Add($"STARTSWITH(e.eventStream, @filter)"); + } + else + { + filters.Add($"e.eventStream = @filter"); + } + + parameters.Add(new SqlParameter("@filter", streamFilter)); + } + } + + private static void ForPosition(this List filters, SqlParameterCollection parameters, StreamPosition streamPosition) + { + if (streamPosition.IsEndOfCommit) + { + filters.Add($"e.timestamp > @time"); + } + else + { + filters.Add($"e.timestamp >= @time"); + } + + parameters.Add(new SqlParameter("@time", streamPosition.Timestamp)); + } + + public static EventPredicate CreateExpression(string property, object value) + { + if (!string.IsNullOrWhiteSpace(property)) + { + var jsonValue = JsonValue.Create(value); + + return x => x.Headers.TryGetValue(property, out var p) && p.Equals(jsonValue); + } + else + { + return x => true; + } + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/FilterExtensions.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/FilterExtensions.cs new file mode 100644 index 000000000..c24e93ff1 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/FilterExtensions.cs @@ -0,0 +1,62 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Microsoft.Azure.Documents.Linq; + +namespace Squidex.Infrastructure.EventSourcing +{ + internal static class FilterExtensions + { + public static async Task FirstOrDefaultAsync(this IQueryable queryable, CancellationToken ct = default) + { + var documentQuery = queryable.AsDocumentQuery(); + + using (documentQuery) + { + if (documentQuery.HasMoreResults) + { + var results = await documentQuery.ExecuteNextAsync(ct); + + return results.FirstOrDefault(); + } + } + + return default; + } + + public static Task QueryAsync(this DocumentClient documentClient, Uri collectionUri, SqlQuerySpec querySpec, Func handler, CancellationToken ct = default) + { + var query = documentClient.CreateDocumentQuery(collectionUri, querySpec); + + return query.QueryAsync(handler, ct); + } + + public static async Task QueryAsync(this IQueryable queryable, Func handler, CancellationToken ct = default) + { + var documentQuery = queryable.AsDocumentQuery(); + + using (documentQuery) + { + while (documentQuery.HasMoreResults && !ct.IsCancellationRequested) + { + var items = await documentQuery.ExecuteNextAsync(ct); + + foreach (var item in items) + { + await handler(item); + } + } + } + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/StreamPosition.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/StreamPosition.cs new file mode 100644 index 000000000..f0626ee5d --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/StreamPosition.cs @@ -0,0 +1,55 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +namespace Squidex.Infrastructure.EventSourcing +{ + internal sealed class StreamPosition + { + public long Timestamp { get; } + + public long CommitOffset { get; } + + public long CommitSize { get; } + + public bool IsEndOfCommit + { + get { return CommitOffset == CommitSize - 1; } + } + + public StreamPosition(long timestamp, long commitOffset, long commitSize) + { + Timestamp = timestamp; + + CommitOffset = commitOffset; + CommitSize = commitSize; + } + + public static implicit operator string(StreamPosition position) + { + var parts = new object[] + { + position.Timestamp, + position.CommitOffset, + position.CommitSize + }; + + return string.Join("-", parts); + } + + public static implicit operator StreamPosition(string position) + { + if (!string.IsNullOrWhiteSpace(position)) + { + var parts = position.Split('-'); + + return new StreamPosition(long.Parse(parts[0]), long.Parse(parts[1]), long.Parse(parts[2])); + } + + return new StreamPosition(0, -1, -1); + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/Squidex.Infrastructure.Azure.csproj b/src/Squidex.Infrastructure.Azure/Squidex.Infrastructure.Azure.csproj index 755262980..32ea4d0ad 100644 --- a/src/Squidex.Infrastructure.Azure/Squidex.Infrastructure.Azure.csproj +++ b/src/Squidex.Infrastructure.Azure/Squidex.Infrastructure.Azure.csproj @@ -5,6 +5,8 @@ 7.3 + + diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs index 1f2dfcecb..d06c1ea15 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs @@ -53,18 +53,26 @@ namespace Squidex.Infrastructure.EventSourcing await projectionClient.ConnectAsync(ct); } - public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter = null, string position = null) { + Guard.NotNull(streamFilter, nameof(streamFilter)); + return new GetEventStoreSubscription(connection, subscriber, serializer, projectionClient, position, prefix, streamFilter); } public Task CreateIndexAsync(string property) { + Guard.NotNullOrEmpty(property, nameof(property)); + return projectionClient.CreateProjectionAsync(property, string.Empty); } public async Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default) { + Guard.NotNull(callback, nameof(callback)); + Guard.NotNullOrEmpty(property, nameof(property)); + Guard.NotNull(value, nameof(value)); + using (Profiler.TraceMethod()) { var streamName = await projectionClient.CreateProjectionAsync(property, value); @@ -77,6 +85,8 @@ namespace Squidex.Infrastructure.EventSourcing public async Task QueryAsync(Func callback, string streamFilter = null, string position = null, CancellationToken ct = default) { + Guard.NotNull(callback, nameof(callback)); + using (Profiler.TraceMethod()) { var streamName = await projectionClient.CreateProjectionAsync(streamFilter); @@ -111,6 +121,8 @@ namespace Squidex.Infrastructure.EventSourcing public async Task> QueryAsync(string streamName, long streamPosition = 0) { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + using (Profiler.TraceMethod()) { var result = new List(); @@ -142,6 +154,8 @@ namespace Squidex.Infrastructure.EventSourcing public Task DeleteStreamAsync(string streamName) { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + return connection.DeleteStreamAsync(GetStreamName(streamName), ExpectedVersion.Any); } @@ -159,11 +173,11 @@ namespace Squidex.Infrastructure.EventSourcing private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection events) { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); + using (Profiler.TraceMethod(nameof(AppendAsync))) { - Guard.NotNullOrEmpty(streamName, nameof(streamName)); - Guard.NotNull(events, nameof(events)); - if (events.Count == 0) { return; diff --git a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs index a458bda5c..0f06e4e77 100644 --- a/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -31,8 +31,6 @@ namespace Squidex.Infrastructure.EventSourcing string prefix, string streamFilter) { - Guard.NotNull(subscriber, nameof(subscriber)); - this.connection = connection; this.position = projectionClient.ParsePositionOrNull(position); diff --git a/src/Squidex.Infrastructure.GoogleCloud/Assets/GoogleCloudAssetStore.cs b/src/Squidex.Infrastructure.GoogleCloud/Assets/GoogleCloudAssetStore.cs index e76b0d884..f76589938 100644 --- a/src/Squidex.Infrastructure.GoogleCloud/Assets/GoogleCloudAssetStore.cs +++ b/src/Squidex.Infrastructure.GoogleCloud/Assets/GoogleCloudAssetStore.cs @@ -80,14 +80,14 @@ namespace Squidex.Infrastructure.Assets } } - public Task UploadAsync(string id, long version, string suffix, Stream stream, CancellationToken ct = default) + public Task UploadAsync(string id, long version, string suffix, Stream stream, bool overwrite = false, CancellationToken ct = default) { - return UploadCoreAsync(GetObjectName(id, version, suffix), stream, ct); + return UploadCoreAsync(GetObjectName(id, version, suffix), stream, overwrite, ct); } public Task UploadAsync(string fileName, Stream stream, CancellationToken ct = default) { - return UploadCoreAsync(fileName, stream, ct); + return UploadCoreAsync(fileName, stream, false, ct); } public Task DeleteAsync(string id, long version, string suffix) @@ -100,11 +100,11 @@ namespace Squidex.Infrastructure.Assets return DeleteCoreAsync(fileName); } - private async Task UploadCoreAsync(string objectName, Stream stream, CancellationToken ct = default) + private async Task UploadCoreAsync(string objectName, Stream stream, bool overwrite = false, CancellationToken ct = default) { try { - await storageClient.UploadObjectAsync(bucketName, objectName, "application/octet-stream", stream, IfNotExists, ct); + await storageClient.UploadObjectAsync(bucketName, objectName, "application/octet-stream", stream, overwrite ? null : IfNotExists, ct); } catch (GoogleApiException ex) when (ex.HttpStatusCode == HttpStatusCode.PreconditionFailed) { diff --git a/src/Squidex.Infrastructure.MongoDb/Assets/MongoGridFsAssetStore.cs b/src/Squidex.Infrastructure.MongoDb/Assets/MongoGridFsAssetStore.cs index 6956bff4c..997fd9068 100644 --- a/src/Squidex.Infrastructure.MongoDb/Assets/MongoGridFsAssetStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/Assets/MongoGridFsAssetStore.cs @@ -52,7 +52,7 @@ namespace Squidex.Infrastructure.Assets using (var readStream = await bucket.OpenDownloadStreamAsync(sourceFileName, cancellationToken: ct)) { - await UploadFileCoreAsync(target, readStream, ct); + await UploadFileCoreAsync(target, readStream, false, ct); } } catch (GridFSFileNotFoundException ex) @@ -78,14 +78,14 @@ namespace Squidex.Infrastructure.Assets } } - public Task UploadAsync(string id, long version, string suffix, Stream stream, CancellationToken ct = default) + public Task UploadAsync(string id, long version, string suffix, Stream stream, bool overwrite = false, CancellationToken ct = default) { - return UploadFileCoreAsync(GetFileName(id, version, suffix), stream, ct); + return UploadFileCoreAsync(GetFileName(id, version, suffix), stream, overwrite, ct); } public Task UploadAsync(string fileName, Stream stream, CancellationToken ct = default) { - return UploadFileCoreAsync(fileName, stream, ct); + return UploadFileCoreAsync(fileName, stream, false, ct); } public Task DeleteAsync(string id, long version, string suffix) @@ -110,10 +110,15 @@ namespace Squidex.Infrastructure.Assets } } - private async Task UploadFileCoreAsync(string id, Stream stream, CancellationToken ct = default) + private async Task UploadFileCoreAsync(string id, Stream stream, bool overwrite = false, CancellationToken ct = default) { try { + if (overwrite) + { + await bucket.DeleteAsync(id, ct); + } + await bucket.UploadFromStreamAsync(id, id, stream, cancellationToken: ct); } catch (MongoWriteException ex) when (ex.WriteError.Category == ServerErrorCategory.DuplicateKey) diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index 314304445..4f254ddd4 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -23,20 +23,23 @@ namespace Squidex.Infrastructure.EventSourcing { public Task CreateIndexAsync(string property) { + Guard.NotNullOrEmpty(property, nameof(property)); + return Collection.Indexes.CreateOneAsync( new CreateIndexModel(Index.Ascending(CreateIndexPath(property)))); } - public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter = null, string position = null) { Guard.NotNull(subscriber, nameof(subscriber)); - Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); return new PollingSubscription(this, subscriber, streamFilter, position); } public async Task> QueryAsync(string streamName, long streamPosition = 0) { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + using (Profiler.TraceMethod()) { var commits = @@ -55,13 +58,13 @@ namespace Squidex.Infrastructure.EventSourcing var commitTimestamp = commit.Timestamp; var commitOffset = 0; - foreach (var e in commit.Events) + foreach (var @event in commit.Events) { eventStreamOffset++; if (eventStreamOffset >= streamPosition) { - var eventData = e.ToEventData(); + var eventData = @event.ToEventData(); var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); @@ -76,6 +79,8 @@ namespace Squidex.Infrastructure.EventSourcing public Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default) { Guard.NotNull(callback, nameof(callback)); + Guard.NotNullOrEmpty(property, nameof(property)); + Guard.NotNull(value, nameof(value)); StreamPosition lastPosition = position; @@ -108,13 +113,13 @@ namespace Squidex.Infrastructure.EventSourcing var commitTimestamp = commit.Timestamp; var commitOffset = 0; - foreach (var e in commit.Events) + foreach (var @event in commit.Events) { eventStreamOffset++; if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) { - var eventData = e.ToEventData(); + var eventData = @event.ToEventData(); if (filterExpression(eventData)) { @@ -157,7 +162,7 @@ namespace Squidex.Infrastructure.EventSourcing private static void AppendByStream(string streamFilter, List filters) { - if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, ".*", StringComparison.OrdinalIgnoreCase)) + if (!StreamFilter.IsAll(streamFilter)) { if (streamFilter.Contains("^")) { diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs index 83f0416e7..7ada79773 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -16,12 +16,15 @@ namespace Squidex.Infrastructure.EventSourcing { public partial class MongoEventStore { + private const int MaxCommitSize = 10; private const int MaxWriteAttempts = 20; private const int MaxCommitSize = 10; private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); public Task DeleteStreamAsync(string streamName) { + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + return Collection.DeleteManyAsync(x => x.EventStream == streamName); } @@ -32,6 +35,9 @@ namespace Squidex.Infrastructure.EventSourcing public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) { + Guard.NotEmpty(commitId, nameof(commitId)); + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); Guard.LessThan(events.Count, MaxCommitSize, "events.Count"); Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); Guard.NotNullOrEmpty(streamName, nameof(streamName)); @@ -44,7 +50,7 @@ namespace Squidex.Infrastructure.EventSourcing return; } - var currentVersion = await GetEventStreamOffset(streamName); + var currentVersion = await GetEventStreamOffsetAsync(streamName); if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) { @@ -67,7 +73,7 @@ namespace Squidex.Infrastructure.EventSourcing { if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) { - currentVersion = await GetEventStreamOffset(streamName); + currentVersion = await GetEventStreamOffsetAsync(streamName); if (expectedVersion != EtagVersion.Any) { @@ -92,7 +98,7 @@ namespace Squidex.Infrastructure.EventSourcing } } - private async Task GetEventStreamOffset(string streamName) + private async Task GetEventStreamOffsetAsync(string streamName) { var document = await Collection.Find(Filter.Eq(EventStreamField, streamName)) diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs index 45c0910e9..661c83435 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs @@ -9,7 +9,7 @@ using MongoDB.Bson; namespace Squidex.Infrastructure.EventSourcing { - public sealed class StreamPosition + internal sealed class StreamPosition { private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); diff --git a/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoDbOptions.cs b/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoDbOptions.cs new file mode 100644 index 000000000..65462dc6c --- /dev/null +++ b/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoDbOptions.cs @@ -0,0 +1,14 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +namespace Squidex.Infrastructure.MongoDb +{ + public sealed class MongoDbOptions + { + public bool IsCosmosDb { get; set; } + } +} diff --git a/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs b/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs index 18a544342..21d44ae47 100644 --- a/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs +++ b/src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs @@ -49,8 +49,8 @@ namespace Squidex.Infrastructure.CQRS.Events this.exchange = exchange; this.eventsFilter = eventsFilter; - this.jsonSerializer = jsonSerializer; this.eventPublisherName = eventPublisherName; + this.jsonSerializer = jsonSerializer; } protected override void DisposeObject(bool disposing) diff --git a/src/Squidex.Infrastructure/Assets/AssetAlreadyExistsException.cs b/src/Squidex.Infrastructure/Assets/AssetAlreadyExistsException.cs index 954f26c4c..ddf8465e0 100644 --- a/src/Squidex.Infrastructure/Assets/AssetAlreadyExistsException.cs +++ b/src/Squidex.Infrastructure/Assets/AssetAlreadyExistsException.cs @@ -32,7 +32,7 @@ namespace Squidex.Infrastructure.Assets { Guard.NotNullOrEmpty(fileName, nameof(fileName)); - return $"An asset with name '{fileName}' already not exists."; + return $"An asset with name '{fileName}' already exists."; } } } diff --git a/src/Squidex.Infrastructure/Assets/FolderAssetStore.cs b/src/Squidex.Infrastructure/Assets/FolderAssetStore.cs index 3c00b6ea8..a61e24366 100644 --- a/src/Squidex.Infrastructure/Assets/FolderAssetStore.cs +++ b/src/Squidex.Infrastructure/Assets/FolderAssetStore.cs @@ -95,14 +95,14 @@ namespace Squidex.Infrastructure.Assets } } - public Task UploadAsync(string id, long version, string suffix, Stream stream, CancellationToken ct = default) + public Task UploadAsync(string id, long version, string suffix, Stream stream, bool overwrite = false, CancellationToken ct = default) { - return UploadCoreAsync(GetFile(id, version, suffix), stream, ct); + return UploadCoreAsync(GetFile(id, version, suffix), stream, overwrite, ct); } public Task UploadAsync(string fileName, Stream stream, CancellationToken ct = default) { - return UploadCoreAsync(GetFile(fileName), stream, ct); + return UploadCoreAsync(GetFile(fileName), stream, false, ct); } public Task DeleteAsync(string id, long version, string suffix) @@ -122,11 +122,11 @@ namespace Squidex.Infrastructure.Assets return TaskHelper.Done; } - private static async Task UploadCoreAsync(FileInfo file, Stream stream, CancellationToken ct = default) + private static async Task UploadCoreAsync(FileInfo file, Stream stream, bool overwrite = false, CancellationToken ct = default) { try { - using (var fileStream = file.Open(FileMode.CreateNew, FileAccess.Write)) + using (var fileStream = file.Open(overwrite ? FileMode.Create : FileMode.CreateNew, FileAccess.Write)) { await stream.CopyToAsync(fileStream, BufferSize, ct); } diff --git a/src/Squidex.Infrastructure/Assets/IAssetStore.cs b/src/Squidex.Infrastructure/Assets/IAssetStore.cs index b4170bfff..65d3c4f84 100644 --- a/src/Squidex.Infrastructure/Assets/IAssetStore.cs +++ b/src/Squidex.Infrastructure/Assets/IAssetStore.cs @@ -21,7 +21,7 @@ namespace Squidex.Infrastructure.Assets Task UploadAsync(string fileName, Stream stream, CancellationToken ct = default); - Task UploadAsync(string id, long version, string suffix, Stream stream, CancellationToken ct = default); + Task UploadAsync(string id, long version, string suffix, Stream stream, bool overwrite = false, CancellationToken ct = default); Task DeleteAsync(string fileName); diff --git a/src/Squidex.Infrastructure/Assets/MemoryAssetStore.cs b/src/Squidex.Infrastructure/Assets/MemoryAssetStore.cs index 7c00f9a85..bc3b8804e 100644 --- a/src/Squidex.Infrastructure/Assets/MemoryAssetStore.cs +++ b/src/Squidex.Infrastructure/Assets/MemoryAssetStore.cs @@ -36,7 +36,7 @@ namespace Squidex.Infrastructure.Assets using (await readerLock.LockAsync()) { - await UploadAsync(id, version, suffix, sourceStream, ct); + await UploadAsync(id, version, suffix, sourceStream, false, ct); } } @@ -64,18 +64,23 @@ namespace Squidex.Infrastructure.Assets } } - public Task UploadAsync(string id, long version, string suffix, Stream stream, CancellationToken ct = default) + public Task UploadAsync(string id, long version, string suffix, Stream stream, bool overwrite = false, CancellationToken ct = default) { Guard.NotNullOrEmpty(id, nameof(id)); - return UploadAsync(GetFileName(id, version, suffix), stream, ct); + return UploadCoreAsync(GetFileName(id, version, suffix), stream, overwrite, ct); } - public async Task UploadAsync(string fileName, Stream stream, CancellationToken ct = default) + public Task UploadAsync(string fileName, Stream stream, CancellationToken ct = default) + { + return UploadCoreAsync(fileName, stream, false); + } + + private async Task UploadCoreAsync(string fileName, Stream stream, bool overwrite, CancellationToken ct = default) { var memoryStream = new MemoryStream(); - if (streams.TryAdd(fileName, memoryStream)) + async Task CopyAsync() { using (await writerLock.LockAsync()) { @@ -89,6 +94,17 @@ namespace Squidex.Infrastructure.Assets } } } + + if (overwrite) + { + await CopyAsync(); + + streams[fileName] = memoryStream; + } + else if (streams.TryAdd(fileName, memoryStream)) + { + await CopyAsync(); + } else { throw new AssetAlreadyExistsException(fileName); diff --git a/src/Squidex.Infrastructure/Assets/NoopAssetStore.cs b/src/Squidex.Infrastructure/Assets/NoopAssetStore.cs index 5c98cd77a..b48eb7269 100644 --- a/src/Squidex.Infrastructure/Assets/NoopAssetStore.cs +++ b/src/Squidex.Infrastructure/Assets/NoopAssetStore.cs @@ -34,7 +34,7 @@ namespace Squidex.Infrastructure.Assets throw new NotSupportedException(); } - public Task UploadAsync(string id, long version, string suffix, Stream stream, CancellationToken ct = default) + public Task UploadAsync(string id, long version, string suffix, Stream stream, bool overwrite = false, CancellationToken ct = default) { throw new NotSupportedException(); } diff --git a/src/Squidex.Infrastructure/Configuration/Options.cs b/src/Squidex.Infrastructure/Configuration/Alternatives.cs similarity index 85% rename from src/Squidex.Infrastructure/Configuration/Options.cs rename to src/Squidex.Infrastructure/Configuration/Alternatives.cs index 89cb9f596..77d70602f 100644 --- a/src/Squidex.Infrastructure/Configuration/Options.cs +++ b/src/Squidex.Infrastructure/Configuration/Alternatives.cs @@ -10,9 +10,9 @@ using System.Collections.Generic; namespace Microsoft.Extensions.Configuration { - public sealed class Options : Dictionary + public sealed class Alternatives : Dictionary { - public Options() + public Alternatives() : base(StringComparer.OrdinalIgnoreCase) { } diff --git a/src/Squidex.Infrastructure/Configuration/ConfigurationExtensions.cs b/src/Squidex.Infrastructure/Configuration/ConfigurationExtensions.cs index fafb0ee84..4c86b80a0 100644 --- a/src/Squidex.Infrastructure/Configuration/ConfigurationExtensions.cs +++ b/src/Squidex.Infrastructure/Configuration/ConfigurationExtensions.cs @@ -46,7 +46,7 @@ namespace Microsoft.Extensions.Configuration return value; } - public static string ConfigureByOption(this IConfiguration config, string path, Options options) + public static string ConfigureByOption(this IConfiguration config, string path, Alternatives options) { var value = config.GetRequiredValue(path); diff --git a/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs b/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs index 46765b666..6ee608632 100644 --- a/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs +++ b/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs @@ -28,6 +28,6 @@ namespace Squidex.Infrastructure.EventSourcing Task DeleteStreamAsync(string streamName); - IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null); + IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter = null, string position = null); } } diff --git a/src/Squidex.Infrastructure/EventSourcing/StreamFilter.cs b/src/Squidex.Infrastructure/EventSourcing/StreamFilter.cs new file mode 100644 index 000000000..b3bc063af --- /dev/null +++ b/src/Squidex.Infrastructure/EventSourcing/StreamFilter.cs @@ -0,0 +1,22 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; + +namespace Squidex.Infrastructure.EventSourcing +{ + public static class StreamFilter + { + public static bool IsAll(string filter) + { + return string.IsNullOrWhiteSpace(filter) + || string.Equals(filter, ".*", StringComparison.OrdinalIgnoreCase) + || string.Equals(filter, "(.*)", StringComparison.OrdinalIgnoreCase) + || string.Equals(filter, "(.*?)", StringComparison.OrdinalIgnoreCase); + } + } +} diff --git a/src/Squidex.Infrastructure/Orleans/GrainOfGuid.cs b/src/Squidex.Infrastructure/Orleans/GrainOfGuid.cs index 0a816c381..a179c78d8 100644 --- a/src/Squidex.Infrastructure/Orleans/GrainOfGuid.cs +++ b/src/Squidex.Infrastructure/Orleans/GrainOfGuid.cs @@ -16,7 +16,7 @@ namespace Squidex.Infrastructure.Orleans { public Guid Key { get; private set; } - public sealed override Task OnActivateAsync() + public override Task OnActivateAsync() { return ActivateAsync(this.GetPrimaryKey()); } diff --git a/src/Squidex.Infrastructure/Orleans/GrainOfString.cs b/src/Squidex.Infrastructure/Orleans/GrainOfString.cs index 041f67df5..13d737ae0 100644 --- a/src/Squidex.Infrastructure/Orleans/GrainOfString.cs +++ b/src/Squidex.Infrastructure/Orleans/GrainOfString.cs @@ -15,7 +15,7 @@ namespace Squidex.Infrastructure.Orleans { public string Key { get; private set; } - public sealed override Task OnActivateAsync() + public override Task OnActivateAsync() { return ActivateAsync(this.GetPrimaryKeyString()); } diff --git a/src/Squidex/Config/Domain/AssetServices.cs b/src/Squidex/Config/Domain/AssetServices.cs index 5b8e36706..669cd29b8 100644 --- a/src/Squidex/Config/Domain/AssetServices.cs +++ b/src/Squidex/Config/Domain/AssetServices.cs @@ -20,7 +20,7 @@ namespace Squidex.Config.Domain { public static void AddMyAssetServices(this IServiceCollection services, IConfiguration config) { - config.ConfigureByOption("assetStore:type", new Options + config.ConfigureByOption("assetStore:type", new Alternatives { ["Default"] = () => { diff --git a/src/Squidex/Config/Domain/EntitiesServices.cs b/src/Squidex/Config/Domain/EntitiesServices.cs index c7a215257..26d62ef32 100644 --- a/src/Squidex/Config/Domain/EntitiesServices.cs +++ b/src/Squidex/Config/Domain/EntitiesServices.cs @@ -31,6 +31,7 @@ using Squidex.Domain.Apps.Entities.Contents; using Squidex.Domain.Apps.Entities.Contents.Commands; using Squidex.Domain.Apps.Entities.Contents.Edm; using Squidex.Domain.Apps.Entities.Contents.GraphQL; +using Squidex.Domain.Apps.Entities.Contents.Text; using Squidex.Domain.Apps.Entities.History; using Squidex.Domain.Apps.Entities.Rules; using Squidex.Domain.Apps.Entities.Rules.Commands; @@ -105,6 +106,9 @@ namespace Squidex.Config.Domain services.AddSingletonAs() .As(); + services.AddSingletonAs() + .As(); + services.AddSingletonAs() .As>(); diff --git a/src/Squidex/Config/Domain/EventStoreServices.cs b/src/Squidex/Config/Domain/EventStoreServices.cs index abd77f817..ee90b1d47 100644 --- a/src/Squidex/Config/Domain/EventStoreServices.cs +++ b/src/Squidex/Config/Domain/EventStoreServices.cs @@ -5,11 +5,14 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; using System.Linq; using EventStore.ClientAPI; +using Microsoft.Azure.Documents.Client; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using MongoDB.Driver; +using Newtonsoft.Json; using Squidex.Infrastructure; using Squidex.Infrastructure.Diagnostics; using Squidex.Infrastructure.EventSourcing; @@ -23,7 +26,7 @@ namespace Squidex.Config.Domain { public static void AddMyEventStoreServices(this IServiceCollection services, IConfiguration config) { - config.ConfigureByOption("eventStore:type", new Options + config.ConfigureByOption("eventStore:type", new Alternatives { ["MongoDb"] = () => { @@ -39,6 +42,25 @@ namespace Squidex.Config.Domain }) .AsOptional(); }, + ["CosmosDb"] = () => + { + var cosmosDbConfiguration = config.GetRequiredValue("eventStore:cosmosDB:configuration"); + var cosmosDbMasterKey = config.GetRequiredValue("eventStore:cosmosDB:masterKey"); + var cosmosDbDatabase = config.GetRequiredValue("eventStore:cosmosDB:database"); + + services.AddSingletonAs(c => new DocumentClient(new Uri(cosmosDbConfiguration), cosmosDbMasterKey, c.GetRequiredService())) + .AsSelf(); + + services.AddSingletonAs(c => new CosmosDbEventStore( + c.GetRequiredService(), + cosmosDbMasterKey, + cosmosDbDatabase, + c.GetRequiredService())) + .AsOptional(); + + services.AddHealthChecks() + .AddCheck("CosmosDB", tags: new[] { "node" }); + }, ["GetEventStore"] = () => { var eventStoreConfiguration = config.GetRequiredValue("eventStore:getEventStore:configuration"); diff --git a/src/Squidex/Config/Domain/StoreServices.cs b/src/Squidex/Config/Domain/StoreServices.cs index 63559f3d4..5f61cf9d4 100644 --- a/src/Squidex/Config/Domain/StoreServices.cs +++ b/src/Squidex/Config/Domain/StoreServices.cs @@ -10,6 +10,7 @@ using IdentityServer4.Stores; using Microsoft.AspNetCore.Identity; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; using Migrate_01.Migrations; using MongoDB.Driver; using Squidex.Domain.Apps.Entities; @@ -17,6 +18,7 @@ using Squidex.Domain.Apps.Entities.Assets.Repositories; using Squidex.Domain.Apps.Entities.Assets.State; using Squidex.Domain.Apps.Entities.Contents.Repositories; using Squidex.Domain.Apps.Entities.Contents.State; +using Squidex.Domain.Apps.Entities.Contents.Text; using Squidex.Domain.Apps.Entities.History.Repositories; using Squidex.Domain.Apps.Entities.MongoDb.Assets; using Squidex.Domain.Apps.Entities.MongoDb.Contents; @@ -31,6 +33,7 @@ using Squidex.Infrastructure.Diagnostics; using Squidex.Infrastructure.EventSourcing; using Squidex.Infrastructure.Json; using Squidex.Infrastructure.Migrations; +using Squidex.Infrastructure.MongoDb; using Squidex.Infrastructure.States; using Squidex.Infrastructure.UsageTracking; @@ -40,7 +43,7 @@ namespace Squidex.Config.Domain { public static void AddMyStoreServices(this IServiceCollection services, IConfiguration config) { - config.ConfigureByOption("store:type", new Options + config.ConfigureByOption("store:type", new Alternatives { ["MongoDB"] = () => { @@ -48,6 +51,10 @@ namespace Squidex.Config.Domain var mongoDatabaseName = config.GetRequiredValue("store:mongoDb:database"); var mongoContentDatabaseName = config.GetOptionalValue("store:mongoDb:contentDatabase", mongoDatabaseName); + var isCosmosDb = config.GetOptionalValue("store:mongoDB:isCosmosDB"); + + services.Configure(config.GetSection("store:mongoDB")); + services.AddSingleton(typeof(ISnapshotStore<,>), typeof(MongoSnapshotStore<,>)); services.AddSingletonAs(_ => Singletons.GetOrAdd(mongoConfiguration, s => new MongoClient(s))) @@ -97,7 +104,8 @@ namespace Squidex.Config.Domain services.AddSingletonAs(c => new MongoContentRepository( c.GetRequiredService().GetDatabase(mongoContentDatabaseName), c.GetRequiredService(), - c.GetRequiredService())) + c.GetRequiredService(), + c.GetRequiredService())) .AsOptional() .AsOptional>() .AsOptional(); diff --git a/src/Squidex/Config/Orleans/OrleansServices.cs b/src/Squidex/Config/Orleans/OrleansServices.cs index aed630ded..36c3cdc71 100644 --- a/src/Squidex/Config/Orleans/OrleansServices.cs +++ b/src/Squidex/Config/Orleans/OrleansServices.cs @@ -58,7 +58,7 @@ namespace Squidex.Config.Orleans var siloPort = config.GetOptionalValue("orleans:siloPort", 11111); - config.ConfigureByOption("orleans:clustering", new Options + config.ConfigureByOption("orleans:clustering", new Alternatives { ["MongoDB"] = () => { @@ -81,7 +81,7 @@ namespace Squidex.Config.Orleans } }); - config.ConfigureByOption("store:type", new Options + config.ConfigureByOption("store:type", new Alternatives { ["MongoDB"] = () => { diff --git a/src/Squidex/appsettings.json b/src/Squidex/appsettings.json index 8a642f33c..08771041a 100644 --- a/src/Squidex/appsettings.json +++ b/src/Squidex/appsettings.json @@ -29,8 +29,8 @@ /* * Set to true, to use strong etags. */ - "strong": false -}, + "strong": false + }, "ui": { /* @@ -199,7 +199,7 @@ /* * Define the type of the event store. * - * Supported: MongoDb, GetEventStore + * Supported: MongoDb, GetEventStore, CosmosDb */ "type": "MongoDb", "mongoDb": { @@ -229,6 +229,20 @@ * Prefix for all streams and projections (for multiple installations). */ "prefix": "squidex" + }, + "cosmosDb": { + /* + * The connection string to your CosmosDB instance. + */ + "configuration": "https://localhost:8081", + /* + * The primary access key. + */ + "masterKey": "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==", + /* + * The name of the event store database. + */ + "database": "Squidex" } }, @@ -269,7 +283,11 @@ /* * The database for all your other read collections. */ - "database": "Squidex" + "database": "Squidex", + /* + * Indicate wheter the connection string is for cosmos db. + */ + "isCosmosDB": "false" } }, @@ -277,48 +295,48 @@ /* * Enable password auth. Set this to false if you want to disable local login, leaving only 3rd party login options. */ - "allowPasswordAuth": true, - /* + "allowPasswordAuth": true, + /* * Initial admin user. */ - "adminEmail": "", - "adminPassword": "", - /* + "adminEmail": "", + "adminPassword": "", + /* * Client with all admin permissions. */ - "adminClientId": "", - "adminClientSecret": "", - /* + "adminClientId": "", + "adminClientSecret": "", + /* * Settings for Google auth (keep empty to disable). */ - "googleClient": "1006817248705-t3lb3ge808m9am4t7upqth79hulk456l.apps.googleusercontent.com", - "googleSecret": "QsEi-fHqkGw2_PjJmtNHf2wg", - /* + "googleClient": "1006817248705-t3lb3ge808m9am4t7upqth79hulk456l.apps.googleusercontent.com", + "googleSecret": "QsEi-fHqkGw2_PjJmtNHf2wg", + /* * Settings for Github auth (keep empty to disable). */ - "githubClient": "211ea00e726baf754c78", - "githubSecret": "d0a0d0fe2c26469ae20987ac265b3a339fd73132", - /* + "githubClient": "211ea00e726baf754c78", + "githubSecret": "d0a0d0fe2c26469ae20987ac265b3a339fd73132", + /* * Settings for Microsoft auth (keep empty to disable). */ - "microsoftClient": "b55da740-6648-4502-8746-b9003f29d5f1", - "microsoftSecret": "idWbANxNYEF4cB368WXJhjN", - /* + "microsoftClient": "b55da740-6648-4502-8746-b9003f29d5f1", + "microsoftSecret": "idWbANxNYEF4cB368WXJhjN", + /* * Settings for your custom oidc server. */ - "oidcName": "OIDC", - "oidcAuthority": "", - "oidcClient": "", - "oidcSecret": "", - /* + "oidcName": "OIDC", + "oidcAuthority": "", + "oidcClient": "", + "oidcSecret": "", + /* * Lock new users automatically, the administrator must unlock them. */ - "lockAutomatically": false, - /* + "lockAutomatically": false, + /* * The url to you privacy statements, if you host squidex by yourself. */ - "privacyUrl": "https://squidex.io/privacy" - }, + "privacyUrl": "https://squidex.io/privacy" + }, "news": { /* diff --git a/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/GrainTextIndexerTests.cs b/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/GrainTextIndexerTests.cs new file mode 100644 index 000000000..35ebe6607 --- /dev/null +++ b/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/GrainTextIndexerTests.cs @@ -0,0 +1,144 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using FakeItEasy; +using Orleans; +using Squidex.Domain.Apps.Core.Apps; +using Squidex.Domain.Apps.Core.Contents; +using Squidex.Domain.Apps.Entities.Apps; +using Squidex.Infrastructure; +using Squidex.Infrastructure.Log; +using Squidex.Infrastructure.Orleans; +using Xunit; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public class GrainTextIndexerTests + { + private readonly IGrainFactory grainFactory = A.Fake(); + private readonly ITextIndexerGrain grain = A.Fake(); + private readonly Guid schemaId = Guid.NewGuid(); + private readonly Guid contentId = Guid.NewGuid(); + private readonly GrainTextIndexer sut; + + public GrainTextIndexerTests() + { + A.CallTo(() => grainFactory.GetGrain(schemaId, null)) + .Returns(grain); + + sut = new GrainTextIndexer(grainFactory, A.Fake()); + } + + [Fact] + public async Task Should_call_grain_when_deleting_entry() + { + await sut.DeleteAsync(schemaId, contentId); + + A.CallTo(() => grain.DeleteAsync(contentId)) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_catch_exception_when_deleting_failed() + { + A.CallTo(() => grain.DeleteAsync(contentId)) + .Throws(new InvalidOperationException()); + + await sut.DeleteAsync(schemaId, contentId); + } + + [Fact] + public async Task Should_call_grain_when_indexing_data() + { + var data = new NamedContentData(); + var dataDraft = new NamedContentData(); + + await sut.IndexAsync(schemaId, contentId, data, dataDraft); + + A.CallTo(() => grain.IndexAsync(contentId, A>.That.Matches(x => x.Value.Data == data && !x.Value.IsDraft))) + .MustHaveHappened(); + + A.CallTo(() => grain.IndexAsync(contentId, A>.That.Matches(x => x.Value.Data == dataDraft && x.Value.IsDraft))) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_not_call_grain_when_data_is_null() + { + var dataDraft = new NamedContentData(); + + await sut.IndexAsync(schemaId, contentId, null, dataDraft); + + A.CallTo(() => grain.IndexAsync(contentId, A>.That.Matches(x => !x.Value.IsDraft))) + .MustNotHaveHappened(); + + A.CallTo(() => grain.IndexAsync(contentId, A>.That.Matches(x => x.Value.Data == dataDraft && x.Value.IsDraft))) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_not_call_grain_when_data_draft_is_null() + { + var data = new NamedContentData(); + + await sut.IndexAsync(schemaId, contentId, data, null); + + A.CallTo(() => grain.IndexAsync(contentId, A>.That.Matches(x => x.Value.Data == data && !x.Value.IsDraft))) + .MustHaveHappened(); + + A.CallTo(() => grain.IndexAsync(contentId, A>.That.Matches(x => x.Value.IsDraft))) + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_catch_exception_when_indexing_failed() + { + var data = new NamedContentData(); + + A.CallTo(() => grain.IndexAsync(contentId, A>.Ignored)) + .Throws(new InvalidOperationException()); + + await sut.IndexAsync(schemaId, contentId, data, null); + } + + [Fact] + public async Task Should_call_grain_when_searching() + { + var foundIds = new List { Guid.NewGuid() }; + + A.CallTo(() => grain.SearchAsync("Search", A.Ignored)) + .Returns(foundIds); + + var ids = await sut.SearchAsync("Search", GetApp(), schemaId, true); + + Assert.Equal(foundIds, ids); + } + + [Fact] + public async Task Should_not_call_grain_when_input_is_empty() + { + var ids = await sut.SearchAsync(string.Empty, GetApp(), schemaId, false); + + Assert.Null(ids); + + A.CallTo(() => grain.SearchAsync(A.Ignored, A.Ignored)) + .MustNotHaveHappened(); + } + + private static IAppEntity GetApp() + { + var app = A.Fake(); + + A.CallTo(() => app.LanguagesConfig).Returns(LanguagesConfig.Build(Language.EN, Language.DE)); + + return app; + } + } +} diff --git a/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/TextIndexerGrainTests.cs b/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/TextIndexerGrainTests.cs new file mode 100644 index 000000000..1cbcb18c8 --- /dev/null +++ b/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/TextIndexerGrainTests.cs @@ -0,0 +1,178 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Squidex.Domain.Apps.Core.Contents; +using Squidex.Infrastructure; +using Squidex.Infrastructure.Assets; +using Xunit; + +namespace Squidex.Domain.Apps.Entities.Contents.Text +{ + public class TextIndexerGrainTests : IDisposable + { + private readonly Guid schemaId = Guid.NewGuid(); + private readonly List ids1 = new List { Guid.NewGuid() }; + private readonly List ids2 = new List { Guid.NewGuid() }; + private readonly SearchContext context; + private readonly IAssetStore assetStore = new MemoryAssetStore(); + private readonly TextIndexerGrain sut; + + public TextIndexerGrainTests() + { + context = new SearchContext + { + Languages = new HashSet { "de", "en" } + }; + + sut = new TextIndexerGrain(assetStore); + sut.ActivateAsync(schemaId).Wait(); + } + + public void Dispose() + { + sut.OnDeactivateAsync().Wait(); + } + + [Fact] + public async Task Should_read_index_and_retrieve() + { + await AddInvariantContent(); + + await sut.DeactivateAsync(); + + var other = new TextIndexerGrain(assetStore); + try + { + await other.ActivateAsync(schemaId); + + var foundHello = await other.SearchAsync("Hello", context); + var foundWorld = await other.SearchAsync("World", context); + + Assert.Equal(ids1, foundHello); + Assert.Equal(ids2, foundWorld); + } + finally + { + await other.OnDeactivateAsync(); + } + } + + [Fact] + public async Task Should_index_invariant_content_and_retrieve() + { + await AddInvariantContent(); + + var foundHello = await sut.SearchAsync("Hello", context); + var foundWorld = await sut.SearchAsync("World", context); + + Assert.Equal(ids1, foundHello); + Assert.Equal(ids2, foundWorld); + } + + [Fact] + public async Task Should_index_invariant_content_and_retrieve_with_fuzzy() + { + await AddInvariantContent(); + + var foundHello = await sut.SearchAsync("helo~", context); + var foundWorld = await sut.SearchAsync("wold~", context); + + Assert.Equal(ids1, foundHello); + Assert.Equal(ids2, foundWorld); + } + + [Fact] + public async Task Should_delete_documents_from_index() + { + await AddInvariantContent(); + + await sut.DeleteAsync(ids1[0]); + await sut.FlushAsync(); + + var helloIds = await sut.SearchAsync("Hello", context); + + var worldIds = await sut.SearchAsync("World", context); + + Assert.Empty(helloIds); + Assert.Equal(ids2, worldIds); + } + + [Fact] + public async Task Should_index_localized_content_and_retrieve() + { + await AddLocalizedContent(); + + var german1 = await sut.SearchAsync("Stadt", context); + var german2 = await sut.SearchAsync("and", context); + + var germanStopwordsIds = await sut.SearchAsync("und", context); + + Assert.Equal(ids1, german1); + Assert.Equal(ids1, german2); + Assert.Equal(ids2, germanStopwordsIds); + + var english1 = await sut.SearchAsync("City", context); + var english2 = await sut.SearchAsync("und", context); + + var englishStopwordsIds = await sut.SearchAsync("and", context); + + Assert.Equal(ids2, english1); + Assert.Equal(ids2, english2); + Assert.Equal(ids1, englishStopwordsIds); + } + + [Fact] + public async Task Should_throw_exception_for_invalid_query() + { + await AddInvariantContent(); + + await Assert.ThrowsAsync(() => sut.SearchAsync("~hello", context)); + } + + private async Task AddLocalizedContent() + { + var germanData = + new NamedContentData() + .AddField("localized", + new ContentFieldData() + .AddValue("de", "Stadt und Umgebung and whatever")); + + var englishData = + new NamedContentData() + .AddField("localized", + new ContentFieldData() + .AddValue("en", "City and Surroundings und sonstiges")); + + await sut.IndexAsync(ids1[0], new IndexData { Data = germanData }); + await sut.IndexAsync(ids2[0], new IndexData { Data = englishData }); + await sut.FlushAsync(); + } + + private async Task AddInvariantContent() + { + var data1 = + new NamedContentData() + .AddField("test", + new ContentFieldData() + .AddValue("iv", "Hello")); + + var data2 = + new NamedContentData() + .AddField("test", + new ContentFieldData() + .AddValue("iv", "World")); + + await sut.IndexAsync(ids1[0], new IndexData { Data = data1 }); + await sut.IndexAsync(ids2[0], new IndexData { Data = data2 }); + + await sut.FlushAsync(); + } + } +} diff --git a/tests/Squidex.Domain.Users.Tests/AssetUserPictureStoreTests.cs b/tests/Squidex.Domain.Users.Tests/AssetUserPictureStoreTests.cs index 69193a0c2..939b1b6fe 100644 --- a/tests/Squidex.Domain.Users.Tests/AssetUserPictureStoreTests.cs +++ b/tests/Squidex.Domain.Users.Tests/AssetUserPictureStoreTests.cs @@ -35,7 +35,7 @@ namespace Squidex.Domain.Users await sut.UploadAsync(userId, stream); - A.CallTo(() => assetStore.UploadAsync(userId, 0, "picture", stream, CancellationToken.None)).MustHaveHappened(); + A.CallTo(() => assetStore.UploadAsync(userId, 0, "picture", stream, false, CancellationToken.None)).MustHaveHappened(); } [Fact] diff --git a/tests/Squidex.Infrastructure.Tests/Assets/AssetStoreTests.cs b/tests/Squidex.Infrastructure.Tests/Assets/AssetStoreTests.cs index 40202902e..080a68d64 100644 --- a/tests/Squidex.Infrastructure.Tests/Assets/AssetStoreTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Assets/AssetStoreTests.cs @@ -60,6 +60,19 @@ namespace Squidex.Infrastructure.Assets Assert.Equal(assetData.ToArray(), readData.ToArray()); } + [Fact] + public async Task Should_read_and_override_file() + { + await Sut.UploadAsync(assetId, 1, "suffix", new MemoryStream(new byte[] { 0x3, 0x4, 0x5, 0x6 })); + await Sut.UploadAsync(assetId, 1, "suffix", assetData, true); + + var readData = new MemoryStream(); + + await Sut.DownloadAsync(assetId, 1, "suffix", readData); + + Assert.Equal(assetData.ToArray(), readData.ToArray()); + } + [Fact] public async Task Should_throw_exception_when_file_to_write_already_exists() { diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreFixture.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreFixture.cs new file mode 100644 index 000000000..b8e96a318 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreFixture.cs @@ -0,0 +1,35 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using Microsoft.Azure.Documents.Client; +using Squidex.Infrastructure.TestHelpers; + +namespace Squidex.Infrastructure.EventSourcing +{ + public sealed class CosmosDbEventStoreFixture : IDisposable + { + private const string EmulatorKey = "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw=="; + private const string EmulatorUri = "https://localhost:8081"; + private readonly DocumentClient client; + + public CosmosDbEventStore EventStore { get; } + + public CosmosDbEventStoreFixture() + { + client = new DocumentClient(new Uri(EmulatorUri), EmulatorKey, JsonHelper.DefaultSettings()); + + EventStore = new CosmosDbEventStore(client, EmulatorKey, "Test", JsonHelper.DefaultSettings()); + EventStore.InitializeAsync().Wait(); + } + + public void Dispose() + { + client.DeleteDatabaseAsync(UriFactory.CreateDatabaseUri("Test")).Wait(); + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreTests.cs new file mode 100644 index 000000000..f49d2f064 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreTests.cs @@ -0,0 +1,29 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Xunit; + +namespace Squidex.Infrastructure.EventSourcing +{ + [Trait("Category", "Dependencies")] + public class CosmosDbEventStoreTests : EventStoreTests, IClassFixture + { + private readonly CosmosDbEventStoreFixture fixture; + + protected override int SubscriptionDelayInMs { get; } = 1000; + + public CosmosDbEventStoreTests(CosmosDbEventStoreFixture fixture) + { + this.fixture = fixture; + } + + public override CosmosDbEventStore CreateStore() + { + return fixture.EventStore; + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index 3f9f33b08..ac7f06f7b 100644 --- a/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -19,11 +19,14 @@ namespace Squidex.Infrastructure.EventSourcing public abstract class EventStoreTests where T : IEventStore { private readonly Lazy sut; + private string subscriptionPosition; public sealed class EventSubscriber : IEventSubscriber { public List Events { get; } = new List(); + public string LastPosition { get; set; } + public Task OnErrorAsync(IEventSubscription subscription, Exception exception) { throw new NotSupportedException(); @@ -31,6 +34,8 @@ namespace Squidex.Infrastructure.EventSourcing public Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) { + LastPosition = storedEvent.EventPosition; + Events.Add(storedEvent); return TaskHelper.Done; @@ -132,6 +137,54 @@ namespace Squidex.Infrastructure.EventSourcing ShouldBeEquivalentTo(readEvents, expected); } + [Fact] + public async Task Should_subscribe_to_next_events() + { + var streamName = $"test-{Guid.NewGuid()}"; + + var events1 = new EventData[] + { + new EventData("Type1", new EnvelopeHeaders(), "1"), + new EventData("Type2", new EnvelopeHeaders(), "2"), + }; + + await QueryWithSubscriptionAsync(streamName, async () => + { + await Sut.AppendAsync(Guid.NewGuid(), streamName, events1); + }); + + var events2 = new EventData[] + { + new EventData("Type1", new EnvelopeHeaders(), "1"), + new EventData("Type2", new EnvelopeHeaders(), "2"), + }; + + var readEventsFromPosition = await QueryWithSubscriptionAsync(streamName, async () => + { + await Sut.AppendAsync(Guid.NewGuid(), streamName, events2); + }); + + var expectedFromPosition = new StoredEvent[] + { + new StoredEvent(streamName, "Position", 2, events2[0]), + new StoredEvent(streamName, "Position", 3, events2[1]) + }; + + var readEventsFromBeginning = await QueryWithSubscriptionAsync(streamName, fromBeginning: true); + + var expectedFromBeginning = new StoredEvent[] + { + new StoredEvent(streamName, "Position", 0, events1[0]), + new StoredEvent(streamName, "Position", 1, events1[1]), + new StoredEvent(streamName, "Position", 2, events2[0]), + new StoredEvent(streamName, "Position", 3, events2[1]) + }; + + ShouldBeEquivalentTo(readEventsFromPosition, expectedFromPosition); + + ShouldBeEquivalentTo(readEventsFromBeginning, expectedFromBeginning); + } + [Fact] public async Task Should_read_events_from_offset() { @@ -272,16 +325,19 @@ namespace Squidex.Infrastructure.EventSourcing } } - private async Task> QueryWithSubscriptionAsync(string streamFilter, Func action) + private async Task> QueryWithSubscriptionAsync(string streamFilter, Func action = null, bool fromBeginning = false) { var subscriber = new EventSubscriber(); IEventSubscription subscription = null; try { - subscription = Sut.CreateSubscription(subscriber, streamFilter); + subscription = Sut.CreateSubscription(subscriber, streamFilter, fromBeginning ? null : subscriptionPosition); - await action(); + if (action != null) + { + await action(); + } using (var cts = new CancellationTokenSource(30000)) { @@ -293,6 +349,8 @@ namespace Squidex.Infrastructure.EventSourcing if (subscriber.Events.Count > 0) { + subscriptionPosition = subscriber.LastPosition; + return subscriber.Events; } } diff --git a/tools/Migrate_01/Migrations/ConvertOldSnapshotStores.cs b/tools/Migrate_01/Migrations/ConvertOldSnapshotStores.cs index 4528f8e5b..58c3f8643 100644 --- a/tools/Migrate_01/Migrations/ConvertOldSnapshotStores.cs +++ b/tools/Migrate_01/Migrations/ConvertOldSnapshotStores.cs @@ -7,23 +7,33 @@ using System.Linq; using System.Threading.Tasks; +using Microsoft.Extensions.Options; using MongoDB.Bson; using MongoDB.Driver; using Squidex.Infrastructure.Migrations; +using Squidex.Infrastructure.MongoDb; +using Squidex.Infrastructure.Tasks; namespace Migrate_01.Migrations { public sealed class ConvertOldSnapshotStores : IMigration { private readonly IMongoDatabase database; + private readonly MongoDbOptions options; - public ConvertOldSnapshotStores(IMongoDatabase database) + public ConvertOldSnapshotStores(IMongoDatabase database, IOptions options) { this.database = database; + this.options = options.Value; } public Task UpdateAsync() { + if (options.IsCosmosDb) + { + return TaskHelper.Done; + } + var collections = new[] { "States_Apps",