From 4debcc7e783838e46c7f0a8ba5f8f6e6a80ac468 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Sun, 12 Feb 2017 17:49:42 +0100 Subject: [PATCH] GCE --- Squidex.sln | 7 + .../GoogleCloudPubSub.cs | 97 +++++++++++++ .../GoogleCloudSubscription.cs | 135 ++++++++++++++++++ .../InfrastructureErrors.cs | 19 +++ .../Properties/AssemblyInfo.cs | 19 +++ .../Squidex.Infrastructure.GoogleCloud.xproj | 21 +++ .../project.json | 24 ++++ .../InfrastructureErrors.cs | 4 +- .../RedisPubSub.cs | 10 +- .../RedisSubscription.cs | 4 +- ...ingCache.cs => InvalidatingMemoryCache.cs} | 6 +- src/Squidex/Config/Domain/ClusterModule.cs | 31 +++- .../Config/Domain/InfrastructureModule.cs | 6 +- src/Squidex/Properties/launchSettings.json | 4 +- src/Squidex/project.json | 1 + 15 files changed, 368 insertions(+), 20 deletions(-) create mode 100644 src/Squidex.Infrastructure.GoogleCloud/GoogleCloudPubSub.cs create mode 100644 src/Squidex.Infrastructure.GoogleCloud/GoogleCloudSubscription.cs create mode 100644 src/Squidex.Infrastructure.GoogleCloud/InfrastructureErrors.cs create mode 100644 src/Squidex.Infrastructure.GoogleCloud/Properties/AssemblyInfo.cs create mode 100644 src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.xproj create mode 100644 src/Squidex.Infrastructure.GoogleCloud/project.json rename src/Squidex.Infrastructure/Caching/{InvalidatingCache.cs => InvalidatingMemoryCache.cs} (88%) diff --git a/Squidex.sln b/Squidex.sln index 133daf848..da5531380 100644 --- a/Squidex.sln +++ b/Squidex.sln @@ -38,6 +38,8 @@ Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Squidex.Read.Tests", "tests EndProject Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Squidex.Infrastructure.Redis", "src\Squidex.Infrastructure.Redis\Squidex.Infrastructure.Redis.xproj", "{D7166C56-178A-4457-B56A-C615C7450DEE}" EndProject +Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Squidex.Infrastructure.GoogleCloud", "src\Squidex.Infrastructure.GoogleCloud\Squidex.Infrastructure.GoogleCloud.xproj", "{4A80390E-507A-4477-8A10-BE89A7427232}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -96,6 +98,10 @@ Global {D7166C56-178A-4457-B56A-C615C7450DEE}.Debug|Any CPU.Build.0 = Debug|Any CPU {D7166C56-178A-4457-B56A-C615C7450DEE}.Release|Any CPU.ActiveCfg = Release|Any CPU {D7166C56-178A-4457-B56A-C615C7450DEE}.Release|Any CPU.Build.0 = Release|Any CPU + {4A80390E-507A-4477-8A10-BE89A7427232}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4A80390E-507A-4477-8A10-BE89A7427232}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4A80390E-507A-4477-8A10-BE89A7427232}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4A80390E-507A-4477-8A10-BE89A7427232}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -113,5 +119,6 @@ Global {6A811927-3C37-430A-90F4-503E37123956} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} {8B074219-F69A-4E41-83C6-12EE1E647779} = {4C6B06C2-6D77-4E0E-AE32-D7050236433A} {D7166C56-178A-4457-B56A-C615C7450DEE} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} + {4A80390E-507A-4477-8A10-BE89A7427232} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF} EndGlobalSection EndGlobal diff --git a/src/Squidex.Infrastructure.GoogleCloud/GoogleCloudPubSub.cs b/src/Squidex.Infrastructure.GoogleCloud/GoogleCloudPubSub.cs new file mode 100644 index 000000000..ec9eb4545 --- /dev/null +++ b/src/Squidex.Infrastructure.GoogleCloud/GoogleCloudPubSub.cs @@ -0,0 +1,97 @@ +// ========================================================================== +// GoogleCloudInvalidator.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Collections.Concurrent; +using Google.Cloud.PubSub.V1; +using Grpc.Core; +using Microsoft.Extensions.Logging; + +namespace Squidex.Infrastructure.GoogleCloud +{ + public class GoogleCloudPubSub : DisposableObject, IPubSub, IExternalSystem + { + private readonly ProjectName projectName; + private readonly ILogger logger; + private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); + private readonly PublisherClient publisher = PublisherClient.Create(); + + public GoogleCloudPubSub(ProjectName projectName, ILogger logger) + { + Guard.NotNull(projectName, nameof(projectName)); + Guard.NotNull(logger, nameof(logger)); + + this.projectName = projectName; + + this.logger = logger; + } + + protected override void DisposeObject(bool disposing) + { + foreach (var subscription in subscriptions.Values) + { + subscription.Dispose(); + } + } + + public void Connect() + { + try + { + try + { + publisher.CreateTopic(new TopicName(projectName.ProjectId, "connection-test")); + } + catch (RpcException e) + { + if (e.Status.StatusCode != StatusCode.AlreadyExists) + { + throw; + } + } + } + catch (Exception ex) + { + throw new ConfigurationException($"GoogleCloud connection failed to connect to project {projectName.ProjectId}", ex); + } + } + + public void Publish(string channelName, string token, bool notifySelf) + { + Guard.NotNull(channelName, nameof(channelName)); + + subscriptions.GetOrAdd(channelName, Create).Publish(token, notifySelf); + } + + public IDisposable Subscribe(string channelName, Action handler) + { + Guard.NotNull(channelName, nameof(channelName)); + + return subscriptions.GetOrAdd(channelName, Create).Subscribe(handler); + } + + private GoogleCloudSubscription Create(string channelName) + { + var topicName = new TopicName(projectName.ProjectId, channelName); + + try + { + publisher.CreateTopic(topicName); + } + catch (RpcException e) + { + if (e.Status.StatusCode != StatusCode.AlreadyExists) + { + throw; + } + } + + return new GoogleCloudSubscription(topicName, logger); + } + } +} diff --git a/src/Squidex.Infrastructure.GoogleCloud/GoogleCloudSubscription.cs b/src/Squidex.Infrastructure.GoogleCloud/GoogleCloudSubscription.cs new file mode 100644 index 000000000..d2782ee91 --- /dev/null +++ b/src/Squidex.Infrastructure.GoogleCloud/GoogleCloudSubscription.cs @@ -0,0 +1,135 @@ +// ========================================================================== +// GoogleCloudSubscription.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Linq; +using System.Reactive.Subjects; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Google.Cloud.PubSub.V1; +using Google.Protobuf; +using Grpc.Core; +using Microsoft.Extensions.Logging; + +// ReSharper disable InvertIf + +namespace Squidex.Infrastructure.GoogleCloud +{ + public class GoogleCloudSubscription : DisposableObject + { + private static readonly Guid InstanceId = Guid.NewGuid(); + private const string EmptyData = "Empty"; + private readonly Subject subject = new Subject(); + private readonly PublisherClient publisher = PublisherClient.Create(); + private readonly TopicName topicName; + private readonly ILogger logger; + private readonly Task pullTask; + private readonly CancellationTokenSource completionToken = new CancellationTokenSource(); + + public GoogleCloudSubscription(TopicName topicName, ILogger logger) + { + this.topicName = topicName; + + this.logger = logger; + + pullTask = PullAsync(); + } + + protected override void DisposeObject(bool disposing) + { + completionToken.Cancel(); + + pullTask.Wait(); + } + + public void Publish(string token, bool notifySelf) + { + try + { + if (string.IsNullOrWhiteSpace(token)) + { + token = EmptyData; + } + + var message = new PubsubMessage + { + Attributes = + { + { "Sender", (notifySelf ? Guid.Empty : InstanceId).ToString() } + }, + Data = ByteString.CopyFromUtf8(token) + }; + + publisher.Publish(topicName, new [] { message }); + } + catch (Exception ex) + { + logger.LogError(InfrastructureErrors.InvalidatingReceivedFailed, ex, "Failed to send invalidation message {0}", token); + } + } + + private async Task PullAsync() + { + var subscriber = SubscriberClient.Create(); + var subscriptionName = new SubscriptionName(topicName.ProjectId, "squidex-" + Guid.NewGuid()); + + await subscriber.CreateSubscriptionAsync(subscriptionName, topicName, null, 60); + + try + { + while (!completionToken.IsCancellationRequested) + { + try + { + var response = await subscriber.PullAsync(subscriptionName, false, int.MaxValue, completionToken.Token); + + foreach (var receivedMessage in response.ReceivedMessages) + { + var token = receivedMessage.Message.Data.ToString(Encoding.UTF8); + + Guid sender; + + if (!receivedMessage.Message.Attributes.ContainsKey("Sender") || !Guid.TryParse(receivedMessage.Message.Attributes["Sender"], out sender)) + { + return; + } + + if (sender != InstanceId) + { + subject.OnNext(token); + } + } + + await subscriber.AcknowledgeAsync(subscriptionName, response.ReceivedMessages.Select(m => m.AckId)); + } + catch (RpcException e) + { + if (e.Status.StatusCode == StatusCode.DeadlineExceeded) + { + continue; + } + } + } + } + catch (TaskCanceledException) + { + logger.LogWarning("Pull process has been cancelled."); + } + finally + { + await subscriber.DeleteSubscriptionAsync(subscriptionName); + } + } + + public IDisposable Subscribe(Action handler) + { + return subject.Subscribe(handler); + } + } +} diff --git a/src/Squidex.Infrastructure.GoogleCloud/InfrastructureErrors.cs b/src/Squidex.Infrastructure.GoogleCloud/InfrastructureErrors.cs new file mode 100644 index 000000000..5660f8e8c --- /dev/null +++ b/src/Squidex.Infrastructure.GoogleCloud/InfrastructureErrors.cs @@ -0,0 +1,19 @@ +// ========================================================================== +// InfrastructureErrors.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Microsoft.Extensions.Logging; + +namespace Squidex.Infrastructure.GoogleCloud +{ + public class InfrastructureErrors + { + public static readonly EventId InvalidatingReceivedFailed = new EventId(40001, "InvalidingReceivedFailed"); + + public static readonly EventId InvalidatingPublishedFailed = new EventId(40002, "InvalidatingPublishedFailed"); + } +} diff --git a/src/Squidex.Infrastructure.GoogleCloud/Properties/AssemblyInfo.cs b/src/Squidex.Infrastructure.GoogleCloud/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..7871e023d --- /dev/null +++ b/src/Squidex.Infrastructure.GoogleCloud/Properties/AssemblyInfo.cs @@ -0,0 +1,19 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Squidex.Infrastructure.GoogleCloud")] +[assembly: AssemblyTrademark("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("4a80390e-507a-4477-8a10-be89a7427232")] diff --git a/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.xproj b/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.xproj new file mode 100644 index 000000000..57b68de12 --- /dev/null +++ b/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.xproj @@ -0,0 +1,21 @@ + + + + 14.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + + + + + 4a80390e-507a-4477-8a10-be89a7427232 + Squidex.Infrastructure.GoogleCloud + .\obj + .\bin\ + v4.6.1 + + + + 2.0 + + + diff --git a/src/Squidex.Infrastructure.GoogleCloud/project.json b/src/Squidex.Infrastructure.GoogleCloud/project.json new file mode 100644 index 000000000..92615713c --- /dev/null +++ b/src/Squidex.Infrastructure.GoogleCloud/project.json @@ -0,0 +1,24 @@ +{ + "version": "1.0.0-*", + "dependencies": { + "Google.Cloud.PubSub.V1": "1.0.0-beta06", + "Microsoft.Extensions.Caching.Abstractions": "1.1.0", + "Microsoft.Extensions.Logging": "1.1.0", + "Squidex.Infrastructure": "1.0.0-*", + "System.Linq": "4.3.0", + "System.Reactive": "3.1.1", + "System.Reflection.TypeExtensions": "4.3.0", + "System.Security.Claims": "4.3.0" + }, + "frameworks": { + "netstandard1.6": { + "dependencies": { + "NETStandard.Library": "1.6.1" + }, + "imports": "dnxcore50" + } + }, + "tooling": { + "defaultNamespace": "Squidex.Infrastructure.Redis" + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.Redis/InfrastructureErrors.cs b/src/Squidex.Infrastructure.Redis/InfrastructureErrors.cs index 28bec8176..ee0ccdd6d 100644 --- a/src/Squidex.Infrastructure.Redis/InfrastructureErrors.cs +++ b/src/Squidex.Infrastructure.Redis/InfrastructureErrors.cs @@ -12,8 +12,8 @@ namespace Squidex.Infrastructure.Redis { public class InfrastructureErrors { - public static readonly EventId InvalidatingReceivedFailed = new EventId(10001, "InvalidingReceivedFailed"); + public static readonly EventId InvalidatingReceivedFailed = new EventId(50001, "InvalidingReceivedFailed"); - public static readonly EventId InvalidatingPublishedFailed = new EventId(10002, "InvalidatingPublishedFailed"); + public static readonly EventId InvalidatingPublishedFailed = new EventId(50002, "InvalidatingPublishedFailed"); } } diff --git a/src/Squidex.Infrastructure.Redis/RedisPubSub.cs b/src/Squidex.Infrastructure.Redis/RedisPubSub.cs index 4bb749bd2..dcf893bb8 100644 --- a/src/Squidex.Infrastructure.Redis/RedisPubSub.cs +++ b/src/Squidex.Infrastructure.Redis/RedisPubSub.cs @@ -15,12 +15,12 @@ namespace Squidex.Infrastructure.Redis { public class RedisPubSub : IPubSub, IExternalSystem { - private readonly ConnectionMultiplexer redis; - private readonly ConcurrentDictionary subjects = new ConcurrentDictionary(); + private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); + private readonly IConnectionMultiplexer redis; private readonly ILogger logger; private readonly ISubscriber subscriber; - public RedisPubSub(ConnectionMultiplexer redis, ILogger logger) + public RedisPubSub(IConnectionMultiplexer redis, ILogger logger) { Guard.NotNull(redis, nameof(redis)); Guard.NotNull(logger, nameof(logger)); @@ -48,14 +48,14 @@ namespace Squidex.Infrastructure.Redis { Guard.NotNullOrEmpty(channelName, nameof(channelName)); - subjects.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Invalidate(token, notifySelf); + subscriptions.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Publish(token, notifySelf); } public IDisposable Subscribe(string channelName, Action handler) { Guard.NotNullOrEmpty(channelName, nameof(channelName)); - return subjects.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Subscribe(handler); + return subscriptions.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Subscribe(handler); } } } diff --git a/src/Squidex.Infrastructure.Redis/RedisSubscription.cs b/src/Squidex.Infrastructure.Redis/RedisSubscription.cs index f7cdc4279..9d37539d1 100644 --- a/src/Squidex.Infrastructure.Redis/RedisSubscription.cs +++ b/src/Squidex.Infrastructure.Redis/RedisSubscription.cs @@ -34,7 +34,7 @@ namespace Squidex.Infrastructure.Redis this.channelName = channelName; } - public void Invalidate(string token, bool notifySelf) + public void Publish(string token, bool notifySelf) { try { @@ -59,7 +59,7 @@ namespace Squidex.Infrastructure.Redis var parts = value.Split('#'); - if (parts.Length < 2) + if (parts.Length < 1) { return; } diff --git a/src/Squidex.Infrastructure/Caching/InvalidatingCache.cs b/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs similarity index 88% rename from src/Squidex.Infrastructure/Caching/InvalidatingCache.cs rename to src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs index e7b67b3ca..0a2ea7566 100644 --- a/src/Squidex.Infrastructure/Caching/InvalidatingCache.cs +++ b/src/Squidex.Infrastructure/Caching/InvalidatingMemoryCache.cs @@ -10,13 +10,13 @@ using Microsoft.Extensions.Caching.Memory; namespace Squidex.Infrastructure.Caching { - public class InvalidatingCache : IMemoryCache + public class InvalidatingMemoryCache : IMemoryCache { private const string ChannelName = "CacheInvalidations"; private readonly IMemoryCache inner; private readonly IPubSub invalidator; - public InvalidatingCache(IMemoryCache inner, IPubSub invalidator) + public InvalidatingMemoryCache(IMemoryCache inner, IPubSub invalidator) { Guard.NotNull(inner, nameof(inner)); Guard.NotNull(invalidator, nameof(invalidator)); @@ -24,7 +24,7 @@ namespace Squidex.Infrastructure.Caching this.inner = inner; this.invalidator = invalidator; - invalidator.Subscribe(ChannelName, Remove); + invalidator.Subscribe(ChannelName, inner.Remove); } public void Dispose() diff --git a/src/Squidex/Config/Domain/ClusterModule.cs b/src/Squidex/Config/Domain/ClusterModule.cs index 93158979c..c957df007 100644 --- a/src/Squidex/Config/Domain/ClusterModule.cs +++ b/src/Squidex/Config/Domain/ClusterModule.cs @@ -8,9 +8,11 @@ using System; using Autofac; +using Google.Cloud.PubSub.V1; using Microsoft.Extensions.Configuration; using Squidex.Infrastructure; using Squidex.Infrastructure.CQRS.Events; +using Squidex.Infrastructure.GoogleCloud; using Squidex.Infrastructure.Redis; using StackExchange.Redis; @@ -67,10 +69,35 @@ namespace Squidex.Config.Domain builder.RegisterType() .As() - .As() + .As() .SingleInstance(); } - else if (!string.Equals(clustererType, "None", StringComparison.OrdinalIgnoreCase)) + else if (string.Equals(clustererType, "GCE", StringComparison.OrdinalIgnoreCase)) + { + var projectId = Configuration.GetValue("squidex:clusterer:gce:projectId"); + + if (string.IsNullOrWhiteSpace(projectId)) + { + throw new ConfigurationException("You must specify the Google cloud engine project id in the 'squidex:clusterer:gce:projectId' configuration section."); + } + + builder.RegisterInstance(new ProjectName(projectId)) + .AsSelf() + .SingleInstance(); + + builder.RegisterType() + .As() + .As() + .SingleInstance(); + + } + else if (string.Equals(clustererType, "None", StringComparison.OrdinalIgnoreCase)) + { + builder.RegisterType() + .As() + .SingleInstance(); + } + else { throw new ConfigurationException($"Unsupported clusterer type '{clustererType}' for key 'squidex:clusterer:type', supported: Redis, None."); } diff --git a/src/Squidex/Config/Domain/InfrastructureModule.cs b/src/Squidex/Config/Domain/InfrastructureModule.cs index 3808d708a..5a38f13bf 100644 --- a/src/Squidex/Config/Domain/InfrastructureModule.cs +++ b/src/Squidex/Config/Domain/InfrastructureModule.cs @@ -56,10 +56,6 @@ namespace Squidex.Config.Domain .As() .SingleInstance(); - builder.RegisterType() - .As() - .SingleInstance(); - builder.RegisterType() .As() .SingleInstance(); @@ -80,7 +76,7 @@ namespace Squidex.Config.Domain .AsSelf() .SingleInstance(); - builder.Register(c => new InvalidatingCache(new MemoryCache(c.Resolve>()), c.Resolve())) + builder.Register(c => new InvalidatingMemoryCache(new MemoryCache(c.Resolve>()), c.Resolve())) .As() .SingleInstance(); } diff --git a/src/Squidex/Properties/launchSettings.json b/src/Squidex/Properties/launchSettings.json index 1aa5504bd..0bb6a0c3a 100644 --- a/src/Squidex/Properties/launchSettings.json +++ b/src/Squidex/Properties/launchSettings.json @@ -18,7 +18,9 @@ "commandName": "Project", "launchUrl": "http://localhost:5000", "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" + "SQUIDEX__CLUSTERER__GCE__PROJECTID": "squidex-157415", + "ASPNETCORE_ENVIRONMENT": "Development", + "SQUIDEX__CLUSTERER__TYPE": "gce" } } } diff --git a/src/Squidex/project.json b/src/Squidex/project.json index 7f3be446a..547bcba4a 100644 --- a/src/Squidex/project.json +++ b/src/Squidex/project.json @@ -31,6 +31,7 @@ "Squidex.Core": "1.0.0-*", "Squidex.Events": "1.0.0-*", "Squidex.Infrastructure": "1.0.0-*", + "Squidex.Infrastructure.GoogleCloud": "1.0.0-*", "Squidex.Infrastructure.MongoDb": "1.0.0-*", "Squidex.Infrastructure.Redis": "1.0.0-*", "Squidex.Read": "1.0.0-*",