Browse Source

Back to actors.

pull/194/head
Sebastian Stehle 9 years ago
parent
commit
83e1130f5c
  1. 19
      src/Squidex.Infrastructure.Redis/RedisPubSub.cs
  2. 52
      src/Squidex.Infrastructure.Redis/RedisSubscription.cs
  3. 265
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
  4. 91
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActorManager.cs
  5. 52
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerState.cs
  6. 9
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesRequest.cs
  7. 11
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesResponse.cs
  8. 15
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs
  9. 15
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs
  10. 15
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs
  11. 4
      src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs
  12. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs
  13. 31
      src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/EventConsumerBootstrap.cs
  14. 34
      src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerGrain.cs
  15. 28
      src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerRegistryGrain.cs
  16. 52
      src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrainState.cs
  17. 98
      src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerRegistryGrain.cs
  18. 48
      src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/WrapperSubscription.cs
  19. 30
      src/Squidex.Infrastructure/CQRS/Events/Orleans/OrleansEventNotifier.cs
  20. 15
      src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs
  21. 23
      src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs
  22. 15
      src/Squidex.Infrastructure/Caching/InvalidationMessage.cs
  23. 4
      src/Squidex.Infrastructure/IPubSub.cs
  24. 12
      src/Squidex.Infrastructure/InMemoryPubSub.cs
  25. 57
      src/Squidex.Infrastructure/Json/Orleans/J.cs
  26. 91
      src/Squidex.Infrastructure/Json/Orleans/JsonExternalSerializer.cs
  27. 83
      src/Squidex.Infrastructure/Orleans/GrainV2.cs
  28. 79
      src/Squidex.Infrastructure/PubSubExtensions.cs
  29. 3
      src/Squidex.Infrastructure/Squidex.Infrastructure.csproj
  30. 17
      src/Squidex.Infrastructure/States/IStateFactory.cs
  31. 21
      src/Squidex.Infrastructure/States/IStateHolder.cs
  32. 19
      src/Squidex.Infrastructure/States/IStateStore.cs
  33. 15
      src/Squidex.Infrastructure/States/InvalidateMessage.cs
  34. 108
      src/Squidex.Infrastructure/States/StateFactory.cs
  35. 44
      src/Squidex.Infrastructure/States/StateHolder.cs
  36. 65
      src/Squidex.Infrastructure/States/StatefulObject.cs
  37. 358
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs
  38. 125
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs
  39. 16
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs
  40. 58
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerBootstrapTests.cs
  41. 408
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerGrainTests.cs
  42. 165
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerRegistryGrainTests.cs
  43. 41
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/OrleansEventNotifierTests.cs
  44. 25
      tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs
  45. 8
      tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs
  46. 66
      tests/Squidex.Infrastructure.Tests/InMemoryPubSubTests.cs
  47. 108
      tests/Squidex.Infrastructure.Tests/Json/Orleans/JsonExternalSerializerTests.cs

19
src/Squidex.Infrastructure.Redis/RedisPubSub.cs

@ -13,9 +13,9 @@ using StackExchange.Redis;
namespace Squidex.Infrastructure
{
public class RedisPubSub : IPubSub, IExternalSystem
public sealed class RedisPubSub : IPubSub, IExternalSystem
{
private readonly ConcurrentDictionary<string, RedisSubscription> subscriptions = new ConcurrentDictionary<string, RedisSubscription>();
private readonly ConcurrentDictionary<string, object> subscriptions = new ConcurrentDictionary<string, object>();
private readonly Lazy<IConnectionMultiplexer> redisClient;
private readonly Lazy<ISubscriber> redisSubscriber;
private readonly ISemanticLog log;
@ -43,18 +43,21 @@ namespace Squidex.Infrastructure
}
}
public void Publish(string channelName, string token, bool notifySelf)
public void Publish<T>(T value, bool notifySelf)
{
Guard.NotNullOrEmpty(channelName, nameof(channelName));
GetSubscriber<T>().Publish(value, notifySelf);
}
subscriptions.GetOrAdd(channelName, c => new RedisSubscription(redisSubscriber.Value, c, log)).Publish(token, notifySelf);
public IDisposable Subscribe<T>(Action<T> handler)
{
return GetSubscriber<T>().Subscribe(handler);
}
public IDisposable Subscribe(string channelName, Action<string> handler)
private RedisSubscription<T> GetSubscriber<T>()
{
Guard.NotNullOrEmpty(channelName, nameof(channelName));
var typeName = typeof(T).FullName;
return subscriptions.GetOrAdd(channelName, c => new RedisSubscription(redisSubscriber.Value, c, log)).Subscribe(handler);
return (RedisSubscription<T>)subscriptions.GetOrAdd(typeName, c => new RedisSubscription<T>(redisSubscriber.Value, c, log));
}
}
}

52
src/Squidex.Infrastructure.Redis/RedisSubscription.cs

@ -7,49 +7,58 @@
// ==========================================================================
using System;
using System.Linq;
using System.Reactive.Subjects;
using Newtonsoft.Json;
using Squidex.Infrastructure.Log;
using StackExchange.Redis;
#pragma warning disable SA1401 // Fields must be private
namespace Squidex.Infrastructure
{
internal sealed class RedisSubscription
internal sealed class RedisSubscription<T>
{
private static readonly Guid InstanceId = Guid.NewGuid();
private readonly Subject<string> subject = new Subject<string>();
private readonly Guid selfId = Guid.NewGuid();
private readonly Subject<T> subject = new Subject<T>();
private readonly ISubscriber subscriber;
private readonly string channelName;
private readonly ISemanticLog log;
private readonly string channelName;
private sealed class Envelope
{
public T Payload;
public Guid Sender;
}
public RedisSubscription(ISubscriber subscriber, string channelName, ISemanticLog log)
{
this.log = log;
this.subscriber = subscriber;
this.subscriber.Subscribe(channelName, (channel, value) => HandleInvalidation(value));
this.subscriber.Subscribe(channelName, (channel, value) => HandleMessage(value));
this.channelName = channelName;
}
public void Publish(string token, bool notifySelf)
public void Publish(object value, bool notifySelf)
{
try
{
var message = string.Join("#", (notifySelf ? Guid.Empty : InstanceId).ToString(), token);
var envelope = JsonConvert.SerializeObject(new Envelope { Sender = selfId, Payload = (T)value });
subscriber.Publish(channelName, message);
subscriber.Publish(channelName, envelope);
}
catch (Exception ex)
{
log.LogError(ex, w => w
.WriteProperty("action", "PublishRedisMessage")
.WriteProperty("state", "Failed")
.WriteProperty("token", token));
.WriteProperty("channel", channelName));
}
}
private void HandleInvalidation(string value)
private void HandleMessage(string value)
{
try
{
@ -58,28 +67,15 @@ namespace Squidex.Infrastructure
return;
}
var parts = value.Split('#');
if (parts.Length < 1)
{
return;
}
if (!Guid.TryParse(parts[0], out var sender))
{
return;
}
var envelope = JsonConvert.DeserializeObject<Envelope>(value);
if (sender != InstanceId)
if (envelope.Sender != selfId)
{
var token = string.Join("#", parts.Skip(1));
subject.OnNext(token);
subject.OnNext(envelope.Payload);
log.LogDebug(w => w
.WriteProperty("action", "ReceiveRedisMessage")
.WriteProperty("channel", channelName)
.WriteProperty("token", token)
.WriteProperty("state", "Received"));
}
}
@ -92,7 +88,7 @@ namespace Squidex.Infrastructure
}
}
public IDisposable Subscribe(Action<string> handler)
public IDisposable Subscribe(Action<T> handler)
{
return subject.Subscribe(handler);
}

265
src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs → src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs

@ -1,5 +1,5 @@
// ==========================================================================
// EventConsumerGrain.cs
// EventConsumerActor.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
@ -8,83 +8,124 @@
using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public class EventConsumerGrain : GrainV2<EventConsumerGrainState>, IEventConsumerGrain
public class EventConsumerActor : StatefulObject<EventConsumerState>, IEventSubscriber
{
private readonly EventDataFormatter eventFormatter;
private readonly EventConsumerFactory eventConsumerFactory;
private readonly EventDataFormatter formatter;
private readonly IEventStore eventStore;
private readonly ISemanticLog log;
private TaskScheduler scheduler;
private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(1);
private IEventSubscription currentSubscription;
private IEventConsumer eventConsumer;
private IEventSubscription eventSubscription;
protected IEventStore EventStore
public EventConsumerActor(
EventDataFormatter formatter,
IEventStore eventStore,
ISemanticLog log)
{
get { return eventStore; }
Guard.NotNull(log, nameof(log));
Guard.NotNull(formatter, nameof(formatter));
Guard.NotNull(eventStore, nameof(eventStore));
this.log = log;
this.formatter = formatter;
this.eventStore = eventStore;
}
public EventConsumerGrain(
EventDataFormatter eventFormatter,
EventConsumerFactory eventConsumerFactory,
IEventStore eventStore,
ISemanticLog log,
IGrainRuntime runtime)
: this(eventFormatter, eventConsumerFactory, eventStore, log, null, runtime, null)
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
dispatcher.StopAndWaitAsync().Wait();
}
}
protected EventConsumerGrain(
EventDataFormatter eventFormatter,
EventConsumerFactory eventConsumerFactory,
IEventStore eventStore,
ISemanticLog log,
IGrainIdentity identity,
IGrainRuntime runtime,
IStorage<EventConsumerGrainState> storage)
: base(identity, runtime, storage)
protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
{
Guard.NotNull(log, nameof(log));
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventFormatter, nameof(eventFormatter));
Guard.NotNull(eventConsumerFactory, nameof(eventConsumerFactory));
return new RetrySubscription(eventStore, this, streamFilter, position);
}
this.log = log;
public virtual EventConsumerInfo GetState()
{
return State.ToInfo(this.eventConsumer.Name);
}
this.eventStore = eventStore;
this.eventFormatter = eventFormatter;
this.eventConsumerFactory = eventConsumerFactory;
public virtual void Stop()
{
dispatcher.DispatchAsync(() => HandleStopAsync()).Forget();
}
public override Task OnActivateAsync()
public virtual void Start()
{
scheduler = TaskScheduler.Current;
dispatcher.DispatchAsync(() => HandleStartAsync()).Forget();
}
eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString());
public virtual void Reset()
{
dispatcher.DispatchAsync(() => HandleResetAsync()).Forget();
}
public virtual void Activate(IEventConsumer eventConsumer)
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
return TaskHelper.Done;
dispatcher.DispatchAsync(() => HandleSetupAsync(eventConsumer)).Forget();
}
public Task ActivateAsync()
private async Task HandleSetupAsync(IEventConsumer consumer)
{
eventConsumer = consumer;
await ReadStateAsync();
if (!State.IsStopped)
{
Subscribe(State.Position);
}
}
private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
if (subscription != currentSubscription)
{
return TaskHelper.Done;
}
return TaskHelper.Done;
return DoAndUpdateStateAsync(async () =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event != null)
{
await DispatchConsumerAsync(@event);
}
State = State.Handled(storedEvent.EventPosition);
});
}
private Task HandleErrorAsync(IEventSubscription subscription, Exception exception)
{
if (subscription != currentSubscription)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(() =>
{
Unsubscribe();
State = State.Failed(exception);
});
}
public Task StartAsync()
private Task HandleStartAsync()
{
if (!State.IsStopped)
{
@ -99,7 +140,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
});
}
public Task StopAsync()
private Task HandleStopAsync()
{
if (State.IsStopped)
{
@ -114,7 +155,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
});
}
public Task ResetAsync()
private Task HandleResetAsync()
{
return DoAndUpdateStateAsync(async () =>
{
@ -124,61 +165,51 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
Subscribe(null);
State = EventConsumerGrainState.Initial();
State = State.Reset();
});
}
public Task OnEventAsync(Immutable<IEventSubscription> subscription, Immutable<StoredEvent> storedEvent)
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
if (subscription.Value != eventSubscription)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(async () =>
{
var @event = ParseKnownEvent(storedEvent.Value);
return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent));
}
if (@event != null)
{
await DispatchConsumerAsync(@event);
}
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception));
}
State = EventConsumerGrainState.Handled(storedEvent.Value.EventPosition);
});
private Task DoAndUpdateStateAsync(Action action)
{
return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; });
}
public Task OnErrorAsync(Immutable<IEventSubscription> subscription, Immutable<Exception> exception)
private async Task DoAndUpdateStateAsync(Func<Task> action)
{
if (subscription.Value != eventSubscription)
try
{
return TaskHelper.Done;
await action();
}
return DoAndUpdateStateAsync(() =>
catch (Exception ex)
{
Unsubscribe();
try
{
Unsubscribe();
}
catch (Exception unsubscribeException)
{
ex = new AggregateException(ex, unsubscribeException);
}
State = State.Failed(exception.Value);
});
}
log.LogFatal(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventConsumer", eventConsumer.Name));
public Task OnClosedAsync(Immutable<IEventSubscription> subscription)
{
if (subscription.Value != eventSubscription)
{
return TaskHelper.Done;
State = State.Failed(ex);
}
return DoAndUpdateStateAsync(() =>
{
Unsubscribe();
});
}
public Task<Immutable<EventConsumerInfo>> GetStateAsync()
{
return Task.FromResult(new Immutable<EventConsumerInfo>(State.ToInfo(this.GetPrimaryKeyString())));
await WriteStateAsync();
}
private async Task ClearAsync()
@ -228,19 +259,19 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
private void Unsubscribe()
{
if (eventSubscription != null)
if (currentSubscription != null)
{
eventSubscription.StopAsync().Forget();
eventSubscription = null;
currentSubscription.StopAsync().Forget();
currentSubscription = null;
}
}
private void Subscribe(string position)
{
if (eventSubscription == null)
if (currentSubscription == null)
{
eventSubscription?.StopAsync().Forget();
eventSubscription = CreateSubscription(eventConsumer.EventsFilter, position);
currentSubscription?.StopAsync().Forget();
currentSubscription = CreateSubscription(eventStore, eventConsumer.EventsFilter, position);
}
}
@ -248,7 +279,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
{
try
{
var @event = eventFormatter.Parse(message.Data);
var @event = formatter.Parse(message.Data);
@event.SetEventPosition(message.EventPosition);
@event.SetEventStreamNumber(message.EventStreamNumber);
@ -262,53 +293,5 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
return null;
}
}
protected virtual IEventConsumerGrain GetSelf()
{
return this.AsReference<IEventConsumerGrain>();
}
protected virtual IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position)
{
return new RetrySubscription(EventStore, subscriber, streamFilter, position);
}
private IEventSubscription CreateSubscription(string streamFilter, string position)
{
return CreateSubscription(new WrapperSubscription(GetSelf(), scheduler), streamFilter, position);
}
private Task DoAndUpdateStateAsync(Action action)
{
return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; });
}
private async Task DoAndUpdateStateAsync(Func<Task> action)
{
try
{
await action();
}
catch (Exception ex)
{
try
{
Unsubscribe();
}
catch (Exception unsubscribeException)
{
ex = new AggregateException(ex, unsubscribeException);
}
log.LogFatal(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventConsumer", eventConsumer.Name));
State = State.Failed(ex);
}
await WriteStateAsync();
}
}
}

91
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActorManager.cs

@ -0,0 +1,91 @@
// ==========================================================================
// EventConsumerActorManager.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.States;
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public sealed class EventConsumerActorManager : DisposableObjectBase, IExternalSystem
{
private readonly IStateFactory factory;
private readonly IPubSub pubSub;
private readonly List<IEventConsumer> consumers;
private readonly List<IDisposable> subscriptions = new List<IDisposable>();
public EventConsumerActorManager(IEnumerable<IEventConsumer> consumers, IPubSub pubSub, IStateFactory factory)
{
Guard.NotNull(pubSub, nameof(pubSub));
Guard.NotNull(factory, nameof(factory));
Guard.NotNull(consumers, nameof(consumers));
this.pubSub = pubSub;
this.factory = factory;
this.consumers = consumers.ToList();
}
public void Connect()
{
var actors = new Dictionary<string, EventConsumerActor>();
foreach (var consumer in consumers)
{
var actor = factory.GetAsync<EventConsumerActor, EventConsumerState>(consumer.Name).Result;
actors[consumer.Name] = actor;
actor.Activate(consumer);
}
subscriptions.Add(pubSub.Subscribe<StartConsumerMessage>(m =>
{
if (actors.TryGetValue(m.ConsumerName, out var actor))
{
actor.Start();
}
}));
subscriptions.Add(pubSub.Subscribe<StopConsumerMessage>(m =>
{
if (actors.TryGetValue(m.ConsumerName, out var actor))
{
actor.Stop();
}
}));
subscriptions.Add(pubSub.Subscribe<ResetConsumerMessage>(m =>
{
if (actors.TryGetValue(m.ConsumerName, out var actor))
{
actor.Reset();
}
}));
subscriptions.Add(pubSub.ReceiveAsync<GetStatesRequest, GetStatesResponse>(request =>
{
var states = actors.Values.Select(x => x.GetState()).ToArray();
return Task.FromResult(new GetStatesResponse { States = states });
}));
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
foreach (var subscription in subscriptions)
{
subscription.Dispose();
}
}
}
}
}

52
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerState.cs

@ -0,0 +1,52 @@
// ==========================================================================
// EventConsumerGrainState.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public sealed class EventConsumerState
{
public bool IsStopped { get; set; }
public string Error { get; set; }
public string Position { get; set; }
public EventConsumerState Reset()
{
return new EventConsumerState();
}
public EventConsumerState Handled(string position)
{
return new EventConsumerState { Position = position };
}
public EventConsumerState Failed(Exception ex)
{
return new EventConsumerState { Position = Position, IsStopped = true, Error = ex?.ToString() };
}
public EventConsumerState Stopped()
{
return new EventConsumerState { Position = Position, IsStopped = true };
}
public EventConsumerState Started()
{
return new EventConsumerState { Position = Position, IsStopped = false };
}
public EventConsumerInfo ToInfo(string name)
{
return SimpleMapper.Map(this, new EventConsumerInfo { Name = name });
}
}
}

9
src/Squidex.Infrastructure/Json/Orleans/IJsonValue.cs → src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesRequest.cs

@ -1,17 +1,14 @@
// ==========================================================================
// IJsonValue.cs
// GetStatesRequest.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.Json.Orleans
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
public interface IJsonValue
public sealed class GetStatesRequest
{
object Value { get; }
bool IsImmutable { get; }
}
}

11
src/Squidex.Infrastructure/Json/Orleans/JExtensions.cs → src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/GetStatesResponse.cs

@ -1,18 +1,15 @@
// ==========================================================================
// JExtensions.cs
// GetStatesResponse.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.Json.Orleans
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
public static class JExtensions
public sealed class GetStatesResponse
{
public static J<T> AsJ<T>(this T value, bool immutable = true)
{
return new J<T>(value, immutable);
}
public EventConsumerInfo[] States { get; set; }
}
}

15
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ResetConsumerMessage.cs

@ -0,0 +1,15 @@
// ==========================================================================
// ResetConsumerMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
public sealed class ResetConsumerMessage
{
public string ConsumerName { get; set; }
}
}

15
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StartConsumerMessage.cs

@ -0,0 +1,15 @@
// ==========================================================================
// StartConsumerMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
public sealed class StartConsumerMessage
{
public string ConsumerName { get; set; }
}
}

15
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/StopConsumerMessage.cs

@ -0,0 +1,15 @@
// ==========================================================================
// StopConsumerMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
{
public sealed class StopConsumerMessage
{
public string ConsumerName { get; set; }
}
}

4
src/Squidex.Infrastructure/CQRS/Events/EventStoreSubscription.cs

@ -51,10 +51,6 @@ namespace Squidex.Infrastructure.CQRS.Events
await eventSubscriber.OnErrorAsync(this, ex);
}
}
finally
{
await eventSubscriber.OnClosedAsync(this);
}
});
}

2
src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs

@ -16,7 +16,5 @@ namespace Squidex.Infrastructure.CQRS.Events
Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent);
Task OnErrorAsync(IEventSubscription subscription, Exception exception);
Task OnClosedAsync(IEventSubscription subscription);
}
}

31
src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/EventConsumerBootstrap.cs

@ -1,31 +0,0 @@
// ==========================================================================
// EventConsumerBootstrap.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
using Orleans.Providers;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains
{
public sealed class EventConsumerBootstrap : IBootstrapProvider
{
public string Name { get; private set; }
public Task Close()
{
return TaskHelper.Done;
}
public Task Init(string name, IProviderRuntime providerRuntime, IProviderConfiguration config)
{
Name = name;
return providerRuntime.GrainFactory.GetGrain<IEventConsumerRegistryGrain>("Default").ActivateAsync(null);
}
}
}

34
src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerGrain.cs

@ -1,34 +0,0 @@
// ==========================================================================
// IEventConsumerGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains
{
public interface IEventConsumerGrain : IGrainWithStringKey
{
Task<Immutable<EventConsumerInfo>> GetStateAsync();
Task ActivateAsync();
Task StopAsync();
Task StartAsync();
Task ResetAsync();
Task OnEventAsync(Immutable<IEventSubscription> subscription, Immutable<StoredEvent> storedEvent);
Task OnErrorAsync(Immutable<IEventSubscription> subscription, Immutable<Exception> exception);
Task OnClosedAsync(Immutable<IEventSubscription> subscription);
}
}

28
src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/IEventConsumerRegistryGrain.cs

@ -1,28 +0,0 @@
// ==========================================================================
// IEventConsumerRegistryGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains
{
public interface IEventConsumerRegistryGrain : IGrainWithStringKey
{
Task ActivateAsync(string streamName);
Task StopAsync(string consumerName);
Task StartAsync(string consumerName);
Task ResetAsync(string consumerName);
Task<Immutable<List<EventConsumerInfo>>> GetConsumersAsync();
}
}

52
src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrainState.cs

@ -1,52 +0,0 @@
// ==========================================================================
// EventConsumerGrainState.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
{
public sealed class EventConsumerGrainState
{
public bool IsStopped { get; set; }
public string Error { get; set; }
public string Position { get; set; }
public static EventConsumerGrainState Initial()
{
return new EventConsumerGrainState();
}
public static EventConsumerGrainState Handled(string position)
{
return new EventConsumerGrainState { Position = position };
}
public EventConsumerGrainState Failed(Exception ex)
{
return new EventConsumerGrainState { Position = Position, IsStopped = true, Error = ex?.ToString() };
}
public EventConsumerGrainState Stopped()
{
return new EventConsumerGrainState { Position = Position, IsStopped = true };
}
public EventConsumerGrainState Started()
{
return new EventConsumerGrainState { Position = Position, IsStopped = false };
}
public EventConsumerInfo ToInfo(string name)
{
return SimpleMapper.Map(this, new EventConsumerInfo { Name = name });
}
}
}

98
src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerRegistryGrain.cs

@ -1,98 +0,0 @@
// ==========================================================================
// EventConsumerRegistryGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
{
public class EventConsumerRegistryGrain : Grain, IEventConsumerRegistryGrain, IRemindable
{
private readonly IEnumerable<IEventConsumer> eventConsumers;
public EventConsumerRegistryGrain(IEnumerable<IEventConsumer> eventConsumers)
: this(eventConsumers, null, null)
{
}
protected EventConsumerRegistryGrain(
IEnumerable<IEventConsumer> eventConsumers,
IGrainIdentity identity,
IGrainRuntime runtime)
: base(identity, runtime)
{
Guard.NotNull(eventConsumers, nameof(eventConsumers));
this.eventConsumers = eventConsumers;
}
public Task ReceiveReminder(string reminderName, TickStatus status)
{
return ActivateAsync(null);
}
public override Task OnActivateAsync()
{
DelayDeactivation(TimeSpan.FromDays(1));
RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10));
RegisterTimer(x => ActivateAsync(null), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
return Task.FromResult(true);
}
public Task ActivateAsync(string streamName)
{
var tasks =
eventConsumers
.Where(c => streamName == null || Regex.IsMatch(streamName, c.EventsFilter))
.Select(c => GrainFactory.GetGrain<IEventConsumerGrain>(c.Name))
.Select(c => c.ActivateAsync());
return Task.WhenAll(tasks);
}
public Task<Immutable<List<EventConsumerInfo>>> GetConsumersAsync()
{
var tasks =
eventConsumers
.Select(c => GrainFactory.GetGrain<IEventConsumerGrain>(c.Name))
.Select(c => c.GetStateAsync());
return Task.WhenAll(tasks).ContinueWith(x => new Immutable<List<EventConsumerInfo>>(x.Result.Select(r => r.Value).ToList()));
}
public Task ResetAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.ResetAsync();
}
public Task StartAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.StartAsync();
}
public Task StopAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.StopAsync();
}
}
}

48
src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/WrapperSubscription.cs

@ -1,48 +0,0 @@
// ==========================================================================
// WrapperSubscription.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Orleans.Concurrency;
namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation
{
internal sealed class WrapperSubscription : IEventSubscriber
{
private readonly IEventConsumerGrain grain;
private readonly TaskScheduler scheduler;
public WrapperSubscription(IEventConsumerGrain grain, TaskScheduler scheduler)
{
this.grain = grain;
this.scheduler = scheduler ?? TaskScheduler.Default;
}
public Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
return Dispatch(() => grain.OnEventAsync(subscription.AsImmutable(), storedEvent.AsImmutable()));
}
public Task OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return Dispatch(() => grain.OnErrorAsync(subscription.AsImmutable(), exception.AsImmutable()));
}
public Task OnClosedAsync(IEventSubscription subscription)
{
return Dispatch(() => grain.OnClosedAsync(subscription.AsImmutable()));
}
private Task Dispatch(Func<Task> task)
{
return Task<Task>.Factory.StartNew(() => task(), CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
}
}
}

30
src/Squidex.Infrastructure/CQRS/Events/Orleans/OrleansEventNotifier.cs

@ -1,30 +0,0 @@
// ==========================================================================
// OrleansEventNotifier.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Orleans;
using Squidex.Infrastructure.CQRS.Events.Orleans.Grains;
namespace Squidex.Infrastructure.CQRS.Events.Orleans
{
public sealed class OrleansEventNotifier : IEventNotifier
{
private readonly IEventConsumerRegistryGrain eventConsumerRegistryGrain;
public OrleansEventNotifier(IGrainFactory factory)
{
Guard.NotNull(factory, nameof(factory));
eventConsumerRegistryGrain = factory.GetGrain<IEventConsumerRegistryGrain>("Default");
}
public void NotifyEventsStored(string streamName)
{
eventConsumerRegistryGrain.ActivateAsync(streamName);
}
}
}

15
src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs

@ -65,16 +65,6 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
private async Task HandleClosedAsync(IEventSubscription subscription)
{
if (subscription == currentSubscription)
{
await eventSubscriber.OnClosedAsync(this);
Unsubscribe();
}
}
private async Task HandleErrorAsync(IEventSubscription subscription, Exception exception)
{
if (subscription == currentSubscription)
@ -105,11 +95,6 @@ namespace Squidex.Infrastructure.CQRS.Events
return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception));
}
Task IEventSubscriber.OnClosedAsync(IEventSubscription subscription)
{
return dispatcher.DispatchAsync(() => HandleClosedAsync(subscription));
}
public async Task StopAsync()
{
await dispatcher.DispatchAsync(() => Unsubscribe());

23
src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs

@ -6,14 +6,15 @@
// All rights reserved.
// ==========================================================================
using System;
using Microsoft.Extensions.Caching.Memory;
namespace Squidex.Infrastructure.Caching
{
public class InvalidatingMemoryCache : IMemoryCache, IInvalidatingCache
public class InvalidatingMemoryCache : DisposableObjectBase, IMemoryCache, IInvalidatingCache
{
private const string ChannelName = "CacheInvalidations";
private readonly IMemoryCache inner;
private readonly IDisposable subscription;
private readonly IPubSub invalidator;
public InvalidatingMemoryCache(IMemoryCache inner, IPubSub invalidator)
@ -24,12 +25,20 @@ namespace Squidex.Infrastructure.Caching
this.inner = inner;
this.invalidator = invalidator;
invalidator.Subscribe(ChannelName, inner.Remove);
subscription = invalidator.Subscribe<InvalidationMessage>(m =>
{
inner.Remove(m.CacheKey);
});
}
public void Dispose()
protected override void DisposeObject(bool disposing)
{
inner.Dispose();
if (disposing)
{
subscription.Dispose();
inner.Dispose();
}
}
public ICacheEntry CreateEntry(object key)
@ -49,9 +58,9 @@ namespace Squidex.Infrastructure.Caching
public void Invalidate(object key)
{
if (key is string)
if (key is string stringKey)
{
invalidator.Publish(ChannelName, key.ToString(), true);
invalidator.Publish(new InvalidationMessage { CacheKey = stringKey }, true);
}
}
}

15
src/Squidex.Infrastructure/Caching/InvalidationMessage.cs

@ -0,0 +1,15 @@
// ==========================================================================
// InvalidationMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.Caching
{
public sealed class InvalidationMessage
{
public string CacheKey { get; set; }
}
}

4
src/Squidex.Infrastructure/IPubSub.cs

@ -12,8 +12,8 @@ namespace Squidex.Infrastructure
{
public interface IPubSub
{
void Publish(string channelName, string token, bool notifySelf);
void Publish<T>(T value, bool notifySelf);
IDisposable Subscribe(string channelName, Action<string> handler);
IDisposable Subscribe<T>(Action<T> handler);
}
}

12
src/Squidex.Infrastructure/InMemoryPubSub.cs

@ -7,26 +7,26 @@
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace Squidex.Infrastructure
{
public sealed class InMemoryPubSub : IPubSub
{
private readonly ConcurrentDictionary<string, Subject<string>> subjects = new ConcurrentDictionary<string, Subject<string>>();
private readonly Subject<object> subject = new Subject<object>();
public void Publish(string channelName, string token, bool notifySelf)
public void Publish<T>(T value, bool notifySelf)
{
if (notifySelf)
{
subjects.GetOrAdd(channelName, k => new Subject<string>()).OnNext(token);
subject.OnNext(value);
}
}
public IDisposable Subscribe(string channelName, Action<string> handler)
public IDisposable Subscribe<T>(Action<T> handler)
{
return subjects.GetOrAdd(channelName, k => new Subject<string>()).Subscribe(handler);
return subject.Where(x => x is T).OfType<T>().Subscribe(handler);
}
}
}

57
src/Squidex.Infrastructure/Json/Orleans/J.cs

@ -1,57 +0,0 @@
// ==========================================================================
// J.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
using Newtonsoft.Json;
namespace Squidex.Infrastructure.Json.Orleans
{
public struct J<T> : IJsonValue
{
private readonly T value;
private readonly bool isImmutable;
public T Value
{
get { return value; }
}
bool IJsonValue.IsImmutable
{
get { return isImmutable; }
}
object IJsonValue.Value
{
get { return Value; }
}
[JsonConstructor]
public J(T value, bool isImmutable = false)
{
this.value = value;
this.isImmutable = isImmutable;
}
public static implicit operator T(J<T> value)
{
return value.Value;
}
public static implicit operator J<T>(T d)
{
return new J<T>(d);
}
public static Task<J<T>> AsTask(T value)
{
return Task.FromResult<J<T>>(value);
}
}
}

91
src/Squidex.Infrastructure/Json/Orleans/JsonExternalSerializer.cs

@ -1,91 +0,0 @@
// ==========================================================================
// JsonExternalSerializer.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.IO;
using System.Linq;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Orleans.Runtime;
using Orleans.Serialization;
namespace Squidex.Infrastructure.Json.Orleans
{
public class JsonExternalSerializer : IExternalSerializer
{
private readonly JsonSerializer serializer;
public JsonExternalSerializer(JsonSerializer serializer)
{
Guard.NotNull(serializer, nameof(serializer));
this.serializer = serializer;
}
public void Initialize(Logger logger)
{
}
public bool IsSupportedType(Type itemType)
{
return itemType.GetInterfaces().Contains(typeof(IJsonValue));
}
public object DeepCopy(object source, ICopyContext context)
{
var jsonValue = source as IJsonValue;
if (jsonValue == null)
{
return null;
}
else if (jsonValue.IsImmutable)
{
return jsonValue;
}
else if (jsonValue.Value == null)
{
return jsonValue;
}
else
{
return JObject.FromObject(source, serializer).ToObject(source.GetType(), serializer);
}
}
public object Deserialize(Type expectedType, IDeserializationContext context)
{
var outLength = context.StreamReader.ReadInt();
var outBytes = context.StreamReader.ReadBytes(outLength);
var stream = new MemoryStream(outBytes);
using (var reader = new JsonTextReader(new StreamReader(stream)))
{
return serializer.Deserialize(reader, expectedType);
}
}
public void Serialize(object item, ISerializationContext context, Type expectedType)
{
var stream = new MemoryStream();
using (var writer = new JsonTextWriter(new StreamWriter(stream)))
{
serializer.Serialize(writer, item);
writer.Flush();
}
var outBytes = stream.ToArray();
context.StreamWriter.Write(outBytes.Length);
context.StreamWriter.Write(outBytes);
}
}
}

83
src/Squidex.Infrastructure/Orleans/GrainV2.cs

@ -1,83 +0,0 @@
// ==========================================================================
// GrainV2.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading;
using System.Threading.Tasks;
using Orleans;
using Orleans.Core;
using Orleans.Runtime;
namespace Squidex.Infrastructure.Orleans
{
public class GrainV2<TGrainState> : Grain where TGrainState : new()
{
private readonly IGrainRuntime runtime;
private IStorage<TGrainState> storage;
protected GrainV2(IGrainRuntime runtime)
{
this.runtime = runtime;
}
protected GrainV2(IGrainIdentity identity, IGrainRuntime runtime, IStorage<TGrainState> storage)
: base(identity, runtime)
{
this.runtime = runtime;
this.storage = storage;
}
protected TGrainState State
{
get
{
return storage.State;
}
set
{
storage.State = value;
}
}
protected virtual Task ClearStateAsync()
{
return storage.ClearStateAsync();
}
protected virtual Task WriteStateAsync()
{
return storage.WriteStateAsync();
}
protected virtual Task ReadStateAsync()
{
return storage.ReadStateAsync();
}
public override void Participate(IGrainLifecycle lifecycle)
{
base.Participate(lifecycle);
lifecycle.Subscribe(GrainLifecycleStage.SetupState, OnSetupState);
}
private async Task OnSetupState(CancellationToken ct)
{
if (!ct.IsCancellationRequested)
{
storage = runtime.GetStorage<TGrainState>(this);
await OnSetupState();
}
}
private async Task OnSetupState()
{
await ReadStateAsync();
}
}
}

79
src/Squidex.Infrastructure/PubSubExtensions.cs

@ -0,0 +1,79 @@
// ==========================================================================
// PubSubExtensions.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
#pragma warning disable 4014
#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void
namespace Squidex.Infrastructure
{
public static class PubSubExtensions
{
public class Request<T>
{
public T Body { get; set; }
public Guid CorrelationId { get; set; }
}
public class Response<T>
{
public T Body { get; set; }
public Guid CorrelationId { get; set; }
}
public static IDisposable ReceiveAsync<TRequest, TResponse>(this IPubSub pubsub, Func<TRequest, Task<TResponse>> callback, bool self = true)
{
return pubsub.Subscribe<Request<TRequest>>(async x =>
{
var response = await callback(x.Body);
pubsub.Publish(new Response<TResponse> { CorrelationId = x.CorrelationId, Body = response }, true);
});
}
public static async Task<TResponse> RequestAsync<TRequest, TResponse>(this IPubSub pubsub, TRequest message, TimeSpan timeout, bool self = true)
{
var request = new Request<TRequest> { Body = message, CorrelationId = Guid.NewGuid() };
IDisposable subscription = null;
try
{
var receiveTask = new TaskCompletionSource<TResponse>();
subscription = pubsub.Subscribe<Response<TResponse>>(response =>
{
if (response.CorrelationId == request.CorrelationId)
{
receiveTask.SetResult(response.Body);
}
});
Task.Run(() => pubsub.Publish(request, self));
var firstTask = await Task.WhenAny(receiveTask.Task, Task.Delay(timeout));
if (firstTask.Id != receiveTask.Task.Id)
{
throw new TaskCanceledException();
}
else
{
return await receiveTask.Task;
}
}
finally
{
subscription?.Dispose();
}
}
}
}

3
src/Squidex.Infrastructure/Squidex.Infrastructure.csproj

@ -10,9 +10,6 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.0.0" />
<PackageReference Include="Microsoft.Orleans.Core" Version="2.0.0-beta1" />
<PackageReference Include="Microsoft.Orleans.OrleansCodeGenerator.Build" Version="2.0.0-beta1-fix" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="NodaTime" Version="2.2.3" />
<PackageReference Include="RefactoringEssentials" Version="5.4.0" />

17
src/Squidex.Infrastructure/States/IStateFactory.cs

@ -0,0 +1,17 @@
// ==========================================================================
// IStateFactory.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
namespace Squidex.Infrastructure.States
{
public interface IStateFactory
{
Task<T> GetAsync<T, TState>(string key) where T : StatefulObject<TState>;
}
}

21
src/Squidex.Infrastructure/States/IStateHolder.cs

@ -0,0 +1,21 @@
// ==========================================================================
// IStateHolder.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
namespace Squidex.Infrastructure.States
{
public interface IStateHolder<T>
{
T State { get; set; }
Task ReadAsync();
Task WriteAsync();
}
}

19
src/Squidex.Infrastructure/States/IStateStore.cs

@ -0,0 +1,19 @@
// ==========================================================================
// IStateStore.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
namespace Squidex.Infrastructure.States
{
public interface IStateStore
{
Task WriteAsync<T>(string key, T value, string oldEtag, string newEtag);
Task<(T Value, string Etag)> ReadAsync<T>(string key);
}
}

15
src/Squidex.Infrastructure/States/InvalidateMessage.cs

@ -0,0 +1,15 @@
// ==========================================================================
// InvalidateMessage.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.States
{
public sealed class InvalidateMessage
{
public string Key { get; set; }
}
}

108
src/Squidex.Infrastructure/States/StateFactory.cs

@ -0,0 +1,108 @@
// ==========================================================================
// StateFactory.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Memory;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.States
{
public sealed class StateFactory : DisposableObjectBase, IExternalSystem, IStateFactory
{
private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(10);
private readonly IPubSub pubSub;
private readonly IStateStore store;
private readonly IMemoryCache statesCache;
private readonly IServiceProvider services;
private readonly List<IDisposable> states = new List<IDisposable>();
private readonly SingleThreadedDispatcher cleanupDispatcher = new SingleThreadedDispatcher();
private IDisposable pubSubscription;
public StateFactory(
IPubSub pubSub,
IServiceProvider services,
IStateStore store,
IMemoryCache statesCache)
{
Guard.NotNull(pubSub, nameof(pubSub));
Guard.NotNull(store, nameof(store));
Guard.NotNull(services, nameof(services));
Guard.NotNull(statesCache, nameof(statesCache));
this.pubSub = pubSub;
this.store = store;
this.services = services;
this.statesCache = statesCache;
}
public void Connect()
{
pubSubscription = pubSub.Subscribe<InvalidateMessage>(m =>
{
statesCache.Remove(m.Key);
});
}
public async Task<T> GetAsync<T, TState>(string key) where T : StatefulObject<TState>
{
Guard.NotNull(key, nameof(key));
if (statesCache.TryGetValue<T>(key, out var state))
{
return state;
}
state = (T)services.GetService(typeof(T));
var stateHolder = new StateHolder<TState>(key, () =>
{
pubSub.Publish(new InvalidateMessage { Key = key }, false);
}, store);
await state.ActivateAsync(stateHolder);
var stateEntry = statesCache.CreateEntry(key);
stateEntry.Value = state;
stateEntry.AbsoluteExpirationRelativeToNow = CacheDuration;
stateEntry.PostEvictionCallbacks.Add(new PostEvictionCallbackRegistration
{
EvictionCallback = (k, v, r, s) =>
{
cleanupDispatcher.DispatchAsync(() =>
{
state.Dispose();
states.Remove(state);
}).Forget();
}
});
states.Add(state);
return state;
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
cleanupDispatcher.DispatchAsync(() =>
{
foreach (var state in states)
{
state.Dispose();
}
});
cleanupDispatcher.StopAndWaitAsync().Wait();
}
}
}
}

44
src/Squidex.Infrastructure/States/StateHolder.cs

@ -0,0 +1,44 @@
// ==========================================================================
// StateHolder.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.States
{
public sealed class StateHolder<T> : IStateHolder<T>
{
private readonly Action written;
private readonly IStateStore store;
private readonly string key;
private string etag;
public T State { get; set; }
internal StateHolder(string key, Action written, IStateStore store)
{
this.key = key;
this.written = written;
this.store = store;
}
public async Task ReadAsync()
{
(State, etag) = await store.ReadAsync<T>(key);
}
public async Task WriteAsync()
{
var newEtag = Guid.NewGuid().ToString();
await store.WriteAsync(key, State, etag, newEtag);
etag = newEtag;
}
}
}

65
src/Squidex.Infrastructure/States/StatefulObject.cs

@ -0,0 +1,65 @@
// ==========================================================================
// StatefulActor.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
namespace Squidex.Infrastructure.States
{
public abstract class StatefulObject<T> : DisposableObjectBase
{
private IStateHolder<T> stateHolder;
public T State
{
get
{
if (stateHolder != null)
{
return stateHolder.State;
}
else
{
return default(T);
}
}
protected set
{
if (stateHolder != null)
{
stateHolder.State = value;
}
}
}
public Task ActivateAsync(IStateHolder<T> stateHolder)
{
Guard.NotNull(stateHolder, nameof(stateHolder));
this.stateHolder = stateHolder;
return stateHolder.ReadAsync();
}
public async Task ReadStateAsync()
{
if (stateHolder != null)
{
await stateHolder.ReadAsync();
}
}
public async Task WriteStateAsync()
{
if (stateHolder != null)
{
await stateHolder.WriteAsync();
}
}
}
}

358
tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs

@ -0,0 +1,358 @@
// ==========================================================================
// EventConsumerActorTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using FakeItEasy;
using FluentAssertions;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.States;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public class EventConsumerActorTests
{
public sealed class MyEvent : IEvent
{
}
public sealed class MyEventConsumerActor : EventConsumerActor
{
public MyEventConsumerActor(EventDataFormatter formatter, IEventStore eventStore, ISemanticLog log)
: base(formatter, eventStore, log)
{
}
protected override IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
{
return eventStore.CreateSubscription(this, streamFilter, position);
}
}
private readonly IEventConsumer eventConsumer = A.Fake<IEventConsumer>();
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventSubscriber sutSubscriber;
private readonly IEventSubscription eventSubscription = A.Fake<IEventSubscription>();
private readonly ISemanticLog log = A.Fake<ISemanticLog>();
private readonly IStateHolder<EventConsumerState> stateHolder = A.Fake<IStateHolder<EventConsumerState>>();
private readonly EventDataFormatter formatter = A.Fake<EventDataFormatter>();
private readonly EventData eventData = new EventData();
private readonly Envelope<IEvent> envelope = new Envelope<IEvent>(new MyEvent());
private readonly EventConsumerActor sut;
private readonly string consumerName;
private readonly string initialPosition = Guid.NewGuid().ToString();
private EventConsumerState state = new EventConsumerState();
public EventConsumerActorTests()
{
state.Position = initialPosition;
consumerName = eventConsumer.GetType().Name;
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.Returns(eventSubscription);
A.CallTo(() => eventConsumer.Name).
Returns(consumerName);
A.CallTo(() => stateHolder.State)
.ReturnsLazily(() => state);
A.CallToSet(() => stateHolder.State)
.Invokes(new Action<EventConsumerState>(s => state = s));
A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
sut = new MyEventConsumerActor(formatter, eventStore, log);
sutSubscriber = sut;
sut.ActivateAsync(stateHolder).Wait();
}
[Fact]
public void Should_not_subscribe_to_event_store_when_stopped_in_db()
{
state = state.Stopped();
sut.Activate(eventConsumer);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustNotHaveHappened();
}
[Fact]
public void Should_subscribe_to_event_store_when_not_found_in_db()
{
sut.Activate(eventConsumer);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public void Should_subscribe_to_event_store_when_not_stopped_in_db()
{
sut.Activate(eventConsumer);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public void Should_stop_subscription_when_stopped()
{
sut.Activate(eventConsumer);
sut.Stop();
sut.Stop();
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
A.CallTo(() => stateHolder.WriteAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public void Should_reset_consumer_when_resetting()
{
sut.Activate(eventConsumer);
sut.Stop();
sut.Reset();
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = null, Error = null });
A.CallTo(() => stateHolder.WriteAsync())
.MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventConsumer.ClearAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, state.Position))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, null))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_invoke_and_update_position_when_event_received()
{
sut.Activate(eventConsumer);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
A.CallTo(() => stateHolder.WriteAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_ignore_old_events()
{
sut.Activate(eventConsumer);
A.CallTo(() => formatter.Parse(eventData, true))
.Throws(new TypeNameNotFoundException());
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
A.CallTo(() => stateHolder.WriteAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_not_invoke_and_update_position_when_event_is_from_another_subscription()
{
sut.Activate(eventConsumer);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(A.Fake<IEventSubscription>(), @event);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription()
{
sut.Activate(eventConsumer);
var ex = new InvalidOperationException();
await OnErrorAsync(A.Fake<IEventSubscription>(), ex);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => stateHolder.WriteAsync())
.MustNotHaveHappened();
}
[Fact]
public void Should_stop_if_resetting_failed()
{
sut.Activate(eventConsumer);
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.ClearAsync())
.Throws(ex);
sut.Reset();
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => stateHolder.WriteAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_if_handling_failed()
{
sut.Activate(eventConsumer);
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => stateHolder.WriteAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_if_deserialization_failed()
{
sut.Activate(eventConsumer);
var ex = new InvalidOperationException();
A.CallTo(() => formatter.Parse(eventData, true))
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
A.CallTo(() => stateHolder.WriteAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_start_after_stop_when_handling_failed()
{
sut.Activate(eventConsumer);
var exception = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await OnEventAsync(eventSubscription, @event);
sut.Start();
sut.Start();
sut.Dispose();
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => stateHolder.WriteAsync())
.MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
}
private Task OnErrorAsync(IEventSubscription subscriber, Exception ex)
{
return sutSubscriber.OnErrorAsync(subscriber, ex);
}
private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev)
{
return sutSubscriber.OnEventAsync(subscriber, ev);
}
}
}

125
tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs

@ -0,0 +1,125 @@
// ==========================================================================
// EventConsumerManagerTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using FakeItEasy;
using FluentAssertions;
using Squidex.Infrastructure.CQRS.Events.Actors.Messages;
using Squidex.Infrastructure.States;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events.Actors
{
public class EventConsumerManagerTests
{
private readonly EventConsumerActor actor1 = A.Fake<EventConsumerActor>();
private readonly EventConsumerActor actor2 = A.Fake<EventConsumerActor>();
private readonly IStateFactory factory = A.Fake<IStateFactory>();
private readonly IEventConsumer consumer1 = A.Fake<IEventConsumer>();
private readonly IEventConsumer consumer2 = A.Fake<IEventConsumer>();
private readonly IPubSub pubSub = new InMemoryPubSub();
private readonly string consumerName1 = "Consumer1";
private readonly string consumerName2 = "Consumer2";
private readonly EventConsumerActorManager sut;
public EventConsumerManagerTests()
{
A.CallTo(() => consumer1.Name).Returns(consumerName1);
A.CallTo(() => consumer2.Name).Returns(consumerName2);
A.CallTo(() => factory.GetAsync<EventConsumerActor, EventConsumerState>(consumerName1)).Returns(actor1);
A.CallTo(() => factory.GetAsync<EventConsumerActor, EventConsumerState>(consumerName2)).Returns(actor2);
sut = new EventConsumerActorManager(new IEventConsumer[] { consumer1, consumer2 }, pubSub, factory);
}
[Fact]
public void Should_activate_all_actors()
{
sut.Connect();
A.CallTo(() => actor1.Activate(consumer1))
.MustHaveHappened();
A.CallTo(() => actor2.Activate(consumer2))
.MustHaveHappened();
}
[Fact]
public void Should_start_correct_actor()
{
sut.Connect();
pubSub.Publish(new StartConsumerMessage { ConsumerName = consumerName1 }, true);
A.CallTo(() => actor1.Start())
.MustHaveHappened();
A.CallTo(() => actor2.Start())
.MustNotHaveHappened();
}
[Fact]
public void Should_stop_correct_actor()
{
sut.Connect();
pubSub.Publish(new StopConsumerMessage { ConsumerName = consumerName1 }, true);
A.CallTo(() => actor1.Stop())
.MustHaveHappened();
A.CallTo(() => actor2.Stop())
.MustNotHaveHappened();
}
[Fact]
public void Should_reset_correct_actor()
{
sut.Connect();
pubSub.Publish(new ResetConsumerMessage { ConsumerName = consumerName2 }, true);
A.CallTo(() => actor1.Reset())
.MustNotHaveHappened();
A.CallTo(() => actor2.Reset())
.MustHaveHappened();
}
[Fact]
public async Task Should_get_state_from_all_actors()
{
sut.Connect();
A.CallTo(() => actor1.GetState())
.Returns(new EventConsumerInfo { Name = consumerName1, Position = "123 " });
A.CallTo(() => actor2.GetState())
.Returns(new EventConsumerInfo { Name = consumerName2, Position = "345 " });
var response = await pubSub.RequestAsync<GetStatesRequest, GetStatesResponse>(new GetStatesRequest(), TimeSpan.FromSeconds(5), true);
response.States.ShouldAllBeEquivalentTo(new EventConsumerInfo[]
{
new EventConsumerInfo { Name = consumerName1, Position = "123 " },
new EventConsumerInfo { Name = consumerName2, Position = "345 " }
});
}
[Fact]
public void Should_not_dispose_actors()
{
sut.Dispose();
Assert.False(actor1.IsDisposed);
Assert.False(actor2.IsDisposed);
}
}
}

16
tests/Squidex.Infrastructure.Tests/CQRS/Events/EventSubscriptionTests.cs

@ -47,22 +47,6 @@ namespace Squidex.Infrastructure.CQRS.Events
.MustHaveHappened();
}
[Fact]
public async Task Should_propagate_closed_to_subscriber()
{
var ex = new InvalidOperationException();
A.CallTo(() => eventStore.GetEventsAsync(A<Func<StoredEvent, Task>>.Ignored, A<CancellationToken>.Ignored, "^my-stream", position))
.Throws(ex);
var sut = new EventStoreSubscription(eventStore, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
A.CallTo(() => eventSubscriber.OnClosedAsync(sut))
.MustHaveHappened();
}
[Fact]
public async Task Should_propagate_operation_cancelled_exception_to_subscriber()
{

58
tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerBootstrapTests.cs

@ -1,58 +0,0 @@
// ==========================================================================
// EventConsumerBootstrapTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
using FakeItEasy;
using Orleans;
using Orleans.Providers;
using Squidex.Infrastructure.CQRS.Events.Orleans.Grains;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events.Grains
{
public sealed class EventConsumerBootstrapTests
{
private readonly IEventConsumerRegistryGrain registry = A.Fake<IEventConsumerRegistryGrain>();
private readonly IProviderRuntime runtime = A.Fake<IProviderRuntime>();
private readonly EventConsumerBootstrap sut = new EventConsumerBootstrap();
public EventConsumerBootstrapTests()
{
var factory = A.Fake<IGrainFactory>();
A.CallTo(() => factory.GetGrain<IEventConsumerRegistryGrain>("Default", null))
.Returns(registry);
A.CallTo(() => runtime.GrainFactory)
.Returns(factory);
}
[Fact]
public async Task Should_do_nothing_on_close()
{
await sut.Close();
}
[Fact]
public async Task Should_set_name_on_init()
{
await sut.Init("MyName", runtime, null);
Assert.Equal("MyName", sut.Name);
}
[Fact]
public async Task Should_activate_registry_on_init()
{
await sut.Init("MyName", runtime, null);
A.CallTo(() => registry.ActivateAsync(null))
.MustHaveHappened();
}
}
}

408
tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerGrainTests.cs

@ -1,408 +0,0 @@
// ==========================================================================
// EventConsumerGrainTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using FakeItEasy;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Infrastructure.CQRS.Events.Orleans.Grains;
using Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation;
using Squidex.Infrastructure.Log;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events.Grains
{
public class EventConsumerGrainTests
{
public sealed class MyEvent : IEvent
{
}
public sealed class MyEventConsumerGrain : EventConsumerGrain
{
public MyEventConsumerGrain(
EventDataFormatter formatter,
EventConsumerFactory eventConsumerFactory,
IEventStore eventStore,
ISemanticLog log,
IGrainIdentity identity,
IGrainRuntime runtime,
IStorage<EventConsumerGrainState> storage)
: base(formatter, eventConsumerFactory, eventStore, log, identity, runtime, storage)
{
}
protected override IEventConsumerGrain GetSelf()
{
return this;
}
protected override IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position)
{
return EventStore.CreateSubscription(subscriber, streamFilter, position);
}
}
private readonly IEventConsumer eventConsumer = A.Fake<IEventConsumer>();
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventSubscription eventSubscription = A.Fake<IEventSubscription>();
private readonly ISemanticLog log = A.Fake<ISemanticLog>();
private readonly IStorage<EventConsumerGrainState> storage = A.Fake<IStorage<EventConsumerGrainState>>();
private readonly EventDataFormatter formatter = A.Fake<EventDataFormatter>();
private readonly EventData eventData = new EventData();
private readonly Envelope<IEvent> envelope = new Envelope<IEvent>(new MyEvent());
private readonly EventConsumerFactory factory;
private readonly MyEventConsumerGrain sut;
private readonly string consumerName;
private EventConsumerGrainState state = new EventConsumerGrainState();
public EventConsumerGrainTests()
{
factory = x => eventConsumer;
state.Position = Guid.NewGuid().ToString();
consumerName = eventConsumer.GetType().Name;
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored)).Returns(eventSubscription);
A.CallTo(() => eventConsumer.Name).Returns(consumerName);
A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
A.CallTo(() => storage.State).ReturnsLazily(() => state);
A.CallToSet(() => storage.State).Invokes(new Action<EventConsumerGrainState>(s => state = s));
sut = new MyEventConsumerGrain(
formatter,
factory,
eventStore,
log,
A.Fake<IGrainIdentity>(),
A.Fake<IGrainRuntime>(),
storage);
}
[Fact]
public async Task Should_not_subscribe_to_event_store_when_stopped_in_db()
{
state.IsStopped = true;
await sut.OnActivateAsync();
await sut.ActivateAsync();
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_subscribe_to_event_store_when_not_stopped_in_db()
{
state.Position = "123";
await sut.OnActivateAsync();
await sut.ActivateAsync();
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, "123"))
.MustHaveHappened(Repeated.Exactly.Once);
}
[Fact]
public async Task Should_stop_subscription_when_stopped()
{
await sut.OnActivateAsync();
await sut.ActivateAsync();
await sut.StopAsync();
await sut.StopAsync();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
Assert.True(state.IsStopped);
}
[Fact]
public async Task Should_reset_consumer_when_resetting()
{
await sut.OnActivateAsync();
await sut.ActivateAsync();
await sut.StopAsync();
await sut.ResetAsync();
A.CallTo(() => eventConsumer.ClearAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, state.Position))
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, null))
.MustHaveHappened(Repeated.Exactly.Once);
Assert.False(state.IsStopped);
}
[Fact]
public async Task Should_unsubscribe_from_subscription_when_closed()
{
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnClosedAsync(eventSubscription);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened();
Assert.False(state.IsStopped);
}
[Fact]
public async Task Should_not_unsubscribe_from_subscription_when_closed_call_is_from_another_subscription()
{
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnClosedAsync(A.Fake<IEventSubscription>());
A.CallTo(() => eventSubscription.StopAsync())
.MustNotHaveHappened();
Assert.False(state.IsStopped);
}
[Fact]
public async Task Should_not_unsubscribe_from_subscription_when_not_running()
{
state.IsStopped = true;
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnClosedAsync(A.Fake<IEventSubscription>());
A.CallTo(() => storage.WriteStateAsync())
.MustNotHaveHappened();
Assert.True(state.IsStopped);
}
[Fact]
public async Task Should_invoke_and_update_position_when_event_received()
{
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnEventAsync(eventSubscription, @event);
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened(Repeated.Exactly.Once);
Assert.Equal(@event.EventPosition, state.Position);
var info = await sut.GetStateAsync();
Assert.Equal(@event.EventPosition, info.Value.Position);
}
[Fact]
public async Task Should_ignore_old_events()
{
A.CallTo(() => formatter.Parse(eventData, true))
.Throws(new TypeNameNotFoundException());
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnEventAsync(eventSubscription, @event);
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
Assert.Equal(@event.EventPosition, state.Position);
}
[Fact]
public async Task Should_not_invoke_and_update_position_when_event_is_from_another_subscription()
{
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnEventAsync(A.Fake<IEventSubscription>(), @event);
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
}
[Fact]
public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription()
{
var ex = new InvalidOperationException();
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnErrorAsync(A.Fake<IEventSubscription>(), ex);
Assert.False(state.IsStopped);
}
[Fact]
public async Task Should_stop_if_subscription_failed()
{
var ex = new InvalidOperationException();
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnErrorAsync(eventSubscription, ex);
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
Assert.True(state.IsStopped);
}
[Fact]
public async Task Should_stop_if_subscription_failed_and_ignore_error_on_unsubscribe()
{
A.CallTo(() => eventSubscription.StopAsync())
.Throws(new InvalidOperationException());
var ex = new InvalidOperationException();
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnErrorAsync(eventSubscription, ex);
Assert.True(state.IsStopped);
}
[Fact]
public async Task Should_stop_if_resetting_failed()
{
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.ClearAsync())
.Throws(ex);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await sut.ResetAsync();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
Assert.True(state.IsStopped);
}
[Fact]
public async Task Should_stop_if_handling_failed()
{
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnEventAsync(eventSubscription, @event);
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
Assert.True(state.IsStopped);
}
[Fact]
public async Task Should_stop_if_deserialization_failed()
{
var ex = new InvalidOperationException();
A.CallTo(() => formatter.Parse(eventData, true))
.Throws(ex);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnEventAsync(eventSubscription, @event);
A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
Assert.True(state.IsStopped);
}
[Fact]
public async Task Should_start_after_stop_when_handling_failed()
{
var exception = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData);
await sut.OnActivateAsync();
await sut.ActivateAsync();
await OnEventAsync(eventSubscription, @event);
Assert.True(state.IsStopped);
await sut.StartAsync();
await sut.StartAsync();
A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened();
A.CallTo(() => eventSubscription.StopAsync())
.MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber>.Ignored, A<string>.Ignored, A<string>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice);
Assert.False(state.IsStopped);
}
private Task OnErrorAsync(IEventSubscription subscriber, Exception ex)
{
return sut.OnErrorAsync(subscriber.AsImmutable(), ex.AsImmutable());
}
private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev)
{
return sut.OnEventAsync(subscriber.AsImmutable(), ev.AsImmutable());
}
private Task OnClosedAsync(IEventSubscription subscriber)
{
return sut.OnClosedAsync(subscriber.AsImmutable());
}
}
}

165
tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/EventConsumerRegistryGrainTests.cs

@ -1,165 +0,0 @@
// ==========================================================================
// EventConsumerRegistryGrainTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Collections.Generic;
using System.Threading.Tasks;
using FakeItEasy;
using FluentAssertions;
using Orleans;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Infrastructure.CQRS.Events.Orleans.Grains;
using Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events.Grains
{
public class EventConsumerRegistryGrainTests
{
public class MyEventConsumerRegistryGrain : EventConsumerRegistryGrain
{
public MyEventConsumerRegistryGrain(
IEnumerable<IEventConsumer> eventConsumers,
IGrainIdentity identity,
IGrainRuntime runtime)
: base(eventConsumers, identity, runtime)
{
}
}
private readonly IEventConsumer consumerA = A.Fake<IEventConsumer>();
private readonly IEventConsumer consumerB = A.Fake<IEventConsumer>();
private readonly IEventConsumerGrain grainA = A.Fake<IEventConsumerGrain>();
private readonly IEventConsumerGrain grainB = A.Fake<IEventConsumerGrain>();
private readonly MyEventConsumerRegistryGrain sut;
public EventConsumerRegistryGrainTests()
{
var grainRuntime = A.Fake<IGrainRuntime>();
var grainFactory = A.Fake<IGrainFactory>();
A.CallTo(() => grainFactory.GetGrain<IEventConsumerGrain>("a", null)).Returns(grainA);
A.CallTo(() => grainFactory.GetGrain<IEventConsumerGrain>("b", null)).Returns(grainB);
A.CallTo(() => grainRuntime.GrainFactory).Returns(grainFactory);
A.CallTo(() => consumerA.Name).Returns("a");
A.CallTo(() => consumerA.EventsFilter).Returns("^a-");
A.CallTo(() => consumerB.Name).Returns("b");
A.CallTo(() => consumerB.EventsFilter).Returns("^b-");
sut = new MyEventConsumerRegistryGrain(new[] { consumerA, consumerB }, A.Fake<IGrainIdentity>(), grainRuntime);
}
[Fact]
public async Task Should_not_activate_all_grains_on_activate()
{
await sut.OnActivateAsync();
A.CallTo(() => grainA.ActivateAsync())
.MustNotHaveHappened();
A.CallTo(() => grainB.ActivateAsync())
.MustNotHaveHappened();
}
[Fact]
public async Task Should_activate_all_grains_on_reminder()
{
await sut.ReceiveReminder(null, default(TickStatus));
A.CallTo(() => grainA.ActivateAsync())
.MustHaveHappened();
A.CallTo(() => grainB.ActivateAsync())
.MustHaveHappened();
}
[Fact]
public async Task Should_activate_all_grains_on_activate_with_null()
{
await sut.ActivateAsync(null);
A.CallTo(() => grainA.ActivateAsync())
.MustHaveHappened();
A.CallTo(() => grainB.ActivateAsync())
.MustHaveHappened();
}
[Fact]
public async Task Should_activate_matching_grains_when_stream_name_defined()
{
await sut.ActivateAsync("a-123");
A.CallTo(() => grainA.ActivateAsync())
.MustHaveHappened();
A.CallTo(() => grainB.ActivateAsync())
.MustNotHaveHappened();
}
[Fact]
public async Task Should_start_matching_grain()
{
await sut.StartAsync("a");
A.CallTo(() => grainA.StartAsync())
.MustHaveHappened();
A.CallTo(() => grainB.StartAsync())
.MustNotHaveHappened();
}
[Fact]
public async Task Should_stop_matching_grain()
{
await sut.StopAsync("b");
A.CallTo(() => grainA.StopAsync())
.MustNotHaveHappened();
A.CallTo(() => grainB.StopAsync())
.MustHaveHappened();
}
[Fact]
public async Task Should_reset_matching_grain()
{
await sut.ResetAsync("b");
A.CallTo(() => grainA.ResetAsync())
.MustNotHaveHappened();
A.CallTo(() => grainB.ResetAsync())
.MustHaveHappened();
}
[Fact]
public async Task Should_fetch_infos_from_all_grains()
{
A.CallTo(() => grainA.GetStateAsync())
.Returns(new Immutable<EventConsumerInfo>(
new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" }));
A.CallTo(() => grainB.GetStateAsync())
.Returns(new Immutable<EventConsumerInfo>(
new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" }));
var infos = await sut.GetConsumersAsync();
infos.Value.ShouldBeEquivalentTo(
new List<EventConsumerInfo>
{
new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" },
new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" }
});
}
}
}

41
tests/Squidex.Infrastructure.Tests/CQRS/Events/Grains/OrleansEventNotifierTests.cs

@ -1,41 +0,0 @@
// ==========================================================================
// OrleansEventNotifierTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using FakeItEasy;
using Orleans;
using Squidex.Infrastructure.CQRS.Events.Orleans;
using Squidex.Infrastructure.CQRS.Events.Orleans.Grains;
using Xunit;
namespace Squidex.Infrastructure.CQRS.Events.Grains
{
public class OrleansEventNotifierTests
{
private readonly IEventConsumerRegistryGrain registry = A.Fake<IEventConsumerRegistryGrain>();
private readonly OrleansEventNotifier sut;
public OrleansEventNotifierTests()
{
var factory = A.Fake<IGrainFactory>();
A.CallTo(() => factory.GetGrain<IEventConsumerRegistryGrain>("Default", null))
.Returns(registry);
sut = new OrleansEventNotifier(factory);
}
[Fact]
public void Should_activate_registry_with_stream_name()
{
sut.NotifyEventsStored("my-stream");
A.CallTo(() => registry.ActivateAsync("my-stream"))
.MustHaveHappened();
}
}
}

25
tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs

@ -112,26 +112,6 @@ namespace Squidex.Infrastructure.CQRS.Events
.MustNotHaveHappened();
}
[Fact]
public async Task Should_forward_closed_from_inner_subscription()
{
await OnClosedAsync(eventSubscription);
await sut.StopAsync();
A.CallTo(() => eventSubscriber.OnClosedAsync(sut))
.MustHaveHappened();
}
[Fact]
public async Task Should_not_forward_closed_when_message_is_from_another_subscription()
{
await OnClosedAsync(A.Fake<IEventSubscription>());
await sut.StopAsync();
A.CallTo(() => eventSubscriber.OnClosedAsync(sut))
.MustNotHaveHappened();
}
private Task OnErrorAsync(IEventSubscription subscriber, Exception ex)
{
return sutSubscriber.OnErrorAsync(subscriber, ex);
@ -141,10 +121,5 @@ namespace Squidex.Infrastructure.CQRS.Events
{
return sutSubscriber.OnEventAsync(subscriber, ev);
}
private Task OnClosedAsync(IEventSubscription subscriber)
{
return sutSubscriber.OnClosedAsync(subscriber);
}
}
}

8
tests/Squidex.Infrastructure.Tests/Caching/InvalidatingMemoryCacheTests.cs

@ -61,7 +61,7 @@ namespace Squidex.Infrastructure.Caching
{
sut.Invalidate(123);
A.CallTo(() => pubsub.Publish("CacheInvalidations", A<string>.Ignored, true)).MustNotHaveHappened();
A.CallTo(() => pubsub.Publish(A<InvalidationMessage>.That.Matches(x => x.CacheKey == "a-key"), true)).MustNotHaveHappened();
}
[Fact]
@ -69,7 +69,7 @@ namespace Squidex.Infrastructure.Caching
{
sut.Invalidate("a-key");
A.CallTo(() => pubsub.Publish("CacheInvalidations", "a-key", true)).MustHaveHappened();
A.CallTo(() => pubsub.Publish(A<InvalidationMessage>.That.Matches(x => x.CacheKey == "a-key"), true)).MustHaveHappened();
}
[Fact]
@ -77,7 +77,7 @@ namespace Squidex.Infrastructure.Caching
{
((IMemoryCache)sut).Invalidate("a-key");
A.CallTo(() => pubsub.Publish("CacheInvalidations", "a-key", true)).MustHaveHappened();
A.CallTo(() => pubsub.Publish(A<InvalidationMessage>.That.Matches(x => x.CacheKey == "a-key"), true)).MustHaveHappened();
}
[Fact]
@ -117,7 +117,7 @@ namespace Squidex.Infrastructure.Caching
Assert.Equal(123, anotherSut.Get<int>("a-key"));
anotherPubsub.Publish("CacheInvalidations", "a-key", true);
anotherPubsub.Publish(new InvalidationMessage { CacheKey = "a-key" }, true);
Assert.Equal(0, anotherSut.Get<int>("a-key"));
}

66
tests/Squidex.Infrastructure.Tests/InMemoryPubSubTests.cs

@ -6,7 +6,9 @@
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Xunit;
namespace Squidex.Infrastructure
@ -15,36 +17,78 @@ namespace Squidex.Infrastructure
{
private readonly InMemoryPubSub sut = new InMemoryPubSub();
private sealed class MessageA
{
public string Text { get; set; }
}
private sealed class MessageB
{
public string Text { get; set; }
}
[Fact]
public void Should_publish_to_handlers()
{
var channel1Events = new List<string>();
var channel2Events = new List<string>();
sut.Subscribe("channel1", x =>
sut.Subscribe<MessageA>(m =>
{
channel1Events.Add(x);
channel1Events.Add(m.Text);
});
sut.Subscribe("channel1", x =>
sut.Subscribe<MessageA>(m =>
{
channel1Events.Add(x);
channel1Events.Add(m.Text);
});
sut.Subscribe("channel2", x =>
sut.Subscribe<MessageB>(m =>
{
channel2Events.Add(x);
channel2Events.Add(m.Text);
});
sut.Publish("channel1", "1", true);
sut.Publish("channel1", "2", true);
sut.Publish("channel1", "3", false);
sut.Publish(new MessageA { Text = "1" }, true);
sut.Publish(new MessageA { Text = "2" }, true);
sut.Publish(new MessageA { Text = "3" }, false);
sut.Publish("channel2", "a", true);
sut.Publish("channel2", "b", true);
sut.Publish(new MessageB { Text = "a" }, true);
sut.Publish(new MessageB { Text = "b" }, true);
Assert.Equal(new[] { "1", "1", "2", "2" }, channel1Events.ToArray());
Assert.Equal(new[] { "a", "b" }, channel2Events.ToArray());
}
[Fact]
public async Task Should_make_request_reply_requests()
{
sut.ReceiveAsync<int, int>(x =>
{
return Task.FromResult(x + x);
}, true);
var response = await sut.RequestAsync<int, int>(2, TimeSpan.FromSeconds(2), true);
Assert.Equal(4, response);
}
[Fact]
public async Task Should_timeout_when_response_is_too_slow()
{
sut.ReceiveAsync<int, int>(async x =>
{
await Task.Delay(1000);
return x + x;
}, true);
await Assert.ThrowsAsync<TaskCanceledException>(() => sut.RequestAsync<int, int>(1, TimeSpan.FromSeconds(0.5), true));
}
[Fact]
public async Task Should_timeout_when_nobody_responds()
{
await Assert.ThrowsAsync<TaskCanceledException>(() => sut.RequestAsync<int, int>(2, TimeSpan.FromSeconds(0.5), true));
}
}
}

108
tests/Squidex.Infrastructure.Tests/Json/Orleans/JsonExternalSerializerTests.cs

@ -1,108 +0,0 @@
// ==========================================================================
// JsonExternalSerializerTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using FakeItEasy;
using Newtonsoft.Json;
using Orleans.Serialization;
using Xunit;
namespace Squidex.Infrastructure.Json.Orleans
{
public class JsonExternalSerializerTests
{
private readonly JsonExternalSerializer sut = new JsonExternalSerializer(JsonSerializer.CreateDefault());
[Fact]
public void Should_serialize_js_only()
{
Assert.True(sut.IsSupportedType(typeof(J<int>)));
Assert.True(sut.IsSupportedType(typeof(J<List<int>>)));
Assert.False(sut.IsSupportedType(typeof(int)));
Assert.False(sut.IsSupportedType(typeof(List<int>)));
}
[Fact]
public void Should_copy_null()
{
var v = (string)null;
var c = DeepCopy(v);
Assert.Null(c);
}
[Fact]
public void Should_copy_null_json()
{
var v = new J<List<int>>(null);
var c = DeepCopy(v);
Assert.Null(c.Value);
}
[Fact]
public void Should_not_copy_immutable_values()
{
var v = new List<int> { 1, 2, 3 }.AsJ(true);
var c = DeepCopy(v);
Assert.Same(v.Value, c.Value);
}
[Fact]
public void Should_copy_non_immutable_values()
{
var value = new J<List<int>>(new List<int> { 1, 2, 3 });
var copy = (J<List<int>>)sut.DeepCopy(value, null);
Assert.Equal(value.Value, copy.Value);
Assert.NotSame(value.Value, copy.Value);
}
[Fact]
public void Should_serialize_and_deserialize_value()
{
var value = new J<List<int>>(new List<int> { 1, 2, 3 });
var writtenLength = 0;
var writtenBuffer = (byte[])null;
var writer = A.Fake<IBinaryTokenStreamWriter>();
var writerContext = new SerializationContext(null) { StreamWriter = writer };
A.CallTo(() => writer.Write(A<int>.Ignored))
.Invokes(new Action<int>(x => writtenLength = x));
A.CallTo(() => writer.Write(A<byte[]>.Ignored))
.Invokes(new Action<byte[]>(x => writtenBuffer = x));
sut.Serialize(value, writerContext, value.GetType());
var reader = A.Fake<IBinaryTokenStreamReader>();
var readerContext = new DeserializationContext(null) { StreamReader = reader };
A.CallTo(() => reader.ReadInt())
.Returns(writtenLength);
A.CallTo(() => reader.ReadBytes(writtenLength))
.Returns(writtenBuffer);
var copy = (J<List<int>>)sut.Deserialize(value.GetType(), readerContext);
Assert.Equal(value.Value, copy.Value);
Assert.NotSame(value.Value, copy.Value);
}
private T DeepCopy<T>(T value)
{
return (T)sut.DeepCopy(value, null);
}
}
}
Loading…
Cancel
Save