Browse Source

Merge pull request #218 from Squidex/json-eventstore

Filter events by Metadata
pull/248/head
Sebastian Stehle 8 years ago
committed by GitHub
parent
commit
e350b1eb73
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      src/Squidex.Domain.Apps.Core.Model/SquidexCoreModel.cs
  2. 4
      src/Squidex.Domain.Apps.Entities.MongoDb/Rules/MongoRuleEventRepository.cs
  3. 3
      src/Squidex.Domain.Apps.Entities/Apps/AppDomainObject.cs
  4. 3
      src/Squidex.Domain.Apps.Entities/Assets/AssetDomainObject.cs
  5. 3
      src/Squidex.Domain.Apps.Entities/Contents/ContentDomainObject.cs
  6. 2
      src/Squidex.Domain.Apps.Entities/Rules/Repositories/IRuleEventRepository.cs
  7. 4
      src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs
  8. 3
      src/Squidex.Domain.Apps.Entities/Rules/RuleDomainObject.cs
  9. 3
      src/Squidex.Domain.Apps.Entities/Schemas/SchemaDomainObject.cs
  10. 26
      src/Squidex.Domain.Apps.Entities/SquidexDomainObjectBase.cs
  11. 3
      src/Squidex.Domain.Apps.Events/SquidexEvents.cs
  12. 28
      src/Squidex.Domain.Apps.Events/SquidexHeaderExtensions.cs
  13. 14
      src/Squidex.Domain.Apps.Events/SquidexHeaders.cs
  14. 12
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs
  15. 60
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs
  16. 27
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs
  17. 142
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs
  18. 97
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs
  19. 19
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEvent.cs
  20. 228
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs
  21. 173
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  22. 129
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs
  23. 13
      src/Squidex.Infrastructure.MongoDb/MongoDb/BsonJsonConvention.cs
  24. 32
      src/Squidex.Infrastructure.MongoDb/MongoDb/JTokenSerializer.cs
  25. 8
      src/Squidex.Infrastructure/Commands/DomainObjectBase.cs
  26. 39
      src/Squidex.Infrastructure/EventSourcing/DefaultEventDataFormatter.cs
  27. 8
      src/Squidex.Infrastructure/EventSourcing/EventData.cs
  28. 12
      src/Squidex.Infrastructure/EventSourcing/IEventStore.cs
  29. 2
      src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs
  30. 26
      src/Squidex.Infrastructure/EventSourcing/StoredEvent.cs
  31. 14
      src/Squidex.Infrastructure/Json/NamedGuidIdConverter.cs
  32. 14
      src/Squidex.Infrastructure/Json/NamedLongIdConverter.cs
  33. 7
      src/Squidex.Infrastructure/Migrations/IMigration.cs
  34. 16
      src/Squidex.Infrastructure/Migrations/IMigrationPath.cs
  35. 60
      src/Squidex.Infrastructure/Migrations/Migrator.cs
  36. 22
      src/Squidex.Infrastructure/NamedId.cs
  37. 3
      src/Squidex.Infrastructure/SquidexInfrastructure.cs
  38. 4
      src/Squidex.Infrastructure/States/Persistence{TOwner,TSnapshot,TKey}.cs
  39. 196
      src/Squidex/Areas/Api/Controllers/Content/ContentsController.cs
  40. 14
      src/Squidex/Areas/Api/Controllers/Content/Generator/SchemaSwaggerGenerator.cs
  41. 2
      src/Squidex/Config/Domain/InfrastructureServices.cs
  42. 8
      src/Squidex/Config/Domain/SerializationServices.cs
  43. 14
      src/Squidex/Config/Domain/WriteServices.cs
  44. 17
      tests/Squidex.Infrastructure.Tests/Commands/DomainObjectBaseTests.cs
  45. 8
      tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventDataFormatterTests.cs
  46. 12
      tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs
  47. 79
      tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs
  48. 14
      tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs
  49. 1
      tools/Migrate_01/Migrate_01.csproj
  50. 38
      tools/Migrate_01/Migration05_RebuildForNewCommands.cs
  51. 56
      tools/Migrate_01/MigrationPath.cs
  52. 13
      tools/Migrate_01/Migrations/AddPatterns.cs
  53. 59
      tools/Migrate_01/Migrations/ConvertEventStore.cs
  54. 15
      tools/Migrate_01/Migrations/RebuildAssets.cs
  55. 19
      tools/Migrate_01/Migrations/RebuildContents.cs
  56. 13
      tools/Migrate_01/Migrations/RebuildSnapshots.cs
  57. 12
      tools/Migrate_01/Rebuilder.cs
  58. 3
      tools/Migrate_01/SquidexMigrations.cs

3
src/Squidex.Domain.Apps.Core.Model/SquidexCoreModel.cs

@ -5,9 +5,12 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Reflection;
namespace Squidex.Domain.Apps.Core
{
public static class SquidexCoreModel
{
public static readonly Assembly Assembly = typeof(SquidexCoreModel).Assembly;
}
}

4
src/Squidex.Domain.Apps.Entities.MongoDb/Rules/MongoRuleEventRepository.cs

@ -39,9 +39,9 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Rules
await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Expires), new CreateIndexOptions { ExpireAfter = TimeSpan.Zero });
}
public Task QueryPendingAsync(Instant now, Func<IRuleEventEntity, Task> callback, CancellationToken cancellationToken = default(CancellationToken))
public Task QueryPendingAsync(Instant now, Func<IRuleEventEntity, Task> callback, CancellationToken ct = default(CancellationToken))
{
return Collection.Find(x => x.NextAttempt < now).ForEachAsync(callback, cancellationToken);
return Collection.Find(x => x.NextAttempt < now).ForEachAsync(callback, ct);
}
public async Task<IReadOnlyList<IRuleEventEntity>> QueryByAppAsync(Guid appId, int skip = 0, int take = 20)

3
src/Squidex.Domain.Apps.Entities/Apps/AppDomainObject.cs

@ -13,13 +13,12 @@ using Squidex.Domain.Apps.Entities.Apps.State;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Apps;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Domain.Apps.Entities.Apps
{
public sealed class AppDomainObject : DomainObjectBase<AppState>
public sealed class AppDomainObject : SquidexDomainObjectBase<AppState>
{
private readonly InitialPatterns initialPatterns;

3
src/Squidex.Domain.Apps.Entities/Assets/AssetDomainObject.cs

@ -10,13 +10,12 @@ using Squidex.Domain.Apps.Entities.Assets.State;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Assets;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Domain.Apps.Entities.Assets
{
public sealed class AssetDomainObject : DomainObjectBase<AssetState>
public sealed class AssetDomainObject : SquidexDomainObjectBase<AssetState>
{
public AssetDomainObject Create(CreateAsset command)
{

3
src/Squidex.Domain.Apps.Entities/Contents/ContentDomainObject.cs

@ -11,13 +11,12 @@ using Squidex.Domain.Apps.Entities.Contents.State;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Contents;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Domain.Apps.Entities.Contents
{
public sealed class ContentDomainObject : DomainObjectBase<ContentState>
public sealed class ContentDomainObject : SquidexDomainObjectBase<ContentState>
{
public ContentDomainObject Create(CreateContent command)
{

2
src/Squidex.Domain.Apps.Entities/Rules/Repositories/IRuleEventRepository.cs

@ -23,7 +23,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Repositories
Task MarkSentAsync(Guid jobId, string dump, RuleResult result, RuleJobResult jobResult, TimeSpan elapsed, Instant? nextCall);
Task QueryPendingAsync(Instant now, Func<IRuleEventEntity, Task> callback, CancellationToken cancellationToken = default(CancellationToken));
Task QueryPendingAsync(Instant now, Func<IRuleEventEntity, Task> callback, CancellationToken ct = default(CancellationToken));
Task<int> CountByAppAsync(Guid appId);

4
src/Squidex.Domain.Apps.Entities/Rules/RuleDequeuer.cs

@ -72,13 +72,13 @@ namespace Squidex.Domain.Apps.Entities.Rules
timer.SkipCurrentDelay();
}
private async Task QueryAsync(CancellationToken cancellationToken)
private async Task QueryAsync(CancellationToken ct)
{
try
{
var now = clock.GetCurrentInstant();
await ruleEventRepository.QueryPendingAsync(now, requestBlock.SendAsync, cancellationToken);
await ruleEventRepository.QueryPendingAsync(now, requestBlock.SendAsync, ct);
}
catch (Exception ex)
{

3
src/Squidex.Domain.Apps.Entities/Rules/RuleDomainObject.cs

@ -10,13 +10,12 @@ using Squidex.Domain.Apps.Entities.Rules.State;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Rules;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Domain.Apps.Entities.Rules
{
public sealed class RuleDomainObject : DomainObjectBase<RuleState>
public sealed class RuleDomainObject : SquidexDomainObjectBase<RuleState>
{
public void Create(CreateRule command)
{

3
src/Squidex.Domain.Apps.Entities/Schemas/SchemaDomainObject.cs

@ -13,13 +13,12 @@ using Squidex.Domain.Apps.Entities.Schemas.State;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Schemas;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Domain.Apps.Entities.Schemas
{
public sealed class SchemaDomainObject : DomainObjectBase<SchemaState>
public sealed class SchemaDomainObject : SquidexDomainObjectBase<SchemaState>
{
private readonly FieldRegistry registry;

26
src/Squidex.Domain.Apps.Entities/SquidexDomainObjectBase.cs

@ -0,0 +1,26 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Squidex.Domain.Apps.Events;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
namespace Squidex.Domain.Apps.Entities
{
public abstract class SquidexDomainObjectBase<T> : DomainObjectBase<T> where T : IDomainState, new()
{
public override void RaiseEvent(Envelope<IEvent> @event)
{
if (@event.Payload is AppEvent appEvent)
{
@event.SetAppId(appEvent.AppId.Id);
}
base.RaiseEvent(@event);
}
}
}

3
src/Squidex.Domain.Apps.Events/SquidexEvents.cs

@ -5,9 +5,12 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Reflection;
namespace Squidex.Domain.Apps.Events
{
public static class SquidexEvents
{
public static readonly Assembly Assembly = typeof(SquidexEvents).Assembly;
}
}

28
src/Squidex.Domain.Apps.Events/SquidexHeaderExtensions.cs

@ -0,0 +1,28 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Globalization;
using Squidex.Infrastructure.EventSourcing;
namespace Squidex.Domain.Apps.Events
{
public static class SquidexHeaderExtensions
{
public static Guid AppId(this EnvelopeHeaders headers)
{
return headers[SquidexHeaders.AppId].ToGuid(CultureInfo.InvariantCulture);
}
public static Envelope<T> SetAppId<T>(this Envelope<T> envelope, Guid value) where T : class
{
envelope.Headers.Set(SquidexHeaders.AppId, value);
return envelope;
}
}
}

14
src/Squidex.Domain.Apps.Events/SquidexHeaders.cs

@ -0,0 +1,14 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Domain.Apps.Events
{
public static class SquidexHeaders
{
public static readonly string AppId = "AppId";
}
}

12
src/Squidex.Infrastructure.GetEventStore/EventSourcing/Formatter.cs

@ -5,6 +5,7 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Text;
using EventStore.ClientAPI;
using EventStoreData = EventStore.ClientAPI.EventData;
@ -20,7 +21,7 @@ namespace Squidex.Infrastructure.EventSourcing
var body = Encoding.UTF8.GetString(@event.Data);
var meta = Encoding.UTF8.GetString(@event.Metadata);
var eventData = new EventData { Type = @event.EventType, EventId = @event.EventId, Payload = body, Metadata = meta };
var eventData = new EventData { Type = @event.EventType, Payload = body, Metadata = meta };
return new StoredEvent(
resolvedEvent.OriginalEventNumber.ToString(),
@ -30,13 +31,10 @@ namespace Squidex.Infrastructure.EventSourcing
public static EventStoreData Write(EventData eventData)
{
var body = Encoding.UTF8.GetBytes(eventData.Payload);
var meta = Encoding.UTF8.GetBytes(eventData.Metadata);
var body = Encoding.UTF8.GetBytes(eventData.Payload.ToString());
var meta = Encoding.UTF8.GetBytes(eventData.Metadata.ToString());
return new EventStoreData(
eventData.EventId,
eventData.Type,
true, body, meta);
return new EventStoreData(Guid.NewGuid(), eventData.Type, true, body, meta);
}
}
}

60
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs

@ -11,7 +11,6 @@ using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Projections;
namespace Squidex.Infrastructure.EventSourcing
{
@ -20,18 +19,18 @@ namespace Squidex.Infrastructure.EventSourcing
private const int WritePageSize = 500;
private const int ReadPageSize = 500;
private readonly IEventStoreConnection connection;
private readonly string projectionHost;
private readonly string prefix;
private ProjectionsManager projectionsManager;
private ProjectionClient projectionClient;
public GetEventStore(IEventStoreConnection connection, string prefix, string projectionHost)
{
Guard.NotNull(connection, nameof(connection));
this.connection = connection;
this.projectionHost = projectionHost;
this.prefix = prefix?.Trim(' ', '-').WithFallback("squidex");
projectionClient = new ProjectionClient(connection, prefix, projectionHost);
}
public void Initialize()
@ -45,50 +44,43 @@ namespace Squidex.Infrastructure.EventSourcing
throw new ConfigurationException("Cannot connect to event store.", ex);
}
try
{
projectionsManager = connection.GetProjectionsManagerAsync(projectionHost).Result;
projectionsManager.ListAllAsync(connection.Settings.DefaultUserCredentials).Wait();
projectionClient.ConnectAsync().Wait();
}
catch (Exception ex)
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
{
throw new ConfigurationException($"Cannot connect to event store projections: {projectionHost}.", ex);
}
return new GetEventStoreSubscription(connection, subscriber, projectionClient, prefix, position, streamFilter);
}
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
public Task CreateIndexAsync(string property)
{
return new GetEventStoreSubscription(connection, subscriber, projectionsManager, prefix, position, streamFilter);
return projectionClient.CreateProjectionAsync(property, string.Empty);
}
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, string streamFilter = null, string position = null, CancellationToken cancellationToken = default(CancellationToken))
public async Task QueryAsync(Func<StoredEvent, Task> callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken))
{
var streamName = await connection.CreateProjectionAsync(projectionsManager, prefix, streamFilter);
var streamName = await projectionClient.CreateProjectionAsync(property, value);
var sliceStart = ProjectionHelper.ParsePosition(position);
var sliceStart = projectionClient.ParsePosition(position);
StreamEventsSlice currentSlice;
do
{
currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, true);
await QueryAsync(callback, streamName, sliceStart, ct);
}
if (currentSlice.Status == SliceReadStatus.Success)
public async Task QueryAsync(Func<StoredEvent, Task> callback, string streamFilter = null, string position = null, CancellationToken ct = default(CancellationToken))
{
sliceStart = currentSlice.NextEventNumber;
var streamName = await projectionClient.CreateProjectionAsync(streamFilter);
foreach (var resolved in currentSlice.Events)
{
var storedEvent = Formatter.Read(resolved);
var sliceStart = projectionClient.ParsePosition(position);
await callback(storedEvent);
}
}
await QueryAsync(callback, streamName, sliceStart, ct);
}
while (!currentSlice.IsEndOfStream && !cancellationToken.IsCancellationRequested);
private Task QueryAsync(Func<StoredEvent, Task> callback, string streamName, long sliceStart, CancellationToken ct)
{
return QueryAsync(callback, GetStreamName(streamName), sliceStart, ct);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, long streamPosition = 0)
public async Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0)
{
var result = new List<StoredEvent>();
@ -97,7 +89,7 @@ namespace Squidex.Infrastructure.EventSourcing
StreamEventsSlice currentSlice;
do
{
currentSlice = await connection.ReadStreamEventsForwardAsync(GetStreamName(streamName), sliceStart, ReadPageSize, false);
currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, ReadPageSize, false);
if (currentSlice.Status == SliceReadStatus.Success)
{
@ -116,12 +108,12 @@ namespace Squidex.Infrastructure.EventSourcing
return result;
}
public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events)
public Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
return AppendEventsInternalAsync(streamName, EtagVersion.Any, events);
}
public Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
public Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion));

27
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs

@ -8,33 +8,32 @@
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.Projections;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.EventSourcing
{
internal sealed class GetEventStoreSubscription : IEventSubscription
{
private readonly IEventStoreConnection eventStoreConnection;
private readonly IEventSubscriber eventSubscriber;
private readonly IEventStoreConnection connection;
private readonly IEventSubscriber subscriber;
private readonly EventStoreCatchUpSubscription subscription;
private readonly long? position;
public GetEventStoreSubscription(
IEventStoreConnection eventStoreConnection,
IEventSubscriber eventSubscriber,
ProjectionsManager projectionsManager,
IEventStoreConnection connection,
IEventSubscriber subscriber,
ProjectionClient projectionClient,
string prefix,
string position,
string streamFilter)
{
Guard.NotNull(eventSubscriber, nameof(eventSubscriber));
Guard.NotNull(subscriber, nameof(subscriber));
this.eventStoreConnection = eventStoreConnection;
this.eventSubscriber = eventSubscriber;
this.position = ProjectionHelper.ParsePositionOrNull(position);
this.connection = connection;
this.position = projectionClient.ParsePositionOrNull(position);
this.subscriber = subscriber;
var streamName = eventStoreConnection.CreateProjectionAsync(projectionsManager, prefix, streamFilter).Result;
var streamName = projectionClient.CreateProjectionAsync(streamFilter).Result;
subscription = SubscribeToStream(streamName);
}
@ -50,12 +49,12 @@ namespace Squidex.Infrastructure.EventSourcing
{
var settings = CatchUpSubscriptionSettings.Default;
return eventStoreConnection.SubscribeToStreamFrom(streamName, position, settings,
return connection.SubscribeToStreamFrom(streamName, position, settings,
(s, e) =>
{
var storedEvent = Formatter.Read(e);
eventSubscriber.OnEventAsync(this, storedEvent).Wait();
subscriber.OnEventAsync(this, storedEvent).Wait();
}, null,
(s, reason, ex) =>
{
@ -64,7 +63,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
ex = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}.");
eventSubscriber.OnErrorAsync(this, ex);
subscriber.OnErrorAsync(this, ex);
}
});
}

142
src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs

@ -0,0 +1,142 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.Projections;
namespace Squidex.Infrastructure.EventSourcing
{
public sealed class ProjectionClient
{
private readonly ConcurrentDictionary<string, bool> projections = new ConcurrentDictionary<string, bool>();
private readonly IEventStoreConnection connection;
private readonly string prefix;
private readonly string projectionHost;
private ProjectionsManager projectionsManager;
public ProjectionClient(IEventStoreConnection connection, string prefix, string projectionHost)
{
this.connection = connection;
this.prefix = prefix;
this.projectionHost = projectionHost;
}
private string CreateFilterProjectionName(string filter)
{
return $"by-{prefix.Slugify()}-{filter.Slugify()}";
}
private string CreatePropertyProjectionName(string property)
{
return $"by-{prefix.Slugify()}-{property.Slugify()}-property";
}
public async Task<string> CreateProjectionAsync(string property, object value)
{
var name = CreatePropertyProjectionName(property);
var query =
$@"fromAll()
.when({{
$any: function (s, e) {{
if (e.streamId.indexOf('{prefix}') === 0 && e.metadata.{property}) {{
linkTo('{name}-' + e.metadata.{property}, e);
}}
}}
}});";
await CreateProjectionAsync(name, query);
return $"{name}-{value}";
}
public async Task<string> CreateProjectionAsync(string streamFilter = null)
{
streamFilter = streamFilter ?? ".*";
var name = CreateFilterProjectionName(streamFilter);
var query =
$@"fromAll()
.when({{
$any: function (s, e) {{
if (e.streamId.indexOf('{prefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({prefix.Length + 1}))) {{
linkTo('{name}', e);
}}
}}
}});";
await CreateProjectionAsync(name, query);
return name;
}
private async Task CreateProjectionAsync(string name, string query)
{
if (projections.TryAdd(name, true))
{
try
{
var credentials = connection.Settings.DefaultUserCredentials;
await projectionsManager.CreateContinuousAsync(name, query, credentials);
}
catch (Exception ex)
{
if (!ex.Is<ProjectionCommandConflictException>())
{
throw;
}
}
}
}
public async Task ConnectAsync()
{
var addressParts = projectionHost.Split(':');
if (addressParts.Length < 2 || !int.TryParse(addressParts[1], out var port))
{
port = 2113;
}
var endpoints = await Dns.GetHostAddressesAsync(addressParts[0]);
var endpoint = new IPEndPoint(endpoints.First(x => x.AddressFamily == AddressFamily.InterNetwork), port);
projectionsManager =
new ProjectionsManager(
connection.Settings.Log, endpoint,
connection.Settings.OperationTimeout);
try
{
await projectionsManager.ListAllAsync(connection.Settings.DefaultUserCredentials);
}
catch (Exception ex)
{
throw new ConfigurationException($"Cannot connect to event store projections: {projectionHost}.", ex);
}
}
public long? ParsePositionOrNull(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
public long ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? parsedPosition : 0;
}
}
}

97
src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionHelper.cs

@ -1,97 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Globalization;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.Projections;
namespace Squidex.Infrastructure.EventSourcing
{
public static class ProjectionHelper
{
private const string ProjectionName = "by-{0}-{1}";
private static readonly ConcurrentDictionary<string, bool> SubscriptionsCreated = new ConcurrentDictionary<string, bool>();
private static string ParseFilter(string prefix, string filter)
{
return string.Format(CultureInfo.InvariantCulture, ProjectionName, prefix.Slugify(), filter.Slugify());
}
public static async Task<string> CreateProjectionAsync(this IEventStoreConnection connection, ProjectionsManager projectionsManager, string prefix, string streamFilter = null)
{
streamFilter = streamFilter ?? ".*";
var streamName = ParseFilter(prefix, streamFilter);
if (SubscriptionsCreated.TryAdd(streamName, true))
{
var projectionConfig =
$@"fromAll()
.when({{
$any: function (s, e) {{
if (e.streamId.indexOf('{prefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({prefix.Length + 1}))) {{
linkTo('{streamName}', e);
}}
}}
}});";
try
{
var credentials = connection.Settings.DefaultUserCredentials;
await projectionsManager.CreateContinuousAsync($"${streamName}", projectionConfig, credentials);
}
catch (Exception ex)
{
if (!ex.Is<ProjectionCommandConflictException>())
{
throw;
}
}
}
return streamName;
}
public static async Task<ProjectionsManager> GetProjectionsManagerAsync(this IEventStoreConnection connection, string projectionHost)
{
var addressParts = projectionHost.Split(':');
if (addressParts.Length < 2 || !int.TryParse(addressParts[1], out var port))
{
port = 2113;
}
var endpoints = await Dns.GetHostAddressesAsync(addressParts[0]);
var endpoint = new IPEndPoint(endpoints.First(x => x.AddressFamily == AddressFamily.InterNetwork), port);
var projectionsManager =
new ProjectionsManager(
connection.Settings.Log, endpoint,
connection.Settings.OperationTimeout);
return projectionsManager;
}
public static long? ParsePositionOrNull(string position)
{
return long.TryParse(position, out var parsedPosition) ? (long?)parsedPosition : null;
}
public static long ParsePosition(string position)
{
return long.TryParse(position, out var parsedPosition) ? parsedPosition : 0;
}
}
}

19
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEvent.cs

@ -5,42 +5,33 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using MongoDB.Bson.Serialization.Attributes;
using Squidex.Infrastructure.Reflection;
using Newtonsoft.Json.Linq;
namespace Squidex.Infrastructure.EventSourcing
{
public class MongoEvent
{
[BsonElement]
[BsonRequired]
public Guid EventId { get; set; }
[BsonElement]
[BsonRequired]
public string Payload { get; set; }
[BsonElement]
[BsonRequired]
public string Metadata { get; set; }
public JToken Metadata { get; set; }
[BsonElement]
[BsonRequired]
public string Type { get; set; }
public MongoEvent()
{
}
public MongoEvent(EventData data)
public static MongoEvent FromEventData(EventData data)
{
SimpleMapper.Map(data, this);
return new MongoEvent { Type = data.Type, Metadata = data.Metadata, Payload = data.ToString() };
}
public EventData ToEventData()
{
return SimpleMapper.Map(this, new EventData());
return new EventData { Type = Type, Metadata = Metadata, Payload = JObject.Parse(Payload) };
}
}
}

228
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs

@ -5,10 +5,6 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
@ -16,16 +12,19 @@ using Squidex.Infrastructure.MongoDb;
namespace Squidex.Infrastructure.EventSourcing
{
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore
public partial class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore
{
private const int MaxAttempts = 20;
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
private static readonly FieldDefinition<MongoEventCommit, BsonTimestamp> TimestampField = Fields.Build(x => x.Timestamp);
private static readonly FieldDefinition<MongoEventCommit, long> EventsCountField = Fields.Build(x => x.EventsCount);
private static readonly FieldDefinition<MongoEventCommit, long> EventStreamOffsetField = Fields.Build(x => x.EventStreamOffset);
private static readonly FieldDefinition<MongoEventCommit, string> EventStreamField = Fields.Build(x => x.EventStream);
private readonly IEventNotifier notifier;
public IMongoCollection<BsonDocument> RawCollection
{
get { return Database.GetCollection<BsonDocument>(CollectionName()); }
}
public MongoEventStore(IMongoDatabase database, IEventNotifier notifier)
: base(database)
{
@ -50,220 +49,5 @@ namespace Squidex.Infrastructure.EventSourcing
collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)),
collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true }));
}
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
{
Guard.NotNull(subscriber, nameof(subscriber));
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter));
return new PollingSubscription(this, notifier, subscriber, streamFilter, position);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, long streamPosition = 0)
{
var commits =
await Collection.Find(
Filter.And(
Filter.Eq(EventStreamField, streamName),
Filter.Gte(EventStreamOffsetField, streamPosition - 1)))
.Sort(Sort.Ascending(TimestampField)).ToListAsync();
var result = new List<StoredEvent>();
foreach (var commit in commits)
{
var eventStreamOffset = (int)commit.EventStreamOffset;
var commitTimestamp = commit.Timestamp;
var commitOffset = 0;
foreach (var e in commit.Events)
{
eventStreamOffset++;
if (eventStreamOffset >= streamPosition)
{
var eventData = e.ToEventData();
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
result.Add(new StoredEvent(eventToken, eventStreamOffset, eventData));
}
}
}
return result;
}
public async Task GetEventsAsync(Func<StoredEvent, Task> callback, string streamFilter = null, string position = null, CancellationToken cancellationToken = default(CancellationToken))
{
Guard.NotNull(callback, nameof(callback));
StreamPosition lastPosition = position;
var filter = CreateFilter(streamFilter, lastPosition);
await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachAsync(async commit =>
{
var eventStreamOffset = (int)commit.EventStreamOffset;
var commitTimestamp = commit.Timestamp;
var commitOffset = 0;
foreach (var e in commit.Events)
{
eventStreamOffset++;
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp)
{
var eventData = e.ToEventData();
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
await callback(new StoredEvent(eventToken, eventStreamOffset, eventData));
commitOffset++;
}
}
}, cancellationToken);
}
public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
return AppendEventsInternalAsync(commitId, streamName, EtagVersion.Any, events);
}
public Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion));
return AppendEventsInternalAsync(commitId, streamName, expectedVersion, events);
}
private async Task AppendEventsInternalAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
if (events.Count == 0)
{
return;
}
var currentVersion = await GetEventStreamOffset(streamName);
if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion)
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events);
for (var attempt = 0; attempt < MaxAttempts; attempt++)
{
try
{
await Collection.InsertOneAsync(commit);
notifier.NotifyEventsStored(streamName);
return;
}
catch (MongoWriteException ex)
{
if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
currentVersion = await GetEventStreamOffset(streamName);
if (expectedVersion != EtagVersion.Any)
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
else if (attempt < MaxAttempts)
{
expectedVersion = currentVersion;
}
else
{
throw new TimeoutException("Could not acquire a free slot for the commit within the provided time.");
}
}
else
{
throw;
}
}
}
}
private async Task<long> GetEventStreamOffset(string streamName)
{
var document =
await Collection.Find(Filter.Eq(EventStreamField, streamName))
.Project<BsonDocument>(Projection
.Include(EventStreamOffsetField)
.Include(EventsCountField))
.Sort(Sort.Descending(EventStreamOffsetField)).Limit(1)
.FirstOrDefaultAsync();
if (document != null)
{
return document[nameof(MongoEventCommit.EventStreamOffset)].ToInt64() + document[nameof(MongoEventCommit.EventsCount)].ToInt64();
}
return EtagVersion.Empty;
}
private static FilterDefinition<MongoEventCommit> CreateFilter(string streamFilter, StreamPosition streamPosition)
{
var filters = new List<FilterDefinition<MongoEventCommit>>();
if (streamPosition.IsEndOfCommit)
{
filters.Add(Filter.Gt(TimestampField, streamPosition.Timestamp));
}
else
{
filters.Add(Filter.Gte(TimestampField, streamPosition.Timestamp));
}
if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, ".*", StringComparison.OrdinalIgnoreCase))
{
if (streamFilter.Contains("^"))
{
filters.Add(Filter.Regex(EventStreamField, streamFilter));
}
else
{
filters.Add(Filter.Eq(EventStreamField, streamFilter));
}
}
return Filter.And(filters);
}
private static MongoEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
var commitEvents = new MongoEvent[events.Count];
var i = 0;
foreach (var e in events)
{
var mongoEvent = new MongoEvent(e);
commitEvents[i++] = mongoEvent;
}
var mongoCommit = new MongoEventCommit
{
Id = commitId,
Events = commitEvents,
EventsCount = events.Count,
EventStream = streamName,
EventStreamOffset = expectedVersion,
Timestamp = EmptyTimestamp
};
return mongoCommit;
}
}
}

173
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -0,0 +1,173 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Infrastructure.MongoDb;
namespace Squidex.Infrastructure.EventSourcing
{
public partial class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore
{
public Task CreateIndexAsync(string property)
{
return Collection.Indexes.CreateOneAsync(Index.Ascending(CreateIndexPath(property)));
}
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
{
Guard.NotNull(subscriber, nameof(subscriber));
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter));
return new PollingSubscription(this, notifier, subscriber, streamFilter, position);
}
public async Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0)
{
var commits =
await Collection.Find(
Filter.And(
Filter.Eq(EventStreamField, streamName),
Filter.Gte(EventStreamOffsetField, streamPosition - 1)))
.Sort(Sort.Ascending(TimestampField)).ToListAsync();
var result = new List<StoredEvent>();
foreach (var commit in commits)
{
var eventStreamOffset = (int)commit.EventStreamOffset;
var commitTimestamp = commit.Timestamp;
var commitOffset = 0;
foreach (var e in commit.Events)
{
eventStreamOffset++;
if (eventStreamOffset >= streamPosition)
{
var eventData = e.ToEventData();
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
result.Add(new StoredEvent(eventToken, eventStreamOffset, eventData));
}
}
}
return result;
}
public Task QueryAsync(Func<StoredEvent, Task> callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken))
{
Guard.NotNull(callback, nameof(callback));
StreamPosition lastPosition = position;
var filter = CreateFilter(property, value, lastPosition);
return QueryAsync(callback, lastPosition, filter, ct);
}
public Task QueryAsync(Func<StoredEvent, Task> callback, string streamFilter = null, string position = null, CancellationToken ct = default(CancellationToken))
{
Guard.NotNull(callback, nameof(callback));
StreamPosition lastPosition = position;
var filter = CreateFilter(streamFilter, lastPosition);
return QueryAsync(callback, lastPosition, filter, ct);
}
private async Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, FilterDefinition<MongoEventCommit> filter, CancellationToken ct)
{
await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachAsync(async commit =>
{
var eventStreamOffset = (int)commit.EventStreamOffset;
var commitTimestamp = commit.Timestamp;
var commitOffset = 0;
foreach (var e in commit.Events)
{
eventStreamOffset++;
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp)
{
var eventData = e.ToEventData();
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length);
await callback(new StoredEvent(eventToken, eventStreamOffset, eventData));
commitOffset++;
}
}
}, ct);
}
private static FilterDefinition<MongoEventCommit> CreateFilter(string property, object value, StreamPosition streamPosition)
{
var filters = new List<FilterDefinition<MongoEventCommit>>();
AddPositionFilter(streamPosition, filters);
AddPropertyFitler(property, value, filters);
return Filter.And(filters);
}
private static FilterDefinition<MongoEventCommit> CreateFilter(string streamFilter, StreamPosition streamPosition)
{
var filters = new List<FilterDefinition<MongoEventCommit>>();
AddPositionFilter(streamPosition, filters);
AddStreamFilter(streamFilter, filters);
return Filter.And(filters);
}
private static void AddPropertyFitler(string property, object value, List<FilterDefinition<MongoEventCommit>> filters)
{
filters.Add(Filter.Eq(CreateIndexPath(property), value));
}
private static void AddStreamFilter(string streamFilter, List<FilterDefinition<MongoEventCommit>> filters)
{
if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, ".*", StringComparison.OrdinalIgnoreCase))
{
if (streamFilter.Contains("^"))
{
filters.Add(Filter.Regex(EventStreamField, streamFilter));
}
else
{
filters.Add(Filter.Eq(EventStreamField, streamFilter));
}
}
}
private static void AddPositionFilter(StreamPosition streamPosition, List<FilterDefinition<MongoEventCommit>> filters)
{
if (streamPosition.IsEndOfCommit)
{
filters.Add(Filter.Gt(TimestampField, streamPosition.Timestamp));
}
else
{
filters.Add(Filter.Gte(TimestampField, streamPosition.Timestamp));
}
}
private static string CreateIndexPath(string property)
{
return $"Events.Metadata.{property}";
}
}
}

129
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs

@ -0,0 +1,129 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
namespace Squidex.Infrastructure.EventSourcing
{
public partial class MongoEventStore
{
private const int MaxWriteAttempts = 20;
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
public Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
return AppendAsync(commitId, streamName, EtagVersion.Any, events);
}
public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion));
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
if (events.Count == 0)
{
return;
}
var currentVersion = await GetEventStreamOffset(streamName);
if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion)
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events);
for (var attempt = 0; attempt < MaxWriteAttempts; attempt++)
{
try
{
await Collection.InsertOneAsync(commit);
notifier.NotifyEventsStored(streamName);
return;
}
catch (MongoWriteException ex)
{
if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
currentVersion = await GetEventStreamOffset(streamName);
if (expectedVersion != EtagVersion.Any)
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
if (attempt < MaxWriteAttempts)
{
expectedVersion = currentVersion;
}
else
{
throw new TimeoutException("Could not acquire a free slot for the commit within the provided time.");
}
}
else
{
throw;
}
}
}
}
private async Task<long> GetEventStreamOffset(string streamName)
{
var document =
await Collection.Find(Filter.Eq(EventStreamField, streamName))
.Project<BsonDocument>(Projection
.Include(EventStreamOffsetField)
.Include(EventsCountField))
.Sort(Sort.Descending(EventStreamOffsetField)).Limit(1)
.FirstOrDefaultAsync();
if (document != null)
{
return document[nameof(MongoEventCommit.EventStreamOffset)].ToInt64() + document[nameof(MongoEventCommit.EventsCount)].ToInt64();
}
return EtagVersion.Empty;
}
private static MongoEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
var commitEvents = new MongoEvent[events.Count];
var i = 0;
foreach (var e in events)
{
var mongoEvent = MongoEvent.FromEventData(e);
commitEvents[i++] = mongoEvent;
}
var mongoCommit = new MongoEventCommit
{
Id = commitId,
Events = commitEvents,
EventsCount = events.Count,
EventStream = streamName,
EventStreamOffset = expectedVersion,
Timestamp = EmptyTimestamp
};
return mongoCommit;
}
}
}

13
src/Squidex.Infrastructure.MongoDb/MongoDb/BsonJsonConvention.cs

@ -11,6 +11,7 @@ using System.Reflection;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Conventions;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Squidex.Infrastructure.MongoDb
{
@ -31,6 +32,18 @@ namespace Squidex.Infrastructure.MongoDb
memberMap.SetSerializer((IBsonSerializer)bsonSerializer);
}
else if (memberMap.MemberType == typeof(JToken))
{
memberMap.SetSerializer(JTokenSerializer<JToken>.Instance);
}
else if (memberMap.MemberType == typeof(JObject))
{
memberMap.SetSerializer(JTokenSerializer<JObject>.Instance);
}
else if (memberMap.MemberType == typeof(JValue))
{
memberMap.SetSerializer(JTokenSerializer<JValue>.Instance);
}
});
ConventionRegistry.Register("json", pack, t => true);

32
src/Squidex.Infrastructure.MongoDb/MongoDb/JTokenSerializer.cs

@ -0,0 +1,32 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.Serializers;
using Newtonsoft.Json.Linq;
namespace Squidex.Infrastructure.MongoDb
{
public sealed class JTokenSerializer<T> : ClassSerializerBase<T> where T : JToken
{
public static readonly JTokenSerializer<T> Instance = new JTokenSerializer<T>();
protected override T DeserializeValue(BsonDeserializationContext context, BsonDeserializationArgs args)
{
var jsonReader = new BsonJsonReader(context.Reader);
return (T)JToken.ReadFrom(jsonReader);
}
protected override void SerializeValue(BsonSerializationContext context, BsonSerializationArgs args, T value)
{
var jsonWriter = new BsonJsonWriter(context.Writer);
value.WriteTo(jsonWriter);
}
}
}

8
src/Squidex.Infrastructure/Commands/DomainObjectBase.cs

@ -44,17 +44,15 @@ namespace Squidex.Infrastructure.Commands
RaiseEvent(Envelope.Create(@event));
}
public void RaiseEvent<TEvent>(Envelope<TEvent> @event) where TEvent : class, IEvent
public virtual void RaiseEvent(Envelope<IEvent> @event)
{
Guard.NotNull(@event, nameof(@event));
@event.SetAggregateId(id);
ApplyEvent(@event.To<IEvent>());
ApplyEvent(@event);
snapshot.Version++;
uncomittedEvents.Add(@event.To<IEvent>());
uncomittedEvents.Add(@event);
}
public IReadOnlyList<Envelope<IEvent>> GetUncomittedEvents()

39
src/Squidex.Infrastructure/EventSourcing/JsonEventDataFormatter.cs → src/Squidex.Infrastructure/EventSourcing/DefaultEventDataFormatter.cs

@ -7,38 +7,37 @@
using System;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Squidex.Infrastructure.EventSourcing
{
public class JsonEventDataFormatter : IEventDataFormatter
public class DefaultEventDataFormatter : IEventDataFormatter
{
private readonly JsonSerializerSettings serializerSettings;
private readonly JsonSerializer serializer;
private readonly TypeNameRegistry typeNameRegistry;
public JsonEventDataFormatter(TypeNameRegistry typeNameRegistry, JsonSerializerSettings serializerSettings = null)
public DefaultEventDataFormatter(TypeNameRegistry typeNameRegistry, JsonSerializer serializer = null)
{
Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry));
this.typeNameRegistry = typeNameRegistry;
this.serializerSettings = serializerSettings ?? new JsonSerializerSettings();
this.serializer = serializer ?? JsonSerializer.CreateDefault();
}
public Envelope<IEvent> Parse(EventData eventData, bool migrate = true)
{
var headers = ReadJson<EnvelopeHeaders>(eventData.Metadata);
var eventType = typeNameRegistry.GetType(eventData.Type);
var eventPayload = ReadJson<IEvent>(eventData.Payload, eventType);
if (migrate && eventPayload is IMigratedEvent migratedEvent)
var headers = eventData.Metadata.ToObject<EnvelopeHeaders>();
var content = eventData.Payload.ToObject(eventType, serializer) as IEvent;
if (migrate && content is IMigratedEvent migratedEvent)
{
eventPayload = migratedEvent.Migrate();
content = migratedEvent.Migrate();
}
var envelope = new Envelope<IEvent>(eventPayload, headers);
envelope.SetEventId(eventData.EventId);
var envelope = new Envelope<IEvent>(content, headers);
return envelope;
}
@ -56,20 +55,10 @@ namespace Squidex.Infrastructure.EventSourcing
envelope.SetCommitId(commitId);
var headers = WriteJson(envelope.Headers);
var content = WriteJson(envelope.Payload);
var headers = JToken.FromObject(envelope.Headers, serializer);
var content = JToken.FromObject(envelope.Payload, serializer);
return new EventData { EventId = envelope.Headers.EventId(), Type = eventType, Payload = content, Metadata = headers };
}
private T ReadJson<T>(string data, Type type = null)
{
return (T)JsonConvert.DeserializeObject(data, type ?? typeof(T), serializerSettings);
}
private string WriteJson(object value)
{
return JsonConvert.SerializeObject(value, serializerSettings);
return new EventData { Type = eventType, Payload = content, Metadata = headers };
}
}
}

8
src/Squidex.Infrastructure/EventSourcing/EventData.cs

@ -5,17 +5,15 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using Newtonsoft.Json.Linq;
namespace Squidex.Infrastructure.EventSourcing
{
public class EventData
{
public Guid EventId { get; set; }
public JToken Payload { get; set; }
public string Payload { get; set; }
public string Metadata { get; set; }
public JToken Metadata { get; set; }
public string Type { get; set; }
}

12
src/Squidex.Infrastructure/EventSourcing/IEventStore.cs

@ -14,13 +14,17 @@ namespace Squidex.Infrastructure.EventSourcing
{
public interface IEventStore
{
Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, long streamPosition = 0);
Task CreateIndexAsync(string property);
Task GetEventsAsync(Func<StoredEvent, Task> callback, string streamFilter = null, string position = null, CancellationToken cancellationToken = default(CancellationToken));
Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0);
Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events);
Task QueryAsync(Func<StoredEvent, Task> callback, string streamFilter = null, string position = null, CancellationToken ct = default(CancellationToken));
Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events);
Task QueryAsync(Func<StoredEvent, Task> callback, string property, object value, string position = null, CancellationToken ct = default(CancellationToken));
Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events);
Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events);
IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null);
}

2
src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs

@ -46,7 +46,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
try
{
await eventStore.GetEventsAsync(async storedEvent =>
await eventStore.QueryAsync(async storedEvent =>
{
await eventSubscriber.OnEventAsync(this, storedEvent);

26
src/Squidex.Infrastructure/EventSourcing/StoredEvent.cs

@ -9,33 +9,21 @@ namespace Squidex.Infrastructure.EventSourcing
{
public sealed class StoredEvent
{
private readonly string eventPosition;
private readonly long eventStreamNumber;
private readonly EventData data;
public string EventPosition { get; }
public string EventPosition
{
get { return eventPosition; }
}
public long EventStreamNumber
{
get { return eventStreamNumber; }
}
public long EventStreamNumber { get; }
public EventData Data
{
get { return data; }
}
public EventData Data { get; }
public StoredEvent(string eventPosition, long eventStreamNumber, EventData data)
{
Guard.NotNullOrEmpty(eventPosition, nameof(eventPosition));
Guard.NotNull(data, nameof(data));
this.data = data;
this.eventPosition = eventPosition;
this.eventStreamNumber = eventStreamNumber;
Data = data;
EventPosition = eventPosition;
EventStreamNumber = eventStreamNumber;
}
}
}

14
src/Squidex.Infrastructure/Json/NamedGuidIdConverter.cs

@ -6,7 +6,6 @@
// ==========================================================================
using System;
using System.Linq;
using Newtonsoft.Json;
namespace Squidex.Infrastructure.Json
@ -25,19 +24,14 @@ namespace Squidex.Infrastructure.Json
throw new JsonException($"Expected String, but got {reader.TokenType}.");
}
var parts = reader.Value.ToString().Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
if (parts.Length < 2)
try
{
throw new JsonException("Named id must have more than 2 parts divided by commata.");
return NamedId<Guid>.Parse(reader.Value.ToString(), Guid.TryParse);
}
if (!Guid.TryParse(parts[0], out var id))
catch (ArgumentException ex)
{
throw new JsonException("Named id must be a valid guid.");
throw new JsonException(ex.Message);
}
return new NamedId<Guid>(id, string.Join(",", parts.Skip(1)));
}
}
}

14
src/Squidex.Infrastructure/Json/NamedLongIdConverter.cs

@ -6,7 +6,6 @@
// ==========================================================================
using System;
using System.Linq;
using Newtonsoft.Json;
namespace Squidex.Infrastructure.Json
@ -25,19 +24,14 @@ namespace Squidex.Infrastructure.Json
throw new JsonException($"Expected String, but got {reader.TokenType}.");
}
var parts = reader.Value.ToString().Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
if (parts.Length < 2)
try
{
throw new JsonException("Named id must have more than 2 parts divided by commata.");
return NamedId<long>.Parse(reader.Value.ToString(), long.TryParse);
}
if (!long.TryParse(parts[0], out var id))
catch (ArgumentException ex)
{
throw new JsonException("Named id must be a valid long.");
throw new JsonException(ex.Message);
}
return new NamedId<long>(id, string.Join(",", parts.Skip(1)));
}
}
}

7
src/Squidex.Infrastructure/Migrations/IMigration.cs

@ -5,17 +5,12 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.Migrations
{
public interface IMigration
{
int FromVersion { get; }
int ToVersion { get; }
Task UpdateAsync(IEnumerable<IMigration> previousMigrations);
Task UpdateAsync();
}
}

16
src/Squidex.Infrastructure/Migrations/IMigrationPath.cs

@ -0,0 +1,16 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
namespace Squidex.Infrastructure.Migrations
{
public interface IMigrationPath
{
(int Version, IEnumerable<IMigration> Migrations) GetNext(int version);
}
}

60
src/Squidex.Infrastructure/Migrations/Migrator.cs

@ -5,8 +5,6 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
@ -15,20 +13,20 @@ namespace Squidex.Infrastructure.Migrations
{
public sealed class Migrator
{
private readonly IMigrationStatus migrationStatus;
private readonly IEnumerable<IMigration> migrations;
private readonly ISemanticLog log;
private readonly IMigrationStatus migrationStatus;
private readonly IMigrationPath migrationPath;
public int LockWaitMs { get; set; } = 5000;
public int LockWaitMs { get; set; } = 500;
public Migrator(IMigrationStatus migrationStatus, IEnumerable<IMigration> migrations, ISemanticLog log)
public Migrator(IMigrationStatus migrationStatus, IMigrationPath migrationPath, ISemanticLog log)
{
Guard.NotNull(migrationStatus, nameof(migrationStatus));
Guard.NotNull(migrations, nameof(migrations));
Guard.NotNull(migrationPath, nameof(migrationPath));
Guard.NotNull(log, nameof(log));
this.migrationStatus = migrationStatus;
this.migrations = migrations.OrderByDescending(x => x.ToVersion).ToList();
this.migrationPath = migrationPath;
this.log = log;
}
@ -39,8 +37,6 @@ namespace Squidex.Infrastructure.Migrations
try
{
var lastMigrator = migrations.FirstOrDefault();
while (!await migrationStatus.TryLockAsync())
{
log.LogInformation(w => w
@ -52,13 +48,16 @@ namespace Squidex.Infrastructure.Migrations
version = await migrationStatus.GetVersionAsync();
if (lastMigrator != null && lastMigrator.ToVersion != version)
while (true)
{
var migrationPath = FindMigratorPath(version, lastMigrator.ToVersion).ToList();
var migrationStep = migrationPath.GetNext(version);
var previousMigrations = new List<IMigration>();
if (migrationStep.Migrations == null || !migrationStep.Migrations.Any())
{
break;
}
foreach (var migration in migrationPath)
foreach (var migration in migrationStep.Migrations)
{
var name = migration.GetType().ToString();
@ -72,13 +71,11 @@ namespace Squidex.Infrastructure.Migrations
.WriteProperty("status", "Completed")
.WriteProperty("migrator", name)))
{
await migration.UpdateAsync(previousMigrations.ToList());
version = migration.ToVersion;
await migration.UpdateAsync();
}
previousMigrations.Add(migration);
}
version = migrationStep.Version;
}
}
finally
@ -86,30 +83,5 @@ namespace Squidex.Infrastructure.Migrations
await migrationStatus.UnlockAsync(version);
}
}
private IEnumerable<IMigration> FindMigratorPath(int fromVersion, int toVersion)
{
var addedMigrators = new HashSet<IMigration>();
while (true)
{
var bestMigrator = migrations.Where(x => x.FromVersion < x.ToVersion).FirstOrDefault(x => x.FromVersion == fromVersion);
if (bestMigrator != null && addedMigrators.Add(bestMigrator))
{
fromVersion = bestMigrator.ToVersion;
yield return bestMigrator;
}
else if (fromVersion != toVersion)
{
throw new InvalidOperationException($"There is no migration path from {fromVersion} to {toVersion}.");
}
else
{
break;
}
}
}
}
}

22
src/Squidex.Infrastructure/NamedId.cs

@ -6,9 +6,12 @@
// ==========================================================================
using System;
using System.Linq;
namespace Squidex.Infrastructure
{
public delegate bool Parser<T>(string input, out T result);
public sealed class NamedId<T> : IEquatable<NamedId<T>>
{
public T Id { get; }
@ -44,5 +47,24 @@ namespace Squidex.Infrastructure
{
return (Id.GetHashCode() * 397) ^ Name.GetHashCode();
}
public static NamedId<T> Parse(string value, Parser<T> parser)
{
Guard.NotNull(value, nameof(value));
var parts = value.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
if (parts.Length < 2)
{
throw new ArgumentException("Named id must have more than 2 parts divided by commata.");
}
if (!parser(parts[0], out var id))
{
throw new ArgumentException("Named id must be a valid guid.");
}
return new NamedId<T>(id, string.Join(",", parts.Skip(1)));
}
}
}

3
src/Squidex.Infrastructure/SquidexInfrastructure.cs

@ -5,9 +5,12 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Reflection;
namespace Squidex.Infrastructure
{
public static class SquidexInfrastructure
{
public static readonly Assembly Assembly = typeof(SquidexInfrastructure).Assembly;
}
}

4
src/Squidex.Infrastructure/States/Persistence{TOwner,TSnapshot,TKey}.cs

@ -101,7 +101,7 @@ namespace Squidex.Infrastructure.States
{
if (UseEventSourcing())
{
var events = await eventStore.GetEventsAsync(GetStreamName(), versionEvents + 1);
var events = await eventStore.QueryAsync(GetStreamName(), versionEvents + 1);
foreach (var @event in events)
{
@ -160,7 +160,7 @@ namespace Squidex.Infrastructure.States
try
{
await eventStore.AppendEventsAsync(commitId, GetStreamName(), expectedVersion, eventData);
await eventStore.AppendAsync(commitId, GetStreamName(), expectedVersion, eventData);
}
catch (WrongEventVersionException ex)
{

196
src/Squidex/Areas/Api/Controllers/Content/ContentsController.cs

@ -44,12 +44,24 @@ namespace Squidex.Areas.Api.Controllers.Contents
this.graphQl = graphQl;
}
/// <summary>
/// GraphQL endpoint.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="query">The graphql endpoint.</param>
/// <returns>
/// 200 => Contents retrieved or mutated.
/// 404 => Schema or app not found.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppReader]
[HttpGet]
[HttpPost]
[Route("content/{app}/graphql/")]
[ApiCosts(2)]
public async Task<IActionResult> PostGraphQL([FromBody] GraphQLQuery query)
public async Task<IActionResult> PostGraphQL(string app, [FromBody] GraphQLQuery query)
{
var result = await graphQl.QueryAsync(App, User, query);
@ -63,11 +75,25 @@ namespace Squidex.Areas.Api.Controllers.Contents
}
}
/// <summary>
/// Queries contents.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="ids">The optional ids of the content to fetch.</param>
/// <param name="archived">Indicates whether to query content items from the archive.</param>
/// <returns>
/// 200 => Contents retrieved.
/// 404 => Schema or app not found.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppReader]
[HttpGet]
[Route("content/{app}/{name}/")]
[ApiCosts(2)]
public async Task<IActionResult> GetContents(string name, [FromQuery] bool archived = false, [FromQuery] string ids = null)
public async Task<IActionResult> GetContents(string app, string name, [FromQuery] bool archived = false, [FromQuery] string ids = null)
{
HashSet<Guid> idsList = null;
@ -112,11 +138,24 @@ namespace Squidex.Areas.Api.Controllers.Contents
return Ok(response);
}
/// <summary>
/// Get a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content to fetch.</param>
/// <returns>
/// 200 => Content found.
/// 404 => Content, schema or app not found.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppReader]
[HttpGet]
[Route("content/{app}/{name}/{id}/")]
[ApiCosts(1)]
public async Task<IActionResult> GetContent(string name, Guid id)
public async Task<IActionResult> GetContent(string app, string name, Guid id)
{
var (schema, entity) = await contentQuery.FindContentAsync(App, name, User, id);
@ -135,11 +174,26 @@ namespace Squidex.Areas.Api.Controllers.Contents
return Ok(response);
}
/// <summary>
/// Get a content item with a specific version.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content to fetch.</param>
/// <param name="version">The version fo the content to fetch.</param>
/// <returns>
/// 200 => Content found.
/// 404 => Content, schema or app not found.
/// 400 => Content data is not valid.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppReader]
[HttpGet]
[Route("content/{app}/{name}/{id}/{version}/")]
[ApiCosts(1)]
public async Task<IActionResult> GetContentVersion(string name, Guid id, int version)
public async Task<IActionResult> GetContentVersion(string app, string name, Guid id, int version)
{
var content = await contentQuery.FindContentAsync(App, name, User, id, version);
@ -157,11 +211,26 @@ namespace Squidex.Areas.Api.Controllers.Contents
return Ok(response.Data);
}
/// <summary>
/// Create a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="request">The full data for the content item.</param>
/// <param name="publish">Indicates whether the content should be published immediately.</param>
/// <returns>
/// 201 => Content created.
/// 404 => Content, schema or app not found.
/// 400 => Content data is not valid.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppEditor]
[HttpPost]
[Route("content/{app}/{name}/")]
[ApiCosts(1)]
public async Task<IActionResult> PostContent(string name, [FromBody] NamedContentData request, [FromQuery] bool publish = false)
public async Task<IActionResult> PostContent(string app, string name, [FromBody] NamedContentData request, [FromQuery] bool publish = false)
{
await contentQuery.FindSchemaAsync(App, name);
@ -175,11 +244,26 @@ namespace Squidex.Areas.Api.Controllers.Contents
return CreatedAtAction(nameof(GetContent), new { id = command.ContentId }, response);
}
/// <summary>
/// Update a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content item to update.</param>
/// <param name="request">The full data for the content item.</param>
/// <returns>
/// 200 => Content updated.
/// 404 => Content, schema or app not found.
/// 400 => Content data is not valid.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppEditor]
[HttpPut]
[Route("content/{app}/{name}/{id}/")]
[ApiCosts(1)]
public async Task<IActionResult> PutContent(string name, Guid id, [FromBody] NamedContentData request)
public async Task<IActionResult> PutContent(string app, string name, Guid id, [FromBody] NamedContentData request)
{
await contentQuery.FindSchemaAsync(App, name);
@ -193,11 +277,26 @@ namespace Squidex.Areas.Api.Controllers.Contents
return Ok(response);
}
/// <summary>
/// Patchs a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content item to patch.</param>
/// <param name="request">The patch for the content item.</param>
/// <returns>
/// 200 => Content patched.
/// 404 => Content, schema or app not found.
/// 400 => Content patch is not valid.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppEditor]
[HttpPatch]
[Route("content/{app}/{name}/{id}/")]
[ApiCosts(1)]
public async Task<IActionResult> PatchContent(string name, Guid id, [FromBody] NamedContentData request)
public async Task<IActionResult> PatchContent(string app, string name, Guid id, [FromBody] NamedContentData request)
{
await contentQuery.FindSchemaAsync(App, name);
@ -211,11 +310,26 @@ namespace Squidex.Areas.Api.Controllers.Contents
return Ok(response);
}
/// <summary>
/// Publish a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content item to publish.</param>
/// <param name="dueTime">The date and time when the content should be published.</param>
/// <returns>
/// 204 => Content published.
/// 404 => Content, schema or app not found.
/// 400 => Content was already published.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppEditor]
[HttpPut]
[Route("content/{app}/{name}/{id}/publish/")]
[ApiCosts(1)]
public async Task<IActionResult> PublishContent(string name, Guid id, string dueTime = null)
public async Task<IActionResult> PublishContent(string app, string name, Guid id, string dueTime = null)
{
await contentQuery.FindSchemaAsync(App, name);
@ -226,11 +340,26 @@ namespace Squidex.Areas.Api.Controllers.Contents
return NoContent();
}
/// <summary>
/// Unpublish a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content item to unpublish.</param>
/// <param name="dueTime">The date and time when the content should be unpublished.</param>
/// <returns>
/// 204 => Content unpublished.
/// 404 => Content, schema or app not found.
/// 400 => Content was not published.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppEditor]
[HttpPut]
[Route("content/{app}/{name}/{id}/unpublish/")]
[ApiCosts(1)]
public async Task<IActionResult> UnpublishContent(string name, Guid id, string dueTime = null)
public async Task<IActionResult> UnpublishContent(string app, string name, Guid id, string dueTime = null)
{
await contentQuery.FindSchemaAsync(App, name);
@ -241,11 +370,26 @@ namespace Squidex.Areas.Api.Controllers.Contents
return NoContent();
}
/// <summary>
/// Archive a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content item to archive.</param>
/// <param name="dueTime">The date and time when the content should be archived.</param>
/// <returns>
/// 204 => Content archived.
/// 404 => Content, schema or app not found.
/// 400 => Content was already archived.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppEditor]
[HttpPut]
[Route("content/{app}/{name}/{id}/archive/")]
[ApiCosts(1)]
public async Task<IActionResult> ArchiveContent(string name, Guid id, string dueTime = null)
public async Task<IActionResult> ArchiveContent(string app, string name, Guid id, string dueTime = null)
{
await contentQuery.FindSchemaAsync(App, name);
@ -256,11 +400,26 @@ namespace Squidex.Areas.Api.Controllers.Contents
return NoContent();
}
/// <summary>
/// Restore a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content item to restore.</param>
/// <param name="dueTime">The date and time when the content should be restored.</param>
/// <returns>
/// 204 => Content restored.
/// 404 => Content, schema or app not found.
/// 400 => Content was not archived.
/// </returns>
/// <remarks>
/// You can read the generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppEditor]
[HttpPut]
[Route("content/{app}/{name}/{id}/restore/")]
[ApiCosts(1)]
public async Task<IActionResult> RestoreContent(string name, Guid id, string dueTime = null)
public async Task<IActionResult> RestoreContent(string app, string name, Guid id, string dueTime = null)
{
await contentQuery.FindSchemaAsync(App, name);
@ -271,11 +430,24 @@ namespace Squidex.Areas.Api.Controllers.Contents
return NoContent();
}
/// <summary>
/// Delete a content item.
/// </summary>
/// <param name="app">The name of the app.</param>
/// <param name="name">The name of the schema.</param>
/// <param name="id">The id of the content item to delete.</param>
/// <returns>
/// 204 => Content has been deleted.
/// 404 => Content, schema or app not found.
/// </returns>
/// <remarks>
/// You can create an generated documentation for your app at /api/content/{appName}/docs
/// </remarks>
[MustBeAppEditor]
[HttpDelete]
[Route("content/{app}/{name}/{id}/")]
[ApiCosts(1)]
public async Task<IActionResult> DeleteContent(string name, Guid id)
public async Task<IActionResult> DeleteContent(string app, string name, Guid id)
{
await contentQuery.FindSchemaAsync(App, name);

14
src/Squidex/Areas/Api/Controllers/Content/Generator/SchemaSwaggerGenerator.cs

@ -148,7 +148,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Generator
operation.AddBodyParameter("data", dataSchema, SchemaBodyDescription);
operation.AddQueryParameter("publish", JsonObjectType.Boolean, "Set to true to autopublish content.");
operation.AddResponse("201", $"{schemaName} created.", contentSchema);
operation.AddResponse("201", $"{schemaName} content created.", contentSchema);
});
}
@ -162,7 +162,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Generator
operation.AddBodyParameter("data", dataSchema, SchemaBodyDescription);
operation.AddResponse("201", $"{schemaName} item updated.", dataSchema);
operation.AddResponse("200", $"{schemaName} content updated.", dataSchema);
});
}
@ -176,7 +176,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Generator
operation.AddBodyParameter("data", dataSchema, SchemaBodyDescription);
operation.AddResponse("201", $"{schemaName} item patched.", dataSchema);
operation.AddResponse("200", $"{schemaName} content patched.", dataSchema);
});
}
@ -188,7 +188,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Generator
operation.Summary = $"Publish a {schemaName} content.";
operation.Security = EditorSecurity;
operation.AddResponse("204", $"{schemaName} item published.");
operation.AddResponse("204", $"{schemaName} content published.");
});
}
@ -200,7 +200,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Generator
operation.Summary = $"Unpublish a {schemaName} content.";
operation.Security = EditorSecurity;
operation.AddResponse("204", $"{schemaName} item unpublished.");
operation.AddResponse("204", $"{schemaName} content unpublished.");
});
}
@ -212,7 +212,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Generator
operation.Summary = $"Archive a {schemaName} content.";
operation.Security = EditorSecurity;
operation.AddResponse("204", $"{schemaName} item restored.");
operation.AddResponse("204", $"{schemaName} content restored.");
});
}
@ -224,7 +224,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Generator
operation.Summary = $"Restore a {schemaName} content.";
operation.Security = EditorSecurity;
operation.AddResponse("204", $"{schemaName} item restored.");
operation.AddResponse("204", $"{schemaName} content restored.");
});
}

2
src/Squidex/Config/Domain/InfrastructureServices.cs

@ -89,7 +89,7 @@ namespace Squidex.Config.Domain
services.AddSingletonAs<ImageSharpAssetThumbnailGenerator>()
.As<IAssetThumbnailGenerator>();
services.AddSingletonAs<JsonEventDataFormatter>()
services.AddSingletonAs<DefaultEventDataFormatter>()
.As<IEventDataFormatter>();
services.AddSingletonAs<Migrator>()

8
src/Squidex/Config/Domain/SerializationServices.cs

@ -28,10 +28,10 @@ namespace Squidex.Config.Domain
{
private static readonly TypeNameRegistry TypeNameRegistry =
new TypeNameRegistry()
.MapUnmapped(typeof(SquidexCoreModel).Assembly)
.MapUnmapped(typeof(SquidexEvents).Assembly)
.MapUnmapped(typeof(SquidexInfrastructure).Assembly)
.MapUnmapped(typeof(SquidexMigrations).Assembly);
.MapUnmapped(SquidexCoreModel.Assembly)
.MapUnmapped(SquidexEvents.Assembly)
.MapUnmapped(SquidexInfrastructure.Assembly)
.MapUnmapped(SquidexMigrations.Assembly);
private static readonly FieldRegistry FieldRegistry = new FieldRegistry(TypeNameRegistry);

14
src/Squidex/Config/Domain/WriteServices.cs

@ -9,6 +9,7 @@ using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Migrate_01;
using Migrate_01.Migrations;
using Squidex.Domain.Apps.Core.Apps;
using Squidex.Domain.Apps.Core.Scripting;
using Squidex.Domain.Apps.Entities.Apps;
@ -67,19 +68,22 @@ namespace Squidex.Config.Domain
services.AddSingletonAs<CreateBlogCommandMiddleware>()
.As<ICommandMiddleware>();
services.AddTransientAs<Migration01_FromCqrs>()
services.AddTransientAs<MigrationPath>()
.As<IMigrationPath>();
services.AddTransientAs<ConvertEventStore>()
.As<IMigration>();
services.AddTransientAs<Migration02_AddPatterns>()
services.AddTransientAs<AddPatterns>()
.As<IMigration>();
services.AddTransientAs<Migration03_SplitContentCollections>()
services.AddTransientAs<RebuildContents>()
.As<IMigration>();
services.AddTransientAs<Migration04_FlattenAssetEntity>()
services.AddTransientAs<RebuildSnapshots>()
.As<IMigration>();
services.AddTransientAs<Migration05_RebuildForNewCommands>()
services.AddTransientAs<RebuildAssets>()
.As<IMigration>();
services.AddTransientAs<Rebuilder>()

17
tests/Squidex.Infrastructure.Tests/Commands/DomainObjectBaseTests.cs

@ -36,23 +36,6 @@ namespace Squidex.Infrastructure.Commands
Assert.Equal(EtagVersion.Empty, sut.Version);
}
[Fact]
public void Should_add_event_to_uncommitted_events_and_increase_version_when_raised()
{
var event1 = new MyEvent();
var event2 = new MyEvent();
sut.RaiseEvent(event1);
sut.RaiseEvent(event2);
Assert.Equal(1, sut.Version);
Assert.Equal(new IEvent[] { event1, event2 }, sut.GetUncomittedEvents().Select(x => x.Payload).ToArray());
sut.ClearUncommittedEvents();
Assert.Equal(0, sut.GetUncomittedEvents().Count);
}
[Fact]
public async Task Should_write_state_and_events_when_saved()
{

8
tests/Squidex.Infrastructure.Tests/EventSourcing/EventDataFormatterTests.cs → tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventDataFormatterTests.cs

@ -15,7 +15,7 @@ using Xunit;
namespace Squidex.Infrastructure.EventSourcing
{
public class EventDataFormatterTests
public class DefaultEventDataFormatterTests
{
public sealed class MyOldEvent : IEvent, IMigratedEvent
{
@ -29,16 +29,16 @@ namespace Squidex.Infrastructure.EventSourcing
private readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings();
private readonly TypeNameRegistry typeNameRegistry = new TypeNameRegistry();
private readonly JsonEventDataFormatter sut;
private readonly DefaultEventDataFormatter sut;
public EventDataFormatterTests()
public DefaultEventDataFormatterTests()
{
serializerSettings.Converters.Add(new PropertiesBagConverter<EnvelopeHeaders>());
typeNameRegistry.Map(typeof(MyEvent), "Event");
typeNameRegistry.Map(typeof(MyOldEvent), "OldEvent");
sut = new JsonEventDataFormatter(typeNameRegistry, serializerSettings);
sut = new DefaultEventDataFormatter(typeNameRegistry, JsonSerializer.Create(serializerSettings));
}
[Fact]

12
tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs

@ -27,7 +27,7 @@ namespace Squidex.Infrastructure.EventSourcing
await WaitAndStopAsync(sut);
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
@ -36,7 +36,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
var ex = new InvalidOperationException();
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
@ -52,7 +52,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
var ex = new OperationCanceledException();
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
@ -68,7 +68,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
var ex = new AggregateException(new OperationCanceledException());
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
@ -88,7 +88,7 @@ namespace Squidex.Infrastructure.EventSourcing
await WaitAndStopAsync(sut);
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
@ -101,7 +101,7 @@ namespace Squidex.Infrastructure.EventSourcing
await WaitAndStopAsync(sut);
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
}

79
tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs

@ -19,7 +19,9 @@ namespace Squidex.Infrastructure.Migrations
public class MigratorTests
{
private readonly IMigrationStatus status = A.Fake<IMigrationStatus>();
private readonly IMigrationPath path = A.Fake<IMigrationPath>();
private readonly ISemanticLog log = A.Fake<ISemanticLog>();
private readonly List<(int From, int To, IMigration Migration)> migrations = new List<(int From, int To, IMigration Migration)>();
public sealed class InMemoryStatus : IMigrationStatus
{
@ -64,6 +66,14 @@ namespace Squidex.Infrastructure.Migrations
public MigratorTests()
{
A.CallTo(() => path.GetNext(A<int>.Ignored))
.ReturnsLazily((int v) =>
{
var m = migrations.Where(x => x.From == v).ToList();
return m.Count == 0 ? (0, null) : (migrations.Max(x => x.To), migrations.Select(x => x.Migration));
});
A.CallTo(() => status.GetVersionAsync()).Returns(0);
A.CallTo(() => status.TryLockAsync()).Returns(true);
}
@ -75,13 +85,13 @@ namespace Squidex.Infrastructure.Migrations
var migrator_1_2 = BuildMigration(1, 2);
var migrator_2_3 = BuildMigration(2, 3);
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, log);
var sut = new Migrator(status, path, log);
await migrator.MigrateAsync();
await sut.MigrateAsync();
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.That.IsEmpty())).MustHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.That.IsSameSequenceAs(migrator_0_1))).MustHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync(A<IEnumerable<IMigration>>.That.IsSameSequenceAs(migrator_0_1, migrator_1_2))).MustHaveHappened();
A.CallTo(() => migrator_0_1.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync()).MustHaveHappened();
A.CallTo(() => status.UnlockAsync(3)).MustHaveHappened();
}
@ -93,51 +103,15 @@ namespace Squidex.Infrastructure.Migrations
var migrator_1_2 = BuildMigration(1, 2);
var migrator_2_3 = BuildMigration(2, 3);
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_1_2, migrator_2_3 }, log);
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).Throws(new ArgumentException());
await Assert.ThrowsAsync<ArgumentException>(migrator.MigrateAsync);
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.That.IsEmpty())).MustHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.That.IsSameSequenceAs(migrator_0_1))).MustHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => status.UnlockAsync(1)).MustHaveHappened();
}
[Fact]
public async Task Should_migrate_with_fastest_path()
{
var migrator_0_1 = BuildMigration(0, 1);
var migrator_0_2 = BuildMigration(0, 2);
var migrator_1_2 = BuildMigration(1, 2);
var migrator_2_3 = BuildMigration(2, 3);
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_0_2, migrator_1_2, migrator_2_3 }, log);
await migrator.MigrateAsync();
A.CallTo(() => migrator_0_2.UpdateAsync(A<IEnumerable<IMigration>>.That.IsEmpty())).MustHaveHappened();
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync(A<IEnumerable<IMigration>>.That.IsSameSequenceAs(migrator_0_2))).MustHaveHappened();
A.CallTo(() => status.UnlockAsync(3)).MustHaveHappened();
}
[Fact]
public async Task Should_throw_if_no_path_found()
{
var migrator_0_1 = BuildMigration(0, 1);
var migrator_2_3 = BuildMigration(2, 3);
var sut = new Migrator(status, path, log);
var migrator = new Migrator(status, new[] { migrator_0_1, migrator_2_3 }, log);
A.CallTo(() => migrator_1_2.UpdateAsync()).Throws(new ArgumentException());
await Assert.ThrowsAsync<InvalidOperationException>(migrator.MigrateAsync);
await Assert.ThrowsAsync<ArgumentException>(sut.MigrateAsync);
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustNotHaveHappened();
A.CallTo(() => migrator_0_1.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_1_2.UpdateAsync()).MustHaveHappened();
A.CallTo(() => migrator_2_3.UpdateAsync()).MustNotHaveHappened();
A.CallTo(() => status.UnlockAsync(0)).MustHaveHappened();
}
@ -148,20 +122,19 @@ namespace Squidex.Infrastructure.Migrations
var migrator_0_1 = BuildMigration(0, 1);
var migrator_1_2 = BuildMigration(1, 2);
var migrator = new Migrator(new InMemoryStatus(), new[] { migrator_0_1, migrator_1_2 }, log) { LockWaitMs = 2 };
var sut = new Migrator(new InMemoryStatus(), path, log) { LockWaitMs = 2 };
await Task.WhenAll(Enumerable.Repeat(0, 10).Select(x => Task.Run(migrator.MigrateAsync)));
await Task.WhenAll(Enumerable.Repeat(0, 10).Select(x => Task.Run(sut.MigrateAsync)));
A.CallTo(() => migrator_0_1.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => migrator_1_2.UpdateAsync(A<IEnumerable<IMigration>>.Ignored)).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => migrator_0_1.UpdateAsync()).MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => migrator_1_2.UpdateAsync()).MustHaveHappened(Repeated.Exactly.Once);
}
private IMigration BuildMigration(int fromVersion, int toVersion)
{
var migration = A.Fake<IMigration>();
A.CallTo(() => migration.FromVersion).Returns(fromVersion);
A.CallTo(() => migration.ToVersion).Returns(toVersion);
migrations.Add((fromVersion, toVersion, migration));
return migration;
}

14
tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs

@ -118,7 +118,7 @@ namespace Squidex.Infrastructure.States
await sut.GetSingleAsync<MyStatefulObjectWithSnapshot>(key);
A.CallTo(() => eventStore.GetEventsAsync(key, 3))
A.CallTo(() => eventStore.QueryAsync(key, 3))
.MustHaveHappened();
}
@ -199,9 +199,9 @@ namespace Squidex.Infrastructure.States
await statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent());
await statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent());
A.CallTo(() => eventStore.AppendEventsAsync(A<Guid>.Ignored, key, 2, A<ICollection<EventData>>.That.Matches(x => x.Count == 2)))
A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, key, 2, A<ICollection<EventData>>.That.Matches(x => x.Count == 2)))
.MustHaveHappened();
A.CallTo(() => eventStore.AppendEventsAsync(A<Guid>.Ignored, key, 4, A<ICollection<EventData>>.That.Matches(x => x.Count == 2)))
A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, key, 4, A<ICollection<EventData>>.That.Matches(x => x.Count == 2)))
.MustHaveHappened();
}
@ -212,7 +212,7 @@ namespace Squidex.Infrastructure.States
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
A.CallTo(() => eventStore.AppendEventsAsync(A<Guid>.Ignored, key, 2, A<ICollection<EventData>>.That.Matches(x => x.Count == 2)))
A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, key, 2, A<ICollection<EventData>>.That.Matches(x => x.Count == 2)))
.Throws(new WrongEventVersionException(1, 1));
await Assert.ThrowsAsync<DomainObjectVersionException>(() => statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent()));
@ -221,7 +221,7 @@ namespace Squidex.Infrastructure.States
[Fact]
public async Task Should_not_remove_from_cache_when_write_failed()
{
A.CallTo(() => eventStore.AppendEventsAsync(A<Guid>.Ignored, A<string>.Ignored, A<long>.Ignored, A<ICollection<EventData>>.Ignored))
A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, A<string>.Ignored, A<long>.Ignored, A<ICollection<EventData>>.Ignored))
.Throws(new InvalidOperationException());
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
@ -251,7 +251,7 @@ namespace Squidex.Infrastructure.States
Assert.Same(retrievedStates[0], retrievedState);
}
A.CallTo(() => eventStore.GetEventsAsync(key, 0))
A.CallTo(() => eventStore.QueryAsync(key, 0))
.MustHaveHappened(Repeated.Exactly.Once);
}
@ -284,7 +284,7 @@ namespace Squidex.Infrastructure.States
i++;
}
A.CallTo(() => eventStore.GetEventsAsync(key, readPosition))
A.CallTo(() => eventStore.QueryAsync(key, readPosition))
.Returns(eventsStored);
}
}

1
tools/Migrate_01/Migrate_01.csproj

@ -6,6 +6,7 @@
<ProjectReference Include="..\..\src\Squidex.Domain.Apps.Core.Model\Squidex.Domain.Apps.Core.Model.csproj" />
<ProjectReference Include="..\..\src\Squidex.Domain.Apps.Entities\Squidex.Domain.Apps.Entities.csproj" />
<ProjectReference Include="..\..\src\Squidex.Domain.Apps.Events\Squidex.Domain.Apps.Events.csproj" />
<ProjectReference Include="..\..\src\Squidex.Infrastructure.MongoDb\Squidex.Infrastructure.MongoDb.csproj" />
<ProjectReference Include="..\..\src\Squidex.Infrastructure\Squidex.Infrastructure.csproj" />
</ItemGroup>
<PropertyGroup>

38
tools/Migrate_01/Migration05_RebuildForNewCommands.cs

@ -1,38 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Squidex.Infrastructure.Migrations;
namespace Migrate_01
{
public sealed class Migration05_RebuildForNewCommands : IMigration
{
private readonly Rebuilder rebuilder;
public int FromVersion { get; } = 4;
public int ToVersion { get; } = 5;
public Migration05_RebuildForNewCommands(Rebuilder rebuilder)
{
this.rebuilder = rebuilder;
}
public async Task UpdateAsync(IEnumerable<IMigration> previousMigrations)
{
if (!previousMigrations.Any(x => x is Migration01_FromCqrs))
{
await rebuilder.RebuildConfigAsync();
await rebuilder.RebuildContentAsync();
await rebuilder.RebuildAssetsAsync();
}
}
}
}

56
tools/Migrate_01/MigrationPath.cs

@ -0,0 +1,56 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using Microsoft.Extensions.DependencyInjection;
using Migrate_01.Migrations;
using Squidex.Infrastructure.Migrations;
namespace Migrate_01
{
public sealed class MigrationPath : IMigrationPath
{
private const int CurrentVersion = 6;
private readonly IServiceProvider serviceProvider;
public MigrationPath(IServiceProvider serviceProvider)
{
this.serviceProvider = serviceProvider;
}
public (int Version, IEnumerable<IMigration> Migrations) GetNext(int version)
{
if (version == CurrentVersion)
{
return (CurrentVersion, null);
}
var migrations = new List<IMigration>();
// Version 6: Convert Event store. Must always be executed first.
if (version < 6)
{
migrations.Add(serviceProvider.GetRequiredService<ConvertEventStore>());
}
// Version 5: Fixes the broken command architecture and requires a rebuild of all snapshots.
if (version < 5)
{
migrations.Add(serviceProvider.GetRequiredService<RebuildSnapshots>());
}
// Version 1: Introduce App patterns.
if (version <= 1)
{
migrations.Add(serviceProvider.GetRequiredService<AddPatterns>());
}
return (CurrentVersion, migrations);
}
}
}

13
tools/Migrate_01/Migration02_AddPatterns.cs → tools/Migrate_01/Migrations/AddPatterns.cs

@ -6,7 +6,6 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Entities.Apps;
using Squidex.Domain.Apps.Entities.Apps.Commands;
@ -14,26 +13,22 @@ using Squidex.Domain.Apps.Entities.Apps.Repositories;
using Squidex.Infrastructure.Migrations;
using Squidex.Infrastructure.States;
namespace Migrate_01
namespace Migrate_01.Migrations
{
public sealed class Migration02_AddPatterns : IMigration
public sealed class AddPatterns : IMigration
{
private readonly InitialPatterns initialPatterns;
private readonly IStateFactory stateFactory;
private readonly IAppRepository appRepository;
public int FromVersion { get; } = 1;
public int ToVersion { get; } = 2;
public Migration02_AddPatterns(InitialPatterns initialPatterns, IAppRepository appRepository, IStateFactory stateFactory)
public AddPatterns(InitialPatterns initialPatterns, IAppRepository appRepository, IStateFactory stateFactory)
{
this.initialPatterns = initialPatterns;
this.appRepository = appRepository;
this.stateFactory = stateFactory;
}
public async Task UpdateAsync(IEnumerable<IMigration> previousMigrations)
public async Task UpdateAsync()
{
var ids = await appRepository.QueryAppIdsAsync();

59
tools/Migrate_01/Migrations/ConvertEventStore.cs

@ -0,0 +1,59 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
using Newtonsoft.Json.Linq;
using Squidex.Domain.Apps.Events;
using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Migrations;
using Squidex.Infrastructure.MongoDb;
namespace Migrate_01.Migrations
{
public sealed class ConvertEventStore : IMigration
{
private readonly IEventStore eventStore;
public ConvertEventStore(IEventStore eventStore)
{
this.eventStore = eventStore;
}
public async Task UpdateAsync()
{
if (eventStore is MongoEventStore mongoEventStore)
{
var collection = mongoEventStore.RawCollection;
var filter = Builders<BsonDocument>.Filter;
await collection.Find(new BsonDocument()).ForEachAsync(async commit =>
{
foreach (BsonDocument @event in commit["Events"].AsBsonArray)
{
var meta = JObject.Parse(@event["Metadata"].AsString);
var data = JObject.Parse(@event["Payload"].AsString);
if (data.TryGetValue("appId", out var appId))
{
meta[SquidexHeaders.AppId] = NamedId<Guid>.Parse(appId.ToString(), Guid.TryParse).Id;
}
@event.Remove("EventId");
@event["Metadata"] = meta.ToBson();
}
await collection.ReplaceOneAsync(filter.Eq("_id", commit["_id"].AsString), commit);
});
}
}
}
}

15
tools/Migrate_01/Migration04_FlattenAssetEntity.cs → tools/Migrate_01/Migrations/RebuildAssets.cs

@ -12,25 +12,18 @@ using Squidex.Infrastructure.Migrations;
namespace Migrate_01
{
public class Migration04_FlattenAssetEntity : IMigration
public class RebuildAssets : IMigration
{
private readonly Rebuilder rebuilder;
public int FromVersion { get; } = 3;
public int ToVersion { get; } = 4;
public Migration04_FlattenAssetEntity(Rebuilder rebuilder)
public RebuildAssets(Rebuilder rebuilder)
{
this.rebuilder = rebuilder;
}
public async Task UpdateAsync(IEnumerable<IMigration> previousMigrations)
public Task UpdateAsync()
{
if (!previousMigrations.Any(x => x is Migration01_FromCqrs))
{
await rebuilder.RebuildAssetsAsync();
}
return rebuilder.RebuildAssetsAsync();
}
}
}

19
tools/Migrate_01/Migration03_SplitContentCollections.cs → tools/Migrate_01/Migrations/RebuildContents.cs

@ -5,32 +5,23 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Squidex.Infrastructure.Migrations;
namespace Migrate_01
namespace Migrate_01.Migrations
{
public class Migration03_SplitContentCollections : IMigration
public class RebuildContents : IMigration
{
private readonly Rebuilder rebuilder;
public int FromVersion { get; } = 2;
public int ToVersion { get; } = 3;
public Migration03_SplitContentCollections(Rebuilder rebuilder)
public RebuildContents(Rebuilder rebuilder)
{
this.rebuilder = rebuilder;
}
public async Task UpdateAsync(IEnumerable<IMigration> previousMigrations)
public Task UpdateAsync()
{
if (!previousMigrations.Any(x => x is Migration01_FromCqrs))
{
await rebuilder.RebuildContentAsync();
}
return rebuilder.RebuildContentAsync();
}
}
}

13
tools/Migrate_01/Migration01_FromCqrs.cs → tools/Migrate_01/Migrations/RebuildSnapshots.cs

@ -5,26 +5,21 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Threading.Tasks;
using Squidex.Infrastructure.Migrations;
namespace Migrate_01
namespace Migrate_01.Migrations
{
public sealed class Migration01_FromCqrs : IMigration
public sealed class RebuildSnapshots : IMigration
{
private readonly Rebuilder rebuilder;
public int FromVersion { get; } = 0;
public int ToVersion { get; } = 1;
public Migration01_FromCqrs(Rebuilder rebuilder)
public RebuildSnapshots(Rebuilder rebuilder)
{
this.rebuilder = rebuilder;
}
public async Task UpdateAsync(IEnumerable<IMigration> previousMigrations)
public async Task UpdateAsync()
{
await rebuilder.RebuildConfigAsync();
await rebuilder.RebuildContentAsync();

12
tools/Migrate_01/Rebuilder.cs

@ -71,7 +71,7 @@ namespace Migrate_01
var handledIds = new HashSet<Guid>();
await eventStore.GetEventsAsync(async storedEvent =>
await eventStore.QueryAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
@ -86,7 +86,7 @@ namespace Migrate_01
await asset.WriteSnapshotAsync();
}
}
}, filter, cancellationToken: CancellationToken.None);
}, filter, ct: CancellationToken.None);
}
public async Task RebuildConfigAsync()
@ -99,7 +99,7 @@ namespace Migrate_01
var handledIds = new HashSet<Guid>();
await eventStore.GetEventsAsync(async storedEvent =>
await eventStore.QueryAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
@ -124,7 +124,7 @@ namespace Migrate_01
await app.WriteSnapshotAsync();
}
}
}, filter, cancellationToken: CancellationToken.None);
}, filter, ct: CancellationToken.None);
}
public async Task RebuildContentAsync()
@ -135,7 +135,7 @@ namespace Migrate_01
var handledIds = new HashSet<Guid>();
await eventStore.GetEventsAsync(async storedEvent =>
await eventStore.QueryAsync(async storedEvent =>
{
var @event = ParseKnownEvent(storedEvent);
@ -161,7 +161,7 @@ namespace Migrate_01
// Schema has been deleted.
}
}
}, filter, cancellationToken: CancellationToken.None);
}, filter, ct: CancellationToken.None);
}
private Envelope<IEvent> ParseKnownEvent(StoredEvent storedEvent)

3
tools/Migrate_01/SquidexMigrations.cs

@ -5,9 +5,12 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Reflection;
namespace Migrate_01
{
public static class SquidexMigrations
{
public static readonly Assembly Assembly = typeof(SquidexMigrations).Assembly;
}
}

Loading…
Cancel
Save