Browse Source

Orleans test

pull/169/head
Sebastian Stehle 8 years ago
parent
commit
955061b2de
  1. 1
      NuGet.Config
  2. BIN
      libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg
  3. 1
      libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg.sha512
  4. 23
      libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.nuspec
  5. BIN
      nuget.exe
  6. 21
      src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerInfo.cs
  7. 24
      src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerGrain.cs
  8. 27
      src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerRegistryGrain.cs
  9. 280
      src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerGrain.cs
  10. 57
      src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrain.cs
  11. 22
      src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrainState.cs
  12. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventConsumer.cs
  13. 2
      src/Squidex.Infrastructure/Squidex.Infrastructure.csproj
  14. 9
      src/Squidex/Config/Domain/ReadModule.cs
  15. 11
      src/Squidex/Config/Orleans/IOrleansRunner.cs
  16. 45
      src/Squidex/Config/Orleans/OrleansModule.cs
  17. 42
      src/Squidex/Config/Orleans/OrleansMongoDbModule.cs
  18. 27
      src/Squidex/Config/Orleans/OrleansSilo.cs

1
NuGet.Config

@ -1,6 +1,7 @@
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<configuration> <configuration>
<packageSources> <packageSources>
<add key="localfeed" value="libs" />
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" protocolVersion="3" /> <add key="nuget.org" value="https://api.nuget.org/v3/index.json" protocolVersion="3" />
</packageSources> </packageSources>
</configuration> </configuration>

BIN
libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg

Binary file not shown.

1
libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.2.0.0-beta1-fix.nupkg.sha512

@ -0,0 +1 @@
5W20j9jiNog4dHUEt+cCnePb8z6jFEMnkwO4XilajM7FCnen3KTnN/G8PAUGuQieSlTI9MRe0sRYcafLJl900w==

23
libs/microsoft.orleans.orleanscodegenerator.build/2.0.0-beta1-fix/microsoft.orleans.orleanscodegenerator.build.nuspec

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="utf-8"?>
<package xmlns="http://schemas.microsoft.com/packaging/2012/06/nuspec.xsd">
<metadata>
<id>Microsoft.Orleans.OrleansCodeGenerator.Build</id>
<version>2.0.0-beta1-fix</version>
<title>Microsoft Orleans Build-time Code Generator</title>
<authors>Microsoft</authors>
<owners>Microsoft</owners>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<developmentDependency>true</developmentDependency>
<licenseUrl>https://github.com/dotnet/Orleans#license</licenseUrl>
<projectUrl>https://github.com/dotnet/Orleans</projectUrl>
<iconUrl>https://raw.githubusercontent.com/dotnet/orleans/gh-pages/assets/logo_128.png</iconUrl>
<description>Microsoft Orleans build-time code generator to install in all grain interface &amp; implementation projects.</description>
<copyright>© Microsoft Corporation. All rights reserved.</copyright>
<tags>Orleans Cloud-Computing Actor-Model Actors Distributed-Systems C# .NET</tags>
<repository type="git" url="https://github.com/dotnet/Orleans" />
<dependencies>
<group targetFramework=".NETFramework4.6.1" />
<group targetFramework=".NETCoreApp2.0" />
</dependencies>
</metadata>
</package>

BIN
nuget.exe

Binary file not shown.

21
src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerInfo.cs

@ -0,0 +1,21 @@
// ==========================================================================
// EventConsumerInfo.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.CQRS.Events.Grains
{
public sealed class EventConsumerInfo
{
public bool IsStopped { get; set; }
public string Name { get; set; }
public string Error { get; set; }
public string Position { get; set; }
}
}

24
src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerGrain.cs

@ -0,0 +1,24 @@
// ==========================================================================
// IEventConsumerGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Threading.Tasks;
using Orleans;
namespace Squidex.Infrastructure.CQRS.Events.Grains
{
public interface IEventConsumerGrain : IGrainWithStringKey, IEventSubscriber
{
Task<EventConsumerInfo> GetStateAsync();
Task StopAsync();
Task StartAsync();
Task ResetAsync();
}
}

27
src/Squidex.Infrastructure/CQRS/Events/Grains/IEventConsumerRegistryGrain.cs

@ -0,0 +1,27 @@
// ==========================================================================
// IEventConsumerRegistryGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans;
namespace Squidex.Infrastructure.CQRS.Events.Grains
{
public interface IEventConsumerRegistryGrain : IGrainWithStringKey
{
Task RegisterAsync(string consumerName);
Task StopAsync(string consumerName);
Task StartAsync(string consumerName);
Task ResetAsync(string consumerName);
Task<List<EventConsumerInfo>> GetConsumersAsync();
}
}

280
src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerGrain.cs

@ -0,0 +1,280 @@
// ==========================================================================
// EventConsumerGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading.Tasks;
using Orleans;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Grains
{
public class EventConsumerGrain : Grain<EventConsumerInfo>, IEventSubscriber, IEventConsumerGrain
{
private readonly EventDataFormatter eventFormatter;
private readonly EventConsumerFactory eventConsumerFactory;
private readonly IEventStore eventStore;
private readonly ISemanticLog log;
private TaskFactory dispatcher;
private IEventSubscription currentSubscription;
private IEventConsumer eventConsumer;
public EventConsumerGrain(
EventDataFormatter eventFormatter,
EventConsumerFactory eventConsumerFactory,
IEventStore eventStore,
ISemanticLog log)
{
Guard.NotNull(log, nameof(log));
Guard.NotNull(eventStore, nameof(eventStore));
Guard.NotNull(eventFormatter, nameof(eventFormatter));
Guard.NotNull(eventConsumerFactory, nameof(eventConsumerFactory));
this.log = log;
this.eventStore = eventStore;
this.eventFormatter = eventFormatter;
this.eventConsumerFactory = eventConsumerFactory;
}
public override async Task OnActivateAsync()
{
dispatcher = new TaskFactory(TaskScheduler.Current);
await GrainFactory.GetGrain<IEventConsumerRegistryGrain>(string.Empty).RegisterAsync(this.IdentityString);
eventConsumer = eventConsumerFactory(this.IdentityString);
if (!State.IsStopped)
{
Subscribe(State.Position);
}
}
protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
{
return new RetrySubscription(eventStore, this, streamFilter, position);
}
private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
if (subscription != currentSubscription)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(async () =>
{
var @event = ParseKnownEvent(storedEvent);
if (@event != null)
{
await DispatchConsumerAsync(@event);
}
State.Error = null;
State.Position = storedEvent.EventPosition;
});
}
private Task HandleErrorAsync(IEventSubscription subscription, Exception exception)
{
if (subscription != currentSubscription)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(() =>
{
Unsubscribe();
State.Error = exception.ToString();
State.IsStopped = true;
});
}
public Task<EventConsumerInfo> GetStateAsync()
{
return Task.FromResult(State);
}
public Task StartAsync()
{
if (!State.IsStopped)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(() =>
{
Subscribe(State.Position);
State.Error = null;
State.IsStopped = false;
});
}
public Task StopAsync()
{
if (State.IsStopped)
{
return TaskHelper.Done;
}
return DoAndUpdateStateAsync(() =>
{
Unsubscribe();
State.Error = null;
State.IsStopped = true;
});
}
public Task ResetAsync()
{
return DoAndUpdateStateAsync(async () =>
{
Unsubscribe();
await ClearAsync();
Subscribe(null);
State.Error = null;
State.Position = null;
State.IsStopped = false;
});
}
Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent)
{
return dispatcher.StartNew(() => this.HandleEventAsync(subscription, storedEvent)).Unwrap();
}
Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return dispatcher.StartNew(() => this.HandleErrorAsync(subscription, exception)).Unwrap();
}
private Task DoAndUpdateStateAsync(Action action)
{
return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; });
}
private async Task DoAndUpdateStateAsync(Func<Task> action)
{
try
{
await action();
}
catch (Exception ex)
{
try
{
Unsubscribe();
}
catch (Exception unsubscribeException)
{
ex = new AggregateException(ex, unsubscribeException);
}
log.LogFatal(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventConsumer", eventConsumer.Name));
State.Error = ex.ToString();
State.IsStopped = true;
}
await WriteStateAsync();
}
private async Task ClearAsync()
{
var actionId = Guid.NewGuid().ToString();
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Started")
.WriteProperty("eventConsumer", eventConsumer.Name));
using (log.MeasureTrace(w => w
.WriteProperty("action", "EventConsumerReset")
.WriteProperty("actionId", actionId)
.WriteProperty("state", "Completed")
.WriteProperty("eventConsumer", eventConsumer.Name)))
{
await eventConsumer.ClearAsync();
}
}
private async Task DispatchConsumerAsync(Envelope<IEvent> @event)
{
var eventId = @event.Headers.EventId().ToString();
var eventType = @event.Payload.GetType().Name;
log.LogInformation(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.Name));
using (log.MeasureTrace(w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("actionId", eventId)
.WriteProperty("state", "Completed")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
.WriteProperty("eventConsumer", eventConsumer.Name)))
{
await eventConsumer.On(@event);
}
}
private void Unsubscribe()
{
if (currentSubscription != null)
{
currentSubscription.StopAsync().Forget();
currentSubscription = null;
}
}
private void Subscribe(string position)
{
if (currentSubscription == null)
{
currentSubscription?.StopAsync().Forget();
currentSubscription = CreateSubscription(eventStore, eventConsumer.EventsFilter, position);
}
}
private Envelope<IEvent> ParseKnownEvent(StoredEvent message)
{
try
{
var @event = eventFormatter.Parse(message.Data);
@event.SetEventPosition(message.EventPosition);
@event.SetEventStreamNumber(message.EventStreamNumber);
return @event;
}
catch (TypeNameNotFoundException)
{
log.LogDebug(w => w.WriteProperty("oldEventFound", message.Data.Type));
return null;
}
}
}
}

57
src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrain.cs

@ -0,0 +1,57 @@
// ==========================================================================
// EventConsumerRegistryGrain.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Orleans;
using Squidex.Infrastructure.Tasks;
namespace Squidex.Infrastructure.CQRS.Events.Grains.Implementation
{
public sealed class EventConsumerRegistryGrain : Grain<EventConsumerRegistryGrainState>, IEventConsumerRegistryGrain
{
public Task<List<EventConsumerInfo>> GetConsumersAsync()
{
var tasks =
State.EventConsumerNames
.Select(n => GrainFactory.GetGrain<IEventConsumerGrain>(n))
.Select(c => c.GetStateAsync());
return Task.WhenAll(tasks).ContinueWith(x => x.Result.ToList());
}
public Task RegisterAsync(string consumerName)
{
State.EventConsumerNames.Add(consumerName);
return TaskHelper.Done;
}
public Task ResetAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.ResetAsync();
}
public Task StartAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.StartAsync();
}
public Task StopAsync(string consumerName)
{
var eventConsumer = GrainFactory.GetGrain<IEventConsumerGrain>(consumerName);
return eventConsumer.StopAsync();
}
}
}

22
src/Squidex.Infrastructure/CQRS/Events/Grains/Implementation/EventConsumerRegistryGrainState.cs

@ -0,0 +1,22 @@
// ==========================================================================
// EventConsumerRegistryGrainState.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System.Collections.Generic;
namespace Squidex.Infrastructure.CQRS.Events.Grains.Implementation
{
public sealed class EventConsumerRegistryGrainState
{
public HashSet<string> EventConsumerNames { get; set; }
public EventConsumerRegistryGrainState()
{
EventConsumerNames = new HashSet<string>();
}
}
}

2
src/Squidex.Infrastructure/CQRS/Events/IEventConsumer.cs

@ -10,6 +10,8 @@ using System.Threading.Tasks;
namespace Squidex.Infrastructure.CQRS.Events namespace Squidex.Infrastructure.CQRS.Events
{ {
public delegate IEventConsumer EventConsumerFactory(string name);
public interface IEventConsumer public interface IEventConsumer
{ {
string Name { get; } string Name { get; }

2
src/Squidex.Infrastructure/Squidex.Infrastructure.csproj

@ -10,6 +10,8 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="Microsoft.Orleans.Core" Version="2.0.0-beta1" />
<PackageReference Include="Microsoft.Orleans.OrleansCodeGenerator.Build" Version="2.0.0-beta1-fix" />
<PackageReference Include="Newtonsoft.Json" Version="10.0.3" /> <PackageReference Include="Newtonsoft.Json" Version="10.0.3" />
<PackageReference Include="NodaTime" Version="2.2.1" /> <PackageReference Include="NodaTime" Version="2.2.1" />
<PackageReference Include="RefactoringEssentials" Version="5.2.0" /> <PackageReference Include="RefactoringEssentials" Version="5.2.0" />

9
src/Squidex/Config/Domain/ReadModule.cs

@ -134,6 +134,15 @@ namespace Squidex.Config.Domain
builder.RegisterType<EdmModelBuilder>() builder.RegisterType<EdmModelBuilder>()
.AsSelf() .AsSelf()
.SingleInstance(); .SingleInstance();
builder.Register(c =>
{
var eventConsumers = c.Resolve<IEnumerable<IEventConsumer>>();
return new EventConsumerFactory(x => eventConsumers.First(e => e.Name == x));
})
.AsSelf()
.SingleInstance();
} }
} }
} }

11
src/Squidex/Config/Orleans/IOrleansRunner.cs

@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Squidex.Config.Orleans
{
public class IOrleansRunner
{
}
}

45
src/Squidex/Config/Orleans/OrleansModule.cs

@ -0,0 +1,45 @@
// ==========================================================================
// OrleansModule.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using Autofac;
using Microsoft.Extensions.Configuration;
using Squidex.Config.Domain;
using Squidex.Infrastructure;
namespace Squidex.Config.Orleans
{
public sealed class OrleansModule : Module
{
private IConfiguration Configuration { get; }
public OrleansModule(IConfiguration configuration)
{
Configuration = configuration;
}
protected override void Load(ContainerBuilder builder)
{
var storeType = Configuration.GetValue<string>("orleans:type");
if (string.IsNullOrWhiteSpace(storeType))
{
throw new ConfigurationException("Configure Orleans type with 'orleans:type'.");
}
if (string.Equals(storeType, "MongoDB", StringComparison.OrdinalIgnoreCase))
{
builder.RegisterModule(new StoreMongoDbModule(Configuration));
}
else
{
throw new ConfigurationException($"Unsupported value '{storeType}' for 'stores:type', supported: MongoDb.");
}
}
}
}

42
src/Squidex/Config/Orleans/OrleansMongoDbModule.cs

@ -0,0 +1,42 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Autofac;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Orleans.Providers.MongoDB;
namespace Squidex.Config.Orleans
{
public sealed class OrleansMongoDbModule : Module
{
private IConfiguration Configuration { get; }
public OrleansMongoDbModule(IConfiguration configuration)
{
Configuration = configuration;
}
protected override void Load(ContainerBuilder builder)
{
var mongoConfig = Configuration.GetSection("orleans:mongoDb");
builder.RegisterInstance(Options.Create(mongoConfig.Get<MongoDBGatewayListProviderOptions>()))
.As<IOptions<MongoDBGatewayListProviderOptions>>()
.SingleInstance();
builder.RegisterInstance(Options.Create(mongoConfig.Get<MongoDBMembershipTableOptions>()))
.As<IOptions<MongoDBMembershipTableOptions>>()
.SingleInstance();
builder.RegisterInstance(Options.Create(mongoConfig.Get<MongoDBRemindersOptions>()))
.As<IOptions<MongoDBRemindersOptions>>()
.SingleInstance();
builder.RegisterInstance(Options.Create(mongoConfig.Get<MongoDBStatisticsOptions>()))
.As<IOptions<MongoDBStatisticsOptions>>()
.SingleInstance();
}
}
}

27
src/Squidex/Config/Orleans/OrleansSilo.cs

@ -0,0 +1,27 @@
// ==========================================================================
// OrleansSilo.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Orleans.Hosting;
using System;
using System.Threading.Tasks;
namespace Squidex.Config.Orleans
{
public sealed class OrleansSilo
{
private readonly IServiceProvider serviceProvider;
public Task RunAsync()
{
new SiloHostBuilder()
.ConfigureLocalHostPrimarySilo(33333)
.ConfigureSiloName("Squidex")
.UseServiceProviderFactory()
}
}
}
Loading…
Cancel
Save