diff --git a/NuGet.Config b/NuGet.Config index 3f0e00340..c982f5720 100644 --- a/NuGet.Config +++ b/NuGet.Config @@ -1,6 +1,7 @@  + \ No newline at end of file diff --git a/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg b/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg new file mode 100644 index 000000000..c719a733a Binary files /dev/null and b/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg differ diff --git a/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg.sha512 b/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg.sha512 new file mode 100644 index 000000000..11c0b5549 --- /dev/null +++ b/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg.sha512 @@ -0,0 +1 @@ +5W20j9jiNog4dHUEt+cCnePb8z6jFEMnkwO4XilajM7FCnen3KTnN/G8PAUGuQieSlTI9MRe0sRYcafLJl900w== \ No newline at end of file diff --git a/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.nuspec b/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.nuspec new file mode 100644 index 000000000..d685caf66 --- /dev/null +++ b/libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.nuspec @@ -0,0 +1,23 @@ + + + + Microsoft.Orleans.OrleansCodeGenerator.Build + 2.0.0-beta1-fix + Microsoft Orleans Build-time Code Generator + Microsoft + Microsoft + false + true + https://github.com/dotnet/Orleans#license + https://github.com/dotnet/Orleans + https://raw.githubusercontent.com/dotnet/orleans/gh-pages/assets/logo_128.png + Microsoft Orleans build-time code generator to install in all grain interface & implementation projects. + © Microsoft Corporation. All rights reserved. + Orleans Cloud-Computing Actor-Model Actors Distributed-Systems C# .NET + + + + + + + \ No newline at end of file diff --git a/nuget.exe b/nuget.exe new file mode 100644 index 000000000..ec1309c7a Binary files /dev/null and b/nuget.exe differ diff --git a/src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerInfo.cs b/src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerInfo.cs new file mode 100644 index 000000000..f994d9851 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerInfo.cs @@ -0,0 +1,21 @@ +// ========================================================================== +// EventConsumerInfo.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +namespace Squidex.Infrastructure.CQRS.Events.Grains +{ + public sealed class EventConsumerInfo + { + public bool IsStopped { get; set; } + + public string Name { get; set; } + + public string Error { get; set; } + + public string Position { get; set; } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerGrain.cs new file mode 100644 index 000000000..82d0e5560 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerGrain.cs @@ -0,0 +1,24 @@ +// ========================================================================== +// IEventConsumerGrain.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Threading.Tasks; +using Orleans; + +namespace Squidex.Infrastructure.CQRS.Events.Grains +{ + public interface IEventConsumerGrain : IGrainWithStringKey, IEventSubscriber + { + Task GetStateAsync(); + + Task StopAsync(); + + Task StartAsync(); + + Task ResetAsync(); + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerRegistryGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerRegistryGrain.cs new file mode 100644 index 000000000..11ee25581 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerRegistryGrain.cs @@ -0,0 +1,27 @@ +// ========================================================================== +// IEventConsumerRegistryGrain.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans; + +namespace Squidex.Infrastructure.CQRS.Events.Grains +{ + public interface IEventConsumerRegistryGrain : IGrainWithStringKey + { + Task RegisterAsync(string consumerName); + + Task StopAsync(string consumerName); + + Task StartAsync(string consumerName); + + Task ResetAsync(string consumerName); + + Task> GetConsumersAsync(); + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerGrain.cs new file mode 100644 index 000000000..1bd4829f9 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerGrain.cs @@ -0,0 +1,280 @@ +// ========================================================================== +// EventConsumerGrain.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using Orleans; +using Squidex.Infrastructure.Log; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.CQRS.Events.Grains +{ + public class EventConsumerGrain : Grain, IEventSubscriber, IEventConsumerGrain + { + private readonly EventDataFormatter eventFormatter; + private readonly EventConsumerFactory eventConsumerFactory; + private readonly IEventStore eventStore; + private readonly ISemanticLog log; + private TaskFactory dispatcher; + private IEventSubscription currentSubscription; + private IEventConsumer eventConsumer; + + public EventConsumerGrain( + EventDataFormatter eventFormatter, + EventConsumerFactory eventConsumerFactory, + IEventStore eventStore, + ISemanticLog log) + { + Guard.NotNull(log, nameof(log)); + Guard.NotNull(eventStore, nameof(eventStore)); + Guard.NotNull(eventFormatter, nameof(eventFormatter)); + Guard.NotNull(eventConsumerFactory, nameof(eventConsumerFactory)); + + this.log = log; + + this.eventStore = eventStore; + this.eventFormatter = eventFormatter; + this.eventConsumerFactory = eventConsumerFactory; + } + + public override async Task OnActivateAsync() + { + dispatcher = new TaskFactory(TaskScheduler.Current); + + await GrainFactory.GetGrain(string.Empty).RegisterAsync(this.IdentityString); + + eventConsumer = eventConsumerFactory(this.IdentityString); + + if (!State.IsStopped) + { + Subscribe(State.Position); + } + } + + protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position) + { + return new RetrySubscription(eventStore, this, streamFilter, position); + } + + private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + { + if (subscription != currentSubscription) + { + return TaskHelper.Done; + } + + return DoAndUpdateStateAsync(async () => + { + var @event = ParseKnownEvent(storedEvent); + + if (@event != null) + { + await DispatchConsumerAsync(@event); + } + + State.Error = null; + State.Position = storedEvent.EventPosition; + }); + } + + private Task HandleErrorAsync(IEventSubscription subscription, Exception exception) + { + if (subscription != currentSubscription) + { + return TaskHelper.Done; + } + + return DoAndUpdateStateAsync(() => + { + Unsubscribe(); + + State.Error = exception.ToString(); + State.IsStopped = true; + }); + } + + public Task GetStateAsync() + { + return Task.FromResult(State); + } + + public Task StartAsync() + { + if (!State.IsStopped) + { + return TaskHelper.Done; + } + + return DoAndUpdateStateAsync(() => + { + Subscribe(State.Position); + + State.Error = null; + State.IsStopped = false; + }); + } + + public Task StopAsync() + { + if (State.IsStopped) + { + return TaskHelper.Done; + } + + return DoAndUpdateStateAsync(() => + { + Unsubscribe(); + + State.Error = null; + State.IsStopped = true; + }); + } + + public Task ResetAsync() + { + return DoAndUpdateStateAsync(async () => + { + Unsubscribe(); + + await ClearAsync(); + + Subscribe(null); + + State.Error = null; + State.Position = null; + State.IsStopped = false; + }); + } + + Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + { + return dispatcher.StartNew(() => this.HandleEventAsync(subscription, storedEvent)).Unwrap(); + } + + Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) + { + return dispatcher.StartNew(() => this.HandleErrorAsync(subscription, exception)).Unwrap(); + } + + private Task DoAndUpdateStateAsync(Action action) + { + return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }); + } + + private async Task DoAndUpdateStateAsync(Func action) + { + try + { + await action(); + } + catch (Exception ex) + { + try + { + Unsubscribe(); + } + catch (Exception unsubscribeException) + { + ex = new AggregateException(ex, unsubscribeException); + } + + log.LogFatal(ex, w => w + .WriteProperty("action", "HandleEvent") + .WriteProperty("state", "Failed") + .WriteProperty("eventConsumer", eventConsumer.Name)); + + State.Error = ex.ToString(); + State.IsStopped = true; + } + + await WriteStateAsync(); + } + + private async Task ClearAsync() + { + var actionId = Guid.NewGuid().ToString(); + + log.LogInformation(w => w + .WriteProperty("action", "EventConsumerReset") + .WriteProperty("actionId", actionId) + .WriteProperty("state", "Started") + .WriteProperty("eventConsumer", eventConsumer.Name)); + + using (log.MeasureTrace(w => w + .WriteProperty("action", "EventConsumerReset") + .WriteProperty("actionId", actionId) + .WriteProperty("state", "Completed") + .WriteProperty("eventConsumer", eventConsumer.Name))) + { + await eventConsumer.ClearAsync(); + } + } + + private async Task DispatchConsumerAsync(Envelope @event) + { + var eventId = @event.Headers.EventId().ToString(); + var eventType = @event.Payload.GetType().Name; + + log.LogInformation(w => w + .WriteProperty("action", "HandleEvent") + .WriteProperty("actionId", eventId) + .WriteProperty("state", "Started") + .WriteProperty("eventId", eventId) + .WriteProperty("eventType", eventType) + .WriteProperty("eventConsumer", eventConsumer.Name)); + + using (log.MeasureTrace(w => w + .WriteProperty("action", "HandleEvent") + .WriteProperty("actionId", eventId) + .WriteProperty("state", "Completed") + .WriteProperty("eventId", eventId) + .WriteProperty("eventType", eventType) + .WriteProperty("eventConsumer", eventConsumer.Name))) + { + await eventConsumer.On(@event); + } + } + + private void Unsubscribe() + { + if (currentSubscription != null) + { + currentSubscription.StopAsync().Forget(); + currentSubscription = null; + } + } + + private void Subscribe(string position) + { + if (currentSubscription == null) + { + currentSubscription?.StopAsync().Forget(); + currentSubscription = CreateSubscription(eventStore, eventConsumer.EventsFilter, position); + } + } + + private Envelope ParseKnownEvent(StoredEvent message) + { + try + { + var @event = eventFormatter.Parse(message.Data); + + @event.SetEventPosition(message.EventPosition); + @event.SetEventStreamNumber(message.EventStreamNumber); + + return @event; + } + catch (TypeNameNotFoundException) + { + log.LogDebug(w => w.WriteProperty("oldEventFound", message.Data.Type)); + + return null; + } + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrain.cs new file mode 100644 index 000000000..7ce41b49d --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrain.cs @@ -0,0 +1,57 @@ +// ========================================================================== +// EventConsumerRegistryGrain.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Orleans; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.CQRS.Events.Grains.Implementation +{ + public sealed class EventConsumerRegistryGrain : Grain, IEventConsumerRegistryGrain + { + public Task> GetConsumersAsync() + { + var tasks = + State.EventConsumerNames + .Select(n => GrainFactory.GetGrain(n)) + .Select(c => c.GetStateAsync()); + + return Task.WhenAll(tasks).ContinueWith(x => x.Result.ToList()); + } + + public Task RegisterAsync(string consumerName) + { + State.EventConsumerNames.Add(consumerName); + + return TaskHelper.Done; + } + + public Task ResetAsync(string consumerName) + { + var eventConsumer = GrainFactory.GetGrain(consumerName); + + return eventConsumer.ResetAsync(); + } + + public Task StartAsync(string consumerName) + { + var eventConsumer = GrainFactory.GetGrain(consumerName); + + return eventConsumer.StartAsync(); + } + + public Task StopAsync(string consumerName) + { + var eventConsumer = GrainFactory.GetGrain(consumerName); + + return eventConsumer.StopAsync(); + } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrainState.cs b/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrainState.cs new file mode 100644 index 000000000..af54c01c5 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrainState.cs @@ -0,0 +1,22 @@ +// ========================================================================== +// EventConsumerRegistryGrainState.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Collections.Generic; + +namespace Squidex.Infrastructure.CQRS.Events.Grains.Implementation +{ + public sealed class EventConsumerRegistryGrainState + { + public HashSet EventConsumerNames { get; set; } + + public EventConsumerRegistryGrainState() + { + EventConsumerNames = new HashSet(); + } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumer.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumer.cs index 1cb4fba4d..565b35d50 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumer.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumer.cs @@ -10,6 +10,8 @@ using System.Threading.Tasks; namespace Squidex.Infrastructure.CQRS.Events { + public delegate IEventConsumer EventConsumerFactory(string name); + public interface IEventConsumer { string Name { get; } diff --git a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj index c338c9865..0265a75ca 100644 --- a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj +++ b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj @@ -10,6 +10,8 @@ + + diff --git a/src/Squidex/Config/Domain/ReadModule.cs b/src/Squidex/Config/Domain/ReadModule.cs index fca1204f7..5135fc0f1 100644 --- a/src/Squidex/Config/Domain/ReadModule.cs +++ b/src/Squidex/Config/Domain/ReadModule.cs @@ -134,6 +134,15 @@ namespace Squidex.Config.Domain builder.RegisterType() .AsSelf() .SingleInstance(); + + builder.Register(c => + { + var eventConsumers = c.Resolve>(); + + return new EventConsumerFactory(x => eventConsumers.First(e => e.Name == x)); + }) + .AsSelf() + .SingleInstance(); } } } diff --git a/src/Squidex/Config/Orleans/IOrleansRunner.cs b/src/Squidex/Config/Orleans/IOrleansRunner.cs new file mode 100644 index 000000000..e79a9c00e --- /dev/null +++ b/src/Squidex/Config/Orleans/IOrleansRunner.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Squidex.Config.Orleans +{ + public class IOrleansRunner + { + } +} diff --git a/src/Squidex/Config/Orleans/OrleansModule.cs b/src/Squidex/Config/Orleans/OrleansModule.cs new file mode 100644 index 000000000..748aebc9f --- /dev/null +++ b/src/Squidex/Config/Orleans/OrleansModule.cs @@ -0,0 +1,45 @@ +// ========================================================================== +// OrleansModule.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using Autofac; +using Microsoft.Extensions.Configuration; +using Squidex.Config.Domain; +using Squidex.Infrastructure; + +namespace Squidex.Config.Orleans +{ + public sealed class OrleansModule : Module + { + private IConfiguration Configuration { get; } + + public OrleansModule(IConfiguration configuration) + { + Configuration = configuration; + } + + protected override void Load(ContainerBuilder builder) + { + var storeType = Configuration.GetValue("orleans:type"); + + if (string.IsNullOrWhiteSpace(storeType)) + { + throw new ConfigurationException("Configure Orleans type with 'orleans:type'."); + } + + if (string.Equals(storeType, "MongoDB", StringComparison.OrdinalIgnoreCase)) + { + builder.RegisterModule(new StoreMongoDbModule(Configuration)); + } + else + { + throw new ConfigurationException($"Unsupported value '{storeType}' for 'stores:type', supported: MongoDb."); + } + } + } +} diff --git a/src/Squidex/Config/Orleans/OrleansMongoDbModule.cs b/src/Squidex/Config/Orleans/OrleansMongoDbModule.cs new file mode 100644 index 000000000..6dfb13acf --- /dev/null +++ b/src/Squidex/Config/Orleans/OrleansMongoDbModule.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Autofac; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Options; +using Orleans.Providers.MongoDB; + +namespace Squidex.Config.Orleans +{ + public sealed class OrleansMongoDbModule : Module + { + private IConfiguration Configuration { get; } + + public OrleansMongoDbModule(IConfiguration configuration) + { + Configuration = configuration; + } + + protected override void Load(ContainerBuilder builder) + { + var mongoConfig = Configuration.GetSection("orleans:mongoDb"); + + builder.RegisterInstance(Options.Create(mongoConfig.Get())) + .As>() + .SingleInstance(); + + builder.RegisterInstance(Options.Create(mongoConfig.Get())) + .As>() + .SingleInstance(); + + builder.RegisterInstance(Options.Create(mongoConfig.Get())) + .As>() + .SingleInstance(); + + builder.RegisterInstance(Options.Create(mongoConfig.Get())) + .As>() + .SingleInstance(); + } + } +} diff --git a/src/Squidex/Config/Orleans/OrleansSilo.cs b/src/Squidex/Config/Orleans/OrleansSilo.cs new file mode 100644 index 000000000..80720c77b --- /dev/null +++ b/src/Squidex/Config/Orleans/OrleansSilo.cs @@ -0,0 +1,27 @@ +// ========================================================================== +// OrleansSilo.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Orleans.Hosting; +using System; +using System.Threading.Tasks; + +namespace Squidex.Config.Orleans +{ + public sealed class OrleansSilo + { + private readonly IServiceProvider serviceProvider; + + public Task RunAsync() + { + new SiloHostBuilder() + .ConfigureLocalHostPrimarySilo(33333) + .ConfigureSiloName("Squidex") + .UseServiceProviderFactory() + } + } +}