Browse Source

GCE

pull/1/head
Sebastian 9 years ago
parent
commit
4debcc7e78
  1. 7
      Squidex.sln
  2. 97
      src/Squidex.Infrastructure.GoogleCloud/GoogleCloudPubSub.cs
  3. 135
      src/Squidex.Infrastructure.GoogleCloud/GoogleCloudSubscription.cs
  4. 19
      src/Squidex.Infrastructure.GoogleCloud/InfrastructureErrors.cs
  5. 19
      src/Squidex.Infrastructure.GoogleCloud/Properties/AssemblyInfo.cs
  6. 21
      src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.xproj
  7. 24
      src/Squidex.Infrastructure.GoogleCloud/project.json
  8. 4
      src/Squidex.Infrastructure.Redis/InfrastructureErrors.cs
  9. 10
      src/Squidex.Infrastructure.Redis/RedisPubSub.cs
  10. 4
      src/Squidex.Infrastructure.Redis/RedisSubscription.cs
  11. 6
      src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs
  12. 31
      src/Squidex/Config/Domain/ClusterModule.cs
  13. 6
      src/Squidex/Config/Domain/InfrastructureModule.cs
  14. 4
      src/Squidex/Properties/launchSettings.json
  15. 1
      src/Squidex/project.json

7
Squidex.sln

@ -38,6 +38,8 @@ Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Squidex.Read.Tests", "tests
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Squidex.Infrastructure.Redis", "src\Squidex.Infrastructure.Redis\Squidex.Infrastructure.Redis.xproj", "{D7166C56-178A-4457-B56A-C615C7450DEE}"
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Squidex.Infrastructure.GoogleCloud", "src\Squidex.Infrastructure.GoogleCloud\Squidex.Infrastructure.GoogleCloud.xproj", "{4A80390E-507A-4477-8A10-BE89A7427232}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -96,6 +98,10 @@ Global
{D7166C56-178A-4457-B56A-C615C7450DEE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D7166C56-178A-4457-B56A-C615C7450DEE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D7166C56-178A-4457-B56A-C615C7450DEE}.Release|Any CPU.Build.0 = Release|Any CPU
{4A80390E-507A-4477-8A10-BE89A7427232}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4A80390E-507A-4477-8A10-BE89A7427232}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4A80390E-507A-4477-8A10-BE89A7427232}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4A80390E-507A-4477-8A10-BE89A7427232}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -113,5 +119,6 @@ Global
{6A811927-3C37-430A-90F4-503E37123956} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF}
{8B074219-F69A-4E41-83C6-12EE1E647779} = {4C6B06C2-6D77-4E0E-AE32-D7050236433A}
{D7166C56-178A-4457-B56A-C615C7450DEE} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF}
{4A80390E-507A-4477-8A10-BE89A7427232} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF}
EndGlobalSection
EndGlobal

97
src/Squidex.Infrastructure.GoogleCloud/GoogleCloudPubSub.cs

@ -0,0 +1,97 @@
// ==========================================================================
// GoogleCloudInvalidator.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using Microsoft.Extensions.Logging;
namespace Squidex.Infrastructure.GoogleCloud
{
public class GoogleCloudPubSub : DisposableObject, IPubSub, IExternalSystem
{
private readonly ProjectName projectName;
private readonly ILogger<GoogleCloudPubSub> logger;
private readonly ConcurrentDictionary<string, GoogleCloudSubscription> subscriptions = new ConcurrentDictionary<string, GoogleCloudSubscription>();
private readonly PublisherClient publisher = PublisherClient.Create();
public GoogleCloudPubSub(ProjectName projectName, ILogger<GoogleCloudPubSub> logger)
{
Guard.NotNull(projectName, nameof(projectName));
Guard.NotNull(logger, nameof(logger));
this.projectName = projectName;
this.logger = logger;
}
protected override void DisposeObject(bool disposing)
{
foreach (var subscription in subscriptions.Values)
{
subscription.Dispose();
}
}
public void Connect()
{
try
{
try
{
publisher.CreateTopic(new TopicName(projectName.ProjectId, "connection-test"));
}
catch (RpcException e)
{
if (e.Status.StatusCode != StatusCode.AlreadyExists)
{
throw;
}
}
}
catch (Exception ex)
{
throw new ConfigurationException($"GoogleCloud connection failed to connect to project {projectName.ProjectId}", ex);
}
}
public void Publish(string channelName, string token, bool notifySelf)
{
Guard.NotNull(channelName, nameof(channelName));
subscriptions.GetOrAdd(channelName, Create).Publish(token, notifySelf);
}
public IDisposable Subscribe(string channelName, Action<string> handler)
{
Guard.NotNull(channelName, nameof(channelName));
return subscriptions.GetOrAdd(channelName, Create).Subscribe(handler);
}
private GoogleCloudSubscription Create(string channelName)
{
var topicName = new TopicName(projectName.ProjectId, channelName);
try
{
publisher.CreateTopic(topicName);
}
catch (RpcException e)
{
if (e.Status.StatusCode != StatusCode.AlreadyExists)
{
throw;
}
}
return new GoogleCloudSubscription(topicName, logger);
}
}
}

135
src/Squidex.Infrastructure.GoogleCloud/GoogleCloudSubscription.cs

@ -0,0 +1,135 @@
// ==========================================================================
// GoogleCloudSubscription.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using Grpc.Core;
using Microsoft.Extensions.Logging;
// ReSharper disable InvertIf
namespace Squidex.Infrastructure.GoogleCloud
{
public class GoogleCloudSubscription : DisposableObject
{
private static readonly Guid InstanceId = Guid.NewGuid();
private const string EmptyData = "Empty";
private readonly Subject<string> subject = new Subject<string>();
private readonly PublisherClient publisher = PublisherClient.Create();
private readonly TopicName topicName;
private readonly ILogger<GoogleCloudPubSub> logger;
private readonly Task pullTask;
private readonly CancellationTokenSource completionToken = new CancellationTokenSource();
public GoogleCloudSubscription(TopicName topicName, ILogger<GoogleCloudPubSub> logger)
{
this.topicName = topicName;
this.logger = logger;
pullTask = PullAsync();
}
protected override void DisposeObject(bool disposing)
{
completionToken.Cancel();
pullTask.Wait();
}
public void Publish(string token, bool notifySelf)
{
try
{
if (string.IsNullOrWhiteSpace(token))
{
token = EmptyData;
}
var message = new PubsubMessage
{
Attributes =
{
{ "Sender", (notifySelf ? Guid.Empty : InstanceId).ToString() }
},
Data = ByteString.CopyFromUtf8(token)
};
publisher.Publish(topicName, new [] { message });
}
catch (Exception ex)
{
logger.LogError(InfrastructureErrors.InvalidatingReceivedFailed, ex, "Failed to send invalidation message {0}", token);
}
}
private async Task PullAsync()
{
var subscriber = SubscriberClient.Create();
var subscriptionName = new SubscriptionName(topicName.ProjectId, "squidex-" + Guid.NewGuid());
await subscriber.CreateSubscriptionAsync(subscriptionName, topicName, null, 60);
try
{
while (!completionToken.IsCancellationRequested)
{
try
{
var response = await subscriber.PullAsync(subscriptionName, false, int.MaxValue, completionToken.Token);
foreach (var receivedMessage in response.ReceivedMessages)
{
var token = receivedMessage.Message.Data.ToString(Encoding.UTF8);
Guid sender;
if (!receivedMessage.Message.Attributes.ContainsKey("Sender") || !Guid.TryParse(receivedMessage.Message.Attributes["Sender"], out sender))
{
return;
}
if (sender != InstanceId)
{
subject.OnNext(token);
}
}
await subscriber.AcknowledgeAsync(subscriptionName, response.ReceivedMessages.Select(m => m.AckId));
}
catch (RpcException e)
{
if (e.Status.StatusCode == StatusCode.DeadlineExceeded)
{
continue;
}
}
}
}
catch (TaskCanceledException)
{
logger.LogWarning("Pull process has been cancelled.");
}
finally
{
await subscriber.DeleteSubscriptionAsync(subscriptionName);
}
}
public IDisposable Subscribe(Action<string> handler)
{
return subject.Subscribe(handler);
}
}
}

19
src/Squidex.Infrastructure.GoogleCloud/InfrastructureErrors.cs

@ -0,0 +1,19 @@
// ==========================================================================
// InfrastructureErrors.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Microsoft.Extensions.Logging;
namespace Squidex.Infrastructure.GoogleCloud
{
public class InfrastructureErrors
{
public static readonly EventId InvalidatingReceivedFailed = new EventId(40001, "InvalidingReceivedFailed");
public static readonly EventId InvalidatingPublishedFailed = new EventId(40002, "InvalidatingPublishedFailed");
}
}

19
src/Squidex.Infrastructure.GoogleCloud/Properties/AssemblyInfo.cs

@ -0,0 +1,19 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Squidex.Infrastructure.GoogleCloud")]
[assembly: AssemblyTrademark("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("4a80390e-507a-4477-8a10-be89a7427232")]

21
src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.xproj

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>4a80390e-507a-4477-8a10-be89a7427232</ProjectGuid>
<RootNamespace>Squidex.Infrastructure.GoogleCloud</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>

24
src/Squidex.Infrastructure.GoogleCloud/project.json

@ -0,0 +1,24 @@
{
"version": "1.0.0-*",
"dependencies": {
"Google.Cloud.PubSub.V1": "1.0.0-beta06",
"Microsoft.Extensions.Caching.Abstractions": "1.1.0",
"Microsoft.Extensions.Logging": "1.1.0",
"Squidex.Infrastructure": "1.0.0-*",
"System.Linq": "4.3.0",
"System.Reactive": "3.1.1",
"System.Reflection.TypeExtensions": "4.3.0",
"System.Security.Claims": "4.3.0"
},
"frameworks": {
"netstandard1.6": {
"dependencies": {
"NETStandard.Library": "1.6.1"
},
"imports": "dnxcore50"
}
},
"tooling": {
"defaultNamespace": "Squidex.Infrastructure.Redis"
}
}

4
src/Squidex.Infrastructure.Redis/InfrastructureErrors.cs

@ -12,8 +12,8 @@ namespace Squidex.Infrastructure.Redis
{
public class InfrastructureErrors
{
public static readonly EventId InvalidatingReceivedFailed = new EventId(10001, "InvalidingReceivedFailed");
public static readonly EventId InvalidatingReceivedFailed = new EventId(50001, "InvalidingReceivedFailed");
public static readonly EventId InvalidatingPublishedFailed = new EventId(10002, "InvalidatingPublishedFailed");
public static readonly EventId InvalidatingPublishedFailed = new EventId(50002, "InvalidatingPublishedFailed");
}
}

10
src/Squidex.Infrastructure.Redis/RedisPubSub.cs

@ -15,12 +15,12 @@ namespace Squidex.Infrastructure.Redis
{
public class RedisPubSub : IPubSub, IExternalSystem
{
private readonly ConnectionMultiplexer redis;
private readonly ConcurrentDictionary<string, RedisSubscription> subjects = new ConcurrentDictionary<string, RedisSubscription>();
private readonly ConcurrentDictionary<string, RedisSubscription> subscriptions = new ConcurrentDictionary<string, RedisSubscription>();
private readonly IConnectionMultiplexer redis;
private readonly ILogger<RedisPubSub> logger;
private readonly ISubscriber subscriber;
public RedisPubSub(ConnectionMultiplexer redis, ILogger<RedisPubSub> logger)
public RedisPubSub(IConnectionMultiplexer redis, ILogger<RedisPubSub> logger)
{
Guard.NotNull(redis, nameof(redis));
Guard.NotNull(logger, nameof(logger));
@ -48,14 +48,14 @@ namespace Squidex.Infrastructure.Redis
{
Guard.NotNullOrEmpty(channelName, nameof(channelName));
subjects.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Invalidate(token, notifySelf);
subscriptions.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Publish(token, notifySelf);
}
public IDisposable Subscribe(string channelName, Action<string> handler)
{
Guard.NotNullOrEmpty(channelName, nameof(channelName));
return subjects.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Subscribe(handler);
return subscriptions.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Subscribe(handler);
}
}
}

4
src/Squidex.Infrastructure.Redis/RedisSubscription.cs

@ -34,7 +34,7 @@ namespace Squidex.Infrastructure.Redis
this.channelName = channelName;
}
public void Invalidate(string token, bool notifySelf)
public void Publish(string token, bool notifySelf)
{
try
{
@ -59,7 +59,7 @@ namespace Squidex.Infrastructure.Redis
var parts = value.Split('#');
if (parts.Length < 2)
if (parts.Length < 1)
{
return;
}

6
src/Squidex.Infrastructure/Caching/InvalidatingCache.cs → src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs

@ -10,13 +10,13 @@ using Microsoft.Extensions.Caching.Memory;
namespace Squidex.Infrastructure.Caching
{
public class InvalidatingCache : IMemoryCache
public class InvalidatingMemoryCache : IMemoryCache
{
private const string ChannelName = "CacheInvalidations";
private readonly IMemoryCache inner;
private readonly IPubSub invalidator;
public InvalidatingCache(IMemoryCache inner, IPubSub invalidator)
public InvalidatingMemoryCache(IMemoryCache inner, IPubSub invalidator)
{
Guard.NotNull(inner, nameof(inner));
Guard.NotNull(invalidator, nameof(invalidator));
@ -24,7 +24,7 @@ namespace Squidex.Infrastructure.Caching
this.inner = inner;
this.invalidator = invalidator;
invalidator.Subscribe(ChannelName, Remove);
invalidator.Subscribe(ChannelName, inner.Remove);
}
public void Dispose()

31
src/Squidex/Config/Domain/ClusterModule.cs

@ -8,9 +8,11 @@
using System;
using Autofac;
using Google.Cloud.PubSub.V1;
using Microsoft.Extensions.Configuration;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.GoogleCloud;
using Squidex.Infrastructure.Redis;
using StackExchange.Redis;
@ -67,10 +69,35 @@ namespace Squidex.Config.Domain
builder.RegisterType<RedisPubSub>()
.As<IPubSub>()
.As<IEventNotifier>()
.As<IExternalSystem>()
.SingleInstance();
}
else if (!string.Equals(clustererType, "None", StringComparison.OrdinalIgnoreCase))
else if (string.Equals(clustererType, "GCE", StringComparison.OrdinalIgnoreCase))
{
var projectId = Configuration.GetValue<string>("squidex:clusterer:gce:projectId");
if (string.IsNullOrWhiteSpace(projectId))
{
throw new ConfigurationException("You must specify the Google cloud engine project id in the 'squidex:clusterer:gce:projectId' configuration section.");
}
builder.RegisterInstance(new ProjectName(projectId))
.AsSelf()
.SingleInstance();
builder.RegisterType<GoogleCloudPubSub>()
.As<IPubSub>()
.As<IExternalSystem>()
.SingleInstance();
}
else if (string.Equals(clustererType, "None", StringComparison.OrdinalIgnoreCase))
{
builder.RegisterType<InMemoryPubSub>()
.As<IPubSub>()
.SingleInstance();
}
else
{
throw new ConfigurationException($"Unsupported clusterer type '{clustererType}' for key 'squidex:clusterer:type', supported: Redis, None.");
}

6
src/Squidex/Config/Domain/InfrastructureModule.cs

@ -56,10 +56,6 @@ namespace Squidex.Config.Domain
.As<ICommandBus>()
.SingleInstance();
builder.RegisterType<InMemoryPubSub>()
.As<IPubSub>()
.SingleInstance();
builder.RegisterType<DefaultMemoryEventNotifier>()
.As<IEventNotifier>()
.SingleInstance();
@ -80,7 +76,7 @@ namespace Squidex.Config.Domain
.AsSelf()
.SingleInstance();
builder.Register(c => new InvalidatingCache(new MemoryCache(c.Resolve<IOptions<MemoryCacheOptions>>()), c.Resolve<IPubSub>()))
builder.Register(c => new InvalidatingMemoryCache(new MemoryCache(c.Resolve<IOptions<MemoryCacheOptions>>()), c.Resolve<IPubSub>()))
.As<IMemoryCache>()
.SingleInstance();
}

4
src/Squidex/Properties/launchSettings.json

@ -18,7 +18,9 @@
"commandName": "Project",
"launchUrl": "http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
"SQUIDEX__CLUSTERER__GCE__PROJECTID": "squidex-157415",
"ASPNETCORE_ENVIRONMENT": "Development",
"SQUIDEX__CLUSTERER__TYPE": "gce"
}
}
}

1
src/Squidex/project.json

@ -31,6 +31,7 @@
"Squidex.Core": "1.0.0-*",
"Squidex.Events": "1.0.0-*",
"Squidex.Infrastructure": "1.0.0-*",
"Squidex.Infrastructure.GoogleCloud": "1.0.0-*",
"Squidex.Infrastructure.MongoDb": "1.0.0-*",
"Squidex.Infrastructure.Redis": "1.0.0-*",
"Squidex.Read": "1.0.0-*",

Loading…
Cancel
Save