Browse Source

Tests for infrastructure.

pull/249/head
Sebastian Stehle 8 years ago
parent
commit
92a5832ddf
  1. 1
      src/Squidex.Domain.Apps.Entities/AppProvider.cs
  2. 158
      src/Squidex.Domain.Apps.Entities/Contents/ContentCommandMiddleware.cs
  3. 126
      src/Squidex.Domain.Apps.Entities/Contents/ContentDomainObject.cs
  4. 232
      src/Squidex.Domain.Apps.Entities/Contents/ContentGrain.cs
  5. 18
      src/Squidex.Domain.Apps.Entities/Contents/ContentOperationContext.cs
  6. 6
      src/Squidex.Domain.Apps.Entities/Contents/IContentGrain.cs
  7. 26
      src/Squidex.Domain.Apps.Entities/SquidexDomainObjectBase.cs
  8. 149
      src/Squidex.Infrastructure/Commands/AggregateHandler.cs
  9. 21
      src/Squidex.Infrastructure/Commands/CommandExtensions.cs
  10. 104
      src/Squidex.Infrastructure/Commands/DomainObjectBase.cs
  11. 16
      src/Squidex.Infrastructure/Commands/DomainObjectGrain.cs
  12. 23
      src/Squidex.Infrastructure/Commands/IAggregateHandler.cs
  13. 20
      src/Squidex.Infrastructure/Commands/IDomainObject.cs
  14. 34
      src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
  15. 24
      src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs
  16. 3
      src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerGrain.cs
  17. 2
      src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerManagerGrain.cs
  18. 2
      src/Squidex.Infrastructure/EventSourcing/Grains/OrleansEventNotifier.cs
  19. 4
      src/Squidex.Infrastructure/EventSourcing/IEventNotifier.cs
  20. 6
      src/Squidex.Infrastructure/Orleans/J{T}.cs
  21. 31
      src/Squidex.Infrastructure/States/IStateFactory.cs
  22. 16
      src/Squidex.Infrastructure/States/IStatefulObject.cs
  23. 14
      src/Squidex.Infrastructure/States/InvalidateMessage.cs
  24. 158
      src/Squidex.Infrastructure/States/StateFactory.cs
  25. 5
      src/Squidex/Config/Domain/ReadServices.cs
  26. 7
      src/Squidex/Config/Domain/WriteServices.cs
  27. 27
      src/Squidex/Pipeline/CommandMiddlewares/EnrichWithAppIdCommandMiddleware.cs
  28. 48
      src/Squidex/Pipeline/CommandMiddlewares/EnrichWithSchemaIdCommandMiddleware.cs
  29. 284
      tests/Squidex.Infrastructure.Tests/Commands/AggregateHandlerTests.cs
  30. 88
      tests/Squidex.Infrastructure.Tests/Commands/DomainObjectBaseTests.cs
  31. 284
      tests/Squidex.Infrastructure.Tests/Commands/DomainObjectGrainTests.cs
  32. 49
      tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventNotifierTests.cs
  33. 173
      tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs
  34. 162
      tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs
  35. 120
      tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerTests.cs
  36. 39
      tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/OrleansEventNotifierTests.cs
  37. 28
      tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs
  38. 39
      tests/Squidex.Infrastructure.Tests/Orleans/BootstrapTests.cs
  39. 80
      tests/Squidex.Infrastructure.Tests/Orleans/JsonExternalSerializerTests.cs
  40. 186
      tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs
  41. 171
      tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs
  42. 145
      tests/Squidex.Infrastructure.Tests/States/StateFactoryTests.cs
  43. 38
      tests/Squidex.Infrastructure.Tests/Tasks/AsyncLockPoolTests.cs
  44. 38
      tests/Squidex.Infrastructure.Tests/Tasks/AsyncLockTests.cs
  45. 4
      tests/Squidex.Infrastructure.Tests/TestHelpers/MyCommand.cs
  46. 1
      tools/Migrate_01/Migrations/AddPatterns.cs

1
src/Squidex.Domain.Apps.Entities/AppProvider.cs

@ -17,7 +17,6 @@ using Squidex.Domain.Apps.Entities.Rules.Repositories;
using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Domain.Apps.Entities.Schemas.Repositories;
using Squidex.Infrastructure;
using Squidex.Infrastructure.States;
namespace Squidex.Domain.Apps.Entities
{

158
src/Squidex.Domain.Apps.Entities/Contents/ContentCommandMiddleware.cs

@ -1,158 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.Scripting;
using Squidex.Domain.Apps.Entities.Assets.Repositories;
using Squidex.Domain.Apps.Entities.Contents.Commands;
using Squidex.Domain.Apps.Entities.Contents.Guards;
using Squidex.Domain.Apps.Entities.Contents.Repositories;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.Dispatching;
namespace Squidex.Domain.Apps.Entities.Contents
{
public class ContentCommandMiddleware : ICommandMiddleware
{
private readonly IAggregateHandler handler;
private readonly IAppProvider appProvider;
private readonly IAssetRepository assetRepository;
private readonly IContentRepository contentRepository;
private readonly IScriptEngine scriptEngine;
public ContentCommandMiddleware(
IAggregateHandler handler,
IAppProvider appProvider,
IAssetRepository assetRepository,
IScriptEngine scriptEngine,
IContentRepository contentRepository)
{
Guard.NotNull(handler, nameof(handler));
Guard.NotNull(appProvider, nameof(appProvider));
Guard.NotNull(scriptEngine, nameof(scriptEngine));
Guard.NotNull(assetRepository, nameof(assetRepository));
Guard.NotNull(contentRepository, nameof(contentRepository));
this.handler = handler;
this.appProvider = appProvider;
this.scriptEngine = scriptEngine;
this.assetRepository = assetRepository;
this.contentRepository = contentRepository;
}
protected async Task On(CreateContent command, CommandContext context)
{
await handler.CreateAsync<ContentDomainObject>(context, async content =>
{
GuardContent.CanCreate(command);
var operationContext = await CreateContext(command, content, () => "Failed to create content.");
if (command.Publish)
{
await operationContext.ExecuteScriptAsync(x => x.ScriptChange, "Published");
}
await operationContext.ExecuteScriptAndTransformAsync(x => x.ScriptCreate, "Create");
await operationContext.EnrichAsync();
await operationContext.ValidateAsync(false);
content.Create(command);
context.Complete(EntityCreatedResult.Create(command.Data, content.Version));
});
}
protected async Task On(UpdateContent command, CommandContext context)
{
await handler.UpdateAsync<ContentDomainObject>(context, async content =>
{
GuardContent.CanUpdate(command);
var operationContext = await CreateContext(command, content, () => "Failed to update content.");
await operationContext.ValidateAsync(true);
await operationContext.ExecuteScriptAndTransformAsync(x => x.ScriptUpdate, "Update");
content.Update(command);
context.Complete(new ContentDataChangedResult(content.Snapshot.Data, content.Version));
});
}
protected async Task On(PatchContent command, CommandContext context)
{
await handler.UpdateAsync<ContentDomainObject>(context, async content =>
{
GuardContent.CanPatch(command);
var operationContext = await CreateContext(command, content, () => "Failed to patch content.");
await operationContext.ValidateAsync(true);
await operationContext.ExecuteScriptAndTransformAsync(x => x.ScriptUpdate, "Patch");
content.Patch(command);
context.Complete(new ContentDataChangedResult(content.Snapshot.Data, content.Version));
});
}
protected Task On(ChangeContentStatus command, CommandContext context)
{
return handler.UpdateAsync<ContentDomainObject>(context, async content =>
{
GuardContent.CanChangeContentStatus(content.Snapshot.Status, command);
if (!command.DueTime.HasValue)
{
var operationContext = await CreateContext(command, content, () => "Failed to patch content.");
await operationContext.ExecuteScriptAsync(x => x.ScriptChange, command.Status);
}
content.ChangeStatus(command);
});
}
protected Task On(DeleteContent command, CommandContext context)
{
return handler.UpdateAsync<ContentDomainObject>(context, async content =>
{
GuardContent.CanDelete(command);
var operationContext = await CreateContext(command, content, () => "Failed to delete content.");
await operationContext.ExecuteScriptAsync(x => x.ScriptDelete, "Delete");
content.Delete(command);
});
}
public async Task HandleAsync(CommandContext context, Func<Task> next)
{
await this.DispatchActionAsync(context.Command, context);
await next();
}
private async Task<ContentOperationContext> CreateContext(ContentCommand command, ContentDomainObject content, Func<string> message)
{
var operationContext =
await ContentOperationContext.CreateAsync(
contentRepository,
content,
command,
appProvider,
assetRepository,
scriptEngine,
message);
return operationContext;
}
}
}

126
src/Squidex.Domain.Apps.Entities/Contents/ContentDomainObject.cs

@ -1,126 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Squidex.Domain.Apps.Core.Contents;
using Squidex.Domain.Apps.Entities.Contents.Commands;
using Squidex.Domain.Apps.Entities.Contents.State;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Contents;
using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Domain.Apps.Entities.Contents
{
public sealed class ContentDomainObject : SquidexDomainObjectBase<ContentState>
{
public ContentDomainObject Create(CreateContent command)
{
VerifyNotCreated();
RaiseEvent(SimpleMapper.Map(command, new ContentCreated()));
if (command.Publish)
{
RaiseEvent(SimpleMapper.Map(command, new ContentStatusChanged { Status = Status.Published }));
}
return this;
}
public ContentDomainObject Delete(DeleteContent command)
{
VerifyCreatedAndNotDeleted();
RaiseEvent(SimpleMapper.Map(command, new ContentDeleted()));
return this;
}
public ContentDomainObject ChangeStatus(ChangeContentStatus command)
{
VerifyCreatedAndNotDeleted();
if (command.DueTime.HasValue)
{
RaiseEvent(SimpleMapper.Map(command, new ContentStatusScheduled { DueTime = command.DueTime.Value }));
}
else
{
RaiseEvent(SimpleMapper.Map(command, new ContentStatusChanged()));
}
return this;
}
public ContentDomainObject Update(UpdateContent command)
{
VerifyCreatedAndNotDeleted();
if (!command.Data.Equals(Snapshot.Data))
{
RaiseEvent(SimpleMapper.Map(command, new ContentUpdated()));
}
return this;
}
public ContentDomainObject Patch(PatchContent command)
{
VerifyCreatedAndNotDeleted();
var newData = command.Data.MergeInto(Snapshot.Data);
if (!newData.Equals(Snapshot.Data))
{
var @event = SimpleMapper.Map(command, new ContentUpdated());
@event.Data = newData;
RaiseEvent(@event);
}
return this;
}
private void RaiseEvent(SchemaEvent @event)
{
if (@event.AppId == null)
{
@event.AppId = Snapshot.AppId;
}
if (@event.SchemaId == null)
{
@event.SchemaId = Snapshot.SchemaId;
}
RaiseEvent(Envelope.Create(@event));
}
private void VerifyNotCreated()
{
if (Snapshot.Data != null)
{
throw new DomainException("Content has already been created.");
}
}
private void VerifyCreatedAndNotDeleted()
{
if (Snapshot.IsDeleted || Snapshot.Data == null)
{
throw new DomainException("Content has already been deleted or not created yet.");
}
}
public override void ApplyEvent(Envelope<IEvent> @event)
{
ApplySnapshot(Snapshot.Apply(@event));
}
}
}

232
src/Squidex.Domain.Apps.Entities/Contents/ContentGrain.cs

@ -0,0 +1,232 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.Contents;
using Squidex.Domain.Apps.Core.Scripting;
using Squidex.Domain.Apps.Entities.Assets.Repositories;
using Squidex.Domain.Apps.Entities.Contents.Commands;
using Squidex.Domain.Apps.Entities.Contents.Guards;
using Squidex.Domain.Apps.Entities.Contents.Repositories;
using Squidex.Domain.Apps.Entities.Contents.State;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Contents;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.States;
namespace Squidex.Domain.Apps.Entities.Contents
{
public sealed class ContentGrain : DomainObjectGrain<ContentState>, IContentGrain
{
private readonly IAppProvider appProvider;
private readonly IAssetRepository assetRepository;
private readonly IContentRepository contentRepository;
private readonly IScriptEngine scriptEngine;
public ContentGrain(
IStore<Guid> store,
IAppProvider appProvider,
IAssetRepository assetRepository,
IScriptEngine scriptEngine,
IContentRepository contentRepository)
: base(store)
{
Guard.NotNull(appProvider, nameof(appProvider));
Guard.NotNull(scriptEngine, nameof(scriptEngine));
Guard.NotNull(assetRepository, nameof(assetRepository));
Guard.NotNull(contentRepository, nameof(contentRepository));
this.appProvider = appProvider;
this.scriptEngine = scriptEngine;
this.assetRepository = assetRepository;
this.contentRepository = contentRepository;
}
public override Task<object> ExecuteAsync(IAggregateCommand command)
{
VerifyNotDeleted();
switch (command)
{
case CreateContent createContent:
return CreateReturnAsync(createContent, async c =>
{
GuardContent.CanCreate(c);
var operationContext = await CreateContext(c, () => "Failed to create content.");
if (c.Publish)
{
await operationContext.ExecuteScriptAsync(x => x.ScriptChange, "Published");
}
await operationContext.ExecuteScriptAndTransformAsync(x => x.ScriptCreate, "Create");
await operationContext.EnrichAsync();
await operationContext.ValidateAsync(false);
Create(c);
return EntityCreatedResult.Create(c.Data, Version);
});
case UpdateContent updateContent:
return UpdateReturnAsync(updateContent, async c =>
{
GuardContent.CanUpdate(c);
var operationContext = await CreateContext(c, () => "Failed to update content.");
await operationContext.ValidateAsync(true);
await operationContext.ExecuteScriptAndTransformAsync(x => x.ScriptUpdate, "Update");
Update(c);
return new ContentDataChangedResult(Snapshot.Data, Version);
});
case PatchContent patchContent:
return UpdateReturnAsync(patchContent, async c =>
{
GuardContent.CanPatch(c);
var operationContext = await CreateContext(c, () => "Failed to patch content.");
await operationContext.ValidateAsync(true);
await operationContext.ExecuteScriptAndTransformAsync(x => x.ScriptUpdate, "Patch");
Patch(c);
return new ContentDataChangedResult(Snapshot.Data, Version);
});
case ChangeContentStatus patchContent:
return UpdateAsync(patchContent, async c =>
{
GuardContent.CanChangeContentStatus(Snapshot.Status, c);
if (!c.DueTime.HasValue)
{
var operationContext = await CreateContext(c, () => "Failed to patch content.");
await operationContext.ExecuteScriptAsync(x => x.ScriptChange, c.Status);
}
ChangeStatus(c);
});
case DeleteContent deleteContent:
return UpdateAsync(deleteContent, async c =>
{
GuardContent.CanDelete(c);
var operationContext = await CreateContext(c, () => "Failed to delete content.");
await operationContext.ExecuteScriptAsync(x => x.ScriptDelete, "Delete");
Delete(c);
});
default:
throw new NotSupportedException();
}
}
public void Create(CreateContent command)
{
RaiseEvent(SimpleMapper.Map(command, new ContentCreated()));
if (command.Publish)
{
RaiseEvent(SimpleMapper.Map(command, new ContentStatusChanged { Status = Status.Published }));
}
}
public void Update(UpdateContent command)
{
if (!command.Data.Equals(Snapshot.Data))
{
RaiseEvent(SimpleMapper.Map(command, new ContentUpdated()));
}
}
public void ChangeStatus(ChangeContentStatus command)
{
if (command.DueTime.HasValue)
{
RaiseEvent(SimpleMapper.Map(command, new ContentStatusScheduled { DueTime = command.DueTime.Value }));
}
else
{
RaiseEvent(SimpleMapper.Map(command, new ContentStatusChanged()));
}
}
public void Patch(PatchContent command)
{
var newData = command.Data.MergeInto(Snapshot.Data);
if (!newData.Equals(Snapshot.Data))
{
var @event = SimpleMapper.Map(command, new ContentUpdated());
@event.Data = newData;
RaiseEvent(@event);
}
}
public void Delete(DeleteContent command)
{
RaiseEvent(SimpleMapper.Map(command, new ContentDeleted()));
}
private void RaiseEvent(SchemaEvent @event)
{
if (@event.AppId == null)
{
@event.AppId = Snapshot.AppId;
}
if (@event.SchemaId == null)
{
@event.SchemaId = Snapshot.SchemaId;
}
RaiseEvent(Envelope.Create(@event));
}
private void VerifyNotDeleted()
{
if (Snapshot.IsDeleted)
{
throw new DomainException("Content has already been deleted.");
}
}
public override void ApplyEvent(Envelope<IEvent> @event)
{
ApplySnapshot(Snapshot.Apply(@event));
}
private async Task<ContentOperationContext> CreateContext(ContentCommand command, Func<string> message)
{
var operationContext =
await ContentOperationContext.CreateAsync(command, Snapshot,
contentRepository,
appProvider,
assetRepository,
scriptEngine,
message);
return operationContext;
}
}
}

18
src/Squidex.Domain.Apps.Entities/Contents/ContentOperationContext.cs

@ -24,9 +24,9 @@ namespace Squidex.Domain.Apps.Entities.Contents
{
public sealed class ContentOperationContext
{
private ContentDomainObject content;
private ContentCommand command;
private IContentRepository contentRepository;
private IContentEntity content;
private IAssetRepository assetRepository;
private IScriptEngine scriptEngine;
private ISchemaEntity schemaEntity;
@ -34,16 +34,16 @@ namespace Squidex.Domain.Apps.Entities.Contents
private Func<string> message;
public static async Task<ContentOperationContext> CreateAsync(
IContentRepository contentRepository,
ContentDomainObject content,
ContentCommand command,
IContentEntity content,
IContentRepository contentRepository,
IAppProvider appProvider,
IAssetRepository assetRepository,
IScriptEngine scriptEngine,
Func<string> message)
{
var a = content.Snapshot.AppId;
var s = content.Snapshot.SchemaId;
var a = content.AppId;
var s = content.SchemaId;
if (command is CreateContent createContent)
{
@ -88,11 +88,11 @@ namespace Squidex.Domain.Apps.Entities.Contents
new ValidationContext(
(contentIds, schemaId) =>
{
return QueryContentsAsync(content.Snapshot.AppId.Id, schemaId, contentIds);
return QueryContentsAsync(content.AppId.Id, schemaId, contentIds);
},
assetIds =>
{
return QueryAssetsAsync(content.Snapshot.AppId.Id, assetIds);
return QueryAssetsAsync(content.AppId.Id, assetIds);
});
if (partial)
@ -125,7 +125,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
{
if (command is ContentDataCommand dataCommand)
{
var ctx = new ScriptContext { ContentId = content.Snapshot.Id, OldData = content.Snapshot.Data, User = command.User, Operation = operation.ToString(), Data = dataCommand.Data };
var ctx = new ScriptContext { ContentId = content.Id, OldData = content.Data, User = command.User, Operation = operation.ToString(), Data = dataCommand.Data };
dataCommand.Data = scriptEngine.ExecuteAndTransform(ctx, script(schemaEntity));
}
@ -135,7 +135,7 @@ namespace Squidex.Domain.Apps.Entities.Contents
public Task ExecuteScriptAsync(Func<ISchemaEntity, string> script, object operation)
{
var ctx = new ScriptContext { ContentId = content.Snapshot.Id, OldData = content.Snapshot.Data, User = command.User, Operation = operation.ToString() };
var ctx = new ScriptContext { ContentId = content.Id, OldData = content.Data, User = command.User, Operation = operation.ToString() };
scriptEngine.Execute(ctx, script(schemaEntity));

6
tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs → src/Squidex.Domain.Apps.Entities/Contents/IContentGrain.cs

@ -1,15 +1,15 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Squidex.Infrastructure.Commands;
namespace Squidex.Infrastructure.TestHelpers
namespace Squidex.Domain.Apps.Entities.Contents
{
internal sealed class MyDomainObject : DomainObjectBase<MyDomainState>
public interface IContentGrain : IDomainObjectGrain
{
}
}

26
src/Squidex.Domain.Apps.Entities/SquidexDomainObjectBase.cs

@ -1,26 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Squidex.Domain.Apps.Events;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
namespace Squidex.Domain.Apps.Entities
{
public abstract class SquidexDomainObjectBase<T> : DomainObjectBase<T> where T : IDomainState, new()
{
public override void RaiseEvent(Envelope<IEvent> @event)
{
if (@event.Payload is AppEvent appEvent)
{
@event.SetAppId(appEvent.AppId.Id);
}
base.RaiseEvent(@event);
}
}
}

149
src/Squidex.Infrastructure/Commands/AggregateHandler.cs

@ -1,149 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.Commands
{
public sealed class AggregateHandler : IAggregateHandler
{
private readonly AsyncLockPool lockPool = new AsyncLockPool(10000);
private readonly IStateFactory stateFactory;
private readonly IServiceProvider serviceProvider;
public AggregateHandler(IStateFactory stateFactory, IServiceProvider serviceProvider)
{
Guard.NotNull(stateFactory, nameof(stateFactory));
Guard.NotNull(serviceProvider, nameof(serviceProvider));
this.stateFactory = stateFactory;
this.serviceProvider = serviceProvider;
}
public Task<T> CreateAsync<T>(CommandContext context, Func<T, Task> creator) where T : class, IDomainObject
{
Guard.NotNull(creator, nameof(creator));
return InvokeAsync(context, creator, false);
}
public Task<T> UpdateAsync<T>(CommandContext context, Func<T, Task> updater) where T : class, IDomainObject
{
Guard.NotNull(updater, nameof(updater));
return InvokeAsync(context, updater, true);
}
public Task<T> CreateSyncedAsync<T>(CommandContext context, Func<T, Task> creator) where T : class, IDomainObject
{
Guard.NotNull(creator, nameof(creator));
return InvokeSyncedAsync(context, creator, false);
}
public Task<T> UpdateSyncedAsync<T>(CommandContext context, Func<T, Task> updater) where T : class, IDomainObject
{
Guard.NotNull(updater, nameof(updater));
return InvokeSyncedAsync(context, updater, true);
}
private async Task<T> InvokeAsync<T>(CommandContext context, Func<T, Task> handler, bool isUpdate) where T : class, IDomainObject
{
Guard.NotNull(context, nameof(context));
var domainCommand = GetCommand(context);
var domainObjectId = domainCommand.AggregateId;
var domainObject = await stateFactory.CreateAsync<T>(domainObjectId);
if (domainCommand.ExpectedVersion != EtagVersion.Any && domainCommand.ExpectedVersion != domainObject.Version)
{
throw new DomainObjectVersionException(domainObjectId.ToString(), typeof(T), domainObject.Version, domainCommand.ExpectedVersion);
}
await handler(domainObject);
await domainObject.WriteAsync();
if (!context.IsCompleted)
{
if (isUpdate)
{
context.Complete(new EntitySavedResult(domainObject.Version));
}
else
{
context.Complete(EntityCreatedResult.Create(domainObjectId, domainObject.Version));
}
}
return domainObject;
}
private async Task<T> InvokeSyncedAsync<T>(CommandContext context, Func<T, Task> handler, bool isUpdate) where T : class, IDomainObject
{
Guard.NotNull(context, nameof(context));
var domainCommand = GetCommand(context);
var domainObjectId = domainCommand.AggregateId;
using (await lockPool.LockAsync(Tuple.Create(typeof(T), domainObjectId)))
{
var domainObject = await stateFactory.GetSingleAsync<T>(domainObjectId);
if (domainCommand.ExpectedVersion != EtagVersion.Any && domainCommand.ExpectedVersion != domainObject.Version)
{
throw new DomainObjectVersionException(domainObjectId.ToString(), typeof(T), domainObject.Version, domainCommand.ExpectedVersion);
}
await handler(domainObject);
try
{
await domainObject.WriteAsync();
stateFactory.Synchronize<T, Guid>(domainObjectId);
}
catch
{
stateFactory.Remove<T, Guid>(domainObjectId);
throw;
}
if (!context.IsCompleted)
{
if (isUpdate)
{
context.Complete(new EntitySavedResult(domainObject.Version));
}
else
{
context.Complete(EntityCreatedResult.Create(domainObjectId, domainObject.Version));
}
}
return domainObject;
}
}
private static IAggregateCommand GetCommand(CommandContext context)
{
if (!(context.Command is IAggregateCommand command))
{
throw new ArgumentException("Context must have an aggregate command.", nameof(context));
}
Guard.NotEmpty(command.AggregateId, "context.Command.AggregateId");
return command;
}
}
}

21
src/Squidex.Infrastructure/Commands/CommandExtensions.cs

@ -5,7 +5,6 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.Tasks;
@ -13,26 +12,6 @@ namespace Squidex.Infrastructure.Commands
{
public static class CommandExtensions
{
public static Task<T> CreateAsync<T>(this IAggregateHandler handler, CommandContext context, Action<T> creator) where T : class, IDomainObject
{
return handler.CreateAsync(context, creator.ToAsync());
}
public static Task<T> UpdateAsync<T>(this IAggregateHandler handler, CommandContext context, Action<T> updater) where T : class, IDomainObject
{
return handler.UpdateAsync(context, updater.ToAsync());
}
public static Task<T> CreateSyncedAsync<T>(this IAggregateHandler handler, CommandContext context, Action<T> creator) where T : class, IDomainObject
{
return handler.CreateSyncedAsync(context, creator.ToAsync());
}
public static Task<T> UpdateSyncedAsync<T>(this IAggregateHandler handler, CommandContext context, Action<T> updater) where T : class, IDomainObject
{
return handler.UpdateSyncedAsync(context, updater.ToAsync());
}
public static Task HandleAsync(this ICommandMiddleware commandMiddleware, CommandContext context)
{
return commandMiddleware.HandleAsync(context, () => TaskHelper.Done);

104
src/Squidex.Infrastructure/Commands/DomainObjectBase.cs

@ -1,104 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.States;
namespace Squidex.Infrastructure.Commands
{
public abstract class DomainObjectBase<T> : IDomainObject where T : IDomainState, new()
{
private readonly List<Envelope<IEvent>> uncomittedEvents = new List<Envelope<IEvent>>();
private Guid id;
private T snapshot = new T { Version = EtagVersion.Empty };
private IPersistence<T> persistence;
public long Version
{
get { return snapshot.Version; }
}
public T Snapshot
{
get { return snapshot; }
}
public Task ActivateAsync(Guid key, IStore<Guid> store)
{
id = key;
persistence = store.WithSnapshotsAndEventSourcing<T, Guid>(GetType(), key, ApplySnapshot, ApplyEvent);
return persistence.ReadAsync();
}
public void RaiseEvent(IEvent @event)
{
RaiseEvent(Envelope.Create(@event));
}
public virtual void RaiseEvent(Envelope<IEvent> @event)
{
Guard.NotNull(@event, nameof(@event));
@event.SetAggregateId(id);
ApplyEvent(@event);
uncomittedEvents.Add(@event);
}
public IReadOnlyList<Envelope<IEvent>> GetUncomittedEvents()
{
return uncomittedEvents;
}
public void ClearUncommittedEvents()
{
uncomittedEvents.Clear();
}
public virtual void ApplySnapshot(T newSnapshot)
{
snapshot = newSnapshot;
}
public virtual void ApplyEvent(Envelope<IEvent> @event)
{
}
public Task WriteSnapshotAsync()
{
snapshot.Version = persistence.Version;
return persistence.WriteSnapshotAsync(snapshot);
}
public async Task WriteAsync()
{
var events = uncomittedEvents.ToArray();
if (events.Length > 0)
{
try
{
snapshot.Version = persistence.Version + events.Length;
await persistence.WriteEventsAsync(events);
await persistence.WriteSnapshotAsync(snapshot);
}
finally
{
uncomittedEvents.Clear();
}
}
}
}
}

16
src/Squidex.Infrastructure/Commands/DomainObjectGrain.cs

@ -9,6 +9,8 @@ using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.States;
@ -32,12 +34,18 @@ namespace Squidex.Infrastructure.Commands
get { return snapshot.Version + uncomittedEvents.Count; }
}
protected T Snapshot
public T Snapshot
{
get { return snapshot; }
}
protected DomainObjectGrain(IStore<Guid> store)
: this(store, null, null)
{
}
protected DomainObjectGrain(IStore<Guid> store, IGrainIdentity identity, IGrainRuntime runtime)
: base(identity, runtime)
{
Guard.NotNull(store, nameof(store));
@ -194,9 +202,11 @@ namespace Squidex.Infrastructure.Commands
}
}
public Task<J<object>> ExecuteAsync(J<IAggregateCommand> command)
public async Task<J<object>> ExecuteAsync(J<IAggregateCommand> command)
{
return ExecuteAsync(command.Value).ContinueWith(x => x.Result.AsJ());
var result = await ExecuteAsync(command.Value);
return result.AsJ();
}
public abstract Task<object> ExecuteAsync(IAggregateCommand command);

23
src/Squidex.Infrastructure/Commands/IAggregateHandler.cs

@ -1,23 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.Commands
{
public interface IAggregateHandler
{
Task<T> CreateAsync<T>(CommandContext context, Func<T, Task> creator) where T : class, IDomainObject;
Task<T> CreateSyncedAsync<T>(CommandContext context, Func<T, Task> creator) where T : class, IDomainObject;
Task<T> UpdateAsync<T>(CommandContext context, Func<T, Task> updater) where T : class, IDomainObject;
Task<T> UpdateSyncedAsync<T>(CommandContext context, Func<T, Task> updater) where T : class, IDomainObject;
}
}

20
src/Squidex.Infrastructure/Commands/IDomainObject.cs

@ -1,20 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Squidex.Infrastructure.States;
namespace Squidex.Infrastructure.Commands
{
public interface IDomainObject : IStatefulObject<Guid>
{
long Version { get; }
Task WriteAsync();
}
}

34
src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

@ -49,6 +49,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
IGrainIdentity identity,
IGrainRuntime runtime,
ISemanticLog log)
: base(identity, runtime)
{
Guard.NotNull(log, nameof(log));
Guard.NotNull(store, nameof(store));
@ -71,16 +72,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString());
persistence = store.WithSnapshots<EventConsumerState, string>(GetType(), this.GetPrimaryKeyString(), s => state = s);
persistence = store.WithSnapshots<EventConsumerState, string>(GetType(), eventConsumer.Name, s => state = s);
return persistence.ReadAsync();
}
protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
{
return new RetrySubscription(eventStore, new WrapperSubscription(this.AsReference<IEventConsumerGrain>(), scheduler), streamFilter, position);
}
public Task<Immutable<EventConsumerInfo>> GetStateAsync()
{
return Task.FromResult(state.ToInfo(this.eventConsumer.Name).AsImmutable());
@ -121,13 +117,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
});
}
public Task WakeUpAsync()
{
currentSubscription?.WakeUp();
return TaskHelper.Done;
}
public Task ActivateAsync()
{
if (!state.IsStopped)
@ -276,6 +265,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
currentSubscription?.StopAsync().Forget();
currentSubscription = CreateSubscription(eventStore, eventConsumer.EventsFilter, position);
}
else
{
currentSubscription.WakeUp();
}
}
private Envelope<IEvent> ParseKnownEvent(StoredEvent message)
@ -296,5 +289,20 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
return null;
}
}
protected virtual IEventConsumerGrain GetSelf()
{
return this.AsReference<IEventConsumerGrain>();
}
protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, IEventSubscriber subscriber, string streamFilter, string position)
{
return new RetrySubscription(eventStore, subscriber, streamFilter, position);
}
private IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
{
return CreateSubscription(eventStore, new WrapperSubscription(GetSelf(), scheduler), streamFilter, position);
}
}
}

24
src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs

@ -14,7 +14,6 @@ using Orleans;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
@ -43,28 +42,18 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
DelayDeactivation(TimeSpan.FromDays(1));
RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10));
RegisterTimer(x => WakeUpAsync(null), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
RegisterTimer(x => ActivateAsync(null), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
return Task.FromResult(true);
}
public Task ActivateAsync()
{
var tasks =
eventConsumers
.Select(c => GrainFactory.GetGrain<IEventConsumerGrain>(c.Name))
.Select(c => c.ActivateAsync());
return Task.WhenAll(tasks);
}
public Task WakeUpAsync(string streamName)
public Task ActivateAsync(string streamName)
{
var tasks =
eventConsumers
.Where(c => streamName == null || Regex.IsMatch(streamName, c.EventsFilter))
.Select(c => GrainFactory.GetGrain<IEventConsumerGrain>(c.Name))
.Select(c => c.WakeUpAsync());
.Select(c => c.ActivateAsync());
return Task.WhenAll(tasks);
}
@ -102,9 +91,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
return eventConsumer.StopAsync();
}
public Task ActivateAsync()
{
return ActivateAsync(null);
}
public Task ReceiveReminder(string reminderName, TickStatus status)
{
return TaskHelper.Done;
return ActivateAsync(null);
}
}
}

3
src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerGrain.cs

@ -7,6 +7,7 @@
using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
using Squidex.Infrastructure.Orleans;
@ -22,8 +23,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
Task ResetAsync();
Task WakeUpAsync();
Task OnEventAsync(Immutable<IEventSubscription> subscription, Immutable<StoredEvent> storedEvent);
Task OnErrorAsync(Immutable<IEventSubscription> subscription, Immutable<Exception> exception);

2
src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerManagerGrain.cs

@ -14,7 +14,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
{
public interface IEventConsumerManagerGrain : IBackgroundGrain
{
Task WakeUpAsync(string streamName);
Task ActivateAsync(string streamName);
Task StopAsync(string consumerName);

2
src/Squidex.Infrastructure/EventSourcing/Grains/OrleansEventNotifier.cs

@ -29,7 +29,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
public void NotifyEventsStored(string streamName)
{
eventConsumerManagerGrain?.WakeUpAsync(streamName);
eventConsumerManagerGrain?.ActivateAsync(streamName);
}
public IDisposable Subscribe(Action<string> handler)

4
src/Squidex.Infrastructure/EventSourcing/IEventNotifier.cs

@ -5,14 +5,10 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
namespace Squidex.Infrastructure.EventSourcing
{
public interface IEventNotifier
{
void NotifyEventsStored(string streamName);
IDisposable Subscribe(Action<string> handler);
}
}

6
src/Squidex.Infrastructure/Orleans/J{T}.cs

@ -45,13 +45,13 @@ namespace Squidex.Infrastructure.Orleans
}
[CopierMethod]
private static object Copy(object input, ICopyContext context)
public static object Copy(object input, ICopyContext context)
{
return input;
}
[SerializerMethod]
private static void Serialize(object input, ISerializationContext context, Type expected)
public static void Serialize(object input, ISerializationContext context, Type expected)
{
var stream = new MemoryStream();
@ -69,7 +69,7 @@ namespace Squidex.Infrastructure.Orleans
}
[DeserializerMethod]
private static object Deserialize(Type expected, IDeserializationContext context)
public static object Deserialize(Type expected, IDeserializationContext context)
{
var outLength = context.StreamReader.ReadInt();
var outBytes = context.StreamReader.ReadBytes(outLength);

31
src/Squidex.Infrastructure/States/IStateFactory.cs

@ -1,31 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.States
{
public interface IStateFactory
{
Task<T> GetSingleAsync<T>(string key) where T : IStatefulObject<string>;
Task<T> GetSingleAsync<T>(Guid key) where T : IStatefulObject<Guid>;
Task<T> GetSingleAsync<T, TKey>(TKey key) where T : IStatefulObject<TKey>;
Task<T> CreateAsync<T>(string key) where T : IStatefulObject<string>;
Task<T> CreateAsync<T>(Guid key) where T : IStatefulObject<Guid>;
Task<T> CreateAsync<T, TKey>(TKey key) where T : IStatefulObject<TKey>;
void Remove<T, TKey>(TKey key) where T : IStatefulObject<TKey>;
void Synchronize<T, TKey>(TKey key) where T : IStatefulObject<TKey>;
}
}

16
src/Squidex.Infrastructure/States/IStatefulObject.cs

@ -1,16 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Threading.Tasks;
namespace Squidex.Infrastructure.States
{
public interface IStatefulObject<TKey>
{
Task ActivateAsync(TKey key, IStore<TKey> store);
}
}

14
src/Squidex.Infrastructure/States/InvalidateMessage.cs

@ -1,14 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Infrastructure.States
{
public sealed class InvalidateMessage
{
public string Key { get; set; }
}
}

158
src/Squidex.Infrastructure/States/StateFactory.cs

@ -1,158 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Memory;
using Squidex.Infrastructure.EventSourcing;
#pragma warning disable RECS0096 // Type parameter is never used
namespace Squidex.Infrastructure.States
{
public sealed class StateFactory : DisposableObjectBase, IInitializable, IStateFactory
{
private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(10);
private readonly IPubSub pubSub;
private readonly IMemoryCache statesCache;
private readonly IServiceProvider services;
private readonly IStreamNameResolver streamNameResolver;
private readonly IEventStore eventStore;
private readonly IEventDataFormatter eventDataFormatter;
private readonly object lockObject = new object();
private IDisposable pubSubSubscription;
public sealed class ObjectHolder<T, TKey> where T : IStatefulObject<TKey>
{
private readonly Task activationTask;
private readonly T obj;
public ObjectHolder(T obj, TKey key, IStore<TKey> store)
{
this.obj = obj;
activationTask = obj.ActivateAsync(key, store);
}
public async Task<T> ActivateAsync()
{
await activationTask;
return obj;
}
}
public StateFactory(
IPubSub pubSub,
IMemoryCache statesCache,
IEventStore eventStore,
IEventDataFormatter eventDataFormatter,
IServiceProvider services,
IStreamNameResolver streamNameResolver)
{
Guard.NotNull(services, nameof(services));
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventDataFormatter, nameof(eventDataFormatter));
Guard.NotNull(pubSub, nameof(pubSub));
Guard.NotNull(statesCache, nameof(statesCache));
Guard.NotNull(streamNameResolver, nameof(streamNameResolver));
this.eventStore = eventStore;
this.eventDataFormatter = eventDataFormatter;
this.pubSub = pubSub;
this.services = services;
this.statesCache = statesCache;
this.streamNameResolver = streamNameResolver;
}
public void Initialize()
{
pubSubSubscription = pubSub.Subscribe<InvalidateMessage>(m =>
{
lock (lockObject)
{
statesCache.Remove(m.Key);
}
});
}
public Task<T> CreateAsync<T>(string key) where T : IStatefulObject<string>
{
return CreateAsync<T, string>(key);
}
public Task<T> CreateAsync<T>(Guid key) where T : IStatefulObject<Guid>
{
return CreateAsync<T, Guid>(key);
}
public async Task<T> CreateAsync<T, TKey>(TKey key) where T : IStatefulObject<TKey>
{
Guard.NotNull(key, nameof(key));
var stateStore = new Store<TKey>(eventStore, eventDataFormatter, services, streamNameResolver);
var state = (T)services.GetService(typeof(T));
await state.ActivateAsync(key, stateStore);
return state;
}
public Task<T> GetSingleAsync<T>(string key) where T : IStatefulObject<string>
{
return GetSingleAsync<T, string>(key);
}
public Task<T> GetSingleAsync<T>(Guid key) where T : IStatefulObject<Guid>
{
return GetSingleAsync<T, Guid>(key);
}
public Task<T> GetSingleAsync<T, TKey>(TKey key) where T : IStatefulObject<TKey>
{
Guard.NotNull(key, nameof(key));
lock (lockObject)
{
if (statesCache.TryGetValue<ObjectHolder<T, TKey>>(key, out var stateObj))
{
return stateObj.ActivateAsync();
}
var state = (T)services.GetService(typeof(T));
var stateStore = new Store<TKey>(eventStore, eventDataFormatter, services, streamNameResolver);
stateObj = new ObjectHolder<T, TKey>(state, key, stateStore);
statesCache.CreateEntry(key)
.SetValue(stateObj)
.SetAbsoluteExpiration(CacheDuration)
.Dispose();
return stateObj.ActivateAsync();
}
}
public void Remove<T, TKey>(TKey key) where T : IStatefulObject<TKey>
{
statesCache.Remove(key);
}
public void Synchronize<T, TKey>(TKey key) where T : IStatefulObject<TKey>
{
pubSub.Publish(new InvalidateMessage { Key = key.ToString() }, false);
}
protected override void DisposeObject(bool disposing)
{
if (disposing && pubSubSubscription != null)
{
pubSubSubscription.Dispose();
}
}
}
}

5
src/Squidex/Config/Domain/ReadServices.cs

@ -28,7 +28,6 @@ using Squidex.Infrastructure.Assets;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.EventSourcing.Grains;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.States;
using Squidex.Pipeline;
namespace Squidex.Config.Domain
@ -37,10 +36,6 @@ namespace Squidex.Config.Domain
{
public static void AddMyReadServices(this IServiceCollection services, IConfiguration config)
{
services.AddSingletonAs<StateFactory>()
.As<IStateFactory>()
.As<IInitializable>();
services.AddSingletonAs<OrleansEventNotifier>()
.As<IEventNotifier>()
.As<IInitializable>();

7
src/Squidex/Config/Domain/WriteServices.cs

@ -18,6 +18,7 @@ using Squidex.Domain.Apps.Entities.Apps.Commands;
using Squidex.Domain.Apps.Entities.Apps.Templates;
using Squidex.Domain.Apps.Entities.Assets;
using Squidex.Domain.Apps.Entities.Contents;
using Squidex.Domain.Apps.Entities.Contents.Commands;
using Squidex.Domain.Apps.Entities.Rules;
using Squidex.Domain.Apps.Entities.Rules.Commands;
using Squidex.Domain.Apps.Entities.Schemas;
@ -56,10 +57,10 @@ namespace Squidex.Config.Domain
services.AddSingletonAs<AssetCommandMiddleware>()
.As<ICommandMiddleware>();
services.AddSingletonAs<ContentCommandMiddleware>()
services.AddSingletonAs<GrainCommandMiddleware<AppCommand, IAppGrain>>()
.As<ICommandMiddleware>();
services.AddSingletonAs<GrainCommandMiddleware<AppCommand, IAppGrain>>()
services.AddSingletonAs<GrainCommandMiddleware<ContentCommand, IContentGrain>>()
.As<ICommandMiddleware>();
services.AddSingletonAs<GrainCommandMiddleware<SchemaCommand, ISchemaGrain>>()
@ -95,7 +96,7 @@ namespace Squidex.Config.Domain
services.AddTransientAs<AppGrain>()
.AsSelf();
services.AddTransientAs<ContentDomainObject>()
services.AddTransientAs<ContentGrain>()
.AsSelf();
services.AddSingleton(c =>

27
src/Squidex/Pipeline/CommandMiddlewares/EnrichWithAppIdCommandMiddleware.cs

@ -9,6 +9,7 @@ using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Squidex.Domain.Apps.Entities;
using Squidex.Domain.Apps.Entities.Apps.Commands;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
@ -32,17 +33,31 @@ namespace Squidex.Pipeline.CommandMiddlewares
if (context.Command is IAppCommand appCommand && appCommand.AppId == null)
{
var appFeature = httpContextAccessor.HttpContext.Features.Get<IAppFeature>();
var appId = GetAppId();
if (appFeature == null)
{
throw new InvalidOperationException("Cannot resolve app.");
}
appCommand.AppId = appId;
}
if (context.Command is AppCommand appSelfCommand && appSelfCommand.AppId == Guid.Empty)
{
var appId = GetAppId();
appCommand.AppId = new NamedId<Guid>(appFeature.App.Id, appFeature.App.Name);
appSelfCommand.AppId = appId.Id;
}
return next();
}
private NamedId<Guid> GetAppId()
{
var appFeature = httpContextAccessor.HttpContext.Features.Get<IAppFeature>();
if (appFeature?.App == null)
{
throw new InvalidOperationException("Cannot resolve app.");
}
return new NamedId<Guid>(appFeature.App.Id, appFeature.App.Name);
}
}
}

48
src/Squidex/Pipeline/CommandMiddlewares/EnrichWithSchemaIdCommandMiddleware.cs

@ -36,28 +36,42 @@ namespace Squidex.Pipeline.CommandMiddlewares
if (context.Command is ISchemaCommand schemaCommand && schemaCommand.SchemaId == null)
{
NamedId<Guid> appId = null;
var schemaId = await GetSchemaIdAsync(context);
if (context.Command is IAppCommand appCommand)
{
appId = appCommand.AppId;
}
schemaCommand.SchemaId = schemaId;
}
if (appId == null)
{
var appFeature = actionContextAccessor.ActionContext.HttpContext.Features.Get<IAppFeature>();
if (context.Command is SchemaCommand schemaSelfCommand && schemaSelfCommand.SchemaId == Guid.Empty)
{
var schemaId = await GetSchemaIdAsync(context);
if (appFeature != null && appFeature.App != null)
{
appId = new NamedId<Guid>(appFeature.App.Id, appFeature.App.Name);
}
}
schemaSelfCommand.SchemaId = schemaId?.Id ?? Guid.Empty;
}
await next();
}
private async Task<NamedId<Guid>> GetSchemaIdAsync(CommandContext context)
{
NamedId<Guid> appId = null;
if (context.Command is IAppCommand appCommand)
{
appId = appCommand.AppId;
}
if (appId == null)
if (appId == null)
{
var appFeature = actionContextAccessor.ActionContext.HttpContext.Features.Get<IAppFeature>();
if (appFeature != null && appFeature.App != null)
{
return;
appId = new NamedId<Guid>(appFeature.App.Id, appFeature.App.Name);
}
}
if (appId != null)
{
var routeValues = actionContextAccessor.ActionContext.RouteData.Values;
if (routeValues.ContainsKey("name"))
@ -80,11 +94,11 @@ namespace Squidex.Pipeline.CommandMiddlewares
throw new DomainObjectNotFoundException(schemaName, typeof(ISchemaEntity));
}
schemaCommand.SchemaId = new NamedId<Guid>(schema.Id, schema.Name);
return new NamedId<Guid>(schema.Id, schema.Name);
}
}
await next();
return null;
}
}
}

284
tests/Squidex.Infrastructure.Tests/Commands/AggregateHandlerTests.cs

@ -1,284 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FakeItEasy;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.Tasks;
using Squidex.Infrastructure.TestHelpers;
using Xunit;
namespace Squidex.Infrastructure.Commands
{
public class AggregateHandlerTests
{
private readonly IServiceProvider serviceProvider = A.Fake<IServiceProvider>();
private readonly IStore<Guid> store = A.Fake<IStore<Guid>>();
private readonly IStateFactory stateFactory = A.Fake<IStateFactory>();
private readonly IPersistence<MyDomainState> persistence = A.Fake<IPersistence<MyDomainState>>();
private readonly Envelope<IEvent> event1 = new Envelope<IEvent>(new MyEvent());
private readonly Envelope<IEvent> event2 = new Envelope<IEvent>(new MyEvent());
private readonly CommandContext context;
private readonly CommandContext invalidContext = new CommandContext(A.Dummy<ICommand>(), A.Dummy<ICommandBus>());
private readonly Guid domainObjectId = Guid.NewGuid();
private readonly MyCommand command;
private readonly MyDomainObject domainObject = new MyDomainObject();
private readonly AggregateHandler sut;
public AggregateHandlerTests()
{
command = new MyCommand { AggregateId = domainObjectId, ExpectedVersion = EtagVersion.Any };
context = new CommandContext(command, A.Dummy<ICommandBus>());
A.CallTo(() => store.WithSnapshotsAndEventSourcing(domainObjectId, A<Func<MyDomainState, Task>>.Ignored, A<Func<Envelope<IEvent>, Task>>.Ignored))
.Returns(persistence);
A.CallTo(() => stateFactory.CreateAsync<MyDomainObject>(domainObjectId))
.Returns(Task.FromResult(domainObject));
A.CallTo(() => stateFactory.GetSingleAsync<MyDomainObject>(domainObjectId))
.Returns(Task.FromResult(domainObject));
sut = new AggregateHandler(stateFactory, serviceProvider);
domainObject.ActivateAsync(domainObjectId, store).Wait();
}
[Fact]
public Task Create_with_task_should_throw_exception_if_not_aggregate_command()
{
return Assert.ThrowsAnyAsync<ArgumentException>(() => sut.CreateAsync<MyDomainObject>(invalidContext, x => TaskHelper.False));
}
[Fact]
public Task Create_synced_with_task_should_throw_exception_if_not_aggregate_command()
{
return Assert.ThrowsAnyAsync<ArgumentException>(() => sut.CreateSyncedAsync<MyDomainObject>(invalidContext, x => TaskHelper.False));
}
[Fact]
public Task Create_with_task_should_should_throw_exception_if_version_is_wrong()
{
command.ExpectedVersion = 2;
return Assert.ThrowsAnyAsync<DomainObjectVersionException>(() => sut.CreateAsync<MyDomainObject>(context, x => TaskHelper.False));
}
[Fact]
public Task Create_synced_with_task_should_should_throw_exception_if_version_is_wrong()
{
command.ExpectedVersion = 2;
return Assert.ThrowsAnyAsync<DomainObjectVersionException>(() => sut.CreateSyncedAsync<MyDomainObject>(context, x => TaskHelper.False));
}
[Fact]
public async Task Create_with_task_should_create_domain_object_and_save()
{
MyDomainObject passedDomainObject = null;
await sut.CreateAsync<MyDomainObject>(context, async x =>
{
x.RaiseEvent(new MyEvent());
await Task.Yield();
passedDomainObject = x;
});
Assert.Equal(domainObject, passedDomainObject);
Assert.NotNull(context.Result<EntityCreatedResult<Guid>>());
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.MustHaveHappened();
}
[Fact]
public async Task Create_synced_with_task_should_create_domain_object_and_save()
{
MyDomainObject passedDomainObject = null;
await sut.CreateSyncedAsync<MyDomainObject>(context, async x =>
{
x.RaiseEvent(new MyEvent());
x.RaiseEvent(new MyEvent());
await Task.Yield();
passedDomainObject = x;
});
Assert.Equal(2, domainObject.Snapshot.Version);
Assert.Equal(domainObject, passedDomainObject);
Assert.NotNull(context.Result<EntityCreatedResult<Guid>>());
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.MustHaveHappened();
}
[Fact]
public async Task Create_should_create_domain_object_and_save()
{
MyDomainObject passedDomainObject = null;
await sut.CreateAsync<MyDomainObject>(context, x =>
{
x.RaiseEvent(new MyEvent());
x.RaiseEvent(new MyEvent());
passedDomainObject = x;
});
Assert.Equal(2, domainObject.Snapshot.Version);
Assert.Equal(domainObject, passedDomainObject);
Assert.NotNull(context.Result<EntityCreatedResult<Guid>>());
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.MustHaveHappened();
}
[Fact]
public async Task Create_synced_should_create_domain_object_and_save()
{
MyDomainObject passedDomainObject = null;
await sut.CreateSyncedAsync<MyDomainObject>(context, x =>
{
x.RaiseEvent(new MyEvent());
x.RaiseEvent(new MyEvent());
passedDomainObject = x;
});
Assert.Equal(2, domainObject.Snapshot.Version);
Assert.Equal(domainObject, passedDomainObject);
Assert.NotNull(context.Result<EntityCreatedResult<Guid>>());
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.MustHaveHappened();
}
[Fact]
public Task Update_with_task_should_throw_exception_if_not_aggregate_command()
{
return Assert.ThrowsAnyAsync<ArgumentException>(() => sut.UpdateAsync<MyDomainObject>(invalidContext, x => TaskHelper.False));
}
[Fact]
public Task Update_synced_with_task_should_throw_exception_if_not_aggregate_command()
{
return Assert.ThrowsAnyAsync<ArgumentException>(() => sut.UpdateSyncedAsync<MyDomainObject>(invalidContext, x => TaskHelper.False));
}
[Fact]
public Task Update_with_task_should_should_throw_exception_if_version_is_wrong()
{
command.ExpectedVersion = 2;
return Assert.ThrowsAnyAsync<DomainObjectVersionException>(() => sut.UpdateAsync<MyDomainObject>(context, x => TaskHelper.False));
}
[Fact]
public Task Update_synced_with_task_should_should_throw_exception_if_version_is_wrong()
{
command.ExpectedVersion = 2;
return Assert.ThrowsAnyAsync<DomainObjectVersionException>(() => sut.UpdateSyncedAsync<MyDomainObject>(context, x => TaskHelper.False));
}
[Fact]
public async Task Update_with_task_should_create_domain_object_and_save()
{
MyDomainObject passedDomainObject = null;
await sut.UpdateAsync<MyDomainObject>(context, async x =>
{
x.RaiseEvent(new MyEvent());
x.RaiseEvent(new MyEvent());
await Task.Yield();
passedDomainObject = x;
});
Assert.Equal(2, domainObject.Snapshot.Version);
Assert.Equal(domainObject, passedDomainObject);
Assert.NotNull(context.Result<EntitySavedResult>());
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.MustHaveHappened();
}
[Fact]
public async Task Update_synced_with_task_should_create_domain_object_and_save()
{
MyDomainObject passedDomainObject = null;
await sut.UpdateSyncedAsync<MyDomainObject>(context, async x =>
{
x.RaiseEvent(new MyEvent());
x.RaiseEvent(new MyEvent());
await Task.Yield();
passedDomainObject = x;
});
Assert.Equal(2, domainObject.Snapshot.Version);
Assert.Equal(domainObject, passedDomainObject);
Assert.NotNull(context.Result<EntitySavedResult>());
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.MustHaveHappened();
}
[Fact]
public async Task Update_should_create_domain_object_and_save()
{
MyDomainObject passedDomainObject = null;
await sut.UpdateAsync<MyDomainObject>(context, x =>
{
x.RaiseEvent(new MyEvent());
x.RaiseEvent(new MyEvent());
passedDomainObject = x;
});
Assert.Equal(2, domainObject.Snapshot.Version);
Assert.Equal(domainObject, passedDomainObject);
Assert.NotNull(context.Result<EntitySavedResult>());
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.MustHaveHappened();
}
[Fact]
public async Task Update_synced_should_create_domain_object_and_save()
{
MyDomainObject passedDomainObject = null;
await sut.UpdateSyncedAsync<MyDomainObject>(context, x =>
{
x.RaiseEvent(new MyEvent());
x.RaiseEvent(new MyEvent());
passedDomainObject = x;
});
Assert.Equal(2, domainObject.Snapshot.Version);
Assert.Equal(domainObject, passedDomainObject);
Assert.NotNull(context.Result<EntitySavedResult>());
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.MustHaveHappened();
}
}
}

88
tests/Squidex.Infrastructure.Tests/Commands/DomainObjectBaseTests.cs

@ -1,88 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FakeItEasy;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.TestHelpers;
using Xunit;
namespace Squidex.Infrastructure.Commands
{
public class DomainObjectBaseTests
{
private readonly IStore<Guid> store = A.Fake<IStore<Guid>>();
private readonly IPersistence<MyDomainState> persistence = A.Fake<IPersistence<MyDomainState>>();
private readonly Guid id = Guid.NewGuid();
private readonly MyDomainObject sut = new MyDomainObject();
public DomainObjectBaseTests()
{
A.CallTo(() => store.WithSnapshotsAndEventSourcing(id, A<Func<MyDomainState, Task>>.Ignored, A<Func<Envelope<IEvent>, Task>>.Ignored))
.Returns(persistence);
}
[Fact]
public void Should_instantiate()
{
Assert.Equal(EtagVersion.Empty, sut.Version);
}
[Fact]
public async Task Should_write_state_and_events_when_saved()
{
await sut.ActivateAsync(id, store);
var event1 = new MyEvent();
var event2 = new MyEvent();
var newState = new MyDomainState();
sut.RaiseEvent(event1);
sut.RaiseEvent(event2);
sut.ApplySnapshot(newState);
await sut.WriteAsync();
A.CallTo(() => persistence.WriteSnapshotAsync(newState))
.MustHaveHappened();
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 2)))
.MustHaveHappened();
Assert.Empty(sut.GetUncomittedEvents());
}
[Fact]
public async Task Should_not_ignore_exception_when_saving()
{
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.Throws(new InvalidOperationException());
await sut.ActivateAsync(id, store);
var event1 = new MyEvent();
var event2 = new MyEvent();
var newState = new MyDomainState();
sut.RaiseEvent(event1);
sut.RaiseEvent(event2);
sut.ApplySnapshot(newState);
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.WriteAsync());
A.CallTo(() => persistence.WriteSnapshotAsync(newState))
.MustNotHaveHappened();
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 2)))
.MustHaveHappened();
Assert.Empty(sut.GetUncomittedEvents());
}
}
}

284
tests/Squidex.Infrastructure.Tests/Commands/DomainObjectGrainTests.cs

@ -0,0 +1,284 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using FakeItEasy;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.TestHelpers;
using Xunit;
namespace Squidex.Infrastructure.Commands
{
public sealed class DomainObjectGrainTests
{
private readonly IStore<Guid> store = A.Fake<IStore<Guid>>();
private readonly IGrainIdentity identity = A.Fake<IGrainIdentity>();
private readonly IPersistence<MyDomainState> persistence = A.Fake<IPersistence<MyDomainState>>();
private readonly Guid id = Guid.NewGuid();
private readonly MyDomainObject sut;
public sealed class MyDomainState : IDomainState
{
public long Version { get; set; }
public int Value { get; set; }
}
public sealed class ValueChanged : IEvent
{
public int Value { get; set; }
}
public sealed class CreateAuto : MyCommand
{
public int Value { get; set; }
}
public sealed class CreateCustom : MyCommand
{
public int Value { get; set; }
}
public sealed class UpdateAuto : MyCommand
{
public int Value { get; set; }
}
public sealed class UpdateCustom : MyCommand
{
public int Value { get; set; }
}
public sealed class MyDomainObject : DomainObjectGrain<MyDomainState>
{
public MyDomainObject(IStore<Guid> store, IGrainIdentity identity, IGrainRuntime runtime)
: base(store, identity, runtime)
{
}
public override Task<object> ExecuteAsync(IAggregateCommand command)
{
switch (command)
{
case CreateAuto createAuto:
return CreateAsync(createAuto, c =>
{
RaiseEvent(new ValueChanged { Value = c.Value });
});
case CreateCustom createCustom:
return CreateReturnAsync(createCustom, c =>
{
RaiseEvent(new ValueChanged { Value = c.Value });
return "CREATED";
});
case UpdateAuto updateAuto:
return UpdateAsync(updateAuto, c =>
{
RaiseEvent(new ValueChanged { Value = c.Value });
});
case UpdateCustom updateCustom:
return UpdateReturnAsync(updateCustom, c =>
{
RaiseEvent(new ValueChanged { Value = c.Value });
return "UPDATED";
});
}
return Task.FromResult<object>(null);
}
public override void ApplyEvent(Envelope<IEvent> @event)
{
if (@event.Payload is ValueChanged valueChanged)
{
ApplySnapshot(new MyDomainState { Value = valueChanged.Value });
}
}
}
public DomainObjectGrainTests()
{
A.CallTo(() => identity.PrimaryKey)
.Returns(id);
A.CallTo(() => store.WithSnapshotsAndEventSourcing(typeof(MyDomainObject), id, A<Func<MyDomainState, Task>>.Ignored, A<Func<Envelope<IEvent>, Task>>.Ignored))
.Returns(persistence);
sut = new MyDomainObject(store, identity, A.Fake<IGrainRuntime>());
}
[Fact]
public void Should_instantiate()
{
Assert.Equal(EtagVersion.Empty, sut.Version);
}
[Fact]
public async Task Should_write_state_and_events_when_created()
{
await SetupEmptyAsync();
var result = await sut.ExecuteAsync(C(new CreateAuto { Value = 5 }));
A.CallTo(() => persistence.WriteSnapshotAsync(A<MyDomainState>.That.Matches(x => x.Value == 5)))
.MustHaveHappened();
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 1)))
.MustHaveHappened();
Assert.True(result.Value is EntityCreatedResult<Guid>);
Assert.Empty(sut.GetUncomittedEvents());
Assert.Equal(5, sut.Snapshot.Value);
}
[Fact]
public async Task Should_write_state_and_events_when_updated()
{
await SetupCreatedAsync();
var result = await sut.ExecuteAsync(C(new UpdateAuto { Value = 5 }));
A.CallTo(() => persistence.WriteSnapshotAsync(A<MyDomainState>.That.Matches(x => x.Value == 5)))
.MustHaveHappened();
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 1)))
.MustHaveHappened();
Assert.True(result.Value is EntitySavedResult);
Assert.Empty(sut.GetUncomittedEvents());
Assert.Equal(5, sut.Snapshot.Value);
}
[Fact]
public async Task Should_throw_exception_when_already_created()
{
await SetupCreatedAsync();
await Assert.ThrowsAsync<DomainException>(() => sut.ExecuteAsync(C(new CreateAuto())));
}
[Fact]
public async Task Should_throw_exception_when_not_created()
{
await SetupEmptyAsync();
await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => sut.ExecuteAsync(C(new UpdateAuto())));
}
[Fact]
public async Task Should_return_custom_result_on_create()
{
await SetupEmptyAsync();
var result = await sut.ExecuteAsync(C(new CreateCustom()));
Assert.Equal("CREATED", result.Value);
}
[Fact]
public async Task Should_return_custom_result_on_update()
{
await SetupCreatedAsync();
var result = await sut.ExecuteAsync(C(new UpdateCustom()));
Assert.Equal("UPDATED", result.Value);
}
[Fact]
public async Task Should_throw_exception_when_other_verison_expected()
{
await SetupCreatedAsync();
await Assert.ThrowsAsync<DomainObjectVersionException>(() => sut.ExecuteAsync(C(new UpdateCustom { ExpectedVersion = 3 })));
}
[Fact]
public async Task Should_reset_state_when_writing_snapshot_for_create_failed()
{
await SetupEmptyAsync();
A.CallTo(() => persistence.WriteSnapshotAsync(A<MyDomainState>.Ignored))
.Throws(new InvalidOperationException());
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.ExecuteAsync(C(new CreateAuto())));
Assert.Empty(sut.GetUncomittedEvents());
Assert.Equal(0, sut.Snapshot.Value);
}
[Fact]
public async Task Should_reset_state_when_writing_snapshot_for_update_failed()
{
await SetupCreatedAsync();
A.CallTo(() => persistence.WriteSnapshotAsync(A<MyDomainState>.Ignored))
.Throws(new InvalidOperationException());
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.ExecuteAsync(C(new UpdateAuto())));
Assert.Empty(sut.GetUncomittedEvents());
Assert.Equal(0, sut.Snapshot.Value);
}
/*
[Fact]
public async Task Should_not_ignore_exception_when_saving()
{
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.Ignored))
.Throws(new InvalidOperationException());
await sut.ActivateAsync(id, store);
var event1 = new MyEvent();
var event2 = new MyEvent();
var newState = new MyDomainState();
sut.RaiseEvent(event1);
sut.RaiseEvent(event2);
sut.ApplySnapshot(newState);
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.WriteAsync());
A.CallTo(() => persistence.WriteSnapshotAsync(newState))
.MustNotHaveHappened();
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 2)))
.MustHaveHappened();
Assert.Empty(sut.GetUncomittedEvents());
}*/
private async Task SetupCreatedAsync()
{
await sut.OnActivateAsync();
await sut.ExecuteAsync(C(new CreateAuto()));
}
private static J<IAggregateCommand> C(IAggregateCommand command)
{
return command.AsJ();
}
private async Task SetupEmptyAsync()
{
await sut.OnActivateAsync();
}
}
}

49
tests/Squidex.Infrastructure.Tests/EventSourcing/DefaultEventNotifierTests.cs

@ -1,49 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using Xunit;
namespace Squidex.Infrastructure.EventSourcing
{
public class DefaultEventNotifierTests
{
private readonly DefaultEventNotifier sut = new DefaultEventNotifier(new InMemoryPubSub());
[Fact]
public void Should_invalidate_all_actions()
{
var handler1Handled = 0;
var handler2Handled = 0;
var streamNames = new List<string>();
sut.Subscribe(x =>
{
streamNames.Add(x);
handler1Handled++;
});
sut.NotifyEventsStored("a");
sut.Subscribe(x =>
{
streamNames.Add(x);
handler2Handled++;
});
sut.NotifyEventsStored("b");
Assert.Equal(2, handler1Handled);
Assert.Equal(1, handler2Handled);
Assert.Equal(streamNames.ToArray(), new[] { "a", "b", "b" });
}
}
}

173
tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs

@ -9,6 +9,9 @@ using System;
using System.Threading.Tasks;
using FakeItEasy;
using FluentAssertions;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.TestHelpers;
@ -20,20 +23,31 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
{
public sealed class MyEventConsumerGrain : EventConsumerGrain
{
public MyEventConsumerGrain(IEventStore eventStore, IEventDataFormatter eventDataFormatter, ISemanticLog log)
: base(eventStore, eventDataFormatter, log)
public MyEventConsumerGrain(
EventConsumerFactory eventConsumerFactory,
IStore<string> store,
IEventStore eventStore,
IEventDataFormatter eventDataFormatter,
IGrainIdentity identity,
IGrainRuntime runtime,
ISemanticLog log)
: base(eventConsumerFactory, store, eventStore, eventDataFormatter, identity, runtime, log)
{
}
protected override IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
protected override IEventConsumerGrain GetSelf()
{
return eventStore.CreateSubscription(this, streamFilter, position);
return this;
}
protected override IEventSubscription CreateSubscription(IEventStore eventStore, IEventSubscriber subscriber, string streamFilter, string position)
{
return eventStore.CreateSubscription(subscriber, streamFilter, position);
}
}
private readonly IEventConsumer eventConsumer = A.Fake<IEventConsumer>();
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventSubscriber sutSubscriber;
private readonly IEventSubscription eventSubscription = A.Fake<IEventSubscription>();
private readonly IPersistence<EventConsumerState> persistence = A.Fake<IPersistence<EventConsumerState>>();
private readonly ISemanticLog log = A.Fake<ISemanticLog>();
@ -53,8 +67,8 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
consumerName = eventConsumer.GetType().Name;
A.CallTo(() => store.WithSnapshots(consumerName, A<Func<EventConsumerState, Task>>.Ignored))
.Invokes(new Action<string, Func<EventConsumerState, Task>>((key, a) => apply = a))
A.CallTo(() => store.WithSnapshots(A<Type>.Ignored, consumerName, A<Func<EventConsumerState, Task>>.Ignored))
.Invokes(new Action<Type, string, Func<EventConsumerState, Task>>((t, key, a) => apply = a))
.Returns(persistence);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
@ -71,18 +85,23 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
sut = new MyEventConsumerGrain(eventStore, formatter, log);
sutSubscriber = sut;
sut = new MyEventConsumerGrain(
x => eventConsumer,
store,
eventStore,
formatter,
A.Fake<IGrainIdentity>(),
A.Fake<IGrainRuntime>(),
log);
}
[Fact]
public void Should_not_subscribe_to_event_store_when_stopped_in_db()
public async Task Should_not_subscribe_to_event_store_when_stopped_in_db()
{
state = state.Stopped();
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
sut.Dispose();
await sut.OnActivateAsync();
await sut.ActivateAsync();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
@ -91,11 +110,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
}
[Fact]
public void Should_subscribe_to_event_store_when_not_found_in_db()
public async Task Should_subscribe_to_event_store_when_not_found_in_db()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
sut.Dispose();
await sut.OnActivateAsync();
await sut.ActivateAsync();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
@ -104,11 +122,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
}
[Fact]
public void Should_subscribe_to_event_store_when_not_stopped_in_db()
public async Task Should_subscribe_to_event_store_when_not_stopped_in_db()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
sut.Dispose();
await sut.OnActivateAsync();
await sut.ActivateAsync();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
@ -117,14 +134,12 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
}
[Fact]
public void Should_stop_subscription_when_stopped()
public async Task Should_stop_subscription_when_stopped()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
sut.Stop();
sut.Stop();
sut.Dispose();
await sut.OnActivateAsync();
await sut.ActivateAsync();
await sut.StopAsync();
await sut.StopAsync();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
@ -136,13 +151,12 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
}
[Fact]
public void Should_reset_consumer_when_resetting()
public async Task Should_reset_consumer_when_resetting()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
sut.Stop();
sut.Reset();
sut.Dispose();
await sut.OnActivateAsync();
await sut.ActivateAsync();
await sut.StopAsync();
await sut.ResetAsync();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = null, Error = null });
@ -165,14 +179,12 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_invoke_and_update_position_when_event_received()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
await sut.OnActivateAsync();
await sut.ActivateAsync();
sut.Dispose();
await OnEventAsync(eventSubscription, @event);
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
@ -186,17 +198,15 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_ignore_old_events()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
A.CallTo(() => formatter.Parse(eventData, true))
.Throws(new TypeNameNotFoundException());
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
await sut.OnActivateAsync();
await sut.ActivateAsync();
sut.Dispose();
await OnEventAsync(eventSubscription, @event);
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
@ -210,14 +220,12 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_not_invoke_and_update_position_when_event_is_from_another_subscription()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(A.Fake<IEventSubscription>(), @event);
await sut.OnActivateAsync();
await sut.ActivateAsync();
sut.Dispose();
await OnEventAsync(A.Fake<IEventSubscription>(), @event);
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
@ -228,15 +236,13 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_stop_if_consumer_failed()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
await sut.OnActivateAsync();
await sut.ActivateAsync();
var ex = new InvalidOperationException();
await OnErrorAsync(eventSubscription, ex);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
@ -249,14 +255,12 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
var ex = new InvalidOperationException();
await OnErrorAsync(A.Fake<IEventSubscription>(), ex);
await sut.OnActivateAsync();
await sut.ActivateAsync();
sut.Dispose();
await OnErrorAsync(A.Fake<IEventSubscription>(), ex);
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
@ -265,18 +269,27 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
}
[Fact]
public void Should_stop_if_resetting_failed()
public async Task Should_wakeup_when_already_subscribed()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await sut.ActivateAsync();
A.CallTo(() => eventSubscription.WakeUp())
.MustHaveHappened();
}
[Fact]
public async Task Should_stop_if_resetting_failed()
{
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.ClearAsync())
.Throws(ex);
sut.Reset();
sut.Dispose();
await sut.OnActivateAsync();
await sut.ActivateAsync();
await sut.ResetAsync();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
@ -290,9 +303,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_stop_if_handling_failed()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
@ -300,9 +310,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
await sut.OnActivateAsync();
await sut.ActivateAsync();
sut.Dispose();
await OnEventAsync(eventSubscription, @event);
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
@ -314,16 +325,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
sut.GetState().ShouldBeEquivalentTo(new EventConsumerInfo { Name = consumerName, IsStopped = true, Position = initialPosition, Error = ex.ToString() });
}
[Fact]
public async Task Should_stop_if_deserialization_failed()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
var ex = new InvalidOperationException();
A.CallTo(() => formatter.Parse(eventData, true))
@ -331,9 +337,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
await sut.OnActivateAsync();
await sut.ActivateAsync();
sut.Dispose();
await OnEventAsync(eventSubscription, @event);
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
@ -350,9 +357,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_start_after_stop_when_handling_failed()
{
sut.ActivateAsync(consumerName, store).Wait();
sut.Activate(eventConsumer);
var exception = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
@ -360,11 +364,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnEventAsync(eventSubscription, @event);
sut.Start();
sut.Start();
sut.Dispose();
await sut.StopAsync();
await sut.StartAsync();
await sut.StartAsync();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
@ -383,12 +390,12 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
private Task OnErrorAsync(IEventSubscription subscriber, Exception ex)
{
return sutSubscriber.OnErrorAsync(subscriber, ex);
return sut.OnErrorAsync(subscriber.AsImmutable(), ex.AsImmutable());
}
private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev)
{
return sutSubscriber.OnEventAsync(subscriber, ev);
return sut.OnEventAsync(subscriber.AsImmutable(), ev.AsImmutable());
}
}
}

162
tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs

@ -0,0 +1,162 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Threading.Tasks;
using FakeItEasy;
using FluentAssertions;
using Orleans;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
using Xunit;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public class EventConsumerManagerGrainTests
{
public class MyEventConsumerManagerGrain : EventConsumerManagerGrain
{
public MyEventConsumerManagerGrain(
IEnumerable<IEventConsumer> eventConsumers,
IGrainIdentity identity,
IGrainRuntime runtime)
: base(eventConsumers, identity, runtime)
{
}
}
private readonly IEventConsumer consumerA = A.Fake<IEventConsumer>();
private readonly IEventConsumer consumerB = A.Fake<IEventConsumer>();
private readonly IEventConsumerGrain grainA = A.Fake<IEventConsumerGrain>();
private readonly IEventConsumerGrain grainB = A.Fake<IEventConsumerGrain>();
private readonly MyEventConsumerManagerGrain sut;
public EventConsumerManagerGrainTests()
{
var grainRuntime = A.Fake<IGrainRuntime>();
var grainFactory = A.Fake<IGrainFactory>();
A.CallTo(() => grainFactory.GetGrain<IEventConsumerGrain>("a", null)).Returns(grainA);
A.CallTo(() => grainFactory.GetGrain<IEventConsumerGrain>("b", null)).Returns(grainB);
A.CallTo(() => grainRuntime.GrainFactory).Returns(grainFactory);
A.CallTo(() => consumerA.Name).Returns("a");
A.CallTo(() => consumerA.EventsFilter).Returns("^a-");
A.CallTo(() => consumerB.Name).Returns("b");
A.CallTo(() => consumerB.EventsFilter).Returns("^b-");
sut = new MyEventConsumerManagerGrain(new[] { consumerA, consumerB }, A.Fake<IGrainIdentity>(), grainRuntime);
}
[Fact]
public async Task Should_not_activate_all_grains_on_activate()
{
await sut.OnActivateAsync();
A.CallTo(() => grainA.ActivateAsync())
.MustNotHaveHappened();
A.CallTo(() => grainB.ActivateAsync())
.MustNotHaveHappened();
}
[Fact]
public async Task Should_activate_all_grains_on_reminder()
{
await sut.ReceiveReminder(null, default(TickStatus));
A.CallTo(() => grainA.ActivateAsync())
.MustHaveHappened();
A.CallTo(() => grainB.ActivateAsync())
.MustHaveHappened();
}
[Fact]
public async Task Should_activate_all_grains_on_wakeup_with_null()
{
await sut.ActivateAsync(null);
A.CallTo(() => grainA.ActivateAsync())
.MustHaveHappened();
A.CallTo(() => grainB.ActivateAsync())
.MustHaveHappened();
}
[Fact]
public async Task Should_activate_matching_grains_when_stream_name_defined()
{
await sut.ActivateAsync("a-123");
A.CallTo(() => grainA.ActivateAsync())
.MustHaveHappened();
A.CallTo(() => grainB.ActivateAsync())
.MustNotHaveHappened();
}
[Fact]
public async Task Should_start_matching_grain()
{
await sut.StartAsync("a");
A.CallTo(() => grainA.StartAsync())
.MustHaveHappened();
A.CallTo(() => grainB.StartAsync())
.MustNotHaveHappened();
}
[Fact]
public async Task Should_stop_matching_grain()
{
await sut.StopAsync("b");
A.CallTo(() => grainA.StopAsync())
.MustNotHaveHappened();
A.CallTo(() => grainB.StopAsync())
.MustHaveHappened();
}
[Fact]
public async Task Should_reset_matching_grain()
{
await sut.ResetAsync("b");
A.CallTo(() => grainA.ResetAsync())
.MustNotHaveHappened();
A.CallTo(() => grainB.ResetAsync())
.MustHaveHappened();
}
[Fact]
public async Task Should_fetch_infos_from_all_grains()
{
A.CallTo(() => grainA.GetStateAsync())
.Returns(new Immutable<EventConsumerInfo>(
new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" }));
A.CallTo(() => grainB.GetStateAsync())
.Returns(new Immutable<EventConsumerInfo>(
new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" }));
var infos = await sut.GetConsumersAsync();
infos.Value.ShouldBeEquivalentTo(
new List<EventConsumerInfo>
{
new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" },
new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" }
});
}
}
}

120
tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerTests.cs

@ -1,120 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using FakeItEasy;
using FluentAssertions;
using Squidex.Infrastructure.States;
using Xunit;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public class EventConsumerManagerTests
{
private readonly EventConsumerGrain actor1 = A.Fake<EventConsumerGrain>();
private readonly EventConsumerGrain actor2 = A.Fake<EventConsumerGrain>();
private readonly IStateFactory factory = A.Fake<IStateFactory>();
private readonly IEventConsumer consumer1 = A.Fake<IEventConsumer>();
private readonly IEventConsumer consumer2 = A.Fake<IEventConsumer>();
private readonly IPubSub pubSub = new InMemoryPubSub();
private readonly string consumerName1 = "Consumer1";
private readonly string consumerName2 = "Consumer2";
private readonly EventConsumerManagerGrain sut;
public EventConsumerManagerTests()
{
A.CallTo(() => consumer1.Name).Returns(consumerName1);
A.CallTo(() => consumer2.Name).Returns(consumerName2);
A.CallTo(() => factory.CreateAsync<EventConsumerGrain>(consumerName1)).Returns(actor1);
A.CallTo(() => factory.CreateAsync<EventConsumerGrain>(consumerName2)).Returns(actor2);
sut = new EventConsumerManagerGrain(new IEventConsumer[] { consumer1, consumer2 }, pubSub, factory);
}
[Fact]
public void Should_activate_all_actors()
{
sut.Run();
A.CallTo(() => actor1.Activate(consumer1))
.MustHaveHappened();
A.CallTo(() => actor2.Activate(consumer2))
.MustHaveHappened();
}
[Fact]
public void Should_start_correct_actor()
{
sut.Run();
pubSub.Publish(new StartConsumerMessage { ConsumerName = consumerName1 }, true);
A.CallTo(() => actor1.Start())
.MustHaveHappened();
A.CallTo(() => actor2.Start())
.MustNotHaveHappened();
}
[Fact]
public void Should_stop_correct_actor()
{
sut.Run();
pubSub.Publish(new StopConsumerMessage { ConsumerName = consumerName1 }, true);
A.CallTo(() => actor1.Stop())
.MustHaveHappened();
A.CallTo(() => actor2.Stop())
.MustNotHaveHappened();
}
[Fact]
public void Should_reset_correct_actor()
{
sut.Run();
pubSub.Publish(new ResetConsumerMessage { ConsumerName = consumerName2 }, true);
A.CallTo(() => actor1.Reset())
.MustNotHaveHappened();
A.CallTo(() => actor2.Reset())
.MustHaveHappened();
}
[Fact]
public async Task Should_get_state_from_all_actors()
{
sut.Run();
A.CallTo(() => actor1.GetState())
.Returns(new EventConsumerInfo { Name = consumerName1, Position = "123 " });
A.CallTo(() => actor2.GetState())
.Returns(new EventConsumerInfo { Name = consumerName2, Position = "345 " });
var response = await pubSub.RequestAsync<GetStatesRequest, GetStatesResponse>(new GetStatesRequest(), TimeSpan.FromSeconds(5), true);
response.States.ShouldAllBeEquivalentTo(new EventConsumerInfo[]
{
new EventConsumerInfo { Name = consumerName1, Position = "123 " },
new EventConsumerInfo { Name = consumerName2, Position = "345 " }
});
}
[Fact]
public void Should_not_dispose_actors()
{
sut.Dispose();
}
}
}

39
tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/OrleansEventNotifierTests.cs

@ -0,0 +1,39 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using FakeItEasy;
using Orleans;
using Xunit;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public class OrleansEventNotifierTests
{
private readonly IEventConsumerManagerGrain manager = A.Fake<IEventConsumerManagerGrain>();
private readonly OrleansEventNotifier sut;
public OrleansEventNotifierTests()
{
var factory = A.Fake<IGrainFactory>();
A.CallTo(() => factory.GetGrain<IEventConsumerManagerGrain>("Default", null))
.Returns(manager);
sut = new OrleansEventNotifier(factory);
}
[Fact]
public void Should_wakeup_manager_with_stream_name()
{
sut.Initialize();
sut.NotifyEventsStored("my-stream");
A.CallTo(() => manager.ActivateAsync("my-stream"))
.MustHaveHappened();
}
}
}

28
tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs

@ -16,14 +16,13 @@ namespace Squidex.Infrastructure.EventSourcing
public class PollingSubscriptionTests
{
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventNotifier eventNotifier = new DefaultEventNotifier(new InMemoryPubSub());
private readonly IEventSubscriber eventSubscriber = A.Fake<IEventSubscriber>();
private readonly string position = Guid.NewGuid().ToString();
[Fact]
public async Task Should_subscribe_on_start()
{
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
@ -39,7 +38,7 @@ namespace Squidex.Infrastructure.EventSourcing
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
@ -55,7 +54,7 @@ namespace Squidex.Infrastructure.EventSourcing
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
@ -71,7 +70,7 @@ namespace Squidex.Infrastructure.EventSourcing
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
@ -80,24 +79,11 @@ namespace Squidex.Infrastructure.EventSourcing
}
[Fact]
public async Task Should_not_subscribe_on_notify_when_stream_matches()
public async Task Should_wake_up()
{
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
eventNotifier.NotifyEventsStored("other-stream-123");
await WaitAndStopAsync(sut);
A.CallTo(() => eventStore.QueryAsync(A<Func<StoredEvent, Task>>.Ignored, "^my-stream", position, A<CancellationToken>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_subscribe_on_notify_when_stream_matches()
{
var sut = new PollingSubscription(eventStore, eventNotifier, eventSubscriber, "^my-stream", position);
eventNotifier.NotifyEventsStored("my-stream-123");
sut.WakeUp();
await WaitAndStopAsync(sut);

39
tests/Squidex.Infrastructure.Tests/Orleans/BootstrapTests.cs

@ -0,0 +1,39 @@
// ==========================================================================
// EventConsumerBootstrapTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using FakeItEasy;
using Orleans;
using Xunit;
namespace Squidex.Infrastructure.Orleans
{
public sealed class BootstrapTests
{
private readonly IBackgroundGrain grain = A.Fake<IBackgroundGrain>();
private readonly Bootstrap<IBackgroundGrain> sut;
public BootstrapTests()
{
var factory = A.Fake<IGrainFactory>();
sut = new Bootstrap<IBackgroundGrain>(factory);
A.CallTo(() => factory.GetGrain<IBackgroundGrain>("Default", null))
.Returns(grain);
}
[Fact]
public void Should_activate_grain_on_run()
{
sut.Run();
A.CallTo(() => grain.ActivateAsync())
.MustHaveHappened();
}
}
}

80
tests/Squidex.Infrastructure.Tests/Orleans/JsonExternalSerializerTests.cs

@ -0,0 +1,80 @@
// ==========================================================================
// JsonExternalSerializerTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using FakeItEasy;
using Orleans.Serialization;
using Xunit;
namespace Squidex.Infrastructure.Orleans
{
public class JsonExternalSerializerTests
{
[Fact]
public void Should_not_copy_null()
{
var v = (string)null;
var c = J<int>.Copy(v, null);
Assert.Null(c);
}
[Fact]
public void Should_copy_null_json()
{
var v = new J<List<int>>(null);
var c = (J<List<int>>)J<object>.Copy(v, null);
Assert.Null(c.Value);
}
[Fact]
public void Should_not_copy_immutable_values()
{
var v = new List<int> { 1, 2, 3 }.AsJ();
var c = (J<List<int>>)J<object>.Copy(v, null);
Assert.Same(v.Value, c.Value);
}
[Fact]
public void Should_serialize_and_deserialize_value()
{
var value = new J<List<int>>(new List<int> { 1, 2, 3 });
var writtenLength = 0;
var writtenBuffer = (byte[])null;
var writer = A.Fake<IBinaryTokenStreamWriter>();
var writerContext = new SerializationContext(null) { StreamWriter = writer };
A.CallTo(() => writer.Write(A<int>.Ignored))
.Invokes(new Action<int>(x => writtenLength = x));
A.CallTo(() => writer.Write(A<byte[]>.Ignored))
.Invokes(new Action<byte[]>(x => writtenBuffer = x));
J<object>.Serialize(value, writerContext, value.GetType());
var reader = A.Fake<IBinaryTokenStreamReader>();
var readerContext = new DeserializationContext(null) { StreamReader = reader };
A.CallTo(() => reader.ReadInt())
.Returns(writtenLength);
A.CallTo(() => reader.ReadBytes(writtenLength))
.Returns(writtenBuffer);
var copy = (J<List<int>>)J<object>.Deserialize(value.GetType(), readerContext);
Assert.Equal(value.Value, copy.Value);
Assert.NotSame(value.Value, copy.Value);
}
}
}

186
tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs

@ -13,7 +13,6 @@ using FakeItEasy;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Tasks;
using Squidex.Infrastructure.TestHelpers;
using Xunit;
@ -21,48 +20,7 @@ namespace Squidex.Infrastructure.States
{
public class PersistenceEventSourcingTests
{
private class MyStatefulObject : IStatefulObject<string>
{
private readonly List<IEvent> appliedEvents = new List<IEvent>();
private IPersistence persistence;
public long ExpectedVersion { get; set; } = EtagVersion.Any;
public List<IEvent> AppliedEvents
{
get { return appliedEvents; }
}
public Task ActivateAsync(string key, IStore<string> store)
{
persistence = store.WithEventSourcing(key, e => appliedEvents.Add(e.Payload));
return persistence.ReadAsync(ExpectedVersion);
}
public Task WriteEventsAsync(params IEvent[] events)
{
return persistence.WriteEventsAsync(events.Select(Envelope.Create).ToArray());
}
}
private class MyStatefulObjectWithSnapshot : IStatefulObject<string>
{
private IPersistence<object> persistence;
public long ExpectedVersion { get; set; } = EtagVersion.Any;
public Task ActivateAsync(string key, IStore<string> store)
{
persistence = store.WithSnapshotsAndEventSourcing<object>(key, s => TaskHelper.Done, s => TaskHelper.Done);
return persistence.ReadAsync(ExpectedVersion);
}
}
private readonly string key = Guid.NewGuid().ToString();
private readonly MyStatefulObject statefulObject = new MyStatefulObject();
private readonly MyStatefulObjectWithSnapshot statefulObjectWithSnapShot = new MyStatefulObjectWithSnapshot();
private readonly IEventDataFormatter eventDataFormatter = A.Fake<IEventDataFormatter>();
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IMemoryCache cache = new MemoryCache(Options.Create(new MemoryCacheOptions()));
@ -70,42 +28,53 @@ namespace Squidex.Infrastructure.States
private readonly IServiceProvider services = A.Fake<IServiceProvider>();
private readonly ISnapshotStore<object, string> snapshotStore = A.Fake<ISnapshotStore<object, string>>();
private readonly IStreamNameResolver streamNameResolver = A.Fake<IStreamNameResolver>();
private readonly StateFactory sut;
private readonly IStore<string> sut;
public PersistenceEventSourcingTests()
{
A.CallTo(() => services.GetService(typeof(MyStatefulObject)))
.Returns(statefulObject);
A.CallTo(() => services.GetService(typeof(MyStatefulObjectWithSnapshot)))
.Returns(statefulObjectWithSnapShot);
A.CallTo(() => services.GetService(typeof(ISnapshotStore<object, string>)))
.Returns(snapshotStore);
A.CallTo(() => streamNameResolver.GetStreamName(typeof(MyStatefulObject), key))
.Returns(key);
A.CallTo(() => streamNameResolver.GetStreamName(typeof(MyStatefulObjectWithSnapshot), key))
A.CallTo(() => streamNameResolver.GetStreamName(typeof(object), key))
.Returns(key);
sut = new StateFactory(pubSub, cache, eventStore, eventDataFormatter, services, streamNameResolver);
sut.Initialize();
sut = new Store<string>(eventStore, eventDataFormatter, services, streamNameResolver);
}
[Fact]
public async Task Should_read_from_store()
{
statefulObject.ExpectedVersion = 1;
var event1 = new MyEvent();
var event2 = new MyEvent();
SetupEventStore(event1, event2);
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
var persistedEvents = new List<IEvent>();
var persistence = sut.WithEventSourcing<object, string>(key, x => persistedEvents.Add(x.Payload));
await persistence.ReadAsync();
Assert.Equal(persistedEvents.ToArray(), new[] { event1, event2 });
}
[Fact]
public async Task Should_ignore_old_events()
{
var storedEvent = new StoredEvent("1", 0, new EventData());
A.CallTo(() => eventStore.QueryAsync(key, 0))
.Returns(new List<StoredEvent> { storedEvent });
A.CallTo(() => eventDataFormatter.Parse(storedEvent.Data, true))
.Throws(new TypeNameNotFoundException());
var persistedEvents = new List<IEvent>();
var persistence = sut.WithEventSourcing<object, string>(key, x => persistedEvents.Add(x.Payload));
Assert.Same(statefulObject, actualObject);
Assert.NotNull(cache.Get<object>(key));
await persistence.ReadAsync();
Assert.Equal(actualObject.AppliedEvents, new[] { event1, event2 });
Assert.Empty(persistedEvents);
Assert.Equal(0, persistence.Version);
}
[Fact]
@ -116,7 +85,11 @@ namespace Squidex.Infrastructure.States
SetupEventStore(3, 2);
await sut.GetSingleAsync<MyStatefulObjectWithSnapshot>(key);
var persistedState = (object)null;
var persistedEvents = new List<IEvent>();
var persistence = sut.WithSnapshotsAndEventSourcing<object, object, string>(key, x => persistedState = x, x => persistedEvents.Add(x.Payload));
await persistence.ReadAsync();
A.CallTo(() => eventStore.QueryAsync(key, 3))
.MustHaveHappened();
@ -130,7 +103,11 @@ namespace Squidex.Infrastructure.States
SetupEventStore(3, 0, 3);
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.GetSingleAsync<MyStatefulObjectWithSnapshot>(key));
var persistedState = (object)null;
var persistedEvents = new List<IEvent>();
var persistence = sut.WithSnapshotsAndEventSourcing<object, object, string>(key, x => persistedState = x, x => persistedEvents.Add(x.Payload));
await Assert.ThrowsAsync<InvalidOperationException>(() => persistence.ReadAsync());
}
[Fact]
@ -141,50 +118,60 @@ namespace Squidex.Infrastructure.States
SetupEventStore(3, 4, 3);
await Assert.ThrowsAsync<InvalidOperationException>(() => sut.GetSingleAsync<MyStatefulObjectWithSnapshot>(key));
var persistedState = (object)null;
var persistedEvents = new List<IEvent>();
var persistence = sut.WithSnapshotsAndEventSourcing<object, object, string>(key, x => persistedState = x, x => persistedEvents.Add(x.Payload));
await Assert.ThrowsAsync<InvalidOperationException>(() => persistence.ReadAsync());
}
[Fact]
public async Task Should_throw_exception_if_not_found()
{
statefulObject.ExpectedVersion = 0;
SetupEventStore(0);
await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => sut.GetSingleAsync<MyStatefulObject>(key));
var persistedEvents = new List<IEvent>();
var persistence = sut.WithEventSourcing<object, string>(key, x => persistedEvents.Add(x.Payload));
await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => persistence.ReadAsync(1));
}
[Fact]
public async Task Should_throw_exception_if_other_version_found()
{
statefulObject.ExpectedVersion = 1;
SetupEventStore(3);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => sut.GetSingleAsync<MyStatefulObject>(key));
var persistedEvents = new List<IEvent>();
var persistence = sut.WithEventSourcing<object, string>(key, x => persistedEvents.Add(x.Payload));
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.ReadAsync(1));
}
[Fact]
public async Task Should_throw_exception_if_other_version_found_from_snapshot()
{
statefulObjectWithSnapShot.ExpectedVersion = 1;
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2L));
SetupEventStore(0);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => sut.GetSingleAsync<MyStatefulObjectWithSnapshot>(key));
var persistedState = (object)null;
var persistedEvents = new List<IEvent>();
var persistence = sut.WithSnapshotsAndEventSourcing<object, object, string>(key, x => persistedState = x, x => persistedEvents.Add(x.Payload));
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.ReadAsync(1));
}
[Fact]
public async Task Should_not_throw_exception_if_nothing_expected()
{
statefulObject.ExpectedVersion = EtagVersion.Any;
SetupEventStore(0);
await sut.GetSingleAsync<MyStatefulObject>(key);
var persistedState = (object)null;
var persistedEvents = new List<IEvent>();
var persistence = sut.WithSnapshotsAndEventSourcing<object, object, string>(key, x => persistedState = x, x => persistedEvents.Add(x.Payload));
await persistence.ReadAsync();
}
[Fact]
@ -192,12 +179,13 @@ namespace Squidex.Infrastructure.States
{
SetupEventStore(3);
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
var persistedEvents = new List<IEvent>();
var persistence = sut.WithEventSourcing<object, string>(key, x => persistedEvents.Add(x.Payload));
Assert.Same(statefulObject, actualObject);
await persistence.ReadAsync();
await statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent());
await statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent());
await persistence.WriteEventsAsync(new[] { new MyEvent(), new MyEvent() }.Select(Envelope.Create));
await persistence.WriteEventsAsync(new[] { new MyEvent(), new MyEvent() }.Select(Envelope.Create));
A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, key, 2, A<ICollection<EventData>>.That.Matches(x => x.Count == 2)))
.MustHaveHappened();
@ -210,49 +198,15 @@ namespace Squidex.Infrastructure.States
{
SetupEventStore(3);
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
var persistedEvents = new List<IEvent>();
var persistence = sut.WithEventSourcing<object, string>(key, x => persistedEvents.Add(x.Payload));
await persistence.ReadAsync();
A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, key, 2, A<ICollection<EventData>>.That.Matches(x => x.Count == 2)))
.Throws(new WrongEventVersionException(1, 1));
await Assert.ThrowsAsync<DomainObjectVersionException>(() => statefulObject.WriteEventsAsync(new MyEvent(), new MyEvent()));
}
[Fact]
public async Task Should_not_remove_from_cache_when_write_failed()
{
A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, A<string>.Ignored, A<long>.Ignored, A<ICollection<EventData>>.Ignored))
.Throws(new InvalidOperationException());
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
await Assert.ThrowsAsync<InvalidOperationException>(() => statefulObject.WriteEventsAsync(new MyEvent()));
Assert.True(cache.TryGetValue(key, out var t));
}
[Fact]
public async Task Should_return_same_instance_for_parallel_requests()
{
A.CallTo(() => snapshotStore.ReadAsync(key))
.ReturnsLazily(() => Task.Delay(1).ContinueWith(x => ((object)1, 1L)));
var tasks = new List<Task<MyStatefulObject>>();
for (var i = 0; i < 1000; i++)
{
tasks.Add(Task.Run(() => sut.GetSingleAsync<MyStatefulObject>(key)));
}
var retrievedStates = await Task.WhenAll(tasks);
foreach (var retrievedState in retrievedStates)
{
Assert.Same(retrievedStates[0], retrievedState);
}
A.CallTo(() => eventStore.QueryAsync(key, 0))
.MustHaveHappened(Repeated.Exactly.Once);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.WriteEventsAsync(new[] { new MyEvent(), new MyEvent() }.Select(Envelope.Create)));
}
private void SetupEventStore(int count, int eventOffset = 0, int readPosition = 0)

171
tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs

@ -6,7 +6,6 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FakeItEasy;
using Microsoft.Extensions.Caching.Memory;
@ -18,45 +17,9 @@ using Xunit;
namespace Squidex.Infrastructure.States
{
public class PersistenceSnapshotTests : IDisposable
public class PersistenceSnapshotTests
{
private class MyStatefulObject : IStatefulObject<string>
{
private IPersistence<int> persistence;
private int state;
public long ExpectedVersion { get; set; } = EtagVersion.Any;
public long Version
{
get { return persistence.Version; }
}
public int State
{
get { return state; }
}
public Task ActivateAsync(string key, IStore<string> store)
{
persistence = store.WithSnapshots<int, string>(key, s => state = s);
return persistence.ReadAsync(ExpectedVersion);
}
public void SetState(int value)
{
state = value;
}
public Task WriteStateAsync()
{
return persistence.WriteSnapshotAsync(state);
}
}
private readonly string key = Guid.NewGuid().ToString();
private readonly MyStatefulObject statefulObject = new MyStatefulObject();
private readonly IEventDataFormatter eventDataFormatter = A.Fake<IEventDataFormatter>();
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IMemoryCache cache = new MemoryCache(Options.Create(new MemoryCacheOptions()));
@ -64,99 +27,101 @@ namespace Squidex.Infrastructure.States
private readonly IServiceProvider services = A.Fake<IServiceProvider>();
private readonly ISnapshotStore<int, string> snapshotStore = A.Fake<ISnapshotStore<int, string>>();
private readonly IStreamNameResolver streamNameResolver = A.Fake<IStreamNameResolver>();
private readonly StateFactory sut;
private readonly IStore<string> sut;
public PersistenceSnapshotTests()
{
A.CallTo(() => services.GetService(typeof(MyStatefulObject)))
.Returns(statefulObject);
A.CallTo(() => services.GetService(typeof(ISnapshotStore<int, string>)))
.Returns(snapshotStore);
sut = new StateFactory(pubSub, cache, eventStore, eventDataFormatter, services, streamNameResolver);
sut.Initialize();
}
public void Dispose()
{
sut.Dispose();
sut = new Store<string>(eventStore, eventDataFormatter, services, streamNameResolver);
}
[Fact]
public async Task Should_read_from_store()
{
statefulObject.ExpectedVersion = 1;
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((123, 1));
.Returns((20, 10));
var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key);
var persistedState = 0;
var persistence = sut.WithSnapshots<object, int, string>(key, x => persistedState = x);
Assert.Same(statefulObject, actualObject);
Assert.NotNull(cache.Get<object>(key));
await persistence.ReadAsync();
Assert.Equal(123, statefulObject.State);
Assert.Equal(10, persistence.Version);
Assert.Equal(20, persistedState);
}
[Fact]
public async Task Should_set_to_empty_when_store_returns_not_found()
public async Task Should_return_empty_version_when_version_negative()
{
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((123, EtagVersion.NotFound));
.Returns((20, -10));
var persistedState = 0;
var persistence = sut.WithSnapshots<object, int, string>(key, x => persistedState = x);
var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key);
await persistence.ReadAsync();
Assert.Equal(-1, statefulObject.Version);
Assert.Equal( 0, statefulObject.State);
Assert.Equal(EtagVersion.Empty, persistence.Version);
}
[Fact]
public async Task Should_throw_exception_if_not_found()
public async Task Should_set_to_empty_when_store_returns_not_found()
{
statefulObject.ExpectedVersion = 0;
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((0, EtagVersion.Empty));
.Returns((20, EtagVersion.Empty));
var persistedState = 0;
var persistence = sut.WithSnapshots<object, int, string>(key, x => persistedState = x);
await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => sut.GetSingleAsync<MyStatefulObject, string>(key));
await persistence.ReadAsync();
Assert.Equal(-1, persistence.Version);
Assert.Equal( 0, persistedState);
}
[Fact]
public async Task Should_throw_exception_if_other_version_found()
public async Task Should_throw_exception_if_not_found_and_version_expected()
{
statefulObject.ExpectedVersion = 1;
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2));
.Returns((123, EtagVersion.Empty));
var persistedState = 0;
var persistence = sut.WithSnapshots<object, int, string>(key, x => persistedState = x);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => sut.GetSingleAsync<MyStatefulObject, string>(key));
await Assert.ThrowsAsync<DomainObjectNotFoundException>(() => persistence.ReadAsync(1));
}
[Fact]
public async Task Should_not_throw_exception_if_noting_expected()
public async Task Should_throw_exception_if_other_version_found()
{
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((0, EtagVersion.Empty));
.Returns((123, 2));
await sut.GetSingleAsync<MyStatefulObject, string>(key);
var persistedState = 0;
var persistence = sut.WithSnapshots<object, int, string>(key, x => persistedState = x);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.ReadAsync(1));
}
[Fact]
public async Task Should_write_to_store_with_previous_version()
{
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((123, 13));
.Returns((20, 10));
var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key);
var persistedState = 0;
var persistence = sut.WithSnapshots<object, int, string>(key, x => persistedState = x);
Assert.Same(statefulObject, actualObject);
Assert.Equal(123, statefulObject.State);
await persistence.ReadAsync();
statefulObject.SetState(456);
Assert.Equal(10, persistence.Version);
Assert.Equal(20, persistedState);
await statefulObject.WriteStateAsync();
await persistence.WriteSnapshotAsync(100);
A.CallTo(() => snapshotStore.WriteAsync(key, 456, 13, 14))
A.CallTo(() => snapshotStore.WriteAsync(key, 100, 10, 11))
.MustHaveHappened();
}
@ -164,51 +129,17 @@ namespace Squidex.Infrastructure.States
public async Task Should_wrap_exception_when_writing_to_store_with_previous_version()
{
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((123, 13));
.Returns((20, 10));
A.CallTo(() => snapshotStore.WriteAsync(key, 123, 13, 14))
A.CallTo(() => snapshotStore.WriteAsync(key, 100, 10, 11))
.Throws(new InconsistentStateException(1, 1, new InvalidOperationException()));
var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key);
var persistedState = 0;
var persistence = sut.WithSnapshots<object, int, string>(key, x => persistedState = x);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => statefulObject.WriteStateAsync());
}
await persistence.ReadAsync();
[Fact]
public async Task Should_not_remove_from_cache_when_write_failed()
{
A.CallTo(() => snapshotStore.WriteAsync(A<string>.Ignored, A<int>.Ignored, A<long>.Ignored, A<long>.Ignored))
.Throws(new InvalidOperationException());
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
await Assert.ThrowsAsync<InvalidOperationException>(() => statefulObject.WriteStateAsync());
Assert.True(cache.TryGetValue(key, out var t));
}
[Fact]
public async Task Should_return_same_instance_for_parallel_requests()
{
A.CallTo(() => snapshotStore.ReadAsync(key))
.ReturnsLazily(() => Task.Delay(1).ContinueWith(x => (1, 1L)));
var tasks = new List<Task<MyStatefulObject>>();
for (var i = 0; i < 1000; i++)
{
tasks.Add(Task.Run(() => sut.GetSingleAsync<MyStatefulObject, string>(key)));
}
var retrievedStates = await Task.WhenAll(tasks);
foreach (var retrievedState in retrievedStates)
{
Assert.Same(retrievedStates[0], retrievedState);
}
A.CallTo(() => snapshotStore.ReadAsync(key))
.MustHaveHappened(Repeated.Exactly.Once);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.WriteSnapshotAsync(100));
}
}
}

145
tests/Squidex.Infrastructure.Tests/States/StateFactoryTests.cs

@ -1,145 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using FakeItEasy;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Options;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Tasks;
using Xunit;
#pragma warning disable RECS0002 // Convert anonymous method to method group
namespace Squidex.Infrastructure.States
{
public class StateFactoryTests : IDisposable
{
private class MyStatefulObject : IStatefulObject<string>
{
public Task ActivateAsync(string key, IStore<string> store)
{
return TaskHelper.Done;
}
}
private readonly string key = Guid.NewGuid().ToString();
private readonly MyStatefulObject statefulObject = new MyStatefulObject();
private readonly IEventDataFormatter eventDataFormatter = A.Fake<IEventDataFormatter>();
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IMemoryCache cache = new MemoryCache(Options.Create(new MemoryCacheOptions()));
private readonly IPubSub pubSub = new InMemoryPubSub(true);
private readonly IServiceProvider services = A.Fake<IServiceProvider>();
private readonly ISnapshotStore<int, string> snapshotStore = A.Fake<ISnapshotStore<int, string>>();
private readonly IStreamNameResolver streamNameResolver = A.Fake<IStreamNameResolver>();
private readonly StateFactory sut;
public StateFactoryTests()
{
A.CallTo(() => services.GetService(typeof(MyStatefulObject)))
.Returns(statefulObject);
A.CallTo(() => services.GetService(typeof(ISnapshotStore<int, string>)))
.Returns(snapshotStore);
sut = new StateFactory(pubSub, cache, eventStore, eventDataFormatter, services, streamNameResolver);
sut.Initialize();
}
public void Dispose()
{
sut.Dispose();
}
[Fact]
public async Task Should_provide_state_from_services_and_add_to_cache()
{
var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key);
Assert.Same(statefulObject, actualObject);
Assert.NotNull(cache.Get<object>(key));
}
[Fact]
public async Task Should_serve_next_request_from_cache()
{
var actualObject1 = await sut.GetSingleAsync<MyStatefulObject, string>(key);
Assert.Same(statefulObject, actualObject1);
Assert.NotNull(cache.Get<object>(key));
var actualObject2 = await sut.GetSingleAsync<MyStatefulObject, string>(key);
A.CallTo(() => services.GetService(typeof(MyStatefulObject)))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_not_serve_next_request_from_cache_when_detached()
{
var actualObject1 = await sut.CreateAsync<MyStatefulObject, string>(key);
Assert.Same(statefulObject, actualObject1);
Assert.Null(cache.Get<object>(key));
var actualObject2 = await sut.CreateAsync<MyStatefulObject, string>(key);
A.CallTo(() => services.GetService(typeof(MyStatefulObject)))
.MustHaveHappened(Repeated.Exactly.Twice);
}
[Fact]
public async Task Should_remove_from_cache_when_invalidation_message_received()
{
var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key);
await InvalidateCacheAsync();
Assert.False(cache.TryGetValue(key, out var t));
}
[Fact]
public async Task Should_remove_from_cache_when_method_called()
{
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
sut.Remove<MyStatefulObject, string>(key);
Assert.False(cache.TryGetValue(key, out var t));
}
[Fact]
public void Should_send_invalidation_message_on_refresh()
{
InvalidateMessage message = null;
pubSub.Subscribe<InvalidateMessage>(m =>
{
message = m;
});
sut.Synchronize<MyStatefulObject, string>(key);
Assert.NotNull(message);
Assert.Equal(key, message.Key);
}
private async Task RemoveFromCacheAsync()
{
cache.Remove(key);
await Task.Delay(400);
}
private async Task InvalidateCacheAsync()
{
pubSub.Publish(new InvalidateMessage { Key = key }, true);
await Task.Delay(400);
}
}
}

38
tests/Squidex.Infrastructure.Tests/Tasks/AsyncLockPoolTests.cs

@ -0,0 +1,38 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
namespace Squidex.Infrastructure.Tasks
{
public sealed class AsyncLockPoolTests
{
[Fact]
public async Task Should_lock()
{
var sut = new AsyncLockPool(1);
var value = 0;
await Task.WhenAll(
Enumerable.Repeat(0, 100).Select(x => new Func<Task>(async () =>
{
using (await sut.LockAsync(1))
{
await Task.Yield();
value++;
}
})()));
Assert.Equal(100, value);
}
}
}

38
tests/Squidex.Infrastructure.Tests/Tasks/AsyncLockTests.cs

@ -0,0 +1,38 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Linq;
using System.Threading.Tasks;
using Xunit;
namespace Squidex.Infrastructure.Tasks
{
public sealed class AsyncLockTests
{
[Fact]
public async Task Should_lock()
{
var sut = new AsyncLock();
var value = 0;
await Task.WhenAll(
Enumerable.Repeat(0, 100).Select(x => new Func<Task>(async () =>
{
using (await sut.LockAsync())
{
await Task.Yield();
value++;
}
})()));
Assert.Equal(100, value);
}
}
}

4
tests/Squidex.Infrastructure.Tests/TestHelpers/MyCommand.cs

@ -11,11 +11,11 @@ using Squidex.Infrastructure.Commands;
namespace Squidex.Infrastructure.TestHelpers
{
internal sealed class MyCommand : IAggregateCommand, ITimestampCommand
public class MyCommand : IAggregateCommand, ITimestampCommand
{
public Guid AggregateId { get; set; }
public long ExpectedVersion { get; set; }
public long ExpectedVersion { get; set; } = EtagVersion.Any;
public Instant Timestamp { get; set; }
}

1
tools/Migrate_01/Migrations/AddPatterns.cs

@ -12,7 +12,6 @@ using Squidex.Domain.Apps.Entities.Apps;
using Squidex.Domain.Apps.Entities.Apps.Commands;
using Squidex.Domain.Apps.Entities.Apps.Repositories;
using Squidex.Infrastructure.Migrations;
using Squidex.Infrastructure.States;
namespace Migrate_01.Migrations
{

Loading…
Cancel
Save