Browse Source

Polling fixes

pull/131/head
Sebastian Stehle 9 years ago
parent
commit
61cd0806bf
  1. 2
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs
  2. 2
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs
  3. 80
      src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs
  4. 7
      src/Squidex.Infrastructure/Actors/IActors.cs
  5. 12
      src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs
  6. 67
      src/Squidex.Infrastructure/Actors/RemoteActors.cs
  7. 6
      src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs
  8. 2
      src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs

2
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs

@ -127,7 +127,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
var storedEvent = Formatter.Read(receiveEvent.Event);
await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent });
await parent.SendAsync(new ReceiveEventMessage { Event = storedEvent, Source = this });
position = receiveEvent.Event.OriginalEventNumber;
}

2
src/Squidex.Infrastructure.MongoDb/CQRS/Events/PollingSubscription.cs

@ -116,7 +116,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
if (receiveEvent.Subscription == subscription)
{
await parent.SendAsync(new ReceiveEventMessage { Event = receiveEvent.Event });
await parent.SendAsync(new ReceiveEventMessage { Event = receiveEvent.Event, Source = this });
position = receiveEvent.Event.EventPosition;
}

80
src/Squidex.Infrastructure/Actors/DefaultRemoteActorChannel.cs

@ -0,0 +1,80 @@
using System;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.Actors
{
public sealed class DefaultRemoteActorChannel : IRemoteActorChannel
{
private static readonly string ChannelName = typeof(DefaultRemoteActorChannel).Name;
private readonly IPubSub pubSub;
private readonly JsonSerializer serializer;
private readonly TypeNameRegistry typeNameRegistry;
private sealed class Envelope
{
public string Recipient { get; set; }
public string PayloadType { get; set; }
public JToken Payload { get; set; }
}
public DefaultRemoteActorChannel(IPubSub pubSub, TypeNameRegistry typeNameRegistry, JsonSerializerSettings serializerSettings = null)
{
Guard.NotNull(pubSub, nameof(pubSub));
Guard.NotNull(typeNameRegistry, nameof(typeNameRegistry));
this.pubSub = pubSub;
this.typeNameRegistry = typeNameRegistry;
serializer = JsonSerializer.Create(serializerSettings ?? new JsonSerializerSettings());
}
public Task SendAsync(string recipient, IMessage message)
{
Guard.NotNullOrEmpty(recipient, nameof(recipient));
Guard.NotNull(message, nameof(message));
var messageType = typeNameRegistry.GetName(message.GetType());
var messagePayload = WriteJson(message);
var envelope = new Envelope { Recipient = recipient, Payload = messagePayload, PayloadType = messageType };
pubSub.Publish(ChannelName, JsonConvert.SerializeObject(envelope), true);
return TaskHelper.Done;
}
public void Subscribe(string recipient, Action<IMessage> handler)
{
Guard.NotNullOrEmpty(recipient, nameof(recipient));
pubSub.Subscribe(ChannelName, json =>
{
var envelope = JsonConvert.DeserializeObject<Envelope>(json);
if (string.Equals(envelope.Recipient, recipient, StringComparison.OrdinalIgnoreCase))
{
var messageType = typeNameRegistry.GetType(envelope.PayloadType);
var messagePayload = ReadJson<IMessage>(envelope.Payload, messageType);
handler?.Invoke(messagePayload);
}
});
}
private T ReadJson<T>(JToken token, Type type = null)
{
return (T)token.ToObject(type ?? typeof(T), serializer);
}
private JToken WriteJson(object value)
{
return JToken.FromObject(value, serializer);
}
}
}

7
src/Squidex.Infrastructure/Actors/IActors.cs

@ -0,0 +1,7 @@
namespace Squidex.Infrastructure.Actors
{
public interface IActors
{
IActor Get(string id);
}
}

12
src/Squidex.Infrastructure/Actors/IRemoteActorChannel.cs

@ -0,0 +1,12 @@
using System;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.Actors
{
public interface IRemoteActorChannel
{
Task SendAsync(string recipient, IMessage message);
void Subscribe(string recipient, Action<IMessage> handler);
}
}

67
src/Squidex.Infrastructure/Actors/RemoteActors.cs

@ -0,0 +1,67 @@
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.Actors
{
public sealed class RemoteActors : IActors
{
private readonly ConcurrentDictionary<string, IActor> senders = new ConcurrentDictionary<string, IActor>();
private readonly ConcurrentDictionary<string, bool> receivers = new ConcurrentDictionary<string, bool>();
private readonly IRemoteActorChannel channel;
private sealed class Sender : IActor
{
private readonly IRemoteActorChannel channel;
private readonly string recipient;
public Sender(IRemoteActorChannel channel, string recipient)
{
this.channel = channel;
this.recipient = recipient;
}
public Task SendAsync(IMessage message)
{
return channel.SendAsync(recipient, message);
}
public Task SendAsync(Exception exception)
{
throw new NotSupportedException();
}
public Task StopAsync()
{
throw new NotSupportedException();
}
}
public RemoteActors(IRemoteActorChannel channel)
{
Guard.NotNull(channel, nameof(channel));
this.channel = channel;
}
public IActor Get(string id)
{
Guard.NotNullOrEmpty(id, nameof(id));
return senders.GetOrAdd(id, k => new Sender(channel, id));
}
public void Connect(string id, IActor actor)
{
Guard.NotNullOrEmpty(id, nameof(id));
Guard.NotNull(actor, nameof(actor));
channel.Subscribe(id, message =>
{
actor.SendAsync(message).Forget();
});
}
}
}

6
src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs

@ -92,7 +92,11 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors
case ReceiveEventMessage receiveEvent:
{
await DispatchConsumerAsync(ParseEvent(receiveEvent.Event));
if (receiveEvent.Source == eventSubscription)
{
await DispatchConsumerAsync(ParseEvent(receiveEvent.Event));
await eventConsumerInfoRepository.SetPositionAsync(eventConsumer.Name, receiveEvent.Event.EventPosition, false);
}
break;
}

2
src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/ReceiveEventMessage.cs

@ -6,5 +6,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages
public sealed class ReceiveEventMessage : IMessage
{
public StoredEvent Event { get; set; }
public IEventSubscription Source { get; set; }
}
}

Loading…
Cancel
Save