diff --git a/src/Squidex.Core/Schemas/Json/SchemaJsonSerializer.cs b/src/Squidex.Core/Schemas/Json/SchemaJsonSerializer.cs index c9c7b82ac..fce8e18a6 100644 --- a/src/Squidex.Core/Schemas/Json/SchemaJsonSerializer.cs +++ b/src/Squidex.Core/Schemas/Json/SchemaJsonSerializer.cs @@ -7,6 +7,7 @@ // ========================================================================== using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -76,31 +77,32 @@ namespace Squidex.Core.Schemas.Json { var model = token.ToObject(serializer); - var schema = Schema.Create(model.Name, model.Properties); - - if (model.IsPublished) - { - schema = schema.Publish(); - } + var fields = + model.Fields.Select(kvp => + { + var fieldModel = kvp.Value; - foreach (var kvp in model.Fields) - { - var fieldModel = kvp.Value; + var field = fieldRegistry.CreateField(kvp.Key, fieldModel.Name, fieldModel.Properties); - var field = fieldRegistry.CreateField(kvp.Key, fieldModel.Name, fieldModel.Properties); + if (fieldModel.IsDisabled) + { + field = field.Disable(); + } - if (fieldModel.IsDisabled) - { - field = field.Disable(); - } + if (fieldModel.IsHidden) + { + field = field.Hide(); + } - if (fieldModel.IsHidden) - { - field = field.Hide(); - } + return field; + }).ToDictionary(x => x.Id, x => x).ToImmutableDictionary(); - schema = schema.AddOrUpdateField(field); - } + var schema = + new Schema( + model.Name, + model.Properties, + model.IsPublished, + fields); return schema; } diff --git a/src/Squidex.Events/Schemas/Utils/SchemaEventDispatcher.cs b/src/Squidex.Events/Schemas/Utils/SchemaEventDispatcher.cs new file mode 100644 index 000000000..92c1e8aaf --- /dev/null +++ b/src/Squidex.Events/Schemas/Utils/SchemaEventDispatcher.cs @@ -0,0 +1,70 @@ +// ========================================================================== +// SchemaUpdater.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Squidex.Core.Schemas; + +namespace Squidex.Events.Schemas.Utils +{ + public class SchemaEventDispatcher + { + public static Schema Dispatch(SchemaCreated @event) + { + return Schema.Create(@event.Name, @event.Properties); + } + + public static Schema Dispatch(FieldAdded @event, Schema schema, FieldRegistry registry) + { + return schema.AddOrUpdateField(registry.CreateField(@event.FieldId, @event.Name, @event.Properties)); + } + + public static Schema Dispatch(FieldUpdated @event, Schema schema) + { + return schema.UpdateField(@event.FieldId, @event.Properties); + } + + public static Schema Dispatch(FieldHidden @event, Schema schema) + { + return schema.HideField(@event.FieldId); + } + + public static Schema Dispatch(FieldShown @event, Schema schema) + { + return schema.ShowField(@event.FieldId); + } + + public static Schema Dispatch(FieldDisabled @event, Schema schema) + { + return schema.DisableField(@event.FieldId); + } + + public static Schema Dispatch(FieldEnabled @event, Schema schema) + { + return schema.EnableField(@event.FieldId); + } + + public static Schema Dispatch(SchemaUpdated @event, Schema schema) + { + return schema.Update(@event.Properties); + } + + public static Schema Dispatch(FieldDeleted @event, Schema schema) + { + return schema.DeleteField(@event.FieldId); + } + + public static Schema Dispatch(SchemaPublished @event, Schema schema) + { + return schema.Publish(); + } + + public static Schema Dispatch(SchemaUnpublished @event, Schema schema) + { + return schema.Unpublish(); + } + } +} diff --git a/src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs b/src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs index 93cf9568c..8b5bb1275 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs @@ -6,6 +6,7 @@ // All rights reserved. // ========================================================================== +using System; using System.Globalization; using System.Threading.Tasks; using MongoDB.Driver; @@ -15,7 +16,7 @@ namespace Squidex.Infrastructure.MongoDb public abstract class MongoRepositoryBase { private const string CollectionFormat = "{0}Set"; - private readonly IMongoCollection mongoCollection; + private Lazy> mongoCollection; private readonly IMongoDatabase mongoDatabase; private readonly string typeName; @@ -71,7 +72,7 @@ namespace Squidex.Infrastructure.MongoDb { get { - return mongoCollection; + return mongoCollection.Value; } } @@ -103,20 +104,39 @@ namespace Squidex.Infrastructure.MongoDb return string.Format(CultureInfo.InvariantCulture, CollectionFormat, typeof(TEntity).Name); } - private IMongoCollection CreateCollection() + private Lazy> CreateCollection() { - var databaseCollection = mongoDatabase.GetCollection( - CollectionName(), - CollectionSettings() ?? new MongoCollectionSettings()); + return new Lazy>(() => + { + var databaseCollection = mongoDatabase.GetCollection( + CollectionName(), + CollectionSettings() ?? new MongoCollectionSettings()); - SetupCollectionAsync(databaseCollection).Wait(); + SetupCollectionAsync(databaseCollection).Wait(); - return databaseCollection; + return databaseCollection; + }); } protected virtual Task SetupCollectionAsync(IMongoCollection collection) { return Task.FromResult(true); } + + public async Task TryDropCollectionAsync() + { + try + { + await mongoDatabase.DropCollectionAsync(CollectionName()); + + mongoCollection = CreateCollection(); + + return true; + } + catch + { + return false; + } + } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure.RabbitMq/RabbitMqEventChannel.cs b/src/Squidex.Infrastructure.RabbitMq/RabbitMqEventChannel.cs index 5ee0b39e0..4e39b16a7 100644 --- a/src/Squidex.Infrastructure.RabbitMq/RabbitMqEventChannel.cs +++ b/src/Squidex.Infrastructure.RabbitMq/RabbitMqEventChannel.cs @@ -35,13 +35,13 @@ namespace Squidex.Infrastructure.RabbitMq } } - public void Publish(EventData events) + public void Publish(EventData eventData) { ThrowIfDisposed(); var channel = currentChannel.Value; - channel.BasicPublish(Exchange, string.Empty, null, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(events))); + channel.BasicPublish(Exchange, string.Empty, null, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventData))); } public void Connect(string queueName, Action received) @@ -64,7 +64,7 @@ namespace Squidex.Infrastructure.RabbitMq received(eventData); }; - channel.BasicConsume(queueName, false, consumer); + channel.BasicConsume(queueName, true, consumer); } private static IModel Connect(IConnectionFactory connectionFactory) diff --git a/src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs b/src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs index 981520ebf..f1b3e370b 100644 --- a/src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs +++ b/src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs @@ -89,7 +89,7 @@ namespace Squidex.Infrastructure.CQRS.Commands { await eventStore.AppendEventsAsync(commitId, streamName, versionExpected, eventsToSave); } - catch (WrongEventVersionException e) + catch (WrongEventVersionException) { throw new DomainObjectVersionException(domainObject.Id.ToString(), domainObject.GetType(), versionCurrent, versionExpected); } diff --git a/src/Squidex/Pipeline/CommandHandlers/LogExceptionHandler.cs b/src/Squidex.Infrastructure/CQRS/Commands/LogExceptionHandler.cs similarity index 78% rename from src/Squidex/Pipeline/CommandHandlers/LogExceptionHandler.cs rename to src/Squidex.Infrastructure/CQRS/Commands/LogExceptionHandler.cs index 8afac1b82..5381b5b3c 100644 --- a/src/Squidex/Pipeline/CommandHandlers/LogExceptionHandler.cs +++ b/src/Squidex.Infrastructure/CQRS/Commands/LogExceptionHandler.cs @@ -8,11 +8,10 @@ using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using Squidex.Infrastructure.CQRS.Commands; // ReSharper disable InvertIf -namespace Squidex.Pipeline.CommandHandlers +namespace Squidex.Infrastructure.CQRS.Commands { public sealed class LogExceptionHandler : ICommandHandler { @@ -29,9 +28,7 @@ namespace Squidex.Pipeline.CommandHandlers if (exception != null) { - var eventId = new EventId(9999, "CommandFailed"); - - logger.LogError(eventId, exception, "Handling {0} command failed", context.Command); + logger.LogError(InfrastructureErrors.CommandFailed, exception, "Handling {0} command failed", context.Command); } return Task.FromResult(false); diff --git a/src/Squidex/Pipeline/CommandHandlers/LogExecutingHandler.cs b/src/Squidex.Infrastructure/CQRS/Commands/LogExecutingHandler.cs similarity index 83% rename from src/Squidex/Pipeline/CommandHandlers/LogExecutingHandler.cs rename to src/Squidex.Infrastructure/CQRS/Commands/LogExecutingHandler.cs index 50391ee66..37a4e6d00 100644 --- a/src/Squidex/Pipeline/CommandHandlers/LogExecutingHandler.cs +++ b/src/Squidex.Infrastructure/CQRS/Commands/LogExecutingHandler.cs @@ -8,9 +8,8 @@ using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using Squidex.Infrastructure.CQRS.Commands; -namespace Squidex.Pipeline.CommandHandlers +namespace Squidex.Infrastructure.CQRS.Commands { public sealed class LogExecutingHandler : ICommandHandler { @@ -23,7 +22,7 @@ namespace Squidex.Pipeline.CommandHandlers public Task HandleAsync(CommandContext context) { - logger.LogError("Handling {0} command", context.Command); + logger.LogInformation("Handling {0} command", context.Command); return Task.FromResult(false); } diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventBus.cs b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs similarity index 90% rename from src/Squidex.Infrastructure/CQRS/Events/EventBus.cs rename to src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs index b6d409066..b31245511 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EventBus.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs @@ -1,5 +1,5 @@ // ========================================================================== -// EventBus.cs +// EventReceiver.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -18,17 +18,17 @@ using NodaTime; namespace Squidex.Infrastructure.CQRS.Events { - public sealed class EventBus + public sealed class EventReceiver { private readonly EventDataFormatter formatter; private readonly IEnumerable liveConsumers; private readonly IEnumerable catchConsumers; private readonly IEventStream eventStream; - private readonly ILogger logger; + private readonly ILogger logger; private bool isSubscribed; - public EventBus( - ILogger logger, + public EventReceiver( + ILogger logger, IEventStream eventStream, IEnumerable liveConsumers, IEnumerable catchConsumers, @@ -69,11 +69,11 @@ namespace Squidex.Infrastructure.CQRS.Events if (isLive) { - DispatchConsumers(liveConsumers.OfType().Union(catchConsumers), @event); + DispatchConsumers(catchConsumers.OfType().Union(liveConsumers), @event); } else { - DispatchConsumers(liveConsumers, @event); + DispatchConsumers(catchConsumers, @event); } }); diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs index e4fe2c4a8..e13f69798 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs @@ -10,6 +10,6 @@ namespace Squidex.Infrastructure.CQRS.Events { public interface IEventPublisher { - void Publish(EventData events); + void Publish(EventData eventData); } } diff --git a/src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs b/src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs new file mode 100644 index 000000000..a643c372c --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs @@ -0,0 +1,17 @@ +// ========================================================================== +// IReplayableStore.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; + +namespace Squidex.Infrastructure.CQRS.Replay +{ + public interface IReplayableStore + { + Task ClearAsync(); + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs b/src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs new file mode 100644 index 000000000..70a4ff96d --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs @@ -0,0 +1,100 @@ +// ========================================================================== +// ReplayGenerator.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Reactive.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Squidex.Infrastructure.CQRS.Events; + +namespace Squidex.Infrastructure.CQRS.Replay +{ + public sealed class ReplayGenerator + { + private readonly ILogger logger; + private readonly IEventStore eventStore; + private readonly IEventPublisher eventPublisher; + private readonly IEnumerable stores; + + public ReplayGenerator( + ILogger logger, + IEventStore eventStore, + IEventPublisher eventPublisher, + IEnumerable stores) + { + Guard.NotNull(logger, nameof(logger)); + Guard.NotNull(eventStore, nameof(eventStore)); + Guard.NotNull(eventPublisher, nameof(eventPublisher)); + Guard.NotNull(stores, nameof(stores)); + + this.stores = stores; + this.logger = logger; + this.eventStore = eventStore; + this.eventPublisher = eventPublisher; + } + + public async Task ReplayAllAsync() + { + logger.LogDebug("Starting to replay all events"); + + if (!await ClearAsync()) + { + return; + } + + await ReplayEventsAsync(); + + logger.LogDebug("Finished to replay all events"); + } + + private async Task ReplayEventsAsync() + { + try + { + logger.LogDebug("Replaying all messages"); + + await eventStore.GetEventsAsync().ForEachAsync(eventData => + { + eventPublisher.Publish(eventData); + }); + + logger.LogDebug("Replayed all messages"); + } + catch (Exception e) + { + logger.LogCritical(InfrastructureErrors.ReplayPublishingFailed, e, "Failed to publish events to {0}", eventPublisher); + } + } + + private async Task ClearAsync() + { + logger.LogDebug("Clearing replayable stores"); + + foreach (var store in stores) + { + try + { + await store.ClearAsync(); + + logger.LogDebug("Cleared store {0}", store); + } + catch (Exception e) + { + logger.LogCritical(InfrastructureErrors.ReplayClearingFailed, e, "Failed to clear store {0}", store); + + return false; + } + } + + logger.LogDebug("Cleared replayable stores"); + + return true; + } + } +} diff --git a/src/Squidex.Infrastructure/InfrastructureErrors.cs b/src/Squidex.Infrastructure/InfrastructureErrors.cs index 20350eee1..296be6f06 100644 --- a/src/Squidex.Infrastructure/InfrastructureErrors.cs +++ b/src/Squidex.Infrastructure/InfrastructureErrors.cs @@ -12,8 +12,14 @@ namespace Squidex.Infrastructure { public class InfrastructureErrors { + public static readonly EventId CommandFailed = new EventId(20001, "CommandFailed"); + public static readonly EventId EventHandlingFailed = new EventId(10001, "EventHandlingFailed"); public static readonly EventId EventDeserializationFailed = new EventId(10002, "EventDeserializationFailed"); + + public static readonly EventId ReplayClearingFailed = new EventId(30001, "ReplayClearingFailed"); + + public static readonly EventId ReplayPublishingFailed = new EventId(30003, "ReplayPublishingFailed"); } } diff --git a/src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs b/src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs index 2877e5157..c4468d8c9 100644 --- a/src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs +++ b/src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs @@ -20,7 +20,7 @@ using Squidex.Read.Utils; namespace Squidex.Read.Apps.Services.Implementations { - public class CachingAppProvider : CachingProvider, IAppProvider, ILiveEventConsumer + public class CachingAppProvider : CachingProvider, IAppProvider, ICatchEventConsumer { private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(30); private readonly IAppRepository appRepository; diff --git a/src/Squidex.Read/Schemas/SchemaHistoryEventsCreator.cs b/src/Squidex.Read/Schemas/SchemaHistoryEventsCreator.cs index 906fb8e7e..e56ec79fe 100644 --- a/src/Squidex.Read/Schemas/SchemaHistoryEventsCreator.cs +++ b/src/Squidex.Read/Schemas/SchemaHistoryEventsCreator.cs @@ -8,7 +8,6 @@ using System.Collections.Generic; using System.Threading.Tasks; -using Squidex.Events; using Squidex.Events.Schemas; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS; @@ -103,11 +102,11 @@ namespace Squidex.Read.Schemas return this.DispatchFuncAsync(@event.Payload, @event.Headers, (HistoryEventToStore)null); } - private Task FindSchemaNameAsync(EnvelopeHeaders headers) + private async Task FindSchemaNameAsync(EnvelopeHeaders headers) { - var name = schemaProvider.FindSchemaNameByIdAsync(headers.AppId(), headers.AggregateId()); + var schema = await schemaProvider.ProviderSchemaByIdAsync(headers.AggregateId()); - return name; + return schema.Label ?? schema.Name; } } } \ No newline at end of file diff --git a/src/Squidex.Read/Schemas/Services/ISchemaProvider.cs b/src/Squidex.Read/Schemas/Services/ISchemaProvider.cs index b0896ed60..6d3985141 100644 --- a/src/Squidex.Read/Schemas/Services/ISchemaProvider.cs +++ b/src/Squidex.Read/Schemas/Services/ISchemaProvider.cs @@ -8,13 +8,14 @@ using System; using System.Threading.Tasks; +using Squidex.Read.Schemas.Repositories; namespace Squidex.Read.Schemas.Services { public interface ISchemaProvider { - Task FindSchemaNameByIdAsync(Guid schemaId); + Task ProviderSchemaByIdAsync(Guid schemaId); - Task FindSchemaIdByNameAsync(Guid appId, string name); + Task ProvideSchemaByNameAsync(Guid appId, string name); } } diff --git a/src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs b/src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs index 45a25642a..43d599928 100644 --- a/src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs +++ b/src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs @@ -21,16 +21,14 @@ using Squidex.Events; namespace Squidex.Read.Schemas.Services.Implementations { - public class CachingSchemaProvider : CachingProvider, ISchemaProvider, ILiveEventConsumer + public class CachingSchemaProvider : CachingProvider, ISchemaProvider, ICatchEventConsumer { private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(10); private readonly ISchemaRepository repository; private sealed class CacheItem { - public Guid? Id; - - public string Name; + public ISchemaEntityWithSchema Entity; } public CachingSchemaProvider(IMemoryCache cache, ISchemaRepository repository) @@ -41,29 +39,29 @@ namespace Squidex.Read.Schemas.Services.Implementations this.repository = repository; } - public async Task FindSchemaNameByIdAsync(Guid schemaId) + public async Task ProviderSchemaByIdAsync(Guid schemaId) { var cacheKey = BuildIdCacheKey(schemaId); var cacheItem = Cache.Get(cacheKey); if (cacheItem == null) { - var schema = await repository.FindSchemaAsync(schemaId); + var entity = await repository.FindSchemaAsync(schemaId); - cacheItem = new CacheItem { Id = schema?.Id, Name = schema?.Name }; + cacheItem = new CacheItem { Entity = entity }; Cache.Set(cacheKey, cacheItem, CacheDuration); - if (cacheItem.Id != null) + if (cacheItem.Entity != null) { - Cache.Set(BuildIdCacheKey(cacheItem.Id.Value), cacheItem, CacheDuration); + Cache.Set(BuildIdCacheKey(cacheItem.Entity.Id), cacheItem, CacheDuration); } } - return cacheItem.Name; + return cacheItem.Entity; } - public async Task FindSchemaIdByNameAsync(Guid appId, string name) + public async Task ProvideSchemaByNameAsync(Guid appId, string name) { Guard.NotNullOrEmpty(name, nameof(name)); @@ -72,19 +70,19 @@ namespace Squidex.Read.Schemas.Services.Implementations if (cacheItem == null) { - var schema = await repository.FindSchemaAsync(appId, name); + var entity = await repository.FindSchemaAsync(appId, name); - cacheItem = new CacheItem { Id = schema?.Id, Name = schema?.Name }; + cacheItem = new CacheItem { Entity = entity }; Cache.Set(cacheKey, cacheItem, CacheDuration); - if (cacheItem.Id != null) + if (cacheItem.Entity != null) { - Cache.Set(BuildIdCacheKey(cacheItem.Id.Value), cacheItem, CacheDuration); + Cache.Set(BuildIdCacheKey(cacheItem.Entity.Id), cacheItem, CacheDuration); } } - return cacheItem.Id; + return cacheItem.Entity; } public Task On(Envelope @event) @@ -96,9 +94,9 @@ namespace Squidex.Read.Schemas.Services.Implementations var cacheItem = Cache.Get(cacheKey); - if (cacheItem.Name != null) + if (cacheItem?.Entity != null) { - Cache.Remove(BuildNameCacheKey(@event.Headers.AppId(), cacheItem.Name)); + Cache.Remove(BuildNameCacheKey(@event.Headers.AppId(), cacheItem.Entity.Name)); } Cache.Remove(cacheKey); diff --git a/src/Squidex.Store.MongoDb/Apps/MongoAppRepository.cs b/src/Squidex.Store.MongoDb/Apps/MongoAppRepository.cs index fca8200e8..f96501665 100644 --- a/src/Squidex.Store.MongoDb/Apps/MongoAppRepository.cs +++ b/src/Squidex.Store.MongoDb/Apps/MongoAppRepository.cs @@ -18,11 +18,12 @@ using Squidex.Read.Apps; using Squidex.Read.Apps.Repositories; using Squidex.Store.MongoDb.Utils; using Squidex.Infrastructure; +using Squidex.Infrastructure.CQRS.Replay; using Squidex.Infrastructure.MongoDb; namespace Squidex.Store.MongoDb.Apps { - public class MongoAppRepository : MongoRepositoryBase, IAppRepository, ICatchEventConsumer + public class MongoAppRepository : MongoRepositoryBase, IAppRepository, ICatchEventConsumer, IReplayableStore { public MongoAppRepository(IMongoDatabase database) : base(database) @@ -39,6 +40,11 @@ namespace Squidex.Store.MongoDb.Apps return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Name)); } + public Task ClearAsync() + { + return TryDropCollectionAsync(); + } + public async Task> QueryAllAsync(string subjectId) { var entities = diff --git a/src/Squidex.Store.MongoDb/History/MongoHistoryEventEntity.cs b/src/Squidex.Store.MongoDb/History/MongoHistoryEventEntity.cs index 394be544d..63fcec513 100644 --- a/src/Squidex.Store.MongoDb/History/MongoHistoryEventEntity.cs +++ b/src/Squidex.Store.MongoDb/History/MongoHistoryEventEntity.cs @@ -22,6 +22,10 @@ namespace Squidex.Store.MongoDb.History [BsonElement] public Guid AppId { get; set; } + [BsonRequired] + [BsonElement] + public int SessionEventIndex { get; set; } + [BsonRequired] [BsonElement] public string Channel { get; set; } diff --git a/src/Squidex.Store.MongoDb/History/MongoHistoryEventRepository.cs b/src/Squidex.Store.MongoDb/History/MongoHistoryEventRepository.cs index 3f7d650a2..a9a54f2d5 100644 --- a/src/Squidex.Store.MongoDb/History/MongoHistoryEventRepository.cs +++ b/src/Squidex.Store.MongoDb/History/MongoHistoryEventRepository.cs @@ -9,10 +9,12 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; using Squidex.Infrastructure.CQRS; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.CQRS.Replay; using Squidex.Infrastructure.MongoDb; using Squidex.Read.History; using Squidex.Read.History.Repositories; @@ -20,10 +22,11 @@ using Squidex.Store.MongoDb.Utils; namespace Squidex.Store.MongoDb.History { - public class MongoHistoryEventRepository : MongoRepositoryBase, IHistoryEventRepository, ICatchEventConsumer + public class MongoHistoryEventRepository : MongoRepositoryBase, IHistoryEventRepository, ICatchEventConsumer, IReplayableStore { private readonly List creators; private readonly Dictionary texts = new Dictionary(); + private int sessionEventCount; public MongoHistoryEventRepository(IMongoDatabase database, IEnumerable creators) : base(database) @@ -47,15 +50,25 @@ namespace Squidex.Store.MongoDb.History protected override Task SetupCollectionAsync(IMongoCollection collection) { return Task.WhenAll( - collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.AppId)), - collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Channel)), + collection.Indexes.CreateOneAsync( + IndexKeys + .Ascending(x => x.AppId) + .Ascending(x => x.Channel) + .Descending(x => x.Created) + .Descending(x => x.SessionEventIndex)), collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Created), new CreateIndexOptions { ExpireAfter = TimeSpan.FromDays(365) })); } + public Task ClearAsync() + { + return TryDropCollectionAsync(); + } + public async Task> QueryEventsByChannel(Guid appId, string channelPrefix, int count) { var entities = - await Collection.Find(x => x.AppId == appId && x.Channel.StartsWith(channelPrefix)).SortByDescending(x => x.Created).Limit(count).ToListAsync(); + await Collection.Find(x => x.AppId == appId && x.Channel.StartsWith(channelPrefix)) + .SortByDescending(x => x.Created).ThenByDescending(x => x.SessionEventIndex).Limit(count).ToListAsync(); return entities.Select(x => (IHistoryEventEntity)new ParsedHistoryEvent(x, texts)).ToList(); } @@ -70,6 +83,8 @@ namespace Squidex.Store.MongoDb.History { await Collection.CreateAsync(@event.Headers, x => { + x.SessionEventIndex = Interlocked.Increment(ref sessionEventCount); + x.Channel = message.Channel; x.Message = message.Message; diff --git a/src/Squidex.Store.MongoDb/MongoDbModule.cs b/src/Squidex.Store.MongoDb/MongoDbModule.cs index 84e53291e..4cbaf2561 100644 --- a/src/Squidex.Store.MongoDb/MongoDbModule.cs +++ b/src/Squidex.Store.MongoDb/MongoDbModule.cs @@ -13,6 +13,7 @@ using Microsoft.AspNetCore.Identity.MongoDB; using Microsoft.Extensions.Options; using MongoDB.Driver; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.CQRS.Replay; using Squidex.Infrastructure.MongoDb; using Squidex.Read.Apps.Repositories; using Squidex.Read.History.Repositories; @@ -72,16 +73,19 @@ namespace Squidex.Store.MongoDb builder.RegisterType() .As() .As() + .As() .SingleInstance(); builder.RegisterType() .As() .As() + .As() .SingleInstance(); builder.RegisterType() .As() .As() + .As() .SingleInstance(); } } diff --git a/src/Squidex.Store.MongoDb/Schemas/MongoSchemaRepository.cs b/src/Squidex.Store.MongoDb/Schemas/MongoSchemaRepository.cs index 50a2b7dda..e3ee792b0 100644 --- a/src/Squidex.Store.MongoDb/Schemas/MongoSchemaRepository.cs +++ b/src/Squidex.Store.MongoDb/Schemas/MongoSchemaRepository.cs @@ -14,9 +14,11 @@ using MongoDB.Driver; using Squidex.Core.Schemas; using Squidex.Core.Schemas.Json; using Squidex.Events.Schemas; +using Squidex.Events.Schemas.Utils; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.CQRS.Replay; using Squidex.Infrastructure.Dispatching; using Squidex.Infrastructure.MongoDb; using Squidex.Infrastructure.Reflection; @@ -25,20 +27,20 @@ using Squidex.Store.MongoDb.Utils; namespace Squidex.Store.MongoDb.Schemas { - public class MongoSchemaRepository : MongoRepositoryBase, ISchemaRepository, ICatchEventConsumer + public class MongoSchemaRepository : MongoRepositoryBase, ISchemaRepository, ICatchEventConsumer, IReplayableStore { private readonly SchemaJsonSerializer serializer; - private readonly FieldRegistry fieldRegistry; + private readonly FieldRegistry registry; - public MongoSchemaRepository(IMongoDatabase database, SchemaJsonSerializer serializer, FieldRegistry fieldRegistry) + public MongoSchemaRepository(IMongoDatabase database, SchemaJsonSerializer serializer, FieldRegistry registry) : base(database) { Guard.NotNull(serializer, nameof(serializer)); - Guard.NotNull(fieldRegistry, nameof(fieldRegistry)); + Guard.NotNull(registry, nameof(registry)); this.serializer = serializer; - this.fieldRegistry = fieldRegistry; + this.registry = registry; } protected override string CollectionName() @@ -51,6 +53,11 @@ namespace Squidex.Store.MongoDb.Schemas return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Name)); } + public Task ClearAsync() + { + return TryDropCollectionAsync(); + } + public async Task> QueryAllAsync(Guid appId) { var entities = await Collection.Find(s => s.AppId == appId && !s.IsDeleted).ToListAsync(); @@ -96,54 +103,52 @@ namespace Squidex.Store.MongoDb.Schemas protected Task On(FieldDeleted @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.DeleteField(@event.FieldId)); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(FieldDisabled @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.DisableField(@event.FieldId)); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(FieldEnabled @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.EnableField(@event.FieldId)); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(FieldHidden @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.HideField(@event.FieldId)); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(FieldShown @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.ShowField(@event.FieldId)); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(FieldUpdated @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.UpdateField(@event.FieldId, @event.Properties)); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(SchemaUpdated @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.Update(@event.Properties)); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(SchemaPublished @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.Publish()); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(SchemaUnpublished @event, EnvelopeHeaders headers) { - return UpdateSchema(headers, s => s.Unpublish()); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s)); } protected Task On(FieldAdded @event, EnvelopeHeaders headers) { - var field = fieldRegistry.CreateField(@event.FieldId, @event.Name, @event.Properties); - - return UpdateSchema(headers, s => s.AddOrUpdateField(field)); + return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s, registry)); } protected Task On(SchemaCreated @event, EnvelopeHeaders headers) diff --git a/src/Squidex.Write/Schemas/SchemaCommandHandler.cs b/src/Squidex.Write/Schemas/SchemaCommandHandler.cs index 7ed4cb9fb..f11efe6c0 100644 --- a/src/Squidex.Write/Schemas/SchemaCommandHandler.cs +++ b/src/Squidex.Write/Schemas/SchemaCommandHandler.cs @@ -32,7 +32,7 @@ namespace Squidex.Write.Schemas protected async Task On(CreateSchema command, CommandContext context) { - if (await schemas.FindSchemaIdByNameAsync(command.AppId, command.Name) != null) + if (await schemas.ProvideSchemaByNameAsync(command.AppId, command.Name) != null) { var error = new ValidationError($"A schema with name '{command.Name}' already exists", "DisplayName", diff --git a/src/Squidex.Write/Schemas/SchemaDomainObject.cs b/src/Squidex.Write/Schemas/SchemaDomainObject.cs index a82cd64d3..9974d6c45 100644 --- a/src/Squidex.Write/Schemas/SchemaDomainObject.cs +++ b/src/Squidex.Write/Schemas/SchemaDomainObject.cs @@ -9,6 +9,7 @@ using System; using Squidex.Core.Schemas; using Squidex.Events.Schemas; +using Squidex.Events.Schemas.Utils; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS; using Squidex.Infrastructure.CQRS.Events; @@ -47,57 +48,57 @@ namespace Squidex.Write.Schemas { totalFields++; - schema = schema.AddOrUpdateField(registry.CreateField(@event.FieldId, @event.Name, @event.Properties)); + schema = SchemaEventDispatcher.Dispatch(@event, schema, registry); } protected void On(SchemaCreated @event) { - schema = Schema.Create(@event.Name, @event.Properties); + schema = SchemaEventDispatcher.Dispatch(@event); } protected void On(FieldUpdated @event) { - schema = schema.UpdateField(@event.FieldId, @event.Properties); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(FieldHidden @event) { - schema = schema.HideField(@event.FieldId); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(FieldShown @event) { - schema = schema.ShowField(@event.FieldId); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(FieldDisabled @event) { - schema = schema.DisableField(@event.FieldId); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(FieldEnabled @event) { - schema = schema.EnableField(@event.FieldId); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(SchemaUpdated @event) { - schema = schema.Update(@event.Properties); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(FieldDeleted @event) { - schema = schema.DeleteField(@event.FieldId); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(SchemaPublished @event) { - schema = schema.Publish(); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(SchemaUnpublished @event) { - schema = schema.Unpublish(); + schema = SchemaEventDispatcher.Dispatch(@event, schema); } protected void On(SchemaDeleted @event) diff --git a/src/Squidex/Config/Domain/InfrastructureModule.cs b/src/Squidex/Config/Domain/InfrastructureModule.cs index e0b47fe76..5a06aa583 100644 --- a/src/Squidex/Config/Domain/InfrastructureModule.cs +++ b/src/Squidex/Config/Domain/InfrastructureModule.cs @@ -14,6 +14,7 @@ using Squidex.Core.Schemas.Json; using Squidex.Infrastructure.CQRS.Autofac; using Squidex.Infrastructure.CQRS.Commands; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.CQRS.Replay; namespace Squidex.Config.Domain { @@ -45,7 +46,11 @@ namespace Squidex.Config.Domain .As() .SingleInstance(); - builder.RegisterType() + builder.RegisterType() + .AsSelf() + .SingleInstance(); + + builder.RegisterType() .AsSelf() .SingleInstance(); diff --git a/src/Squidex/Config/Domain/ReadModule.cs b/src/Squidex/Config/Domain/ReadModule.cs index 54845d408..0b205dedb 100644 --- a/src/Squidex/Config/Domain/ReadModule.cs +++ b/src/Squidex/Config/Domain/ReadModule.cs @@ -8,8 +8,11 @@ using Autofac; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Read.Apps; using Squidex.Read.Apps.Services; using Squidex.Read.Apps.Services.Implementations; +using Squidex.Read.History; +using Squidex.Read.Schemas; using Squidex.Read.Schemas.Services; using Squidex.Read.Schemas.Services.Implementations; @@ -21,12 +24,20 @@ namespace Squidex.Config.Domain { builder.RegisterType() .As() - .As() + .As() .SingleInstance(); builder.RegisterType() .As() - .As() + .As() + .SingleInstance(); + + builder.RegisterType() + .As() + .SingleInstance(); + + builder.RegisterType() + .As() .SingleInstance(); } } diff --git a/src/Squidex/Config/EventStore/EventStoreUsage.cs b/src/Squidex/Config/EventStore/EventStoreUsage.cs index 94a8f6aa2..65387a7f3 100644 --- a/src/Squidex/Config/EventStore/EventStoreUsage.cs +++ b/src/Squidex/Config/EventStore/EventStoreUsage.cs @@ -16,7 +16,7 @@ namespace Squidex.Config.EventStore { public static IApplicationBuilder UseMyEventStore(this IApplicationBuilder app) { - app.ApplicationServices.GetService().Subscribe(); + app.ApplicationServices.GetService().Subscribe(); return app; } diff --git a/src/Squidex/Pipeline/CommandHandlers/EnrichWithSchemaAggregateIdHandler.cs b/src/Squidex/Pipeline/CommandHandlers/EnrichWithSchemaAggregateIdHandler.cs index b30c83ed8..706b18a8b 100644 --- a/src/Squidex/Pipeline/CommandHandlers/EnrichWithSchemaAggregateIdHandler.cs +++ b/src/Squidex/Pipeline/CommandHandlers/EnrichWithSchemaAggregateIdHandler.cs @@ -53,14 +53,14 @@ namespace Squidex.Pipeline.CommandHandlers { var schemaName = routeValues["name"].ToString(); - var id = await schemaProvider.FindSchemaIdByNameAsync(appCommand.AppId, schemaName); + var schema = await schemaProvider.ProvideSchemaByNameAsync(appCommand.AppId, schemaName); - if (!id.HasValue) + if (schema == null) { throw new DomainObjectNotFoundException(schemaName, typeof(SchemaDomainObject)); } - aggregateCommand.AggregateId = id.Value; + aggregateCommand.AggregateId = schema.Id; } return false; diff --git a/src/Squidex/Program.cs b/src/Squidex/Program.cs index 9ecc53c0d..989c4d65a 100644 --- a/src/Squidex/Program.cs +++ b/src/Squidex/Program.cs @@ -8,6 +8,8 @@ using System.IO; using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Squidex.Infrastructure.CQRS.Replay; namespace Squidex { @@ -22,7 +24,14 @@ namespace Squidex .UseStartup() .Build(); - host.Run(); + if (args.Length == 1 && args[0] == "--replay") + { + host.Services.GetService().ReplayAllAsync().Wait(); + } + else + { + host.Run(); + } } } } diff --git a/src/Squidex/app-config/clean-css-loader.js b/src/Squidex/app-config/clean-css-loader.js index 0b6d8af16..c682f05c6 100644 --- a/src/Squidex/app-config/clean-css-loader.js +++ b/src/Squidex/app-config/clean-css-loader.js @@ -9,14 +9,14 @@ function cleanCss(css) { new CleanCSS().minify(css, function (err, minified) { if (err) { - if (Array.isArray(err) && (err[0] != null)) { + if (Array.isArray(err) && (err[0] !== null)) { return callback(err[0]); } else { return callback(err); } } var warnings; - if (((warnings = minified.warnings) != null ? warnings.length : void 0) !== 0) { + if (((warnings = minified.warnings) !== null ? warnings.length : void 0) !== 0) { warnings.forEach(function (msg) { loader.emitWarning(msg.toString()); }); diff --git a/src/Squidex/app/shared/services/users-provider.service.ts b/src/Squidex/app/shared/services/users-provider.service.ts index 259ecb41b..f74b4ab0c 100644 --- a/src/Squidex/app/shared/services/users-provider.service.ts +++ b/src/Squidex/app/shared/services/users-provider.service.ts @@ -31,12 +31,6 @@ export class UsersProviderService { .catch(err => { return Observable.of(new UserDto('NOT FOUND', 'NOT FOUND', 'NOT FOUND', '')); }) - .map(dto => { - if (this.authService.user && dto.id === this.authService.user.id) { - dto = new UserDto(dto.id, dto.email, me, dto.pictureUrl); - } - return dto; - }) .publishLast(); request.connect(); @@ -44,6 +38,12 @@ export class UsersProviderService { result = this.caches[id] = request; } - return result; + return result + .map(dto => { + if (this.authService.user && dto.id === this.authService.user.id) { + dto = new UserDto(dto.id, dto.email, me, dto.pictureUrl); + } + return dto; + }); } } \ No newline at end of file diff --git a/tests/Squidex.Write.Tests/Schemas/SchemaCommandHandlerTests.cs b/tests/Squidex.Write.Tests/Schemas/SchemaCommandHandlerTests.cs index 51ad4c920..c84fc601d 100644 --- a/tests/Squidex.Write.Tests/Schemas/SchemaCommandHandlerTests.cs +++ b/tests/Squidex.Write.Tests/Schemas/SchemaCommandHandlerTests.cs @@ -13,6 +13,7 @@ using Moq; using Squidex.Core.Schemas; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Commands; +using Squidex.Read.Schemas.Repositories; using Squidex.Read.Schemas.Services; using Squidex.Write.Schemas.Commands; using Squidex.Write.Utils; @@ -50,7 +51,7 @@ namespace Squidex.Write.Schemas var command = new CreateSchema { Name = schemaName, AppId = appId, AggregateId = Id }; var context = new CommandContext(command); - schemaProvider.Setup(x => x.FindSchemaIdByNameAsync(appId, schemaName)).Returns(Task.FromResult(Guid.NewGuid())).Verifiable(); + schemaProvider.Setup(x => x.ProvideSchemaByNameAsync(appId, schemaName)).Returns(Task.FromResult(new Mock().Object)).Verifiable(); await TestCreate(schema, async _ => { @@ -66,7 +67,7 @@ namespace Squidex.Write.Schemas var command = new CreateSchema { Name = schemaName, AppId = appId, AggregateId = Id }; var context = new CommandContext(command); - schemaProvider.Setup(x => x.FindSchemaIdByNameAsync(Id, schemaName)).Returns(Task.FromResult(null)).Verifiable(); + schemaProvider.Setup(x => x.ProvideSchemaByNameAsync(Id, schemaName)).Returns(Task.FromResult(null)).Verifiable(); await TestCreate(schema, async _ => {