Browse Source

PubSub

pull/1/head
Sebastian 9 years ago
parent
commit
d58c0e7d30
  1. 2
      src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs
  2. 19
      src/Squidex.Infrastructure.RabbitMq/InfrastructureErrors.cs
  3. 135
      src/Squidex.Infrastructure.RabbitMq/RabbitMqEventBus.cs
  4. 21
      src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.xproj
  5. 31
      src/Squidex.Infrastructure.RabbitMq/project.json
  6. 63
      src/Squidex.Infrastructure.Redis/RedisEventNotifier.cs
  7. 37
      src/Squidex.Infrastructure.Redis/RedisExternalSystem.cs
  8. 61
      src/Squidex.Infrastructure.Redis/RedisPubSub.cs
  9. 66
      src/Squidex.Infrastructure.Redis/RedisSubscription.cs
  10. 3
      src/Squidex.Infrastructure.Redis/project.json
  11. 18
      src/Squidex.Infrastructure/CQRS/Events/DefaultMemoryEventNotifier.cs
  12. 36
      src/Squidex.Infrastructure/Caching/InvalidatingCache.cs
  13. 8
      src/Squidex.Infrastructure/Caching/WrapperCacheEntry.cs
  14. 2
      src/Squidex.Infrastructure/IExternalSystem.cs
  15. 19
      src/Squidex.Infrastructure/IPubSub.cs
  16. 32
      src/Squidex.Infrastructure/InMemoryPubSub.cs
  17. 4
      src/Squidex.Infrastructure/InfrastructureErrors.cs
  18. 1
      src/Squidex.Infrastructure/project.json
  19. 21
      src/Squidex/Config/Domain/ClusterModule.cs
  20. 14
      src/Squidex/Config/Domain/InfrastructureModule.cs
  21. 2
      src/Squidex/Config/Domain/Usages.cs

2
src/Squidex.Infrastructure.MongoDb/MongoRepositoryBase.cs

@ -144,7 +144,7 @@ namespace Squidex.Infrastructure.MongoDb
} }
} }
public void CheckConnection() public void Connect()
{ {
try try
{ {

19
src/Squidex.Infrastructure.RabbitMq/InfrastructureErrors.cs

@ -1,19 +0,0 @@
// ==========================================================================
// InfrastructureErrors.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using Microsoft.Extensions.Logging;
namespace Squidex.Infrastructure.RabbitMq
{
public class InfrastructureErrors
{
public static readonly EventId EventHandlingFailed = new EventId(10001, "EventHandlingFailed");
public static readonly EventId EventDeserializationFailed = new EventId(10002, "EventDeserializationFailed");
}
}

135
src/Squidex.Infrastructure.RabbitMq/RabbitMqEventBus.cs

@ -1,135 +0,0 @@
// ==========================================================================
// EventChannel.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Text;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Squidex.Infrastructure.CQRS.Events;
// ReSharper disable InvertIf
namespace Squidex.Infrastructure.RabbitMq
{
public sealed class RabbitMqEventBus : DisposableObject, IExternalSystem
{
private readonly bool isPersistent;
private readonly string queueName;
private const string Exchange = "Squidex";
private readonly ConnectionFactory connectionFactory;
private readonly Lazy<IConnection> connection;
private readonly Lazy<IModel> channel;
private EventingBasicConsumer consumer;
public RabbitMqEventBus(ConnectionFactory connectionFactory, bool isPersistent, string queueName)
{
Guard.NotNull(connectionFactory, nameof(connectionFactory));
this.queueName = queueName;
this.connectionFactory = connectionFactory;
connection = new Lazy<IConnection>(connectionFactory.CreateConnection);
channel = new Lazy<IModel>(() => CreateChannel(connection.Value));
this.isPersistent = isPersistent;
}
protected override void DisposeObject(bool disposing)
{
if (connection.IsValueCreated)
{
connection.Value.Close();
connection.Value.Dispose();
}
}
public void Publish(EventData eventData)
{
ThrowIfDisposed();
channel.Value.BasicPublish(Exchange, string.Empty, null, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(eventData)));
}
public void CheckConnection()
{
try
{
var currentConnection = connection.Value;
if (!currentConnection.IsOpen)
{
throw new ConfigurationException($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}");
}
}
catch (Exception e)
{
throw new ConfigurationException($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}", e);
}
}
public void Connect(string queuePrefix, Action<EventData> received)
{
ThrowIfDisposed();
ThrowIfConnected();
lock (connection)
{
var currentChannel = channel.Value;
ThrowIfConnected();
var fullQueueName = $"{queuePrefix}_";
if (!string.IsNullOrWhiteSpace(queueName))
{
fullQueueName += queueName;
}
else
{
fullQueueName += Environment.MachineName;
}
currentChannel.QueueDeclare(fullQueueName, isPersistent, false, !isPersistent);
currentChannel.QueueBind(fullQueueName, Exchange, string.Empty);
consumer = new EventingBasicConsumer(currentChannel);
consumer.Received += (model, e) =>
{
var eventData = JsonConvert.DeserializeObject<EventData>(Encoding.UTF8.GetString(e.Body));
received(eventData);
};
currentChannel.BasicConsume(fullQueueName, true, consumer);
}
}
private static IModel CreateChannel(IConnection connection, bool declareExchange = true)
{
var channel = connection.CreateModel();
if (declareExchange)
{
channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true);
}
return channel;
}
private void ThrowIfConnected()
{
if (consumer != null)
{
throw new InvalidOperationException("Already connected to channel.");
}
}
}
}

21
src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.xproj

@ -1,21 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>3c9ba12d-f5f2-4355-8d30-8289e4d0752d</ProjectGuid>
<RootNamespace>Squidex.Infrastructure.RabbitMq</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
</PropertyGroup>
<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>

31
src/Squidex.Infrastructure.RabbitMq/project.json

@ -1,31 +0,0 @@
{
"version": "1.0.0-*",
"dependencies": {
"Autofac": "4.3.0",
"Microsoft.Extensions.Logging": "1.1.0",
"Newtonsoft.Json": "9.0.2-beta2",
"NodaTime": "2.0.0-beta20170123",
"RabbitMQ.Client": "5.0.0-pre2",
"Squidex.Infrastructure": "1.0.0-*",
"System.Linq": "4.3.0",
"System.Reactive": "3.1.1",
"System.Reflection.TypeExtensions": "4.3.0",
"System.Security.Claims": "4.3.0"
},
"frameworks": {
"netstandard1.6": {
"dependencies": {
"NETStandard.Library": "1.6.1"
},
"imports": "dnxcore50"
}
},
"buildOptions": {
"embed": [
"*.csv"
]
},
"tooling": {
"defaultNamespace": "Squidex.Infrastructure.RabbitMq"
}
}

63
src/Squidex.Infrastructure.Redis/RedisEventNotifier.cs

@ -1,63 +0,0 @@
// ==========================================================================
// RedisEventNotifier.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using Microsoft.Extensions.Logging;
using Squidex.Infrastructure.CQRS.Events;
using StackExchange.Redis;
namespace Squidex.Infrastructure.Redis
{
public sealed class RedisEventNotifier : IEventNotifier
{
private const string Channel = "SquidexEventNotifications";
private readonly InMemoryEventNotifier inMemoryNotifier = new InMemoryEventNotifier();
private readonly ISubscriber subscriber;
private readonly ILogger<RedisEventNotifier> logger;
public RedisEventNotifier(IConnectionMultiplexer redis, ILogger<RedisEventNotifier> logger)
{
Guard.NotNull(redis, nameof(redis));
Guard.NotNull(logger, nameof(logger));
subscriber = redis.GetSubscriber();
subscriber.Subscribe(Channel, (channel, value) => HandleInvalidation());
this.logger = logger;
}
private void HandleInvalidation()
{
try
{
inMemoryNotifier.NotifyEventsStored();
}
catch (Exception ex)
{
logger.LogError(InfrastructureErrors.InvalidatingReceivedFailed, ex, "Failed to receive invalidation message.");
}
}
public void NotifyEventsStored()
{
try
{
subscriber.Publish(Channel, RedisValue.Null);
}
catch (Exception ex)
{
logger.LogError(InfrastructureErrors.InvalidatingReceivedFailed, ex, "Failed to send invalidation message");
}
}
public void Subscribe(Action handler)
{
inMemoryNotifier.Subscribe(handler);
}
}
}

37
src/Squidex.Infrastructure.Redis/RedisExternalSystem.cs

@ -1,37 +0,0 @@
// ==========================================================================
// RedisExternalSystem.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using StackExchange.Redis;
namespace Squidex.Infrastructure.Redis
{
public sealed class RedisExternalSystem : IExternalSystem
{
private readonly IConnectionMultiplexer redis;
public RedisExternalSystem(IConnectionMultiplexer redis)
{
Guard.NotNull(redis, nameof(redis));
this.redis = redis;
}
public void CheckConnection()
{
try
{
redis.GetStatus();
}
catch (Exception ex)
{
throw new ConfigurationException($"Redis connection failed to connect to database {redis.Configuration}", ex);
}
}
}
}

61
src/Squidex.Infrastructure.Redis/RedisPubSub.cs

@ -0,0 +1,61 @@
// ==========================================================================
// RedisInvalidator2.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace Squidex.Infrastructure.Redis
{
public class RedisPubSub : IPubSub, IExternalSystem
{
private readonly ConnectionMultiplexer redis;
private readonly ConcurrentDictionary<string, RedisSubscription> subjects = new ConcurrentDictionary<string, RedisSubscription>();
private readonly ILogger<RedisPubSub> logger;
private readonly ISubscriber subscriber;
public RedisPubSub(ConnectionMultiplexer redis, ILogger<RedisPubSub> logger)
{
Guard.NotNull(redis, nameof(redis));
Guard.NotNull(logger, nameof(logger));
this.redis = redis;
this.logger = logger;
subscriber = redis.GetSubscriber();
}
public void Connect()
{
try
{
redis.GetStatus();
}
catch (Exception ex)
{
throw new ConfigurationException($"Redis connection failed to connect to database {redis.Configuration}", ex);
}
}
public void Publish(string channelName, string token, bool notifySelf)
{
Guard.NotNullOrEmpty(channelName, nameof(channelName));
subjects.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Invalidate(token, notifySelf);
}
public IDisposable Subscribe(string channelName, Action<string> handler)
{
Guard.NotNullOrEmpty(channelName, nameof(channelName));
return subjects.GetOrAdd(channelName, c => new RedisSubscription(subscriber, c, logger)).Subscribe(handler);
}
}
}

66
src/Squidex.Infrastructure.Redis/RedisInvalidator.cs → src/Squidex.Infrastructure.Redis/RedisSubscription.cs

@ -1,5 +1,5 @@
// ========================================================================== // ==========================================================================
// RedisInvalidator.cs // RedisSubscription.cs
// Squidex Headless CMS // Squidex Headless CMS
// ========================================================================== // ==========================================================================
// Copyright (c) Squidex Group // Copyright (c) Squidex Group
@ -7,40 +7,45 @@
// ========================================================================== // ==========================================================================
using System; using System;
using Microsoft.Extensions.Caching.Memory; using System.Linq;
using System.Reactive.Subjects;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using StackExchange.Redis; using StackExchange.Redis;
// ReSharper disable InvertIf // ReSharper disable InvertIf
// ReSharper disable ArrangeThisQualifier
namespace Squidex.Infrastructure.Redis namespace Squidex.Infrastructure.Redis
{ {
internal sealed class RedisInvalidator internal sealed class RedisSubscription
{ {
private const string Channel = "SquidexChannelInvalidations"; private static readonly Guid InstanceId = Guid.NewGuid();
private readonly Guid instanceId = Guid.NewGuid(); private readonly Subject<string> subject = new Subject<string>();
private readonly ISubscriber subscriber; private readonly ISubscriber subscriber;
private readonly IMemoryCache cache; private readonly string channelName;
private readonly ILogger<RedisInvalidatingCache> logger; private readonly ILogger<RedisPubSub> logger;
private int invalidationsReceived;
public int InvalidationsReceived public RedisSubscription(ISubscriber subscriber, string channelName, ILogger<RedisPubSub> logger)
{ {
get this.logger = logger;
{
return invalidationsReceived; this.subscriber = subscriber;
} this.subscriber.Subscribe(channelName, (channel, value) => HandleInvalidation(value));
this.channelName = channelName;
} }
public RedisInvalidator(IConnectionMultiplexer redis, IMemoryCache cache, ILogger<RedisInvalidatingCache> logger) public void Invalidate(string token, bool notifySelf)
{ {
this.cache = cache; try
{
subscriber = redis.GetSubscriber(); var message = string.Join("#", (notifySelf ? Guid.Empty : InstanceId).ToString());
subscriber.Subscribe(Channel, (channel, value) => HandleInvalidation(value));
this.logger = logger; subscriber.Publish(channelName, message);
}
catch (Exception ex)
{
logger.LogError(InfrastructureErrors.InvalidatingReceivedFailed, ex, "Failed to send invalidation message {0}", token);
}
} }
private void HandleInvalidation(string value) private void HandleInvalidation(string value)
@ -54,7 +59,7 @@ namespace Squidex.Infrastructure.Redis
var parts = value.Split('#'); var parts = value.Split('#');
if (parts.Length != 2) if (parts.Length < 2)
{ {
return; return;
} }
@ -66,11 +71,11 @@ namespace Squidex.Infrastructure.Redis
return; return;
} }
if (sender != instanceId) if (sender != InstanceId)
{ {
invalidationsReceived++; var token = string.Join("#", parts.Skip(1));
cache.Remove(parts[1]); subject.OnNext(token);
} }
} }
catch (Exception ex) catch (Exception ex)
@ -79,18 +84,9 @@ namespace Squidex.Infrastructure.Redis
} }
} }
public void Invalidate(string key) public IDisposable Subscribe(Action<string> handler)
{ {
try return subject.Subscribe(handler);
{
var message = string.Join("#", instanceId.ToString());
subscriber.Publish(Channel, message);
}
catch (Exception ex)
{
logger.LogError(InfrastructureErrors.InvalidatingReceivedFailed, ex, "Failed to send invalidation message {0}", key);
}
} }
} }
} }

3
src/Squidex.Infrastructure.Redis/project.json

@ -1,11 +1,8 @@
{ {
"version": "1.0.0-*", "version": "1.0.0-*",
"dependencies": { "dependencies": {
"Autofac": "4.3.0",
"Microsoft.Extensions.Caching.Abstractions": "1.1.0", "Microsoft.Extensions.Caching.Abstractions": "1.1.0",
"Microsoft.Extensions.Logging": "1.1.0", "Microsoft.Extensions.Logging": "1.1.0",
"Newtonsoft.Json": "9.0.2-beta2",
"NodaTime": "2.0.0-beta20170123",
"Squidex.Infrastructure": "1.0.0-*", "Squidex.Infrastructure": "1.0.0-*",
"StackExchange.Redis.StrongName": "1.2.0", "StackExchange.Redis.StrongName": "1.2.0",
"System.Linq": "4.3.0", "System.Linq": "4.3.0",

18
src/Squidex.Infrastructure/CQRS/Events/InMemoryEventNotifier.cs → src/Squidex.Infrastructure/CQRS/Events/DefaultMemoryEventNotifier.cs

@ -7,22 +7,30 @@
// ========================================================================== // ==========================================================================
using System; using System;
using System.Reactive.Subjects;
namespace Squidex.Infrastructure.CQRS.Events namespace Squidex.Infrastructure.CQRS.Events
{ {
public sealed class InMemoryEventNotifier : IEventNotifier public sealed class DefaultMemoryEventNotifier : IEventNotifier
{ {
private readonly Subject<object> subject = new Subject<object>(); private readonly string ChannelName = typeof(DefaultMemoryEventNotifier).Name;
private readonly IPubSub invalidator;
public DefaultMemoryEventNotifier(IPubSub invalidator)
{
Guard.NotNull(invalidator, nameof(invalidator));
this.invalidator = invalidator;
}
public void NotifyEventsStored() public void NotifyEventsStored()
{ {
subject.OnNext(null); invalidator.Publish(ChannelName, string.Empty, true);
} }
public void Subscribe(Action handler) public void Subscribe(Action handler)
{ {
subject.Subscribe(_ => handler()); invalidator.Subscribe(ChannelName, x => handler());
} }
} }
} }

36
src/Squidex.Infrastructure.Redis/RedisInvalidatingCache.cs → src/Squidex.Infrastructure/Caching/InvalidatingCache.cs

@ -1,5 +1,5 @@
// ========================================================================== // ==========================================================================
// RedisInvalidatingCache.cs // InvalidatingCache.cs
// Squidex Headless CMS // Squidex Headless CMS
// ========================================================================== // ==========================================================================
// Copyright (c) Squidex Group // Copyright (c) Squidex Group
@ -7,25 +7,24 @@
// ========================================================================== // ==========================================================================
using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace Squidex.Infrastructure.Redis namespace Squidex.Infrastructure.Caching
{ {
public class RedisInvalidatingCache : IMemoryCache public class InvalidatingCache : IMemoryCache
{ {
private const string ChannelName = "CacheInvalidations";
private readonly IMemoryCache inner; private readonly IMemoryCache inner;
private readonly RedisInvalidator invalidator; private readonly IPubSub invalidator;
public RedisInvalidatingCache(IMemoryCache inner, IConnectionMultiplexer redis, ILogger<RedisInvalidatingCache> logger) public InvalidatingCache(IMemoryCache inner, IPubSub invalidator)
{ {
Guard.NotNull(redis, nameof(redis));
Guard.NotNull(inner, nameof(inner)); Guard.NotNull(inner, nameof(inner));
Guard.NotNull(logger, nameof(logger)); Guard.NotNull(invalidator, nameof(invalidator));
this.inner = inner; this.inner = inner;
this.invalidator = invalidator;
invalidator = new RedisInvalidator(redis, inner, logger); invalidator.Subscribe(ChannelName, Remove);
} }
public void Dispose() public void Dispose()
@ -33,6 +32,11 @@ namespace Squidex.Infrastructure.Redis
inner.Dispose(); inner.Dispose();
} }
public ICacheEntry CreateEntry(object key)
{
return new WrapperCacheEntry(inner.CreateEntry(key), Invalidate);
}
public bool TryGetValue(object key, out object value) public bool TryGetValue(object key, out object value)
{ {
return inner.TryGetValue(key, out value); return inner.TryGetValue(key, out value);
@ -42,15 +46,15 @@ namespace Squidex.Infrastructure.Redis
{ {
inner.Remove(key); inner.Remove(key);
if (key is string) Invalidate(key);
{
invalidator.Invalidate(key.ToString());
}
} }
public ICacheEntry CreateEntry(object key) private void Invalidate(object key)
{ {
return new WrapperCacheEntry(inner.CreateEntry(key), invalidator); if (key is string)
{
invalidator.Publish(ChannelName, key.ToString(), false);
}
} }
} }
} }

8
src/Squidex.Infrastructure.Redis/WrapperCacheEntry.cs → src/Squidex.Infrastructure/Caching/WrapperCacheEntry.cs

@ -11,12 +11,12 @@ using System.Collections.Generic;
using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives; using Microsoft.Extensions.Primitives;
namespace Squidex.Infrastructure.Redis namespace Squidex.Infrastructure.Caching
{ {
internal sealed class WrapperCacheEntry : ICacheEntry internal sealed class WrapperCacheEntry : ICacheEntry
{ {
private readonly ICacheEntry inner; private readonly ICacheEntry inner;
private readonly RedisInvalidator invalidator; private readonly Action<object> invalidator;
public object Key public object Key
{ {
@ -63,7 +63,7 @@ namespace Squidex.Infrastructure.Redis
set { inner.Value = value; } set { inner.Value = value; }
} }
public WrapperCacheEntry(ICacheEntry inner, RedisInvalidator invalidator) public WrapperCacheEntry(ICacheEntry inner, Action<object> invalidator)
{ {
this.inner = inner; this.inner = inner;
@ -74,7 +74,7 @@ namespace Squidex.Infrastructure.Redis
{ {
if (Key is string) if (Key is string)
{ {
invalidator.Invalidate(Key?.ToString()); invalidator(Key);
} }
inner.Dispose(); inner.Dispose();

2
src/Squidex.Infrastructure/IExternalSystem.cs

@ -10,6 +10,6 @@ namespace Squidex.Infrastructure
{ {
public interface IExternalSystem public interface IExternalSystem
{ {
void CheckConnection(); void Connect();
} }
} }

19
src/Squidex.Infrastructure/IPubSub.cs

@ -0,0 +1,19 @@
// ==========================================================================
// IInvalidator.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
namespace Squidex.Infrastructure
{
public interface IPubSub
{
void Publish(string channelName, string token, bool notifySelf);
IDisposable Subscribe(string channelName, Action<string> handler);
}
}

32
src/Squidex.Infrastructure/InMemoryPubSub.cs

@ -0,0 +1,32 @@
// ==========================================================================
// InMemoryInvalidator.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Concurrent;
using System.Reactive.Subjects;
namespace Squidex.Infrastructure
{
public sealed class InMemoryPubSub : IPubSub
{
private readonly ConcurrentDictionary<string, Subject<string>> subjects = new ConcurrentDictionary<string, Subject<string>>();
public void Publish(string channelName, string token, bool notifySelf)
{
if (notifySelf)
{
subjects.GetOrAdd(channelName, k => new Subject<string>()).OnNext(token);
}
}
public IDisposable Subscribe(string channelName, Action<string> handler)
{
return subjects.GetOrAdd(channelName, k => new Subject<string>()).Subscribe(handler);
}
}
}

4
src/Squidex.Infrastructure/InfrastructureErrors.cs

@ -19,9 +19,5 @@ namespace Squidex.Infrastructure
public static readonly EventId EventHandlingFailed = new EventId(10001, "EventHandlingFailed"); public static readonly EventId EventHandlingFailed = new EventId(10001, "EventHandlingFailed");
public static readonly EventId EventDeserializationFailed = new EventId(10002, "EventDeserializationFailed"); public static readonly EventId EventDeserializationFailed = new EventId(10002, "EventDeserializationFailed");
public static readonly EventId ReplayClearingFailed = new EventId(30001, "ReplayClearingFailed");
public static readonly EventId ReplayPublishingFailed = new EventId(30003, "ReplayPublishingFailed");
} }
} }

1
src/Squidex.Infrastructure/project.json

@ -1,6 +1,7 @@
{ {
"version": "1.0.0-*", "version": "1.0.0-*",
"dependencies": { "dependencies": {
"Microsoft.Extensions.Caching.Abstractions": "1.1.0",
"Microsoft.Extensions.Logging": "1.1.0", "Microsoft.Extensions.Logging": "1.1.0",
"Newtonsoft.Json": "9.0.2-beta2", "Newtonsoft.Json": "9.0.2-beta2",
"NodaTime": "2.0.0-beta20170123", "NodaTime": "2.0.0-beta20170123",

21
src/Squidex/Config/Domain/ClusterModule.cs

@ -8,10 +8,7 @@
using System; using System;
using Autofac; using Autofac;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Squidex.Infrastructure; using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.Redis; using Squidex.Infrastructure.Redis;
@ -68,24 +65,10 @@ namespace Squidex.Config.Domain
throw new ConfigurationException($"Redis connection failed to connect to database {connectionString}", ex); throw new ConfigurationException($"Redis connection failed to connect to database {connectionString}", ex);
} }
builder.RegisterType<RedisEventNotifier>() builder.RegisterType<RedisPubSub>()
.As<IPubSub>()
.As<IEventNotifier>() .As<IEventNotifier>()
.SingleInstance(); .SingleInstance();
builder.RegisterType<RedisExternalSystem>()
.As<IExternalSystem>()
.SingleInstance();
builder.Register(c =>
{
var inner = new MemoryCache(c.Resolve<IOptions<MemoryCacheOptions>>());
return new RedisInvalidatingCache(inner,
c.Resolve<IConnectionMultiplexer>(),
c.Resolve<ILogger<RedisInvalidatingCache>>());
})
.As<IMemoryCache>()
.SingleInstance();
} }
else if (!string.Equals(clustererType, "None", StringComparison.OrdinalIgnoreCase)) else if (!string.Equals(clustererType, "None", StringComparison.OrdinalIgnoreCase))
{ {

14
src/Squidex/Config/Domain/InfrastructureModule.cs

@ -9,9 +9,13 @@
using Autofac; using Autofac;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc.Infrastructure; using Microsoft.AspNetCore.Mvc.Infrastructure;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using Squidex.Core.Schemas; using Squidex.Core.Schemas;
using Squidex.Core.Schemas.Json; using Squidex.Core.Schemas.Json;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Caching;
using Squidex.Infrastructure.CQRS.Commands; using Squidex.Infrastructure.CQRS.Commands;
using Squidex.Infrastructure.CQRS.Events; using Squidex.Infrastructure.CQRS.Events;
@ -52,7 +56,11 @@ namespace Squidex.Config.Domain
.As<ICommandBus>() .As<ICommandBus>()
.SingleInstance(); .SingleInstance();
builder.RegisterType<InMemoryEventNotifier>() builder.RegisterType<InMemoryPubSub>()
.As<IPubSub>()
.SingleInstance();
builder.RegisterType<DefaultMemoryEventNotifier>()
.As<IEventNotifier>() .As<IEventNotifier>()
.SingleInstance(); .SingleInstance();
@ -71,6 +79,10 @@ namespace Squidex.Config.Domain
builder.RegisterType<FieldRegistry>() builder.RegisterType<FieldRegistry>()
.AsSelf() .AsSelf()
.SingleInstance(); .SingleInstance();
builder.Register(c => new InvalidatingCache(new MemoryCache(c.Resolve<IOptions<MemoryCacheOptions>>()), c.Resolve<IPubSub>()))
.As<IMemoryCache>()
.SingleInstance();
} }
} }
} }

2
src/Squidex/Config/Domain/Usages.cs

@ -54,7 +54,7 @@ namespace Squidex.Config.Domain
foreach (var system in systems) foreach (var system in systems)
{ {
system.CheckConnection(); system.Connect();
} }
return app; return app;

Loading…
Cancel
Save