Browse Source

Replay

pull/1/head
Sebastian 10 years ago
parent
commit
3d233e72bf
  1. 42
      src/Squidex.Core/Schemas/Json/SchemaJsonSerializer.cs
  2. 70
      src/Squidex.Events/Schemas/Utils/SchemaEventDispatcher.cs
  3. 36
      src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs
  4. 6
      src/Squidex.Infrastructure.RabbitMq/RabbitMqEventChannel.cs
  5. 2
      src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs
  6. 7
      src/Squidex.Infrastructure/CQRS/Commands/LogExceptionHandler.cs
  7. 5
      src/Squidex.Infrastructure/CQRS/Commands/LogExecutingHandler.cs
  8. 14
      src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
  9. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs
  10. 17
      src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs
  11. 100
      src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs
  12. 6
      src/Squidex.Infrastructure/InfrastructureErrors.cs
  13. 2
      src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs
  14. 7
      src/Squidex.Read/Schemas/SchemaHistoryEventsCreator.cs
  15. 5
      src/Squidex.Read/Schemas/Services/ISchemaProvider.cs
  16. 34
      src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs
  17. 8
      src/Squidex.Store.MongoDb/Apps/MongoAppRepository.cs
  18. 4
      src/Squidex.Store.MongoDb/History/MongoHistoryEventEntity.cs
  19. 23
      src/Squidex.Store.MongoDb/History/MongoHistoryEventRepository.cs
  20. 4
      src/Squidex.Store.MongoDb/MongoDbModule.cs
  21. 39
      src/Squidex.Store.MongoDb/Schemas/MongoSchemaRepository.cs
  22. 2
      src/Squidex.Write/Schemas/SchemaCommandHandler.cs
  23. 23
      src/Squidex.Write/Schemas/SchemaDomainObject.cs
  24. 7
      src/Squidex/Config/Domain/InfrastructureModule.cs
  25. 15
      src/Squidex/Config/Domain/ReadModule.cs
  26. 2
      src/Squidex/Config/EventStore/EventStoreUsage.cs
  27. 6
      src/Squidex/Pipeline/CommandHandlers/EnrichWithSchemaAggregateIdHandler.cs
  28. 11
      src/Squidex/Program.cs
  29. 4
      src/Squidex/app-config/clean-css-loader.js
  30. 14
      src/Squidex/app/shared/services/users-provider.service.ts
  31. 5
      tests/Squidex.Write.Tests/Schemas/SchemaCommandHandlerTests.cs

42
src/Squidex.Core/Schemas/Json/SchemaJsonSerializer.cs

@ -7,6 +7,7 @@
// ==========================================================================
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
@ -76,31 +77,32 @@ namespace Squidex.Core.Schemas.Json
{
var model = token.ToObject<SchemaModel>(serializer);
var schema = Schema.Create(model.Name, model.Properties);
if (model.IsPublished)
{
schema = schema.Publish();
}
var fields =
model.Fields.Select(kvp =>
{
var fieldModel = kvp.Value;
foreach (var kvp in model.Fields)
{
var fieldModel = kvp.Value;
var field = fieldRegistry.CreateField(kvp.Key, fieldModel.Name, fieldModel.Properties);
var field = fieldRegistry.CreateField(kvp.Key, fieldModel.Name, fieldModel.Properties);
if (fieldModel.IsDisabled)
{
field = field.Disable();
}
if (fieldModel.IsDisabled)
{
field = field.Disable();
}
if (fieldModel.IsHidden)
{
field = field.Hide();
}
if (fieldModel.IsHidden)
{
field = field.Hide();
}
return field;
}).ToDictionary(x => x.Id, x => x).ToImmutableDictionary();
schema = schema.AddOrUpdateField(field);
}
var schema =
new Schema(
model.Name,
model.Properties,
model.IsPublished,
fields);
return schema;
}

70
src/Squidex.Events/Schemas/Utils/SchemaEventDispatcher.cs

@ -0,0 +1,70 @@
// ==========================================================================
// SchemaUpdater.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Squidex.Core.Schemas;
namespace Squidex.Events.Schemas.Utils
{
public class SchemaEventDispatcher
{
public static Schema Dispatch(SchemaCreated @event)
{
return Schema.Create(@event.Name, @event.Properties);
}
public static Schema Dispatch(FieldAdded @event, Schema schema, FieldRegistry registry)
{
return schema.AddOrUpdateField(registry.CreateField(@event.FieldId, @event.Name, @event.Properties));
}
public static Schema Dispatch(FieldUpdated @event, Schema schema)
{
return schema.UpdateField(@event.FieldId, @event.Properties);
}
public static Schema Dispatch(FieldHidden @event, Schema schema)
{
return schema.HideField(@event.FieldId);
}
public static Schema Dispatch(FieldShown @event, Schema schema)
{
return schema.ShowField(@event.FieldId);
}
public static Schema Dispatch(FieldDisabled @event, Schema schema)
{
return schema.DisableField(@event.FieldId);
}
public static Schema Dispatch(FieldEnabled @event, Schema schema)
{
return schema.EnableField(@event.FieldId);
}
public static Schema Dispatch(SchemaUpdated @event, Schema schema)
{
return schema.Update(@event.Properties);
}
public static Schema Dispatch(FieldDeleted @event, Schema schema)
{
return schema.DeleteField(@event.FieldId);
}
public static Schema Dispatch(SchemaPublished @event, Schema schema)
{
return schema.Publish();
}
public static Schema Dispatch(SchemaUnpublished @event, Schema schema)
{
return schema.Unpublish();
}
}
}

36
src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs

@ -6,6 +6,7 @@
// All rights reserved.
// ==========================================================================
using System;
using System.Globalization;
using System.Threading.Tasks;
using MongoDB.Driver;
@ -15,7 +16,7 @@ namespace Squidex.Infrastructure.MongoDb
public abstract class MongoRepositoryBase<TEntity>
{
private const string CollectionFormat = "{0}Set";
private readonly IMongoCollection<TEntity> mongoCollection;
private Lazy<IMongoCollection<TEntity>> mongoCollection;
private readonly IMongoDatabase mongoDatabase;
private readonly string typeName;
@ -71,7 +72,7 @@ namespace Squidex.Infrastructure.MongoDb
{
get
{
return mongoCollection;
return mongoCollection.Value;
}
}
@ -103,20 +104,39 @@ namespace Squidex.Infrastructure.MongoDb
return string.Format(CultureInfo.InvariantCulture, CollectionFormat, typeof(TEntity).Name);
}
private IMongoCollection<TEntity> CreateCollection()
private Lazy<IMongoCollection<TEntity>> CreateCollection()
{
var databaseCollection = mongoDatabase.GetCollection<TEntity>(
CollectionName(),
CollectionSettings() ?? new MongoCollectionSettings());
return new Lazy<IMongoCollection<TEntity>>(() =>
{
var databaseCollection = mongoDatabase.GetCollection<TEntity>(
CollectionName(),
CollectionSettings() ?? new MongoCollectionSettings());
SetupCollectionAsync(databaseCollection).Wait();
SetupCollectionAsync(databaseCollection).Wait();
return databaseCollection;
return databaseCollection;
});
}
protected virtual Task SetupCollectionAsync(IMongoCollection<TEntity> collection)
{
return Task.FromResult(true);
}
public async Task<bool> TryDropCollectionAsync()
{
try
{
await mongoDatabase.DropCollectionAsync(CollectionName());
mongoCollection = CreateCollection();
return true;
}
catch
{
return false;
}
}
}
}

6
src/Squidex.Infrastructure.RabbitMq/RabbitMqEventChannel.cs

@ -35,13 +35,13 @@ namespace Squidex.Infrastructure.RabbitMq
}
}
public void Publish(EventData events)
public void Publish(EventData eventData)
{
ThrowIfDisposed();
var channel = currentChannel.Value;
channel.BasicPublish(Exchange, string.Empty, null, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(events)));
channel.BasicPublish(Exchange, string.Empty, null, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventData)));
}
public void Connect(string queueName, Action<EventData> received)
@ -64,7 +64,7 @@ namespace Squidex.Infrastructure.RabbitMq
received(eventData);
};
channel.BasicConsume(queueName, false, consumer);
channel.BasicConsume(queueName, true, consumer);
}
private static IModel Connect(IConnectionFactory connectionFactory)

2
src/Squidex.Infrastructure/CQRS/Commands/DefaultDomainObjectRepository.cs

@ -89,7 +89,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
{
await eventStore.AppendEventsAsync(commitId, streamName, versionExpected, eventsToSave);
}
catch (WrongEventVersionException e)
catch (WrongEventVersionException)
{
throw new DomainObjectVersionException(domainObject.Id.ToString(), domainObject.GetType(), versionCurrent, versionExpected);
}

7
src/Squidex/Pipeline/CommandHandlers/LogExceptionHandler.cs → src/Squidex.Infrastructure/CQRS/Commands/LogExceptionHandler.cs

@ -8,11 +8,10 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Squidex.Infrastructure.CQRS.Commands;
// ReSharper disable InvertIf
namespace Squidex.Pipeline.CommandHandlers
namespace Squidex.Infrastructure.CQRS.Commands
{
public sealed class LogExceptionHandler : ICommandHandler
{
@ -29,9 +28,7 @@ namespace Squidex.Pipeline.CommandHandlers
if (exception != null)
{
var eventId = new EventId(9999, "CommandFailed");
logger.LogError(eventId, exception, "Handling {0} command failed", context.Command);
logger.LogError(InfrastructureErrors.CommandFailed, exception, "Handling {0} command failed", context.Command);
}
return Task.FromResult(false);

5
src/Squidex/Pipeline/CommandHandlers/LogExecutingHandler.cs → src/Squidex.Infrastructure/CQRS/Commands/LogExecutingHandler.cs

@ -8,9 +8,8 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Squidex.Infrastructure.CQRS.Commands;
namespace Squidex.Pipeline.CommandHandlers
namespace Squidex.Infrastructure.CQRS.Commands
{
public sealed class LogExecutingHandler : ICommandHandler
{
@ -23,7 +22,7 @@ namespace Squidex.Pipeline.CommandHandlers
public Task<bool> HandleAsync(CommandContext context)
{
logger.LogError("Handling {0} command", context.Command);
logger.LogInformation("Handling {0} command", context.Command);
return Task.FromResult(false);
}

14
src/Squidex.Infrastructure/CQRS/Events/EventBus.cs → src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs

@ -1,5 +1,5 @@
// ==========================================================================
// EventBus.cs
// EventReceiver.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
@ -18,17 +18,17 @@ using NodaTime;
namespace Squidex.Infrastructure.CQRS.Events
{
public sealed class EventBus
public sealed class EventReceiver
{
private readonly EventDataFormatter formatter;
private readonly IEnumerable<ILiveEventConsumer> liveConsumers;
private readonly IEnumerable<ICatchEventConsumer> catchConsumers;
private readonly IEventStream eventStream;
private readonly ILogger<EventBus> logger;
private readonly ILogger<EventReceiver> logger;
private bool isSubscribed;
public EventBus(
ILogger<EventBus> logger,
public EventReceiver(
ILogger<EventReceiver> logger,
IEventStream eventStream,
IEnumerable<ILiveEventConsumer> liveConsumers,
IEnumerable<ICatchEventConsumer> catchConsumers,
@ -69,11 +69,11 @@ namespace Squidex.Infrastructure.CQRS.Events
if (isLive)
{
DispatchConsumers(liveConsumers.OfType<IEventConsumer>().Union(catchConsumers), @event);
DispatchConsumers(catchConsumers.OfType<IEventConsumer>().Union(liveConsumers), @event);
}
else
{
DispatchConsumers(liveConsumers, @event);
DispatchConsumers(catchConsumers, @event);
}
});

2
src/Squidex.Infrastructure/CQRS/Events/IEventPublisher.cs

@ -10,6 +10,6 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventPublisher
{
void Publish(EventData events);
void Publish(EventData eventData);
}
}

17
src/Squidex.Infrastructure/CQRS/Replay/IReplayableStore.cs

@ -0,0 +1,17 @@
// ==========================================================================
// IReplayableStore.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
namespace Squidex.Infrastructure.CQRS.Replay
{
public interface IReplayableStore
{
Task ClearAsync();
}
}

100
src/Squidex.Infrastructure/CQRS/Replay/ReplayGenerator.cs

@ -0,0 +1,100 @@
// ==========================================================================
// ReplayGenerator.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Squidex.Infrastructure.CQRS.Events;
namespace Squidex.Infrastructure.CQRS.Replay
{
public sealed class ReplayGenerator
{
private readonly ILogger<ReplayGenerator> logger;
private readonly IEventStore eventStore;
private readonly IEventPublisher eventPublisher;
private readonly IEnumerable<IReplayableStore> stores;
public ReplayGenerator(
ILogger<ReplayGenerator> logger,
IEventStore eventStore,
IEventPublisher eventPublisher,
IEnumerable<IReplayableStore> stores)
{
Guard.NotNull(logger, nameof(logger));
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventPublisher, nameof(eventPublisher));
Guard.NotNull(stores, nameof(stores));
this.stores = stores;
this.logger = logger;
this.eventStore = eventStore;
this.eventPublisher = eventPublisher;
}
public async Task ReplayAllAsync()
{
logger.LogDebug("Starting to replay all events");
if (!await ClearAsync())
{
return;
}
await ReplayEventsAsync();
logger.LogDebug("Finished to replay all events");
}
private async Task ReplayEventsAsync()
{
try
{
logger.LogDebug("Replaying all messages");
await eventStore.GetEventsAsync().ForEachAsync(eventData =>
{
eventPublisher.Publish(eventData);
});
logger.LogDebug("Replayed all messages");
}
catch (Exception e)
{
logger.LogCritical(InfrastructureErrors.ReplayPublishingFailed, e, "Failed to publish events to {0}", eventPublisher);
}
}
private async Task<bool> ClearAsync()
{
logger.LogDebug("Clearing replayable stores");
foreach (var store in stores)
{
try
{
await store.ClearAsync();
logger.LogDebug("Cleared store {0}", store);
}
catch (Exception e)
{
logger.LogCritical(InfrastructureErrors.ReplayClearingFailed, e, "Failed to clear store {0}", store);
return false;
}
}
logger.LogDebug("Cleared replayable stores");
return true;
}
}
}

6
src/Squidex.Infrastructure/InfrastructureErrors.cs

@ -12,8 +12,14 @@ namespace Squidex.Infrastructure
{
public class InfrastructureErrors
{
public static readonly EventId CommandFailed = new EventId(20001, "CommandFailed");
public static readonly EventId EventHandlingFailed = new EventId(10001, "EventHandlingFailed");
public static readonly EventId EventDeserializationFailed = new EventId(10002, "EventDeserializationFailed");
public static readonly EventId ReplayClearingFailed = new EventId(30001, "ReplayClearingFailed");
public static readonly EventId ReplayPublishingFailed = new EventId(30003, "ReplayPublishingFailed");
}
}

2
src/Squidex.Read/Apps/Services/Implementations/CachingAppProvider.cs

@ -20,7 +20,7 @@ using Squidex.Read.Utils;
namespace Squidex.Read.Apps.Services.Implementations
{
public class CachingAppProvider : CachingProvider, IAppProvider, ILiveEventConsumer
public class CachingAppProvider : CachingProvider, IAppProvider, ICatchEventConsumer
{
private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(30);
private readonly IAppRepository appRepository;

7
src/Squidex.Read/Schemas/SchemaHistoryEventsCreator.cs

@ -8,7 +8,6 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using Squidex.Events;
using Squidex.Events.Schemas;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS;
@ -103,11 +102,11 @@ namespace Squidex.Read.Schemas
return this.DispatchFuncAsync(@event.Payload, @event.Headers, (HistoryEventToStore)null);
}
private Task<string> FindSchemaNameAsync(EnvelopeHeaders headers)
private async Task<string> FindSchemaNameAsync(EnvelopeHeaders headers)
{
var name = schemaProvider.FindSchemaNameByIdAsync(headers.AppId(), headers.AggregateId());
var schema = await schemaProvider.ProviderSchemaByIdAsync(headers.AggregateId());
return name;
return schema.Label ?? schema.Name;
}
}
}

5
src/Squidex.Read/Schemas/Services/ISchemaProvider.cs

@ -8,13 +8,14 @@
using System;
using System.Threading.Tasks;
using Squidex.Read.Schemas.Repositories;
namespace Squidex.Read.Schemas.Services
{
public interface ISchemaProvider
{
Task<string> FindSchemaNameByIdAsync(Guid schemaId);
Task<ISchemaEntityWithSchema> ProviderSchemaByIdAsync(Guid schemaId);
Task<Guid?> FindSchemaIdByNameAsync(Guid appId, string name);
Task<ISchemaEntityWithSchema> ProvideSchemaByNameAsync(Guid appId, string name);
}
}

34
src/Squidex.Read/Schemas/Services/Implementations/CachingSchemaProvider.cs

@ -21,16 +21,14 @@ using Squidex.Events;
namespace Squidex.Read.Schemas.Services.Implementations
{
public class CachingSchemaProvider : CachingProvider, ISchemaProvider, ILiveEventConsumer
public class CachingSchemaProvider : CachingProvider, ISchemaProvider, ICatchEventConsumer
{
private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(10);
private readonly ISchemaRepository repository;
private sealed class CacheItem
{
public Guid? Id;
public string Name;
public ISchemaEntityWithSchema Entity;
}
public CachingSchemaProvider(IMemoryCache cache, ISchemaRepository repository)
@ -41,29 +39,29 @@ namespace Squidex.Read.Schemas.Services.Implementations
this.repository = repository;
}
public async Task<string> FindSchemaNameByIdAsync(Guid schemaId)
public async Task<ISchemaEntityWithSchema> ProviderSchemaByIdAsync(Guid schemaId)
{
var cacheKey = BuildIdCacheKey(schemaId);
var cacheItem = Cache.Get<CacheItem>(cacheKey);
if (cacheItem == null)
{
var schema = await repository.FindSchemaAsync(schemaId);
var entity = await repository.FindSchemaAsync(schemaId);
cacheItem = new CacheItem { Id = schema?.Id, Name = schema?.Name };
cacheItem = new CacheItem { Entity = entity };
Cache.Set(cacheKey, cacheItem, CacheDuration);
if (cacheItem.Id != null)
if (cacheItem.Entity != null)
{
Cache.Set(BuildIdCacheKey(cacheItem.Id.Value), cacheItem, CacheDuration);
Cache.Set(BuildIdCacheKey(cacheItem.Entity.Id), cacheItem, CacheDuration);
}
}
return cacheItem.Name;
return cacheItem.Entity;
}
public async Task<Guid?> FindSchemaIdByNameAsync(Guid appId, string name)
public async Task<ISchemaEntityWithSchema> ProvideSchemaByNameAsync(Guid appId, string name)
{
Guard.NotNullOrEmpty(name, nameof(name));
@ -72,19 +70,19 @@ namespace Squidex.Read.Schemas.Services.Implementations
if (cacheItem == null)
{
var schema = await repository.FindSchemaAsync(appId, name);
var entity = await repository.FindSchemaAsync(appId, name);
cacheItem = new CacheItem { Id = schema?.Id, Name = schema?.Name };
cacheItem = new CacheItem { Entity = entity };
Cache.Set(cacheKey, cacheItem, CacheDuration);
if (cacheItem.Id != null)
if (cacheItem.Entity != null)
{
Cache.Set(BuildIdCacheKey(cacheItem.Id.Value), cacheItem, CacheDuration);
Cache.Set(BuildIdCacheKey(cacheItem.Entity.Id), cacheItem, CacheDuration);
}
}
return cacheItem.Id;
return cacheItem.Entity;
}
public Task On(Envelope<IEvent> @event)
@ -96,9 +94,9 @@ namespace Squidex.Read.Schemas.Services.Implementations
var cacheItem = Cache.Get<CacheItem>(cacheKey);
if (cacheItem.Name != null)
if (cacheItem?.Entity != null)
{
Cache.Remove(BuildNameCacheKey(@event.Headers.AppId(), cacheItem.Name));
Cache.Remove(BuildNameCacheKey(@event.Headers.AppId(), cacheItem.Entity.Name));
}
Cache.Remove(cacheKey);

8
src/Squidex.Store.MongoDb/Apps/MongoAppRepository.cs

@ -18,11 +18,12 @@ using Squidex.Read.Apps;
using Squidex.Read.Apps.Repositories;
using Squidex.Store.MongoDb.Utils;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Replay;
using Squidex.Infrastructure.MongoDb;
namespace Squidex.Store.MongoDb.Apps
{
public class MongoAppRepository : MongoRepositoryBase<MongoAppEntity>, IAppRepository, ICatchEventConsumer
public class MongoAppRepository : MongoRepositoryBase<MongoAppEntity>, IAppRepository, ICatchEventConsumer, IReplayableStore
{
public MongoAppRepository(IMongoDatabase database)
: base(database)
@ -39,6 +40,11 @@ namespace Squidex.Store.MongoDb.Apps
return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Name));
}
public Task ClearAsync()
{
return TryDropCollectionAsync();
}
public async Task<IReadOnlyList<IAppEntity>> QueryAllAsync(string subjectId)
{
var entities =

4
src/Squidex.Store.MongoDb/History/MongoHistoryEventEntity.cs

@ -22,6 +22,10 @@ namespace Squidex.Store.MongoDb.History
[BsonElement]
public Guid AppId { get; set; }
[BsonRequired]
[BsonElement]
public int SessionEventIndex { get; set; }
[BsonRequired]
[BsonElement]
public string Channel { get; set; }

23
src/Squidex.Store.MongoDb/History/MongoHistoryEventRepository.cs

@ -9,10 +9,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Infrastructure.CQRS;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Replay;
using Squidex.Infrastructure.MongoDb;
using Squidex.Read.History;
using Squidex.Read.History.Repositories;
@ -20,10 +22,11 @@ using Squidex.Store.MongoDb.Utils;
namespace Squidex.Store.MongoDb.History
{
public class MongoHistoryEventRepository : MongoRepositoryBase<MongoHistoryEventEntity>, IHistoryEventRepository, ICatchEventConsumer
public class MongoHistoryEventRepository : MongoRepositoryBase<MongoHistoryEventEntity>, IHistoryEventRepository, ICatchEventConsumer, IReplayableStore
{
private readonly List<IHistoryEventsCreator> creators;
private readonly Dictionary<string, string> texts = new Dictionary<string, string>();
private int sessionEventCount;
public MongoHistoryEventRepository(IMongoDatabase database, IEnumerable<IHistoryEventsCreator> creators)
: base(database)
@ -47,15 +50,25 @@ namespace Squidex.Store.MongoDb.History
protected override Task SetupCollectionAsync(IMongoCollection<MongoHistoryEventEntity> collection)
{
return Task.WhenAll(
collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.AppId)),
collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Channel)),
collection.Indexes.CreateOneAsync(
IndexKeys
.Ascending(x => x.AppId)
.Ascending(x => x.Channel)
.Descending(x => x.Created)
.Descending(x => x.SessionEventIndex)),
collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Created), new CreateIndexOptions { ExpireAfter = TimeSpan.FromDays(365) }));
}
public Task ClearAsync()
{
return TryDropCollectionAsync();
}
public async Task<List<IHistoryEventEntity>> QueryEventsByChannel(Guid appId, string channelPrefix, int count)
{
var entities =
await Collection.Find(x => x.AppId == appId && x.Channel.StartsWith(channelPrefix)).SortByDescending(x => x.Created).Limit(count).ToListAsync();
await Collection.Find(x => x.AppId == appId && x.Channel.StartsWith(channelPrefix))
.SortByDescending(x => x.Created).ThenByDescending(x => x.SessionEventIndex).Limit(count).ToListAsync();
return entities.Select(x => (IHistoryEventEntity)new ParsedHistoryEvent(x, texts)).ToList();
}
@ -70,6 +83,8 @@ namespace Squidex.Store.MongoDb.History
{
await Collection.CreateAsync(@event.Headers, x =>
{
x.SessionEventIndex = Interlocked.Increment(ref sessionEventCount);
x.Channel = message.Channel;
x.Message = message.Message;

4
src/Squidex.Store.MongoDb/MongoDbModule.cs

@ -13,6 +13,7 @@ using Microsoft.AspNetCore.Identity.MongoDB;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Replay;
using Squidex.Infrastructure.MongoDb;
using Squidex.Read.Apps.Repositories;
using Squidex.Read.History.Repositories;
@ -72,16 +73,19 @@ namespace Squidex.Store.MongoDb
builder.RegisterType<MongoHistoryEventRepository>()
.As<IHistoryEventRepository>()
.As<ICatchEventConsumer>()
.As<IReplayableStore>()
.SingleInstance();
builder.RegisterType<MongoSchemaRepository>()
.As<ISchemaRepository>()
.As<ICatchEventConsumer>()
.As<IReplayableStore>()
.SingleInstance();
builder.RegisterType<MongoAppRepository>()
.As<IAppRepository>()
.As<ICatchEventConsumer>()
.As<IReplayableStore>()
.SingleInstance();
}
}

39
src/Squidex.Store.MongoDb/Schemas/MongoSchemaRepository.cs

@ -14,9 +14,11 @@ using MongoDB.Driver;
using Squidex.Core.Schemas;
using Squidex.Core.Schemas.Json;
using Squidex.Events.Schemas;
using Squidex.Events.Schemas.Utils;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Replay;
using Squidex.Infrastructure.Dispatching;
using Squidex.Infrastructure.MongoDb;
using Squidex.Infrastructure.Reflection;
@ -25,20 +27,20 @@ using Squidex.Store.MongoDb.Utils;
namespace Squidex.Store.MongoDb.Schemas
{
public class MongoSchemaRepository : MongoRepositoryBase<MongoSchemaEntity>, ISchemaRepository, ICatchEventConsumer
public class MongoSchemaRepository : MongoRepositoryBase<MongoSchemaEntity>, ISchemaRepository, ICatchEventConsumer, IReplayableStore
{
private readonly SchemaJsonSerializer serializer;
private readonly FieldRegistry fieldRegistry;
private readonly FieldRegistry registry;
public MongoSchemaRepository(IMongoDatabase database, SchemaJsonSerializer serializer, FieldRegistry fieldRegistry)
public MongoSchemaRepository(IMongoDatabase database, SchemaJsonSerializer serializer, FieldRegistry registry)
: base(database)
{
Guard.NotNull(serializer, nameof(serializer));
Guard.NotNull(fieldRegistry, nameof(fieldRegistry));
Guard.NotNull(registry, nameof(registry));
this.serializer = serializer;
this.fieldRegistry = fieldRegistry;
this.registry = registry;
}
protected override string CollectionName()
@ -51,6 +53,11 @@ namespace Squidex.Store.MongoDb.Schemas
return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.Name));
}
public Task ClearAsync()
{
return TryDropCollectionAsync();
}
public async Task<IReadOnlyList<ISchemaEntity>> QueryAllAsync(Guid appId)
{
var entities = await Collection.Find(s => s.AppId == appId && !s.IsDeleted).ToListAsync();
@ -96,54 +103,52 @@ namespace Squidex.Store.MongoDb.Schemas
protected Task On(FieldDeleted @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.DeleteField(@event.FieldId));
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(FieldDisabled @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.DisableField(@event.FieldId));
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(FieldEnabled @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.EnableField(@event.FieldId));
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(FieldHidden @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.HideField(@event.FieldId));
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(FieldShown @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.ShowField(@event.FieldId));
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(FieldUpdated @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.UpdateField(@event.FieldId, @event.Properties));
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(SchemaUpdated @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.Update(@event.Properties));
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(SchemaPublished @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.Publish());
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(SchemaUnpublished @event, EnvelopeHeaders headers)
{
return UpdateSchema(headers, s => s.Unpublish());
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s));
}
protected Task On(FieldAdded @event, EnvelopeHeaders headers)
{
var field = fieldRegistry.CreateField(@event.FieldId, @event.Name, @event.Properties);
return UpdateSchema(headers, s => s.AddOrUpdateField(field));
return UpdateSchema(headers, s => SchemaEventDispatcher.Dispatch(@event, s, registry));
}
protected Task On(SchemaCreated @event, EnvelopeHeaders headers)

2
src/Squidex.Write/Schemas/SchemaCommandHandler.cs

@ -32,7 +32,7 @@ namespace Squidex.Write.Schemas
protected async Task On(CreateSchema command, CommandContext context)
{
if (await schemas.FindSchemaIdByNameAsync(command.AppId, command.Name) != null)
if (await schemas.ProvideSchemaByNameAsync(command.AppId, command.Name) != null)
{
var error =
new ValidationError($"A schema with name '{command.Name}' already exists", "DisplayName",

23
src/Squidex.Write/Schemas/SchemaDomainObject.cs

@ -9,6 +9,7 @@
using System;
using Squidex.Core.Schemas;
using Squidex.Events.Schemas;
using Squidex.Events.Schemas.Utils;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS;
using Squidex.Infrastructure.CQRS.Events;
@ -47,57 +48,57 @@ namespace Squidex.Write.Schemas
{
totalFields++;
schema = schema.AddOrUpdateField(registry.CreateField(@event.FieldId, @event.Name, @event.Properties));
schema = SchemaEventDispatcher.Dispatch(@event, schema, registry);
}
protected void On(SchemaCreated @event)
{
schema = Schema.Create(@event.Name, @event.Properties);
schema = SchemaEventDispatcher.Dispatch(@event);
}
protected void On(FieldUpdated @event)
{
schema = schema.UpdateField(@event.FieldId, @event.Properties);
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(FieldHidden @event)
{
schema = schema.HideField(@event.FieldId);
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(FieldShown @event)
{
schema = schema.ShowField(@event.FieldId);
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(FieldDisabled @event)
{
schema = schema.DisableField(@event.FieldId);
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(FieldEnabled @event)
{
schema = schema.EnableField(@event.FieldId);
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(SchemaUpdated @event)
{
schema = schema.Update(@event.Properties);
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(FieldDeleted @event)
{
schema = schema.DeleteField(@event.FieldId);
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(SchemaPublished @event)
{
schema = schema.Publish();
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(SchemaUnpublished @event)
{
schema = schema.Unpublish();
schema = SchemaEventDispatcher.Dispatch(@event, schema);
}
protected void On(SchemaDeleted @event)

7
src/Squidex/Config/Domain/InfrastructureModule.cs

@ -14,6 +14,7 @@ using Squidex.Core.Schemas.Json;
using Squidex.Infrastructure.CQRS.Autofac;
using Squidex.Infrastructure.CQRS.Commands;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.CQRS.Replay;
namespace Squidex.Config.Domain
{
@ -45,7 +46,11 @@ namespace Squidex.Config.Domain
.As<ICommandBus>()
.SingleInstance();
builder.RegisterType<EventBus>()
builder.RegisterType<ReplayGenerator>()
.AsSelf()
.SingleInstance();
builder.RegisterType<EventReceiver>()
.AsSelf()
.SingleInstance();

15
src/Squidex/Config/Domain/ReadModule.cs

@ -8,8 +8,11 @@
using Autofac;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Read.Apps;
using Squidex.Read.Apps.Services;
using Squidex.Read.Apps.Services.Implementations;
using Squidex.Read.History;
using Squidex.Read.Schemas;
using Squidex.Read.Schemas.Services;
using Squidex.Read.Schemas.Services.Implementations;
@ -21,12 +24,20 @@ namespace Squidex.Config.Domain
{
builder.RegisterType<CachingAppProvider>()
.As<IAppProvider>()
.As<ILiveEventConsumer>()
.As<ICatchEventConsumer>()
.SingleInstance();
builder.RegisterType<CachingSchemaProvider>()
.As<ISchemaProvider>()
.As<ILiveEventConsumer>()
.As<ICatchEventConsumer>()
.SingleInstance();
builder.RegisterType<AppHistoryEventsCreator>()
.As<IHistoryEventsCreator>()
.SingleInstance();
builder.RegisterType<SchemaHistoryEventsCreator>()
.As<IHistoryEventsCreator>()
.SingleInstance();
}
}

2
src/Squidex/Config/EventStore/EventStoreUsage.cs

@ -16,7 +16,7 @@ namespace Squidex.Config.EventStore
{
public static IApplicationBuilder UseMyEventStore(this IApplicationBuilder app)
{
app.ApplicationServices.GetService<EventBus>().Subscribe();
app.ApplicationServices.GetService<EventReceiver>().Subscribe();
return app;
}

6
src/Squidex/Pipeline/CommandHandlers/EnrichWithSchemaAggregateIdHandler.cs

@ -53,14 +53,14 @@ namespace Squidex.Pipeline.CommandHandlers
{
var schemaName = routeValues["name"].ToString();
var id = await schemaProvider.FindSchemaIdByNameAsync(appCommand.AppId, schemaName);
var schema = await schemaProvider.ProvideSchemaByNameAsync(appCommand.AppId, schemaName);
if (!id.HasValue)
if (schema == null)
{
throw new DomainObjectNotFoundException(schemaName, typeof(SchemaDomainObject));
}
aggregateCommand.AggregateId = id.Value;
aggregateCommand.AggregateId = schema.Id;
}
return false;

11
src/Squidex/Program.cs

@ -8,6 +8,8 @@
using System.IO;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Squidex.Infrastructure.CQRS.Replay;
namespace Squidex
{
@ -22,7 +24,14 @@ namespace Squidex
.UseStartup<Startup>()
.Build();
host.Run();
if (args.Length == 1 && args[0] == "--replay")
{
host.Services.GetService<ReplayGenerator>().ReplayAllAsync().Wait();
}
else
{
host.Run();
}
}
}
}

4
src/Squidex/app-config/clean-css-loader.js

@ -9,14 +9,14 @@ function cleanCss(css) {
new CleanCSS().minify(css, function (err, minified) {
if (err) {
if (Array.isArray(err) && (err[0] != null)) {
if (Array.isArray(err) && (err[0] !== null)) {
return callback(err[0]);
} else {
return callback(err);
}
}
var warnings;
if (((warnings = minified.warnings) != null ? warnings.length : void 0) !== 0) {
if (((warnings = minified.warnings) !== null ? warnings.length : void 0) !== 0) {
warnings.forEach(function (msg) {
loader.emitWarning(msg.toString());
});

14
src/Squidex/app/shared/services/users-provider.service.ts

@ -31,12 +31,6 @@ export class UsersProviderService {
.catch(err => {
return Observable.of(new UserDto('NOT FOUND', 'NOT FOUND', 'NOT FOUND', ''));
})
.map(dto => {
if (this.authService.user && dto.id === this.authService.user.id) {
dto = new UserDto(dto.id, dto.email, me, dto.pictureUrl);
}
return dto;
})
.publishLast();
request.connect();
@ -44,6 +38,12 @@ export class UsersProviderService {
result = this.caches[id] = request;
}
return result;
return result
.map(dto => {
if (this.authService.user && dto.id === this.authService.user.id) {
dto = new UserDto(dto.id, dto.email, me, dto.pictureUrl);
}
return dto;
});
}
}

5
tests/Squidex.Write.Tests/Schemas/SchemaCommandHandlerTests.cs

@ -13,6 +13,7 @@ using Moq;
using Squidex.Core.Schemas;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Commands;
using Squidex.Read.Schemas.Repositories;
using Squidex.Read.Schemas.Services;
using Squidex.Write.Schemas.Commands;
using Squidex.Write.Utils;
@ -50,7 +51,7 @@ namespace Squidex.Write.Schemas
var command = new CreateSchema { Name = schemaName, AppId = appId, AggregateId = Id };
var context = new CommandContext(command);
schemaProvider.Setup(x => x.FindSchemaIdByNameAsync(appId, schemaName)).Returns(Task.FromResult<Guid?>(Guid.NewGuid())).Verifiable();
schemaProvider.Setup(x => x.ProvideSchemaByNameAsync(appId, schemaName)).Returns(Task.FromResult(new Mock<ISchemaEntityWithSchema>().Object)).Verifiable();
await TestCreate(schema, async _ =>
{
@ -66,7 +67,7 @@ namespace Squidex.Write.Schemas
var command = new CreateSchema { Name = schemaName, AppId = appId, AggregateId = Id };
var context = new CommandContext(command);
schemaProvider.Setup(x => x.FindSchemaIdByNameAsync(Id, schemaName)).Returns(Task.FromResult<Guid?>(null)).Verifiable();
schemaProvider.Setup(x => x.ProvideSchemaByNameAsync(Id, schemaName)).Returns(Task.FromResult<ISchemaEntityWithSchema>(null)).Verifiable();
await TestCreate(schema, async _ =>
{

Loading…
Cancel
Save