Browse Source

Repair asset files. (#593)

* Repair asset files.

* Tests fixed.

* Mini fix to prevent errors.

* Reverted the event consumer approach.
pull/596/head
Sebastian Stehle 5 years ago
committed by GitHub
parent
commit
c1a2cda11a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 81
      backend/src/Migrations/Migrations/PopulateGrainIndexes.cs
  2. 2
      backend/src/Migrations/RebuildOptions.cs
  3. 15
      backend/src/Migrations/RebuildRunner.cs
  4. 9
      backend/src/Squidex.Domain.Apps.Entities/Apps/Templates/AlwaysCreateClientCommandMiddleware.cs
  5. 2
      backend/src/Squidex.Domain.Apps.Entities/Assets/AssetCommandMiddleware.cs
  6. 75
      backend/src/Squidex.Domain.Apps.Entities/Assets/RepairFiles.cs
  7. 2
      backend/src/Squidex.Domain.Apps.Entities/Backup/BackupGrain.cs
  8. 6
      backend/src/Squidex.Domain.Apps.Entities/Backup/BackupReader.cs
  9. 14
      backend/src/Squidex.Domain.Apps.Entities/Backup/Model/CompatibleStoredEvent.cs
  10. 2
      backend/src/Squidex.Domain.Apps.Entities/Comments/CommentsGrain.cs
  11. 20
      backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerGrain.cs
  12. 21
      backend/src/Squidex.Infrastructure/EventSourcing/DefaultEventDataFormatter.cs
  13. 20
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs
  14. 4
      backend/src/Squidex.Infrastructure/EventSourcing/IEventDataFormatter.cs
  15. 1
      backend/src/Squidex.Infrastructure/Guard.cs
  16. 15
      backend/src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs
  17. 3
      backend/src/Squidex/Config/Domain/AssetServices.cs
  18. 5
      backend/src/Squidex/appsettings.json
  19. 132
      backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RepairFilesTests.cs
  20. 9
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventDataFormatterTests.cs
  21. 9
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs
  22. 10
      backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs

81
backend/src/Migrations/Migrations/PopulateGrainIndexes.cs

@ -77,34 +77,37 @@ namespace Migrations.Migrations
await eventStore.QueryAsync(storedEvent =>
{
var @event = eventDataFormatter.Parse(storedEvent.Data);
var @event = eventDataFormatter.ParseIfKnown(storedEvent);
switch (@event.Payload)
if (@event != null)
{
case AppCreated created:
{
RemoveApp(created.AppId, false);
switch (@event.Payload)
{
case AppCreated created:
{
RemoveApp(created.AppId, false);
appsByName[created.Name] = created.AppId.Id;
break;
}
appsByName[created.Name] = created.AppId.Id;
break;
}
case AppContributorAssigned contributorAssigned:
{
if (HasApp(contributorAssigned.AppId, true, out _))
case AppContributorAssigned contributorAssigned:
{
Index(contributorAssigned.ContributorId).Add(contributorAssigned.AppId.Id);
if (HasApp(contributorAssigned.AppId, true, out _))
{
Index(contributorAssigned.ContributorId).Add(contributorAssigned.AppId.Id);
}
break;
}
case AppContributorRemoved contributorRemoved:
Index(contributorRemoved.ContributorId).Remove(contributorRemoved.AppId.Id);
break;
case AppArchived archived:
RemoveApp(archived.AppId, true);
break;
}
case AppContributorRemoved contributorRemoved:
Index(contributorRemoved.ContributorId).Remove(contributorRemoved.AppId.Id);
break;
case AppArchived archived:
RemoveApp(archived.AppId, true);
break;
}
}
return Task.CompletedTask;
@ -129,16 +132,19 @@ namespace Migrations.Migrations
await eventStore.QueryAsync(storedEvent =>
{
var @event = eventDataFormatter.Parse(storedEvent.Data);
var @event = eventDataFormatter.ParseIfKnown(storedEvent);
switch (@event.Payload)
if (@event != null)
{
case RuleCreated created:
Index(created).Add(created.RuleId);
break;
case RuleDeleted deleted:
Index(deleted).Remove(deleted.RuleId);
break;
switch (@event.Payload)
{
case RuleCreated created:
Index(created).Add(created.RuleId);
break;
case RuleDeleted deleted:
Index(deleted).Remove(deleted.RuleId);
break;
}
}
return Task.CompletedTask;
@ -161,16 +167,19 @@ namespace Migrations.Migrations
await eventStore.QueryAsync(storedEvent =>
{
var @event = eventDataFormatter.Parse(storedEvent.Data);
var @event = eventDataFormatter.ParseIfKnown(storedEvent);
switch (@event.Payload)
if (@event != null)
{
case SchemaCreated created:
Index(created)[created.SchemaId.Name] = created.SchemaId.Id;
break;
case SchemaDeleted deleted:
Index(deleted).Remove(deleted.SchemaId.Name);
break;
switch (@event.Payload)
{
case SchemaCreated created:
Index(created)[created.SchemaId.Name] = created.SchemaId.Id;
break;
case SchemaDeleted deleted:
Index(deleted).Remove(deleted.SchemaId.Name);
break;
}
}
return Task.CompletedTask;

2
backend/src/Migrations/RebuildOptions.cs

@ -13,6 +13,8 @@ namespace Migrations
public bool Assets { get; set; }
public bool AssetFiles { get; set; }
public bool Contents { get; set; }
public bool Indexes { get; set; }

15
backend/src/Migrations/RebuildRunner.cs

@ -9,6 +9,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Migrations.Migrations;
using Squidex.Domain.Apps.Entities.Assets;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
@ -16,16 +17,23 @@ namespace Migrations
{
public sealed class RebuildRunner
{
private readonly RepairFiles repairFiles;
private readonly Rebuilder rebuilder;
private readonly PopulateGrainIndexes populateGrainIndexes;
private readonly RebuildOptions rebuildOptions;
public RebuildRunner(Rebuilder rebuilder, IOptions<RebuildOptions> rebuildOptions, PopulateGrainIndexes populateGrainIndexes)
public RebuildRunner(
RepairFiles repairFiles,
Rebuilder rebuilder,
IOptions<RebuildOptions> rebuildOptions,
PopulateGrainIndexes populateGrainIndexes)
{
Guard.NotNull(repairFiles, nameof(repairFiles));
Guard.NotNull(rebuilder, nameof(rebuilder));
Guard.NotNull(rebuildOptions, nameof(rebuildOptions));
Guard.NotNull(populateGrainIndexes, nameof(populateGrainIndexes));
this.repairFiles = repairFiles;
this.rebuilder = rebuilder;
this.rebuildOptions = rebuildOptions.Value;
this.populateGrainIndexes = populateGrainIndexes;
@ -54,6 +62,11 @@ namespace Migrations
await rebuilder.RebuildAssetFoldersAsync(ct);
}
if (rebuildOptions.AssetFiles)
{
await repairFiles.RepairAsync(ct);
}
if (rebuildOptions.Contents)
{
await rebuilder.RebuildContentAsync(ct);

9
backend/src/Squidex.Domain.Apps.Entities/Apps/Templates/AlwaysCreateClientCommandMiddleware.cs

@ -9,24 +9,23 @@ using System.Threading.Tasks;
using Squidex.Domain.Apps.Entities.Apps.Commands;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Domain.Apps.Entities.Apps.Templates
{
public sealed class AlwaysCreateClientCommandMiddleware : ICommandMiddleware
{
public Task HandleAsync(CommandContext context, NextDelegate next)
public async Task HandleAsync(CommandContext context, NextDelegate next)
{
await next(context);
if (context.IsCompleted && context.Command is CreateApp createApp)
{
var appId = NamedId.Of(createApp.AppId, createApp.Name);
var command = new AttachClient { Id = "default", AppId = appId };
context.CommandBus.PublishAsync(command).Forget();
await context.CommandBus.PublishAsync(command);
}
return next(context);
}
}
}

2
backend/src/Squidex.Domain.Apps.Entities/Assets/AssetCommandMiddleware.cs

@ -42,8 +42,8 @@ namespace Squidex.Domain.Apps.Entities.Assets
this.assetFileStore = assetFileStore;
this.assetEnricher = assetEnricher;
this.assetQuery = assetQuery;
this.contextProvider = contextProvider;
this.assetMetadataSources = assetMetadataSources;
this.contextProvider = contextProvider;
}
public override async Task HandleAsync(CommandContext context, NextDelegate next)

75
backend/src/Squidex.Domain.Apps.Entities/Assets/RepairFiles.cs

@ -0,0 +1,75 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Events.Assets;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Assets;
using Squidex.Infrastructure.EventSourcing;
namespace Squidex.Domain.Apps.Entities.Assets
{
public sealed class RepairFiles
{
private static readonly MemoryStream DummyStream = new MemoryStream(Encoding.UTF8.GetBytes("dummy"));
private readonly IAssetFileStore assetFileStore;
private readonly IEventStore eventStore;
private readonly IEventDataFormatter eventDataFormatter;
public RepairFiles(
IAssetFileStore assetFileStore,
IEventStore eventStore,
IEventDataFormatter eventDataFormatter)
{
Guard.NotNull(assetFileStore, nameof(assetFileStore));
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventDataFormatter, nameof(eventDataFormatter));
this.assetFileStore = assetFileStore;
this.eventStore = eventStore;
this.eventDataFormatter = eventDataFormatter;
}
public async Task RepairAsync(CancellationToken ct = default)
{
await eventStore.QueryAsync(async storedEvent =>
{
var @event = eventDataFormatter.ParseIfKnown(storedEvent);
if (@event != null)
{
switch (@event.Payload)
{
case AssetCreated assetCreated:
await TryRepairAsync(assetCreated.AppId, assetCreated.AssetId, assetCreated.FileVersion, ct);
break;
case AssetUpdated assetUpdated:
await TryRepairAsync(assetUpdated.AppId, assetUpdated.AssetId, assetUpdated.FileVersion, ct);
break;
}
}
}, "^asset\\-", ct: ct);
}
private async Task TryRepairAsync(NamedId<DomainId> appId, DomainId id, long fileVersion, CancellationToken ct)
{
try
{
await assetFileStore.GetFileSizeAsync(appId.Id, id, fileVersion, ct);
}
catch (AssetNotFoundException)
{
DummyStream.Position = 0;
await assetFileStore.UploadAsync(appId.Id, id, fileVersion, DummyStream, ct);
}
}
}
}

2
backend/src/Squidex.Domain.Apps.Entities/Backup/BackupGrain.cs

@ -146,7 +146,7 @@ namespace Squidex.Domain.Apps.Entities.Backup
await eventStore.QueryAsync(async storedEvent =>
{
var @event = eventDataFormatter.Parse(storedEvent.Data);
var @event = eventDataFormatter.Parse(storedEvent);
if (@event.Payload is SquidexEvent squidexEvent && squidexEvent.Actor != null)
{

6
backend/src/Squidex.Domain.Apps.Entities/Backup/BackupReader.cs

@ -110,10 +110,10 @@ namespace Squidex.Domain.Apps.Entities.Backup
using (var stream = eventEntry.Open())
{
var (streamName, data) = serializer.Deserialize<CompatibleStoredEvent>(stream).ToEvent();
var storedEvent = serializer.Deserialize<CompatibleStoredEvent>(stream).ToStoredEvent();
var eventStream = streamName;
var eventEnvelope = formatter.Parse(data);
var eventStream = storedEvent.StreamName;
var eventEnvelope = formatter.Parse(storedEvent);
await handler((eventStream, eventEnvelope));
}

14
backend/src/Squidex.Domain.Apps.Entities/Backup/Model/CompatibleStoredEvent.cs

@ -47,15 +47,17 @@ namespace Squidex.Domain.Apps.Entities.Backup.Model
return new CompatibleStoredEvent { NewEvent = NewEvent.V2(stored) };
}
public (string Stream, EventData Data) ToEvent()
public StoredEvent ToStoredEvent()
{
if (NewEvent != null)
{
return NewEvent.ToEvent();
return NewEvent.ToStoredEvent();
}
else
{
return (StreamName, Data.ToData());
var data = Data.ToData();
return new StoredEvent(StreamName, EventPosition, EventStreamNumber, data);
}
}
}
@ -109,9 +111,11 @@ namespace Squidex.Domain.Apps.Entities.Backup.Model
};
}
public (string Stream, EventData Data) ToEvent()
public StoredEvent ToStoredEvent()
{
return (StreamName, new EventData(EventType, EventHeaders, EventPayload));
var data = new EventData(EventType, EventHeaders, EventPayload);
return new StoredEvent(StreamName, "0", -1, data);
}
}
}

2
backend/src/Squidex.Domain.Apps.Entities/Comments/CommentsGrain.cs

@ -51,7 +51,7 @@ namespace Squidex.Domain.Apps.Entities.Comments
foreach (var @event in storedEvents)
{
var parsedEvent = eventDataFormatter.Parse(@event.Data);
var parsedEvent = eventDataFormatter.Parse(@event);
version = @event.EventStreamNumber;

20
backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerGrain.cs

@ -16,7 +16,6 @@ using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.Tasks;
using Squidex.Infrastructure.Translations;
@ -156,7 +155,7 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner
{
try
{
var @event = ParseKnownEvent(storedEvent);
var @event = eventDataFormatter.ParseIfKnown(storedEvent);
if (@event != null)
{
@ -215,23 +214,6 @@ namespace Squidex.Domain.Apps.Entities.Rules.Runner
}
}
private Envelope<IEvent>? ParseKnownEvent(StoredEvent storedEvent)
{
try
{
var @event = eventDataFormatter.Parse(storedEvent.Data);
@event.SetEventPosition(storedEvent.EventPosition);
@event.SetEventStreamNumber(storedEvent.EventStreamNumber);
return @event;
}
catch (TypeNameNotFoundException)
{
return null;
}
}
public Task ReceiveReminder(string reminderName, TickStatus status)
{
EnsureIsRunning();

21
backend/src/Squidex.Infrastructure/EventSourcing/DefaultEventDataFormatter.cs

@ -28,8 +28,24 @@ namespace Squidex.Infrastructure.EventSourcing
this.serializer = serializer;
}
public Envelope<IEvent> Parse(EventData eventData)
public Envelope<IEvent>? ParseIfKnown(StoredEvent storedEvent)
{
try
{
return Parse(storedEvent);
}
catch (TypeNameNotFoundException)
{
return null;
}
}
public Envelope<IEvent> Parse(StoredEvent storedEvent)
{
Guard.NotNull(storedEvent, nameof(storedEvent));
var eventData = storedEvent.Data;
var payloadType = typeNameRegistry.GetType(eventData.Type);
var payloadObj = serializer.Deserialize<IEvent>(eventData.Payload, payloadType);
@ -45,6 +61,9 @@ namespace Squidex.Infrastructure.EventSourcing
var envelope = new Envelope<IEvent>(payloadObj, eventData.Headers);
envelope.SetEventPosition(storedEvent.EventPosition);
envelope.SetEventStreamNumber(storedEvent.EventStreamNumber);
return envelope;
}

20
backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs

@ -10,7 +10,6 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.EventSourcing.Grains
@ -63,7 +62,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
{
try
{
job.Event = ParseKnownEvent(job.StoredEvent!);
job.Event = eventDataFormatter.ParseIfKnown(job.StoredEvent!);
}
catch (Exception ex)
{
@ -156,23 +155,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
eventSubscription.Unsubscribe();
}
private Envelope<IEvent>? ParseKnownEvent(StoredEvent storedEvent)
{
try
{
var @event = eventDataFormatter.Parse(storedEvent.Data);
@event.SetEventPosition(storedEvent.EventPosition);
@event.SetEventStreamNumber(storedEvent.EventStreamNumber);
return @event;
}
catch (TypeNameNotFoundException)
{
return null;
}
}
public Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
var job = new Job

4
backend/src/Squidex.Infrastructure/EventSourcing/IEventDataFormatter.cs

@ -11,7 +11,9 @@ namespace Squidex.Infrastructure.EventSourcing
{
public interface IEventDataFormatter
{
Envelope<IEvent> Parse(EventData eventData);
Envelope<IEvent> Parse(StoredEvent storedEvent);
Envelope<IEvent>? ParseIfKnown(StoredEvent storedEvent);
EventData ToEventData(Envelope<IEvent> envelope, Guid commitId, bool migrate = true);
}

1
backend/src/Squidex.Infrastructure/Guard.cs

@ -175,6 +175,7 @@ namespace Squidex.Infrastructure
}
[DebuggerStepThrough]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void NotNull(object? target, string parameterName)
{
if (target == null)

15
backend/src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs

@ -10,7 +10,6 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
#pragma warning disable RECS0012 // 'if' statement can be re-written as 'switch' statement
@ -118,7 +117,7 @@ namespace Squidex.Infrastructure.States
throw new InvalidOperationException("Events must follow the snapshot version in consecutive order with no gaps.");
}
var parsedEvent = ParseKnownEvent(@event);
var parsedEvent = eventDataFormatter.ParseIfKnown(@event);
if (applyEvent != null && parsedEvent != null)
{
@ -210,18 +209,6 @@ namespace Squidex.Infrastructure.States
return persistenceMode == PersistenceMode.EventSourcing || persistenceMode == PersistenceMode.SnapshotsAndEventSourcing;
}
private Envelope<IEvent>? ParseKnownEvent(StoredEvent storedEvent)
{
try
{
return eventDataFormatter.Parse(storedEvent.Data);
}
catch (TypeNameNotFoundException)
{
return null;
}
}
private void UpdateVersion()
{
if (persistenceMode == PersistenceMode.Snapshots)

3
backend/src/Squidex/Config/Domain/AssetServices.cs

@ -45,6 +45,9 @@ namespace Squidex.Config.Domain
services.AddSingletonAs<AssetQueryParser>()
.AsSelf();
services.AddSingletonAs<RepairFiles>()
.AsSelf();
services.AddTransientAs<AssetHistoryEventsCreator>()
.As<IHistoryEventsCreator>();

5
backend/src/Squidex/appsettings.json

@ -691,6 +691,11 @@
*/
"assets": false,
/*
* Set to true to create dummy asset files if they do not exist. Useful when a backup fail.
*/
"assetFiles": false,
/*
* Set to true to rebuild contents.
*/

132
backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RepairFilesTests.cs

@ -0,0 +1,132 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.IO;
using System.Threading.Tasks;
using FakeItEasy;
using Squidex.Domain.Apps.Events.Assets;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Assets;
using Squidex.Infrastructure.EventSourcing;
using Xunit;
namespace Squidex.Domain.Apps.Entities.Assets
{
public class RepairFilesTests
{
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventDataFormatter eventDataFormatter = A.Fake<IEventDataFormatter>();
private readonly IAssetFileStore assetFileStore = A.Fake<IAssetFileStore>();
private readonly NamedId<DomainId> appId = NamedId.Of(DomainId.NewGuid(), "my-app");
private readonly RepairFiles sut;
public RepairFilesTests()
{
sut = new RepairFiles(assetFileStore, eventStore, eventDataFormatter);
}
[Fact]
public async Task Should_repair_created_asset_if_not_found()
{
var @event = new AssetCreated { AppId = appId, AssetId = DomainId.NewGuid() };
SetupEvent(@event);
A.CallTo(() => assetFileStore.GetFileSizeAsync(appId.Id, @event.AssetId, 0, default))
.Throws(new AssetNotFoundException("file"));
await sut.RepairAsync();
A.CallTo(() => assetFileStore.UploadAsync(appId.Id, @event.AssetId, 0, A<Stream>._, default))
.MustHaveHappened();
}
[Fact]
public async Task Should_not_repair_created_asset_if_found()
{
var @event = new AssetCreated { AppId = appId, AssetId = DomainId.NewGuid() };
SetupEvent(@event);
A.CallTo(() => assetFileStore.GetFileSizeAsync(appId.Id, @event.AssetId, 0, default))
.Returns(100);
await sut.RepairAsync();
A.CallTo(() => assetFileStore.UploadAsync(appId.Id, @event.AssetId, 0, A<Stream>._, default))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_repair_updated_asset_if_not_found()
{
var @event = new AssetUpdated { AppId = appId, AssetId = DomainId.NewGuid(), FileVersion = 3 };
SetupEvent(@event);
A.CallTo(() => assetFileStore.GetFileSizeAsync(appId.Id, @event.AssetId, 3, default))
.Throws(new AssetNotFoundException("file"));
await sut.RepairAsync();
A.CallTo(() => assetFileStore.UploadAsync(appId.Id, @event.AssetId, 3, A<Stream>._, default))
.MustHaveHappened();
}
[Fact]
public async Task Should_not_repair_updated_asset_if_found()
{
var @event = new AssetUpdated { AppId = appId, AssetId = DomainId.NewGuid(), FileVersion = 3 };
SetupEvent(@event);
A.CallTo(() => assetFileStore.GetFileSizeAsync(appId.Id, @event.AssetId, 3, default))
.Returns(100);
await sut.RepairAsync();
A.CallTo(() => assetFileStore.UploadAsync(appId.Id, @event.AssetId, 3, A<Stream>._, default))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_ignore_old_events()
{
SetupEvent(null);
await sut.RepairAsync();
A.CallTo(() => assetFileStore.GetFileSizeAsync(A<DomainId>._, A<DomainId>._, A<long>._, default))
.MustNotHaveHappened();
}
private void SetupEvent(IEvent? @event)
{
var storedEvent = new StoredEvent("stream", "0", -1, new EventData("type", new EnvelopeHeaders(), "payload"));
if (@event != null)
{
A.CallTo(() => eventDataFormatter.ParseIfKnown(storedEvent))
.Returns(Envelope.Create(@event));
}
else
{
A.CallTo(() => eventDataFormatter.ParseIfKnown(storedEvent))
.Returns(null);
}
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>._, "^asset\\-", null, default))
.Invokes(x =>
{
var callback = x.GetArgument<Func<StoredEvent, Task>>(0)!;
callback(storedEvent).Wait();
});
}
}
}

9
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventDataFormatterTests.cs

@ -54,8 +54,9 @@ namespace Squidex.Infrastructure.EventSourcing
inputEvent.SetTimestamp(SystemClock.Instance.GetCurrentInstant());
var eventData = sut.ToEventData(inputEvent, commitId);
var eventStored = new StoredEvent("stream", "0", -1, eventData);
var outputEvent = sut.Parse(eventData).To<MyEvent>();
var outputEvent = sut.Parse(eventStored).To<MyEvent>();
AssertHeaders(inputEvent.Headers, outputEvent.Headers);
AssertPayload(inputEvent, outputEvent);
@ -67,8 +68,9 @@ namespace Squidex.Infrastructure.EventSourcing
var inputEvent = new Envelope<MyOldEvent>(new MyOldEvent { MyProperty = "My-Property" });
var eventData = sut.ToEventData(inputEvent, Guid.NewGuid());
var eventStored = new StoredEvent("stream", "0", -1, eventData);
var outputEvent = sut.Parse(eventData).To<MyEvent>();
var outputEvent = sut.Parse(eventStored).To<MyEvent>();
Assert.Equal(inputEvent.Payload.MyProperty, outputEvent.Payload.MyProperty);
}
@ -79,8 +81,9 @@ namespace Squidex.Infrastructure.EventSourcing
var inputEvent = new Envelope<MyOldEvent>(new MyOldEvent { MyProperty = "My-Property" });
var eventData = sut.ToEventData(inputEvent, Guid.NewGuid(), false);
var eventStored = new StoredEvent("stream", "0", -1, eventData);
var outputEvent = sut.Parse(eventData).To<MyEvent>();
var outputEvent = sut.Parse(eventStored).To<MyEvent>();
Assert.Equal(inputEvent.Payload.MyProperty, outputEvent.Payload.MyProperty);
}

9
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs

@ -12,7 +12,6 @@ using FakeItEasy;
using FluentAssertions;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.TestHelpers;
using Xunit;
@ -96,7 +95,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => eventSubscription.Sender)
.Returns(eventSubscription);
A.CallTo(() => formatter.Parse(eventData))
A.CallTo(() => formatter.ParseIfKnown(A<StoredEvent>.That.Matches(x => x.Data == eventData)))
.Returns(envelope);
sut = new MyEventConsumerGrain(
@ -318,8 +317,8 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_ignore_old_events()
{
A.CallTo(() => formatter.Parse(eventData))
.Throws(new TypeNameNotFoundException());
A.CallTo(() => formatter.ParseIfKnown(A<StoredEvent>.That.Matches(x => x.Data == eventData)))
.Returns(null);
var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData);
@ -466,7 +465,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
{
var ex = new InvalidOperationException();
A.CallTo(() => formatter.Parse(eventData))
A.CallTo(() => formatter.ParseIfKnown(A<StoredEvent>.That.Matches(x => x.Data == eventData)))
.Throws(ex);
var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData);

10
backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs

@ -11,7 +11,6 @@ using System.Linq;
using System.Threading.Tasks;
using FakeItEasy;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.TestHelpers;
using Xunit;
@ -66,8 +65,8 @@ namespace Squidex.Infrastructure.States
A.CallTo(() => eventStore.QueryAsync(key, 0))
.Returns(new List<StoredEvent> { storedEvent });
A.CallTo(() => eventDataFormatter.Parse(storedEvent.Data))
.Throws(new TypeNameNotFoundException());
A.CallTo(() => eventDataFormatter.ParseIfKnown(storedEvent))
.Returns(null);
var persistedEvents = new List<IEvent>();
var persistence = sut.WithEventSourcing(None.Type, key, x => persistedEvents.Add(x.Payload));
@ -274,7 +273,10 @@ namespace Squidex.Infrastructure.States
eventsStored.Add(eventStored);
A.CallTo(() => eventDataFormatter.Parse(eventData))
A.CallTo(() => eventDataFormatter.Parse(eventStored))
.Returns(new Envelope<IEvent>(@event));
A.CallTo(() => eventDataFormatter.ParseIfKnown(eventStored))
.Returns(new Envelope<IEvent>(@event));
i++;

Loading…
Cancel
Save