From 61cd0806bfadc015e272002779d6814925795d6f Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 24 Sep 2017 14:23:14 +0200 Subject: [PATCH] Polling fixes --- .../CQRS/Events/GetEventStoreSubscription.cs | 2 +- .../CQRS/Events/PollingSubscription.cs | 2 +- .../Actors/DefaultRemoteActorChannel.cs | 80 +++++++++++++++++++ src/Squidex.Infrastructure/Actors/IActors.cs | 7 ++ .../Actors/IRemoteActorChannel.cs | 12 +++ .../Actors/RemoteActors.cs | 67 ++++++++++++++++ .../CQRS/Events/Actors/EventConsumerActor.cs | 6 +- .../Actors/Messages/ReceiveEventMessage.cs | 2 + 8 files changed, 175 insertions(+), 3 deletions(-) create mode 100644 src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs create mode 100644 src/Squidex.Infrastructure/Actors/IActors.cs create mode 100644 src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs create mode 100644 src/Squidex.Infrastructure/Actors/RemoteActors.cs diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs index 3651109a6..77b8782f6 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs @@ -127,7 +127,7 @@ namespace Squidex.Infrastructure.CQRS.Events { var storedEvent = Formatter.Read(receiveEvent.Event); - await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent }); + await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent, Source = this }); position = receiveEvent.Event.OriginalEventNumber; } diff --git a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs index 4d23b8a1b..9b46a5c72 100644 --- a/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs +++ b/src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs @@ -116,7 +116,7 @@ namespace Squidex.Infrastructure.CQRS.Events { if (receiveEvent.Subscription == subscription) { - await parent.SendAsync(new ReceiveEventMessage { Event = receiveEvent.Event }); + await parent.SendAsync(new ReceiveEventMessage { Event = receiveEvent.Event, Source = this }); position = receiveEvent.Event.EventPosition; } diff --git a/src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs b/src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs new file mode 100644 index 000000000..aa7610ef5 --- /dev/null +++ b/src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs @@ -0,0 +1,80 @@ +using System; +using System.Threading.Tasks; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.Actors +{ + public sealed class DefaultRemoteActorChannel : IRemoteActorChannel + { + private static readonly string ChannelName = typeof(DefaultRemoteActorChannel).Name; + private readonly IPubSub pubSub; + private readonly JsonSerializer serializer; + private readonly TypeNameRegistry typeNameRegistry; + + private sealed class Envelope + { + public string Recipient { get; set; } + + public string PayloadType { get; set; } + + public JToken Payload { get; set; } + } + + public DefaultRemoteActorChannel(IPubSub pubSub, TypeNameRegistry typeNameRegistry, JsonSerializerSettings serializerSettings = null) + { + Guard.NotNull(pubSub, nameof(pubSub)); + Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry)); + + this.pubSub = pubSub; + + this.typeNameRegistry = typeNameRegistry; + + serializer = JsonSerializer.Create(serializerSettings ?? new JsonSerializerSettings()); + } + + public Task SendAsync(string recipient, IMessage message) + { + Guard.NotNullOrEmpty(recipient, nameof(recipient)); + Guard.NotNull(message, nameof(message)); + + var messageType = typeNameRegistry.GetName(message.GetType()); + var messagePayload = WriteJson(message); + + var envelope = new Envelope { Recipient = recipient, Payload = messagePayload, PayloadType = messageType }; + + pubSub.Publish(ChannelName, JsonConvert.SerializeObject(envelope), true); + + return TaskHelper.Done; + } + + public void Subscribe(string recipient, Action handler) + { + Guard.NotNullOrEmpty(recipient, nameof(recipient)); + + pubSub.Subscribe(ChannelName, json => + { + var envelope = JsonConvert.DeserializeObject(json); + + if (string.Equals(envelope.Recipient, recipient, StringComparison.OrdinalIgnoreCase)) + { + var messageType = typeNameRegistry.GetType(envelope.PayloadType); + var messagePayload = ReadJson(envelope.Payload, messageType); + + handler?.Invoke(messagePayload); + } + }); + } + + private T ReadJson(JToken token, Type type = null) + { + return (T)token.ToObject(type ?? typeof(T), serializer); + } + + private JToken WriteJson(object value) + { + return JToken.FromObject(value, serializer); + } + } +} diff --git a/src/Squidex.Infrastructure/Actors/IActors.cs b/src/Squidex.Infrastructure/Actors/IActors.cs new file mode 100644 index 000000000..f2d898ff4 --- /dev/null +++ b/src/Squidex.Infrastructure/Actors/IActors.cs @@ -0,0 +1,7 @@ +namespace Squidex.Infrastructure.Actors +{ + public interface IActors + { + IActor Get(string id); + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs b/src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs new file mode 100644 index 000000000..f3c8ace3e --- /dev/null +++ b/src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs @@ -0,0 +1,12 @@ +using System; +using System.Threading.Tasks; + +namespace Squidex.Infrastructure.Actors +{ + public interface IRemoteActorChannel + { + Task SendAsync(string recipient, IMessage message); + + void Subscribe(string recipient, Action handler); + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure/Actors/RemoteActors.cs b/src/Squidex.Infrastructure/Actors/RemoteActors.cs new file mode 100644 index 000000000..a4073687a --- /dev/null +++ b/src/Squidex.Infrastructure/Actors/RemoteActors.cs @@ -0,0 +1,67 @@ +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.Actors +{ + public sealed class RemoteActors : IActors + { + private readonly ConcurrentDictionary senders = new ConcurrentDictionary(); + private readonly ConcurrentDictionary receivers = new ConcurrentDictionary(); + private readonly IRemoteActorChannel channel; + + private sealed class Sender : IActor + { + private readonly IRemoteActorChannel channel; + private readonly string recipient; + + public Sender(IRemoteActorChannel channel, string recipient) + { + this.channel = channel; + + this.recipient = recipient; + } + + public Task SendAsync(IMessage message) + { + return channel.SendAsync(recipient, message); + } + + public Task SendAsync(Exception exception) + { + throw new NotSupportedException(); + } + + public Task StopAsync() + { + throw new NotSupportedException(); + } + } + + public RemoteActors(IRemoteActorChannel channel) + { + Guard.NotNull(channel, nameof(channel)); + + this.channel = channel; + } + + public IActor Get(string id) + { + Guard.NotNullOrEmpty(id, nameof(id)); + + return senders.GetOrAdd(id, k => new Sender(channel, id)); + } + + public void Connect(string id, IActor actor) + { + Guard.NotNullOrEmpty(id, nameof(id)); + Guard.NotNull(actor, nameof(actor)); + + channel.Subscribe(id, message => + { + actor.SendAsync(message).Forget(); + }); + } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index 62f7ed754..f92da6df8 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -92,7 +92,11 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors case ReceiveEventMessage receiveEvent: { - await DispatchConsumerAsync(ParseEvent(receiveEvent.Event)); + if (receiveEvent.Source == eventSubscription) + { + await DispatchConsumerAsync(ParseEvent(receiveEvent.Event)); + await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, receiveEvent.Event.EventPosition, false); + } break; } diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs index 53289ddc2..bbb6d3a26 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs @@ -6,5 +6,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages public sealed class ReceiveEventMessage : IMessage { public StoredEvent Event { get; set; } + + public IEventSubscription Source { get; set; } } }