Browse Source

Started to migrate.

pull/249/head
Sebastian Stehle 8 years ago
parent
commit
c8511ed04a
  1. 3
      src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/FastlyActionHandler.cs
  2. 1
      src/Squidex.Domain.Apps.Entities/Apps/AppCommandMiddleware.cs
  3. 4
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs
  4. 2
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs
  5. 40
      src/Squidex.Infrastructure/EventSourcing/DefaultEventNotifier.cs
  6. 34
      src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerBootstrap.cs
  7. 127
      src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
  8. 90
      src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrainManager.cs
  9. 107
      src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs
  10. 33
      src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerGrain.cs
  11. 29
      src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerManagerGrain.cs
  12. 13
      src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesRequest.cs
  13. 14
      src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesResponse.cs
  14. 14
      src/Squidex.Infrastructure/EventSourcing/Grains/Messages/ResetConsumerMessage.cs
  15. 14
      src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StartConsumerMessage.cs
  16. 14
      src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StopConsumerMessage.cs
  17. 34
      src/Squidex.Infrastructure/EventSourcing/Grains/OrleansEventNotifier.cs
  18. 42
      src/Squidex.Infrastructure/EventSourcing/Grains/WrapperSubscription.cs
  19. 2
      src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs
  20. 14
      src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs
  21. 5
      src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs
  22. 3
      src/Squidex.Infrastructure/Squidex.Infrastructure.csproj
  23. 33
      src/Squidex.Infrastructure/TypeNameRegistry.cs
  24. 27
      src/Squidex/Areas/Api/Controllers/EventConsumers/EventConsumersController.cs
  25. 1
      src/Squidex/Config/Authentication/MicrosoftHandler.cs
  26. 6
      src/Squidex/Config/Domain/ReadServices.cs
  27. 41
      src/Squidex/Config/Orleans/ClientServices.cs
  28. 34
      src/Squidex/Config/Orleans/SiloExtensions.cs
  29. 57
      src/Squidex/Config/Orleans/SiloServices.cs
  30. 44
      src/Squidex/Program.cs
  31. 3
      src/Squidex/Squidex.csproj
  32. 2
      src/Squidex/WebStartup.cs
  33. 5
      tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerTests.cs

3
src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/FastlyActionHandler.cs

@ -8,14 +8,11 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Squidex.Domain.Apps.Core.Rules;
using Squidex.Domain.Apps.Core.Rules.Actions;
using Squidex.Domain.Apps.Events;
using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Http;

1
src/Squidex.Domain.Apps.Entities/Apps/AppCommandMiddleware.cs

@ -6,7 +6,6 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Squidex.Domain.Apps.Entities.Apps.Commands;
using Squidex.Domain.Apps.Entities.Apps.Guards;

4
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs

@ -46,6 +46,10 @@ namespace Squidex.Infrastructure.EventSourcing
return TaskHelper.Done;
}
public void WakeUp()
{
}
private EventStoreCatchUpSubscription SubscribeToStream(string streamName)
{
var settings = CatchUpSubscriptionSettings.Default;

2
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs

@ -56,7 +56,7 @@ namespace Squidex.Infrastructure.EventSourcing
Guard.NotNull(subscriber, nameof(subscriber));
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter));
return new PollingSubscription(this, notifier, subscriber, streamFilter, position);
return new PollingSubscription(this, subscriber, streamFilter, position);
}
public async Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName, long streamPosition = 0)

40
src/Squidex.Infrastructure/EventSourcing/DefaultEventNotifier.cs

@ -1,40 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
namespace Squidex.Infrastructure.EventSourcing
{
public sealed class DefaultEventNotifier : IEventNotifier
{
private static readonly string ChannelName = typeof(DefaultEventNotifier).Name;
private readonly IPubSub pubsub;
public sealed class EventNotification
{
public string StreamName { get; set; }
}
public DefaultEventNotifier(IPubSub pubsub)
{
Guard.NotNull(pubsub, nameof(pubsub));
this.pubsub = pubsub;
}
public void NotifyEventsStored(string streamName)
{
pubsub.Publish(new EventNotification { StreamName = streamName }, true);
}
public IDisposable Subscribe(Action<string> handler)
{
return pubsub.Subscribe<EventNotification>(x => handler?.Invoke(x.StreamName));
}
}
}

34
src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerBootstrap.cs

@ -0,0 +1,34 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Orleans;
using Orleans.Runtime;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public sealed class EventConsumerBootstrap : ILifecycleParticipant<ISiloLifecycle>
{
private readonly IGrainFactory grainFactory;
public EventConsumerBootstrap(IGrainFactory grainFactory)
{
Guard.NotNull(grainFactory, nameof(grainFactory));
this.grainFactory = grainFactory;
}
public void Participate(ISiloLifecycle lifecycle)
{
lifecycle.Subscribe(SiloLifecycleStage.SiloActive, ct =>
{
var grain = grainFactory.GetGrain<IEventConsumerManagerGrain>("Default");
return grain.ActivateAsync();
});
}
}
}

127
src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

@ -8,133 +8,134 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public class EventConsumerGrain : DisposableObjectBase, IStatefulObject<string>, IEventSubscriber
public class EventConsumerGrain : Grain, IEventConsumerGrain
{
private readonly EventConsumerFactory eventConsumerFactory;
private readonly IEventDataFormatter eventDataFormatter;
private readonly IEventStore eventStore;
private readonly ISemanticLog log;
private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(1);
private readonly IPersistence<EventConsumerState> persistence;
private TaskScheduler scheduler;
private IEventSubscription currentSubscription;
private IEventConsumer eventConsumer;
private IPersistence<EventConsumerState> persistence;
private EventConsumerState state = new EventConsumerState();
public EventConsumerGrain(
EventConsumerFactory eventConsumerFactory,
IStore<string> store,
IEventStore eventStore,
IEventDataFormatter eventDataFormatter,
ISemanticLog log)
: this (eventConsumerFactory, store, eventStore, eventDataFormatter, null, null, log)
{
}
protected EventConsumerGrain(
EventConsumerFactory eventConsumerFactory,
IStore<string> store,
IEventStore eventStore,
IEventDataFormatter eventDataFormatter,
IGrainIdentity identity,
IGrainRuntime runtime,
ISemanticLog log)
{
Guard.NotNull(log, nameof(log));
Guard.NotNull(store, nameof(store));
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventDataFormatter, nameof(eventDataFormatter));
Guard.NotNull(eventConsumerFactory, nameof(eventConsumerFactory));
this.log = log;
this.eventStore = eventStore;
this.eventDataFormatter = eventDataFormatter;
}
this.eventConsumerFactory = eventConsumerFactory;
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
dispatcher.StopAndWaitAsync().Wait();
}
persistence = store.WithSnapshots<EventConsumerState, string>(this.GetPrimaryKeyString(), s => state = s);
}
public Task ActivateAsync(string key, IStore<string> store)
public override Task OnActivateAsync()
{
persistence = store.WithSnapshots<EventConsumerState, string>(key, s => state = s);
scheduler = TaskScheduler.Current;
eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString());
return persistence.ReadAsync();
}
protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
{
return new RetrySubscription(eventStore, this, streamFilter, position);
}
public virtual EventConsumerInfo GetState()
{
return state.ToInfo(this.eventConsumer.Name);
return new RetrySubscription(eventStore, new WrapperSubscription(this.AsReference<IEventConsumerGrain>(), scheduler), streamFilter, position);
}
public virtual void Stop()
public virtual Task<Immutable<EventConsumerInfo>> GetStateAsync()
{
dispatcher.DispatchAsync(HandleStopAsync).Forget();
return Task.FromResult(state.ToInfo(this.eventConsumer.Name).AsImmutable());
}
public virtual void Start()
public Task OnEventAsync(Immutable<IEventSubscription> subscription, Immutable<StoredEvent> storedEvent)
{
dispatcher.DispatchAsync(HandleStartAsync).Forget();
}
public virtual void Reset()
if (subscription.Value != currentSubscription)
{
dispatcher.DispatchAsync(HandleResetAsync).Forget();
}
public virtual void Activate(IEventConsumer eventConsumer)
{
Guard.NotNull(eventConsumer, nameof(eventConsumer));
dispatcher.DispatchAsync(() => HandleSetupAsync(eventConsumer)).Forget();
return TaskHelper.Done;
}
private Task HandleSetupAsync(IEventConsumer consumer)
return DoAndUpdateStateAsync(async () =>
{
eventConsumer = consumer;
var @event = ParseKnownEvent(storedEvent.Value);
if (!state.IsStopped)
if (@event != null)
{
Subscribe(state.Position);
await DispatchConsumerAsync(@event);
}
return TaskHelper.Done;
state = state.Handled(storedEvent.Value.EventPosition);
});
}
private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
public Task OnErrorAsync(Immutable<IEventSubscription> subscription, Immutable<Exception> exception)
{
if (subscription != currentSubscription)
if (subscription.Value != currentSubscription)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(async () =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event != null)
return DoAndUpdateStateAsync(() =>
{
await DispatchConsumerAsync(@event);
}
Unsubscribe();
state = state.Handled(storedEvent.EventPosition);
state = state.Failed(exception.Value);
});
}
private Task HandleErrorAsync(IEventSubscription subscription, Exception exception)
{
if (subscription != currentSubscription)
public Task WakeUpAsync()
{
currentSubscription?.WakeUp();
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(() =>
public Task ActivateAsync()
{
Unsubscribe();
if (!state.IsStopped)
{
Subscribe(state.Position);
}
state = state.Failed(exception);
});
return TaskHelper.Done;
}
private Task HandleStartAsync()
public Task StartAsync()
{
if (!state.IsStopped)
{
@ -149,7 +150,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
});
}
private Task HandleStopAsync()
public Task StopAsync()
{
if (state.IsStopped)
{
@ -164,7 +165,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
});
}
private Task HandleResetAsync()
public Task ResetAsync()
{
return DoAndUpdateStateAsync(async () =>
{
@ -178,16 +179,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
});
}
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent));
}
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception));
}
private Task DoAndUpdateStateAsync(Action action, [CallerMemberName] string caller = null)
{
return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }, caller);

90
src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrainManager.cs

@ -1,90 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Squidex.Infrastructure.EventSourcing.Grains.Messages;
using Squidex.Infrastructure.States;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public sealed class EventConsumerGrainManager : DisposableObjectBase, IRunnable
{
private readonly IStateFactory factory;
private readonly IPubSub pubSub;
private readonly List<IEventConsumer> consumers;
private readonly List<IDisposable> subscriptions = new List<IDisposable>();
public EventConsumerGrainManager(IEnumerable<IEventConsumer> consumers, IPubSub pubSub, IStateFactory factory)
{
Guard.NotNull(pubSub, nameof(pubSub));
Guard.NotNull(factory, nameof(factory));
Guard.NotNull(consumers, nameof(consumers));
this.pubSub = pubSub;
this.factory = factory;
this.consumers = consumers.ToList();
}
public void Run()
{
var actors = new Dictionary<string, EventConsumerGrain>();
foreach (var consumer in consumers)
{
var actor = factory.CreateAsync<EventConsumerGrain>(consumer.Name).Result;
actors[consumer.Name] = actor;
actor.Activate(consumer);
}
subscriptions.Add(pubSub.Subscribe<StartConsumerMessage>(m =>
{
if (actors.TryGetValue(m.ConsumerName, out var actor))
{
actor.Start();
}
}));
subscriptions.Add(pubSub.Subscribe<StopConsumerMessage>(m =>
{
if (actors.TryGetValue(m.ConsumerName, out var actor))
{
actor.Stop();
}
}));
subscriptions.Add(pubSub.Subscribe<ResetConsumerMessage>(m =>
{
if (actors.TryGetValue(m.ConsumerName, out var actor))
{
actor.Reset();
}
}));
subscriptions.Add(pubSub.ReceiveAsync<GetStatesRequest, GetStatesResponse>(request =>
{
var states = actors.Values.Select(x => x.GetState()).ToArray();
return Task.FromResult(new GetStatesResponse { States = states });
}));
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
foreach (var subscription in subscriptions)
{
subscription.Dispose();
}
}
}
}
}

107
src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs

@ -0,0 +1,107 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
using Orleans.Core;
using Orleans.Runtime;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public class EventConsumerManagerGrain : Grain, IEventConsumerManagerGrain
{
private readonly IEnumerable<IEventConsumer> eventConsumers;
public EventConsumerManagerGrain(IEnumerable<IEventConsumer> eventConsumers)
: this(eventConsumers, null, null)
{
}
protected EventConsumerManagerGrain(
IEnumerable<IEventConsumer> eventConsumers,
IGrainIdentity identity,
IGrainRuntime runtime)
: base(identity, runtime)
{
Guard.NotNull(eventConsumers, nameof(eventConsumers));
this.eventConsumers = eventConsumers;
}
public Task ReceiveReminder(string reminderName, TickStatus status)
{
return ActivateAsync();
}
public override Task OnActivateAsync()
{
DelayDeactivation(TimeSpan.FromDays(1));
RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10));
RegisterTimer(x => ActivateAsync(), null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
return Task.FromResult(true);
}
public Task ActivateAsync()
{
var tasks =
eventConsumers
.Select(c => GrainFactory.GetGrain<IEventConsumerGrain>(c.Name))
.Select(c => c.ActivateAsync());
return Task.WhenAll(tasks);
}
public Task WakeUpAsync(string streamName)
{
var tasks =
eventConsumers
.Where(c => streamName == null || Regex.IsMatch(streamName, c.EventsFilter))
.Select(c => GrainFactory.GetGrain<IEventConsumerGrain>(c.Name))
.Select(c => c.WakeUpAsync());
return Task.WhenAll(tasks);
}
public Task<Immutable<List<EventConsumerInfo>>> GetConsumersAsync()
{
var tasks =
eventConsumers
.Select(c => GrainFactory.GetGrain<IEventConsumerGrain>(c.Name))
.Select(c => c.GetStateAsync());
return Task.WhenAll(tasks).ContinueWith(x => new Immutable<List<EventConsumerInfo>>(x.Result.Select(r => r.Value).ToList()));
}
public Task ResetAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.ResetAsync();
}
public Task StartAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.StartAsync();
}
public Task StopAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.StopAsync();
}
}
}

33
src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerGrain.cs

@ -0,0 +1,33 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public interface IEventConsumerGrain : IGrainWithStringKey
{
Task<Immutable<EventConsumerInfo>> GetStateAsync();
Task ActivateAsync();
Task StopAsync();
Task StartAsync();
Task ResetAsync();
Task WakeUpAsync();
Task OnEventAsync(Immutable<IEventSubscription> subscription, Immutable<StoredEvent> storedEvent);
Task OnErrorAsync(Immutable<IEventSubscription> subscription, Immutable<Exception> exception);
}
}

29
src/Squidex.Infrastructure/EventSourcing/Grains/IEventConsumerManagerGrain.cs

@ -0,0 +1,29 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public interface IEventConsumerManagerGrain : IGrainWithStringKey
{
Task ActivateAsync();
Task WakeUpAsync(string streamName);
Task StopAsync(string consumerName);
Task StartAsync(string consumerName);
Task ResetAsync(string consumerName);
Task<Immutable<List<EventConsumerInfo>>> GetConsumersAsync();
}
}

13
src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesRequest.cs

@ -1,13 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Infrastructure.EventSourcing.Grains.Messages
{
public sealed class GetStatesRequest
{
}
}

14
src/Squidex.Infrastructure/EventSourcing/Grains/Messages/GetStatesResponse.cs

@ -1,14 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Infrastructure.EventSourcing.Grains.Messages
{
public sealed class GetStatesResponse
{
public EventConsumerInfo[] States { get; set; }
}
}

14
src/Squidex.Infrastructure/EventSourcing/Grains/Messages/ResetConsumerMessage.cs

@ -1,14 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Infrastructure.EventSourcing.Grains.Messages
{
public sealed class ResetConsumerMessage
{
public string ConsumerName { get; set; }
}
}

14
src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StartConsumerMessage.cs

@ -1,14 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Infrastructure.EventSourcing.Grains.Messages
{
public sealed class StartConsumerMessage
{
public string ConsumerName { get; set; }
}
}

14
src/Squidex.Infrastructure/EventSourcing/Grains/Messages/StopConsumerMessage.cs

@ -1,14 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Infrastructure.EventSourcing.Grains.Messages
{
public sealed class StopConsumerMessage
{
public string ConsumerName { get; set; }
}
}

34
src/Squidex.Infrastructure/EventSourcing/Grains/OrleansEventNotifier.cs

@ -0,0 +1,34 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using Orleans;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
public sealed class OrleansEventNotifier : IEventNotifier
{
private readonly IEventConsumerManagerGrain eventConsumerManagerGrain;
public OrleansEventNotifier(IGrainFactory factory)
{
Guard.NotNull(factory, nameof(factory));
eventConsumerManagerGrain = factory.GetGrain<IEventConsumerManagerGrain>("Default");
}
public void NotifyEventsStored(string streamName)
{
eventConsumerManagerGrain.WakeUpAsync(streamName);
}
public IDisposable Subscribe(Action<string> handler)
{
return null;
}
}
}

42
src/Squidex.Infrastructure/EventSourcing/Grains/WrapperSubscription.cs

@ -0,0 +1,42 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using Orleans.Concurrency;
namespace Squidex.Infrastructure.EventSourcing.Grains
{
internal sealed class WrapperSubscription : IEventSubscriber
{
private readonly IEventConsumerGrain grain;
private readonly TaskScheduler scheduler;
public WrapperSubscription(IEventConsumerGrain grain, TaskScheduler scheduler)
{
this.grain = grain;
this.scheduler = scheduler ?? TaskScheduler.Default;
}
public Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
return Dispatch(() => grain.OnEventAsync(subscription.AsImmutable(), storedEvent.AsImmutable()));
}
public Task OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return Dispatch(() => grain.OnErrorAsync(subscription.AsImmutable(), exception.AsImmutable()));
}
private Task Dispatch(Func<Task> task)
{
return Task<Task>.Factory.StartNew(() => task(), CancellationToken.None, TaskCreationOptions.None, scheduler).Unwrap();
}
}
}

2
src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs

@ -11,6 +11,8 @@ namespace Squidex.Infrastructure.EventSourcing
{
public interface IEventSubscription
{
void WakeUp();
Task StopAsync();
}
}

14
src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs

@ -14,10 +14,8 @@ namespace Squidex.Infrastructure.EventSourcing
{
public sealed class PollingSubscription : IEventSubscription
{
private readonly IEventNotifier eventNotifier;
private readonly IEventStore eventStore;
private readonly IEventSubscriber eventSubscriber;
private readonly IDisposable notification;
private readonly CompletionTimer timer;
private readonly Regex streamRegex;
private readonly string streamFilter;
@ -25,17 +23,14 @@ namespace Squidex.Infrastructure.EventSourcing
public PollingSubscription(
IEventStore eventStore,
IEventNotifier eventNotifier,
IEventSubscriber eventSubscriber,
string streamFilter,
string position)
{
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventNotifier, nameof(eventNotifier));
Guard.NotNull(eventSubscriber, nameof(eventSubscriber));
this.position = position;
this.eventNotifier = eventNotifier;
this.eventStore = eventStore;
this.eventSubscriber = eventSubscriber;
this.streamFilter = streamFilter;
@ -61,20 +56,15 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
});
}
notification = eventNotifier.Subscribe(streamName =>
{
if (streamRegex.IsMatch(streamName))
public void WakeUp()
{
timer.SkipCurrentDelay();
}
});
}
public Task StopAsync()
{
notification?.Dispose();
return timer.StopAsync();
}
}

5
src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs

@ -57,6 +57,11 @@ namespace Squidex.Infrastructure.EventSourcing
currentSubscription = null;
}
public void WakeUp()
{
currentSubscription?.WakeUp();
}
private async Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
if (subscription == currentSubscription)

3
src/Squidex.Infrastructure/Squidex.Infrastructure.csproj

@ -10,6 +10,9 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Orleans.Core" Version="2.0.0-beta3" />
<PackageReference Include="Microsoft.Orleans.OrleansCodeGenerator.Build" Version="2.0.0-beta3" />
<PackageReference Include="Microsoft.Orleans.OrleansRuntime" Version="2.0.0-beta3" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="NodaTime" Version="2.2.3" />
<PackageReference Include="RefactoringEssentials" Version="5.6.0" />

33
src/Squidex.Infrastructure/TypeNameRegistry.cs

@ -23,19 +23,14 @@ namespace Squidex.Infrastructure
lock (namesByType)
{
try
{
typesByName.Add(name, type);
}
catch (ArgumentException)
{
if (typesByName[name] != type)
if (typesByName.TryGetValue(name, out var existingType) && existingType != type)
{
var message = $"The name '{name}' is already registered with type '{typesByName[name]}'";
throw new ArgumentException(message, nameof(type));
}
}
typesByName[name] = type;
}
return this;
@ -62,33 +57,23 @@ namespace Squidex.Infrastructure
lock (namesByType)
{
try
{
namesByType.Add(type, name);
}
catch (ArgumentException)
{
if (namesByType[type] != name)
if (namesByType.TryGetValue(type, out var existingName) && existingName != name)
{
var message = $"The type '{type}' is already registered with name '{namesByType[type]}'";
throw new ArgumentException(message, nameof(type));
}
}
try
{
typesByName.Add(name, type);
}
catch (ArgumentException)
{
if (typesByName[name] != type)
namesByType[type] = name;
if (typesByName.TryGetValue(name, out var existingType) && existingType != type)
{
var message = $"The name '{name}' is already registered with type '{typesByName[name]}'";
throw new ArgumentException(message, nameof(type));
}
}
typesByName[name] = type;
}
return this;

27
src/Squidex/Areas/Api/Controllers/EventConsumers/EventConsumersController.cs

@ -5,15 +5,14 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using NSwag.Annotations;
using Orleans;
using Squidex.Areas.Api.Controllers.EventConsumers.Models;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing.Grains.Messages;
using Squidex.Infrastructure.EventSourcing.Grains;
using Squidex.Infrastructure.Reflection;
using Squidex.Pipeline;
@ -25,12 +24,12 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers
[SwaggerIgnore]
public sealed class EventConsumersController : ApiController
{
private readonly IPubSub pubSub;
private readonly IEventConsumerManagerGrain eventConsumerManagerGrain;
public EventConsumersController(ICommandBus commandBus, IPubSub pubSub)
public EventConsumersController(ICommandBus commandBus, IClusterClient orleans)
: base(commandBus)
{
this.pubSub = pubSub;
eventConsumerManagerGrain = orleans.GetGrain<IEventConsumerManagerGrain>("Default");
}
[HttpGet]
@ -38,9 +37,9 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers
[ApiCosts(0)]
public async Task<IActionResult> GetEventConsumers()
{
var entities = await pubSub.RequestAsync<GetStatesRequest, GetStatesResponse>(new GetStatesRequest(), TimeSpan.FromSeconds(2), true);
var entities = await eventConsumerManagerGrain.GetConsumersAsync();
var models = entities.States.Select(x => SimpleMapper.Map(x, new EventConsumerDto())).ToList();
var models = entities.Value.Select(x => SimpleMapper.Map(x, new EventConsumerDto())).ToList();
return Ok(models);
}
@ -48,9 +47,9 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers
[HttpPut]
[Route("event-consumers/{name}/start/")]
[ApiCosts(0)]
public IActionResult Start(string name)
public async Task<IActionResult> Start(string name)
{
pubSub.Publish(new StartConsumerMessage { ConsumerName = name }, true);
await eventConsumerManagerGrain.StartAsync(name);
return NoContent();
}
@ -58,9 +57,9 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers
[HttpPut]
[Route("event-consumers/{name}/stop/")]
[ApiCosts(0)]
public IActionResult Stop(string name)
public async Task<IActionResult> Stop(string name)
{
pubSub.Publish(new StopConsumerMessage { ConsumerName = name }, true);
await eventConsumerManagerGrain.StopAsync(name);
return NoContent();
}
@ -68,9 +67,9 @@ namespace Squidex.Areas.Api.Controllers.EventConsumers
[HttpPut]
[Route("event-consumers/{name}/reset/")]
[ApiCosts(0)]
public IActionResult Reset(string name)
public async Task<IActionResult> Reset(string name)
{
pubSub.Publish(new ResetConsumerMessage { ConsumerName = name }, true);
await eventConsumerManagerGrain.ResetAsync(name);
return NoContent();
}

1
src/Squidex/Config/Authentication/MicrosoftHandler.cs

@ -5,7 +5,6 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Security.Claims;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authentication.OAuth;
using Squidex.Shared.Identity;

6
src/Squidex/Config/Domain/ReadServices.cs

@ -40,10 +40,6 @@ namespace Squidex.Config.Domain
if (consumeEvents)
{
services.AddTransient<EventConsumerGrain>();
services.AddSingletonAs<EventConsumerGrainManager>()
.As<IRunnable>();
services.AddSingletonAs<RuleDequeuer>()
.As<IRunnable>();
}
@ -113,7 +109,7 @@ namespace Squidex.Config.Domain
services.AddSingletonAs<WebhookActionHandler>()
.As<IRuleActionHandler>();
services.AddSingletonAs<DefaultEventNotifier>()
services.AddSingletonAs<OrleansEventNotifier>()
.As<IEventNotifier>();
services.AddSingletonAs<RuleEnqueuer>()

41
src/Squidex/Config/Orleans/ClientServices.cs

@ -0,0 +1,41 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using Microsoft.Extensions.DependencyInjection;
using Orleans;
using Squidex.Infrastructure.EventSourcing.Grains;
namespace Squidex.Config.Orleans
{
public static class ClientServices
{
public static void AddAppClient(this IServiceCollection services)
{
services.AddSingletonAs(c => c.GetRequiredService<IClusterClient>())
.As<IGrainFactory>();
services.AddSingletonAs(c =>
{
var client = new ClientBuilder()
.ConfigureApplicationParts(builder =>
{
builder.AddApplicationPart(typeof(EventConsumerGrain).Assembly);
})
.UseStaticGatewayListProvider(options =>
{
options.Gateways.Add(new Uri("gwy.tcp://127.0.0.1:40000/0"));
})
.Build();
client.Connect().Wait();
return client;
});
}
}
}

34
src/Squidex/Config/Orleans/SiloExtensions.cs

@ -0,0 +1,34 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Microsoft.Extensions.Configuration;
using Orleans;
using Orleans.Hosting;
using Orleans.Runtime.Configuration;
namespace Squidex.Config.Orleans
{
public static class SiloExtensions
{
public static ISiloHostBuilder UseContentRoot(this ISiloHostBuilder builder, string path)
{
builder.ConfigureAppConfiguration(config =>
{
config.SetBasePath(path);
});
return builder;
}
public static ClusterConfiguration WithDashboard(this ClusterConfiguration config)
{
config.RegisterDashboard();
return config;
}
}
}

57
src/Squidex/Config/Orleans/SiloServices.cs

@ -0,0 +1,57 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Linq;
using System.Net;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Orleans;
using Orleans.Runtime;
using Orleans.Runtime.Configuration;
using Squidex.Infrastructure.EventSourcing.Grains;
namespace Squidex.Config.Orleans
{
public static class SiloServices
{
public static void AddAppSiloServices(this IServiceCollection services, IConfiguration config)
{
services.AddSingletonAs<EventConsumerBootstrap>()
.As<ILifecycleParticipant<ISiloLifecycle>>();
/*
var clusterConfiguration =
services.Where(x => x.ServiceType == typeof(ClusterConfiguration))
.Select(x => x.ImplementationInstance)
.Select(x => (ClusterConfiguration)x)
.FirstOrDefault();
if (clusterConfiguration != null)
{
clusterConfiguration.Globals.RegisterBootstrapProvider<EventConsumerBootstrap>("EventConsumers");
var ipConfig = config.GetRequiredValue("orleans:hostNameOrIPAddress");
if (ipConfig.Equals("Host", StringComparison.OrdinalIgnoreCase))
{
ipConfig = Dns.GetHostName();
}
else if (ipConfig.Equals("FirstIPAddressOfHost"))
{
var ips = Dns.GetHostAddressesAsync(Dns.GetHostName()).Result;
ipConfig = ips.FirstOrDefault()?.ToString();
}
clusterConfiguration.Defaults.PropagateActivityId = true;
clusterConfiguration.Defaults.ProxyGatewayEndpoint = new IPEndPoint(IPAddress.Any, 40000);
clusterConfiguration.Defaults.HostNameOrIPAddress = ipConfig;
}*/
}
}
}

44
src/Squidex/Program.cs

@ -5,8 +5,14 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.IO;
using Microsoft.AspNetCore.Hosting;
using Orleans;
using Orleans.Hosting;
using Orleans.Runtime.Configuration;
using Squidex.Config.Orleans;
using Squidex.Infrastructure.EventSourcing.Grains;
using Squidex.Infrastructure.Log.Adapter;
namespace Squidex
@ -14,6 +20,32 @@ namespace Squidex
public static class Program
{
public static void Main(string[] args)
{
var silo = new SiloHostBuilder()
.UseConfiguration(ClusterConfiguration.LocalhostPrimarySilo(33333))
.UseContentRoot(Directory.GetCurrentDirectory())
.ConfigureServices((context, services) =>
{
services.AddAppSiloServices(context.Configuration);
services.AddAppServices(context.Configuration);
})
.ConfigureApplicationParts(builder =>
{
builder.AddApplicationPart(typeof(EventConsumerManagerGrain).Assembly);
})
.ConfigureLogging(builder =>
{
builder.AddSemanticLog();
})
.ConfigureAppConfiguration((hostContext, builder) =>
{
builder.AddAppConfiguration(GetEnvironment(), args);
})
.Build();
silo.StartAsync().Wait();
try
{
new WebHostBuilder()
.UseKestrel(k => { k.AddServerHeader = false; })
@ -31,5 +63,17 @@ namespace Squidex
.Build()
.Run();
}
finally
{
silo.StopAsync().Wait();
}
}
private static string GetEnvironment()
{
var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT");
return environment ?? "Development";
}
}
}

3
src/Squidex/Squidex.csproj

@ -70,6 +70,8 @@
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.0.0" />
<PackageReference Include="Microsoft.Data.Edm" Version="5.8.3" />
<PackageReference Include="Microsoft.OData.Core" Version="7.4.0" />
<PackageReference Include="Microsoft.Orleans.Client" Version="2.0.0-beta3" />
<PackageReference Include="Microsoft.Orleans.Server " Version="2.0.0-beta3" />
<PackageReference Include="Microsoft.VisualStudio.Web.BrowserLink" Version="2.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Web.CodeGeneration.Design" Version="2.0.2" />
<PackageReference Include="MongoDB.Driver" Version="2.5.0" />
@ -77,6 +79,7 @@
<PackageReference Include="NodaTime.Serialization.JsonNet" Version="2.0.0" />
<PackageReference Include="NSwag.AspNetCore" Version="11.12.16" />
<PackageReference Include="OpenCover" Version="4.6.519" />
<PackageReference Include="OrleansDashboard" Version="2.0.0-beta4" />
<PackageReference Include="RefactoringEssentials" Version="5.6.0" />
<PackageReference Include="ReportGenerator" Version="3.1.1" />
<PackageReference Include="StackExchange.Redis.StrongName" Version="1.2.6" />

2
src/Squidex/WebStartup.cs

@ -15,6 +15,7 @@ using Squidex.Areas.Frontend;
using Squidex.Areas.IdentityServer;
using Squidex.Areas.Portal;
using Squidex.Config.Domain;
using Squidex.Config.Orleans;
using Squidex.Config.Web;
namespace Squidex
@ -30,6 +31,7 @@ namespace Squidex
public IServiceProvider ConfigureServices(IServiceCollection services)
{
services.AddAppClient();
services.AddAppServices(configuration);
return services.BuildServiceProvider();

5
tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerTests.cs

@ -9,7 +9,6 @@ using System;
using System.Threading.Tasks;
using FakeItEasy;
using FluentAssertions;
using Squidex.Infrastructure.EventSourcing.Grains.Messages;
using Squidex.Infrastructure.States;
using Xunit;
@ -25,7 +24,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
private readonly IPubSub pubSub = new InMemoryPubSub();
private readonly string consumerName1 = "Consumer1";
private readonly string consumerName2 = "Consumer2";
private readonly EventConsumerGrainManager sut;
private readonly EventConsumerManagerGrain sut;
public EventConsumerManagerTests()
{
@ -35,7 +34,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => factory.CreateAsync<EventConsumerGrain>(consumerName1)).Returns(actor1);
A.CallTo(() => factory.CreateAsync<EventConsumerGrain>(consumerName2)).Returns(actor2);
sut = new EventConsumerGrainManager(new IEventConsumer[] { consumer1, consumer2 }, pubSub, factory);
sut = new EventConsumerManagerGrain(new IEventConsumer[] { consumer1, consumer2 }, pubSub, factory);
}
[Fact]

Loading…
Cancel
Save