mirror of https://github.com/Squidex/squidex.git
68 changed files with 1141 additions and 802 deletions
@ -1,29 +1,32 @@ |
|||||
// ==========================================================================
|
// ==========================================================================
|
||||
// MongoStreamPositionEntity.cs
|
// MongoEvent.cs
|
||||
// Squidex Headless CMS
|
// Squidex Headless CMS
|
||||
// ==========================================================================
|
// ==========================================================================
|
||||
// Copyright (c) Squidex Group
|
// Copyright (c) Squidex Group
|
||||
// All rights reserved.
|
// All rights reserved.
|
||||
// ==========================================================================
|
// ==========================================================================
|
||||
|
|
||||
using System.Runtime.Serialization; |
using System; |
||||
using MongoDB.Bson; |
|
||||
using MongoDB.Bson.Serialization.Attributes; |
using MongoDB.Bson.Serialization.Attributes; |
||||
|
|
||||
namespace Squidex.Store.MongoDb.Infrastructure |
namespace Squidex.Infrastructure.MongoDb.EventStore |
||||
{ |
{ |
||||
[DataContract] |
public class MongoEvent |
||||
public class MongoStreamPositionEntity |
|
||||
{ |
{ |
||||
[BsonId] |
[BsonElement] |
||||
public ObjectId Id { get; set; } |
|
||||
|
|
||||
[BsonRequired] |
[BsonRequired] |
||||
|
public Guid EventId { get; set; } |
||||
|
|
||||
[BsonElement] |
[BsonElement] |
||||
public string SubscriptionName { get; set; } |
[BsonRequired] |
||||
|
public string Payload { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
[BsonRequired] |
[BsonRequired] |
||||
|
public string Metadata { get; set; } |
||||
|
|
||||
[BsonElement] |
[BsonElement] |
||||
public int? Position { get; set; } |
[BsonRequired] |
||||
|
public string Type { get; set; } |
||||
} |
} |
||||
} |
} |
||||
@ -0,0 +1,43 @@ |
|||||
|
// ==========================================================================
|
||||
|
// MongoEventCommit.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using MongoDB.Bson; |
||||
|
using MongoDB.Bson.Serialization.Attributes; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.MongoDb.EventStore |
||||
|
{ |
||||
|
public sealed class MongoEventCommit |
||||
|
{ |
||||
|
[BsonId] |
||||
|
[BsonElement] |
||||
|
[BsonRepresentation(BsonType.String)] |
||||
|
public Guid Id { get; set; } |
||||
|
|
||||
|
[BsonRequired] |
||||
|
[BsonElement] |
||||
|
public DateTime Timestamp { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
|
[BsonRequired] |
||||
|
public List<MongoEvent> Events { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
|
[BsonRequired] |
||||
|
public string EventStream { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
|
[BsonRequired] |
||||
|
public int EventsVersion { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
|
[BsonRequired] |
||||
|
public int EventCount { get; set; } |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,137 @@ |
|||||
|
// ==========================================================================
|
||||
|
// MongoEventStore.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Reactive.Linq; |
||||
|
using System.Threading.Tasks; |
||||
|
using MongoDB.Bson; |
||||
|
using MongoDB.Bson.Serialization.Attributes; |
||||
|
using MongoDB.Driver; |
||||
|
using Squidex.Infrastructure.CQRS.Events; |
||||
|
using Squidex.Infrastructure.Reflection; |
||||
|
// ReSharper disable ClassNeverInstantiated.Local
|
||||
|
// ReSharper disable UnusedMember.Local
|
||||
|
|
||||
|
namespace Squidex.Infrastructure.MongoDb.EventStore |
||||
|
{ |
||||
|
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore |
||||
|
{ |
||||
|
private sealed class EventCountEntity |
||||
|
{ |
||||
|
[BsonId] |
||||
|
[BsonElement] |
||||
|
[BsonRepresentation(BsonType.String)] |
||||
|
public Guid Id { get; set; } |
||||
|
|
||||
|
[BsonElement] |
||||
|
[BsonRequired] |
||||
|
public int EventCount { get; set; } |
||||
|
} |
||||
|
|
||||
|
public MongoEventStore(IMongoDatabase database) |
||||
|
: base(database) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
protected override string CollectionName() |
||||
|
{ |
||||
|
return "Events"; |
||||
|
} |
||||
|
|
||||
|
protected override Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection) |
||||
|
{ |
||||
|
return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.EventStream).Ascending(x => x.EventsVersion), new CreateIndexOptions { Unique = true }); |
||||
|
} |
||||
|
|
||||
|
public IObservable<EventData> GetEventsAsync(string streamName) |
||||
|
{ |
||||
|
Guard.NotNullOrEmpty(streamName, nameof(streamName)); |
||||
|
|
||||
|
return Observable.Create<EventData>(async (observer, ct) => |
||||
|
{ |
||||
|
try |
||||
|
{ |
||||
|
await Collection.Find(x => x.EventStream == streamName).ForEachAsync(commit => |
||||
|
{ |
||||
|
foreach (var @event in commit.Events) |
||||
|
{ |
||||
|
var eventData = SimpleMapper.Map(@event, new EventData()); |
||||
|
|
||||
|
observer.OnNext(eventData); |
||||
|
} |
||||
|
}, ct); |
||||
|
|
||||
|
observer.OnCompleted(); |
||||
|
} |
||||
|
catch (Exception e) |
||||
|
{ |
||||
|
observer.OnError(e); |
||||
|
} |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public IObservable<EventData> GetEventsAsync() |
||||
|
{ |
||||
|
return Observable.Create<EventData>(async (observer, ct) => |
||||
|
{ |
||||
|
try |
||||
|
{ |
||||
|
await Collection.Find(new BsonDocument()).ForEachAsync(commit => |
||||
|
{ |
||||
|
foreach (var @event in commit.Events) |
||||
|
{ |
||||
|
var eventData = SimpleMapper.Map(@event, new EventData()); |
||||
|
|
||||
|
observer.OnNext(eventData); |
||||
|
} |
||||
|
}, ct); |
||||
|
|
||||
|
observer.OnCompleted(); |
||||
|
} |
||||
|
catch (Exception e) |
||||
|
{ |
||||
|
observer.OnError(e); |
||||
|
} |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable<EventData> events) |
||||
|
{ |
||||
|
var allCommits = |
||||
|
await Collection.Find(c => c.EventStream == streamName) |
||||
|
.Project<BsonDocument>(Projection.Include(x => x.EventCount)) |
||||
|
.ToListAsync(); |
||||
|
|
||||
|
var currentVersion = allCommits.Sum(x => x["EventCount"].ToInt32()) - 1; |
||||
|
if (currentVersion != expectedVersion) |
||||
|
{ |
||||
|
throw new InvalidOperationException($"Current version: {currentVersion}, expected version: {expectedVersion}"); |
||||
|
} |
||||
|
|
||||
|
var now = DateTime.UtcNow; |
||||
|
|
||||
|
var commit = new MongoEventCommit |
||||
|
{ |
||||
|
Id = commitId, |
||||
|
Events = events.Select(x => SimpleMapper.Map(x, new MongoEvent())).ToList(), |
||||
|
EventStream = streamName, |
||||
|
EventsVersion = expectedVersion, |
||||
|
Timestamp = now |
||||
|
}; |
||||
|
|
||||
|
if (commit.Events.Any()) |
||||
|
{ |
||||
|
commit.EventCount = commit.Events.Count; |
||||
|
|
||||
|
await Collection.InsertOneAsync(commit); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,26 @@ |
|||||
|
// ==========================================================================
|
||||
|
// AssemblyInfo.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System.Reflection; |
||||
|
using System.Runtime.InteropServices; |
||||
|
|
||||
|
// General Information about an assembly is controlled through the following
|
||||
|
// set of attributes. Change these attribute values to modify the information
|
||||
|
// associated with an assembly.
|
||||
|
[assembly: AssemblyConfiguration("")] |
||||
|
[assembly: AssemblyCompany("")] |
||||
|
[assembly: AssemblyProduct("Squidex.Infrastructure.MongoDb")] |
||||
|
[assembly: AssemblyTrademark("")] |
||||
|
|
||||
|
// Setting ComVisible to false makes the types in this assembly not visible
|
||||
|
// to COM components. If you need to access a type in this assembly from
|
||||
|
// COM, set the ComVisible attribute to true on that type.
|
||||
|
[assembly: ComVisible(false)] |
||||
|
|
||||
|
// The following GUID is for the ID of the typelib if this project is exposed to COM
|
||||
|
[assembly: Guid("6a811927-3c37-430a-90f4-503e37123956")] |
||||
@ -0,0 +1,24 @@ |
|||||
|
<?xml version="1.0" encoding="utf-8"?> |
||||
|
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> |
||||
|
<PropertyGroup> |
||||
|
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion> |
||||
|
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath> |
||||
|
</PropertyGroup> |
||||
|
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" /> |
||||
|
<PropertyGroup Label="Globals"> |
||||
|
<ProjectGuid>6a811927-3c37-430a-90f4-503e37123956</ProjectGuid> |
||||
|
<RootNamespace>Squidex.Infrastructure.MongoDb</RootNamespace> |
||||
|
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath> |
||||
|
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath> |
||||
|
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion> |
||||
|
</PropertyGroup> |
||||
|
<PropertyGroup> |
||||
|
<SchemaVersion>2.0</SchemaVersion> |
||||
|
</PropertyGroup> |
||||
|
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" /> |
||||
|
<ProjectExtensions> |
||||
|
<VisualStudio> |
||||
|
<UserProperties project_1json__JSONSchema="http://json.schemastore.org/project-1.0.0-beta6" /> |
||||
|
</VisualStudio> |
||||
|
</ProjectExtensions> |
||||
|
</Project> |
||||
@ -0,0 +1,31 @@ |
|||||
|
{ |
||||
|
"version": "1.0.0-*", |
||||
|
"dependencies": { |
||||
|
"Autofac": "4.2.1", |
||||
|
"Microsoft.Extensions.Logging": "1.1.0", |
||||
|
"Microsoft.NETCore.App": "1.1.0", |
||||
|
"MongoDB.Driver": "2.4.1", |
||||
|
"NETStandard.Library": "1.6.1", |
||||
|
"Newtonsoft.Json": "9.0.2-beta1", |
||||
|
"NodaTime": "2.0.0-alpha20160729", |
||||
|
"Squidex.Infrastructure": "1.0.0-*", |
||||
|
"System.Linq": "4.3.0", |
||||
|
"System.Reactive": "3.1.1", |
||||
|
"System.Reflection.TypeExtensions": "4.3.0", |
||||
|
"System.Security.Claims": "4.3.0" |
||||
|
}, |
||||
|
"frameworks": { |
||||
|
"netcoreapp1.0": { |
||||
|
"dependencies": { |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"buildOptions": { |
||||
|
"embed": [ |
||||
|
"*.csv" |
||||
|
] |
||||
|
}, |
||||
|
"tooling": { |
||||
|
"defaultNamespace": "Squidex.Infrastructure.MongoDb" |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,19 @@ |
|||||
|
// ==========================================================================
|
||||
|
// InfrastructureErrors.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using Microsoft.Extensions.Logging; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.RabbitMq |
||||
|
{ |
||||
|
public class InfrastructureErrors |
||||
|
{ |
||||
|
public static readonly EventId EventHandlingFailed = new EventId(10001, "EventHandlingFailed"); |
||||
|
|
||||
|
public static readonly EventId EventDeserializationFailed = new EventId(10002, "EventDeserializationFailed"); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,26 @@ |
|||||
|
// ==========================================================================
|
||||
|
// AssemblyInfo.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System.Reflection; |
||||
|
using System.Runtime.InteropServices; |
||||
|
|
||||
|
// General Information about an assembly is controlled through the following
|
||||
|
// set of attributes. Change these attribute values to modify the information
|
||||
|
// associated with an assembly.
|
||||
|
[assembly: AssemblyConfiguration("")] |
||||
|
[assembly: AssemblyCompany("")] |
||||
|
[assembly: AssemblyProduct("Squidex.Infrastructure.RabbitMq")] |
||||
|
[assembly: AssemblyTrademark("")] |
||||
|
|
||||
|
// Setting ComVisible to false makes the types in this assembly not visible
|
||||
|
// to COM components. If you need to access a type in this assembly from
|
||||
|
// COM, set the ComVisible attribute to true on that type.
|
||||
|
[assembly: ComVisible(false)] |
||||
|
|
||||
|
// The following GUID is for the ID of the typelib if this project is exposed to COM
|
||||
|
[assembly: Guid("3c9ba12d-f5f2-4355-8d30-8289e4d0752d")] |
||||
@ -0,0 +1,80 @@ |
|||||
|
// ==========================================================================
|
||||
|
// EventChannel.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Text; |
||||
|
using Newtonsoft.Json; |
||||
|
using RabbitMQ.Client; |
||||
|
using RabbitMQ.Client.Events; |
||||
|
using Squidex.Infrastructure.CQRS.Events; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.RabbitMq |
||||
|
{ |
||||
|
public sealed class RabbitMqEventChannel : DisposableObject, IEventPublisher, IEventStream |
||||
|
{ |
||||
|
private const string Exchange = "Squidex"; |
||||
|
private readonly Lazy<IModel> currentChannel; |
||||
|
|
||||
|
public RabbitMqEventChannel(IConnectionFactory connectionFactory) |
||||
|
{ |
||||
|
Guard.NotNull(connectionFactory, nameof(connectionFactory)); |
||||
|
|
||||
|
currentChannel = new Lazy<IModel>(() => Connect(connectionFactory)); |
||||
|
} |
||||
|
|
||||
|
protected override void DisposeObject(bool disposing) |
||||
|
{ |
||||
|
if (currentChannel.IsValueCreated) |
||||
|
{ |
||||
|
currentChannel.Value.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void Publish(EventData events) |
||||
|
{ |
||||
|
ThrowIfDisposed(); |
||||
|
|
||||
|
var channel = currentChannel.Value; |
||||
|
|
||||
|
channel.BasicPublish(Exchange, string.Empty, null, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(events))); |
||||
|
} |
||||
|
|
||||
|
public void Connect(string queueName, Action<EventData> received) |
||||
|
{ |
||||
|
ThrowIfDisposed(); |
||||
|
|
||||
|
var channel = currentChannel.Value; |
||||
|
|
||||
|
queueName = $"{queueName}_{Environment.MachineName}"; |
||||
|
|
||||
|
channel.QueueDeclare(queueName, true, false, false); |
||||
|
channel.QueueBind(queueName, Exchange, string.Empty); |
||||
|
|
||||
|
var consumer = new EventingBasicConsumer(channel); |
||||
|
|
||||
|
consumer.Received += (model, e) => |
||||
|
{ |
||||
|
var eventData = JsonConvert.DeserializeObject<EventData>(Encoding.UTF8.GetString(e.Body)); |
||||
|
|
||||
|
received(eventData); |
||||
|
}; |
||||
|
|
||||
|
channel.BasicConsume(queueName, false, consumer); |
||||
|
} |
||||
|
|
||||
|
private static IModel Connect(IConnectionFactory connectionFactory) |
||||
|
{ |
||||
|
var connection = connectionFactory.CreateConnection(); |
||||
|
var channel = connection.CreateModel(); |
||||
|
|
||||
|
channel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true); |
||||
|
|
||||
|
return channel; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,21 @@ |
|||||
|
<?xml version="1.0" encoding="utf-8"?> |
||||
|
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> |
||||
|
<PropertyGroup> |
||||
|
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion> |
||||
|
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.Props" Condition="'$(VSToolsPath)' != ''" /> |
||||
|
<PropertyGroup Label="Globals"> |
||||
|
<ProjectGuid>3c9ba12d-f5f2-4355-8d30-8289e4d0752d</ProjectGuid> |
||||
|
<RootNamespace>Squidex.Infrastructure.RabbitMq</RootNamespace> |
||||
|
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">.\obj</BaseIntermediateOutputPath> |
||||
|
<OutputPath Condition="'$(OutputPath)'=='' ">.\bin\</OutputPath> |
||||
|
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion> |
||||
|
</PropertyGroup> |
||||
|
|
||||
|
<PropertyGroup> |
||||
|
<SchemaVersion>2.0</SchemaVersion> |
||||
|
</PropertyGroup> |
||||
|
<Import Project="$(VSToolsPath)\DotNet\Microsoft.DotNet.targets" Condition="'$(VSToolsPath)' != ''" /> |
||||
|
</Project> |
||||
@ -0,0 +1,31 @@ |
|||||
|
{ |
||||
|
"version": "1.0.0-*", |
||||
|
"dependencies": { |
||||
|
"Autofac": "4.2.1", |
||||
|
"Microsoft.Extensions.Logging": "1.1.0", |
||||
|
"Microsoft.NETCore.App": "1.1.0", |
||||
|
"NETStandard.Library": "1.6.1", |
||||
|
"Newtonsoft.Json": "9.0.2-beta1", |
||||
|
"NodaTime": "2.0.0-alpha20160729", |
||||
|
"RabbitMQ.Client": "5.0.0-pre2", |
||||
|
"Squidex.Infrastructure": "1.0.0-*", |
||||
|
"System.Linq": "4.3.0", |
||||
|
"System.Reactive": "3.1.1", |
||||
|
"System.Reflection.TypeExtensions": "4.3.0", |
||||
|
"System.Security.Claims": "4.3.0" |
||||
|
}, |
||||
|
"frameworks": { |
||||
|
"netcoreapp1.0": { |
||||
|
"dependencies": { |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
"buildOptions": { |
||||
|
"embed": [ |
||||
|
"*.csv" |
||||
|
] |
||||
|
}, |
||||
|
"tooling": { |
||||
|
"defaultNamespace": "Squidex.Infrastructure.RabbitMq" |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,96 @@ |
|||||
|
// ==========================================================================
|
||||
|
// DefaultDomainObjectRepository.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Reactive.Linq; |
||||
|
using System.Threading.Tasks; |
||||
|
using Squidex.Infrastructure.CQRS.Events; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.CQRS.Commands |
||||
|
{ |
||||
|
public sealed class DefaultDomainObjectRepository : IDomainObjectRepository |
||||
|
{ |
||||
|
private readonly IStreamNameResolver nameResolver; |
||||
|
private readonly IDomainObjectFactory factory; |
||||
|
private readonly IEventStore eventStore; |
||||
|
private readonly IEventPublisher eventPublisher; |
||||
|
private readonly EventDataFormatter formatter; |
||||
|
|
||||
|
public DefaultDomainObjectRepository( |
||||
|
IDomainObjectFactory factory, |
||||
|
IEventStore eventStore, |
||||
|
IEventPublisher eventPublisher, |
||||
|
IStreamNameResolver nameResolver, |
||||
|
EventDataFormatter formatter) |
||||
|
{ |
||||
|
Guard.NotNull(factory, nameof(factory)); |
||||
|
Guard.NotNull(formatter, nameof(formatter)); |
||||
|
Guard.NotNull(eventStore, nameof(eventStore)); |
||||
|
Guard.NotNull(eventPublisher, nameof(eventPublisher)); |
||||
|
Guard.NotNull(nameResolver, nameof(nameResolver)); |
||||
|
|
||||
|
this.factory = factory; |
||||
|
this.eventStore = eventStore; |
||||
|
this.formatter = formatter; |
||||
|
this.eventPublisher = eventPublisher; |
||||
|
this.nameResolver = nameResolver; |
||||
|
} |
||||
|
|
||||
|
public async Task<TDomainObject> GetByIdAsync<TDomainObject>(Guid id, int version = int.MaxValue) where TDomainObject : class, IAggregate |
||||
|
{ |
||||
|
Guard.GreaterThan(version, 0, nameof(version)); |
||||
|
|
||||
|
var streamName = nameResolver.GetStreamName(typeof(TDomainObject), id); |
||||
|
|
||||
|
var domainObject = (TDomainObject)factory.CreateNew(typeof(TDomainObject), id); |
||||
|
|
||||
|
var events = await eventStore.GetEventsAsync(streamName).ToList(); |
||||
|
|
||||
|
if (events.Count == 0) |
||||
|
{ |
||||
|
throw new DomainObjectNotFoundException(id.ToString(), typeof(TDomainObject)); |
||||
|
} |
||||
|
|
||||
|
foreach (var eventData in events) |
||||
|
{ |
||||
|
var envelope = formatter.Parse(eventData); |
||||
|
|
||||
|
domainObject.ApplyEvent(envelope); |
||||
|
} |
||||
|
|
||||
|
if (domainObject.Version != version && version < int.MaxValue) |
||||
|
{ |
||||
|
throw new DomainObjectVersionException(id.ToString(), typeof(TDomainObject), domainObject.Version, version); |
||||
|
} |
||||
|
|
||||
|
return domainObject; |
||||
|
} |
||||
|
|
||||
|
public async Task SaveAsync(IAggregate domainObject, ICollection<Envelope<IEvent>> events, Guid commitId) |
||||
|
{ |
||||
|
Guard.NotNull(domainObject, nameof(domainObject)); |
||||
|
|
||||
|
var streamName = nameResolver.GetStreamName(domainObject.GetType(), domainObject.Id); |
||||
|
|
||||
|
var versionCurrent = domainObject.Version; |
||||
|
var versionBefore = versionCurrent - events.Count; |
||||
|
var versionExpected = versionBefore == 0 ? -1 : versionBefore - 1; |
||||
|
|
||||
|
var eventsToSave = events.Select(x => formatter.ToEventData(x, commitId)).ToList(); |
||||
|
|
||||
|
await eventStore.AppendEventsAsync(commitId, streamName, versionExpected, eventsToSave); |
||||
|
|
||||
|
foreach (var eventData in eventsToSave) |
||||
|
{ |
||||
|
eventPublisher.Publish(eventData); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -1,250 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// EventStoreBus.cs
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex Group
|
|
||||
// All rights reserved.
|
|
||||
// ==========================================================================
|
|
||||
|
|
||||
using System; |
|
||||
using System.Collections.Generic; |
|
||||
using System.Linq; |
|
||||
using System.Reflection; |
|
||||
using System.Threading.Tasks; |
|
||||
using EventStore.ClientAPI; |
|
||||
using EventStore.ClientAPI.SystemData; |
|
||||
using Microsoft.Extensions.Logging; |
|
||||
using Squidex.Infrastructure.CQRS.Events; |
|
||||
// ReSharper disable InvertIf
|
|
||||
|
|
||||
namespace Squidex.Infrastructure.CQRS.EventStore |
|
||||
{ |
|
||||
public sealed class EventStoreBus : IDisposable |
|
||||
{ |
|
||||
private readonly IEventStoreConnection connection; |
|
||||
private readonly UserCredentials credentials; |
|
||||
private readonly EventStoreFormatter formatter; |
|
||||
private readonly IEnumerable<ILiveEventConsumer> liveConsumers; |
|
||||
private readonly IEnumerable<ICatchEventConsumer> catchConsumers; |
|
||||
private readonly ILogger<EventStoreBus> logger; |
|
||||
private readonly IStreamPositionStorage positions; |
|
||||
private readonly List<EventStoreCatchUpSubscription> catchSubscriptions = new List<EventStoreCatchUpSubscription>(); |
|
||||
private EventStoreSubscription liveSubscription; |
|
||||
private string streamName; |
|
||||
private bool isSubscribed; |
|
||||
|
|
||||
public EventStoreBus( |
|
||||
ILogger<EventStoreBus> logger, |
|
||||
IEnumerable<ILiveEventConsumer> liveConsumers, |
|
||||
IEnumerable<ICatchEventConsumer> catchConsumers, |
|
||||
IStreamPositionStorage positions, |
|
||||
IEventStoreConnection connection, |
|
||||
UserCredentials credentials, |
|
||||
EventStoreFormatter formatter) |
|
||||
{ |
|
||||
Guard.NotNull(logger, nameof(logger)); |
|
||||
Guard.NotNull(formatter, nameof(formatter)); |
|
||||
Guard.NotNull(positions, nameof(positions)); |
|
||||
Guard.NotNull(connection, nameof(connection)); |
|
||||
Guard.NotNull(credentials, nameof(credentials)); |
|
||||
Guard.NotNull(liveConsumers, nameof(liveConsumers)); |
|
||||
Guard.NotNull(catchConsumers, nameof(catchConsumers)); |
|
||||
|
|
||||
this.logger = logger; |
|
||||
this.formatter = formatter; |
|
||||
this.positions = positions; |
|
||||
this.connection = connection; |
|
||||
this.credentials = credentials; |
|
||||
this.liveConsumers = liveConsumers; |
|
||||
this.catchConsumers = catchConsumers; |
|
||||
} |
|
||||
|
|
||||
public void Dispose() |
|
||||
{ |
|
||||
lock (catchSubscriptions) |
|
||||
{ |
|
||||
foreach (var catchSubscription in catchSubscriptions) |
|
||||
{ |
|
||||
catchSubscription.Stop(TimeSpan.FromMinutes(1)); |
|
||||
} |
|
||||
|
|
||||
liveSubscription.Unsubscribe(); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
public void Subscribe(string streamToConnect = "$all") |
|
||||
{ |
|
||||
Guard.NotNullOrEmpty(streamToConnect, nameof(streamToConnect)); |
|
||||
|
|
||||
if (isSubscribed) |
|
||||
{ |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
streamName = streamToConnect; |
|
||||
|
|
||||
SubscribeLive(); |
|
||||
SubscribeCatch(); |
|
||||
|
|
||||
isSubscribed = true; |
|
||||
} |
|
||||
|
|
||||
private void SubscribeLive() |
|
||||
{ |
|
||||
Task.Run(async () => |
|
||||
{ |
|
||||
liveSubscription = |
|
||||
await connection.SubscribeToStreamAsync(streamName, true, |
|
||||
(subscription, resolvedEvent) => |
|
||||
{ |
|
||||
OnLiveEvent(resolvedEvent); |
|
||||
}, (subscription, dropped, ex) => |
|
||||
{ |
|
||||
OnConnectionDropped(); |
|
||||
}, credentials); |
|
||||
}).Wait(); |
|
||||
} |
|
||||
|
|
||||
private void OnConnectionDropped() |
|
||||
{ |
|
||||
try |
|
||||
{ |
|
||||
liveSubscription.Close(); |
|
||||
|
|
||||
logger.LogError("Subscription closed"); |
|
||||
} |
|
||||
finally |
|
||||
{ |
|
||||
SubscribeLive(); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private void SubscribeCatch() |
|
||||
{ |
|
||||
foreach (var catchConsumer in catchConsumers) |
|
||||
{ |
|
||||
SubscribeCatchFor(catchConsumer); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private void SubscribeCatchFor(IEventConsumer consumer) |
|
||||
{ |
|
||||
var subscriptionName = consumer.GetType().GetTypeInfo().Name; |
|
||||
|
|
||||
var position = positions.ReadPosition(subscriptionName); |
|
||||
|
|
||||
logger.LogInformation("[{0}]: Subscribing from {0}", consumer, position ?? 0); |
|
||||
|
|
||||
var settings = |
|
||||
new CatchUpSubscriptionSettings( |
|
||||
int.MaxValue, 4096, |
|
||||
true, |
|
||||
true); |
|
||||
|
|
||||
var catchSubscription = |
|
||||
connection.SubscribeToStreamFrom(streamName, position, settings, |
|
||||
(subscription, resolvedEvent) => |
|
||||
{ |
|
||||
OnCatchEvent(consumer, resolvedEvent, subscriptionName, subscription); |
|
||||
}, userCredentials: credentials); |
|
||||
|
|
||||
lock (catchSubscriptions) |
|
||||
{ |
|
||||
catchSubscriptions.Add(catchSubscription); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private void OnLiveEvent(ResolvedEvent resolvedEvent) |
|
||||
{ |
|
||||
Envelope<IEvent> @event = null; |
|
||||
|
|
||||
try |
|
||||
{ |
|
||||
@event = formatter.Parse(new EventWrapper(resolvedEvent)); |
|
||||
} |
|
||||
catch (Exception ex) |
|
||||
{ |
|
||||
logger.LogError(InfrastructureErrors.EventDeserializationFailed, ex, |
|
||||
"[LiveConsumers]: Failed to deserialize event {0}#{1}", streamName, |
|
||||
resolvedEvent.OriginalEventNumber); |
|
||||
} |
|
||||
|
|
||||
if (@event != null) |
|
||||
{ |
|
||||
DispatchConsumers(liveConsumers, @event).Wait(); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private void OnCatchEvent(IEventConsumer consumer, ResolvedEvent resolvedEvent, string subscriptionName, EventStoreCatchUpSubscription subscription) |
|
||||
{ |
|
||||
if (resolvedEvent.OriginalEvent.EventStreamId.StartsWith("$", StringComparison.OrdinalIgnoreCase)) |
|
||||
{ |
|
||||
return; |
|
||||
} |
|
||||
|
|
||||
var isFailed = false; |
|
||||
|
|
||||
Envelope<IEvent> @event = null; |
|
||||
|
|
||||
try |
|
||||
{ |
|
||||
@event = formatter.Parse(new EventWrapper(resolvedEvent)); |
|
||||
} |
|
||||
catch (Exception ex) |
|
||||
{ |
|
||||
logger.LogError(InfrastructureErrors.EventDeserializationFailed, ex, |
|
||||
"[{consumer}]: Failed to deserialize event {1}#{2}", consumer, streamName, |
|
||||
resolvedEvent.OriginalEventNumber); |
|
||||
|
|
||||
isFailed = true; |
|
||||
} |
|
||||
|
|
||||
if (@event != null) |
|
||||
{ |
|
||||
try |
|
||||
{ |
|
||||
logger.LogInformation("Received event {0} ({1})", @event.Payload.GetType().Name, @event.Headers.AggregateId()); |
|
||||
|
|
||||
consumer.On(@event).Wait(); |
|
||||
|
|
||||
positions.WritePosition(subscriptionName, resolvedEvent.OriginalEventNumber); |
|
||||
} |
|
||||
catch (Exception ex) |
|
||||
{ |
|
||||
logger.LogError(InfrastructureErrors.EventHandlingFailed, ex, |
|
||||
"[{0}]: Failed to handle event {1} ({2})", consumer, |
|
||||
@event.Payload, |
|
||||
@event.Headers.EventId()); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
if (isFailed) |
|
||||
{ |
|
||||
lock (catchSubscriptions) |
|
||||
{ |
|
||||
subscription.Stop(); |
|
||||
|
|
||||
catchSubscriptions.Remove(subscription); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private Task DispatchConsumers(IEnumerable<IEventConsumer> consumers, Envelope<IEvent> @event) |
|
||||
{ |
|
||||
return Task.WhenAll(consumers.Select(c => DispatchConsumer(@event, c)).ToList()); |
|
||||
} |
|
||||
|
|
||||
private async Task DispatchConsumer(Envelope<IEvent> @event, IEventConsumer consumer) |
|
||||
{ |
|
||||
try |
|
||||
{ |
|
||||
await consumer.On(@event); |
|
||||
} |
|
||||
catch (Exception ex) |
|
||||
{ |
|
||||
logger.LogError(InfrastructureErrors.EventHandlingFailed, ex, |
|
||||
"[{0}]: Failed to handle event {1} ({2})", consumer, @event.Payload, @event.Headers.EventId()); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -1,152 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// EventStoreDomainObjectRepository.cs
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex Group
|
|
||||
// All rights reserved.
|
|
||||
// ==========================================================================
|
|
||||
|
|
||||
using System; |
|
||||
using System.Collections.Generic; |
|
||||
using System.Diagnostics; |
|
||||
using System.Linq; |
|
||||
using System.Threading.Tasks; |
|
||||
using EventStore.ClientAPI; |
|
||||
using EventStore.ClientAPI.SystemData; |
|
||||
using Squidex.Infrastructure.CQRS.Commands; |
|
||||
using Squidex.Infrastructure.CQRS.Events; |
|
||||
|
|
||||
// ReSharper disable RedundantAssignment
|
|
||||
// ReSharper disable ConvertIfStatementToSwitchStatement
|
|
||||
// ReSharper disable TooWideLocalVariableScope
|
|
||||
|
|
||||
namespace Squidex.Infrastructure.CQRS.EventStore |
|
||||
{ |
|
||||
public sealed class EventStoreDomainObjectRepository : IDomainObjectRepository |
|
||||
{ |
|
||||
private const int WritePageSize = 500; |
|
||||
private const int ReadPageSize = 500; |
|
||||
private readonly IEventStoreConnection connection; |
|
||||
private readonly IStreamNameResolver nameResolver; |
|
||||
private readonly IDomainObjectFactory factory; |
|
||||
private readonly UserCredentials credentials; |
|
||||
private readonly EventStoreFormatter formatter; |
|
||||
|
|
||||
public EventStoreDomainObjectRepository( |
|
||||
IDomainObjectFactory factory, |
|
||||
IStreamNameResolver nameResolver, |
|
||||
IEventStoreConnection connection, |
|
||||
UserCredentials credentials, |
|
||||
EventStoreFormatter formatter) |
|
||||
{ |
|
||||
Guard.NotNull(factory, nameof(factory)); |
|
||||
Guard.NotNull(formatter, nameof(formatter)); |
|
||||
Guard.NotNull(connection, nameof(connection)); |
|
||||
Guard.NotNull(credentials, nameof(credentials)); |
|
||||
Guard.NotNull(nameResolver, nameof(nameResolver)); |
|
||||
|
|
||||
this.factory = factory; |
|
||||
this.formatter = formatter; |
|
||||
this.connection = connection; |
|
||||
this.credentials = credentials; |
|
||||
this.nameResolver = nameResolver; |
|
||||
} |
|
||||
|
|
||||
public async Task<TDomainObject> GetByIdAsync<TDomainObject>(Guid id, int version = int.MaxValue) where TDomainObject : class, IAggregate |
|
||||
{ |
|
||||
Guard.GreaterThan(version, 0, nameof(version)); |
|
||||
|
|
||||
var streamName = nameResolver.GetStreamName(typeof(TDomainObject), id); |
|
||||
|
|
||||
var domainObject = (TDomainObject)factory.CreateNew(typeof(TDomainObject), id); |
|
||||
|
|
||||
var sliceStart = 0; |
|
||||
var sliceCount = 0; |
|
||||
|
|
||||
StreamEventsSlice currentSlice; |
|
||||
do |
|
||||
{ |
|
||||
sliceCount = sliceStart + ReadPageSize <= version ? ReadPageSize : version - sliceStart + 1; |
|
||||
|
|
||||
currentSlice = await connection.ReadStreamEventsForwardAsync(streamName, sliceStart, sliceCount, false, credentials); |
|
||||
|
|
||||
if (currentSlice.Status == SliceReadStatus.StreamNotFound) |
|
||||
{ |
|
||||
throw new DomainObjectNotFoundException(id.ToString(), typeof(TDomainObject)); |
|
||||
} |
|
||||
|
|
||||
if (currentSlice.Status == SliceReadStatus.StreamDeleted) |
|
||||
{ |
|
||||
throw new DomainObjectDeletedException(id.ToString(), typeof(TDomainObject)); |
|
||||
} |
|
||||
|
|
||||
sliceStart = currentSlice.NextEventNumber; |
|
||||
|
|
||||
foreach (var resolved in currentSlice.Events) |
|
||||
{ |
|
||||
var envelope = formatter.Parse(new EventWrapper(resolved)); |
|
||||
|
|
||||
domainObject.ApplyEvent(envelope); |
|
||||
} |
|
||||
} |
|
||||
while (version >= currentSlice.NextEventNumber && !currentSlice.IsEndOfStream); |
|
||||
|
|
||||
if (domainObject.Version != version && version < int.MaxValue) |
|
||||
{ |
|
||||
throw new DomainObjectVersionException(id.ToString(), typeof(TDomainObject), domainObject.Version, version); |
|
||||
} |
|
||||
|
|
||||
return domainObject; |
|
||||
} |
|
||||
|
|
||||
public async Task SaveAsync(IAggregate domainObject, ICollection<Envelope<IEvent>> events, Guid commitId) |
|
||||
{ |
|
||||
Guard.NotNull(domainObject, nameof(domainObject)); |
|
||||
|
|
||||
var streamName = nameResolver.GetStreamName(domainObject.GetType(), domainObject.Id); |
|
||||
|
|
||||
var versionCurrent = domainObject.Version; |
|
||||
var versionBefore = versionCurrent - events.Count; |
|
||||
var versionExpected = versionBefore == 0 ? ExpectedVersion.NoStream : versionBefore - 1; |
|
||||
|
|
||||
var eventsToSave = events.Select(x => formatter.ToEventData(x, commitId)).ToList(); |
|
||||
|
|
||||
await InsertEventsAsync(streamName, versionExpected, eventsToSave); |
|
||||
|
|
||||
domainObject.ClearUncommittedEvents(); |
|
||||
} |
|
||||
|
|
||||
private async Task InsertEventsAsync(string streamName, int expectedVersion, IReadOnlyCollection<EventData> eventsToSave) |
|
||||
{ |
|
||||
if (eventsToSave.Count > 0) |
|
||||
{ |
|
||||
if (eventsToSave.Count < WritePageSize) |
|
||||
{ |
|
||||
await connection.AppendToStreamAsync(streamName, expectedVersion, eventsToSave, credentials); |
|
||||
} |
|
||||
else |
|
||||
{ |
|
||||
var transaction = await connection.StartTransactionAsync(streamName, expectedVersion, credentials); |
|
||||
|
|
||||
try |
|
||||
{ |
|
||||
for (var p = 0; p < eventsToSave.Count; p += WritePageSize) |
|
||||
{ |
|
||||
await transaction.WriteAsync(eventsToSave.Skip(p).Take(WritePageSize)); |
|
||||
} |
|
||||
|
|
||||
await transaction.CommitAsync(); |
|
||||
} |
|
||||
finally |
|
||||
{ |
|
||||
transaction.Dispose(); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
else |
|
||||
{ |
|
||||
Debug.WriteLine($"No events to insert for: {streamName}", "GetEventStoreRepository"); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -1,66 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// EventStoreFormatter.cs
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex Group
|
|
||||
// All rights reserved.
|
|
||||
// ==========================================================================
|
|
||||
|
|
||||
using System; |
|
||||
using System.Text; |
|
||||
using EventStore.ClientAPI; |
|
||||
using Newtonsoft.Json; |
|
||||
using NodaTime; |
|
||||
using Squidex.Infrastructure.CQRS.Events; |
|
||||
|
|
||||
// ReSharper disable InconsistentNaming
|
|
||||
|
|
||||
namespace Squidex.Infrastructure.CQRS.EventStore |
|
||||
{ |
|
||||
public class EventStoreFormatter |
|
||||
{ |
|
||||
private readonly JsonSerializerSettings serializerSettings; |
|
||||
|
|
||||
public EventStoreFormatter(JsonSerializerSettings serializerSettings = null) |
|
||||
{ |
|
||||
this.serializerSettings = serializerSettings ?? new JsonSerializerSettings(); |
|
||||
} |
|
||||
|
|
||||
public Envelope<IEvent> Parse(IReceivedEvent @event) |
|
||||
{ |
|
||||
var headers = ReadJson<PropertiesBag>(@event.Metadata); |
|
||||
|
|
||||
var eventType = TypeNameRegistry.GetType(@event.EventType); |
|
||||
var eventData = ReadJson<IEvent>(@event.Payload, eventType); |
|
||||
|
|
||||
var envelope = new Envelope<IEvent>(eventData, headers); |
|
||||
|
|
||||
envelope.Headers.Set(CommonHeaders.Timestamp, Instant.FromDateTimeUtc(DateTime.SpecifyKind(@event.Created, DateTimeKind.Utc))); |
|
||||
envelope.Headers.Set(CommonHeaders.EventNumber, @event.EventNumber); |
|
||||
|
|
||||
return envelope; |
|
||||
} |
|
||||
|
|
||||
public EventData ToEventData(Envelope<IEvent> envelope, Guid commitId) |
|
||||
{ |
|
||||
var eventType = TypeNameRegistry.GetName(envelope.Payload.GetType()); |
|
||||
|
|
||||
envelope.Headers.Set(CommonHeaders.CommitId, commitId); |
|
||||
|
|
||||
var headers = WriteJson(envelope.Headers); |
|
||||
var content = WriteJson(envelope.Payload); |
|
||||
|
|
||||
return new EventData(envelope.Headers.EventId(), eventType, true, content, headers); |
|
||||
} |
|
||||
|
|
||||
private T ReadJson<T>(byte[] data, Type type = null) |
|
||||
{ |
|
||||
return (T)JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), type ?? typeof(T), serializerSettings); |
|
||||
} |
|
||||
|
|
||||
private byte[] WriteJson(object value) |
|
||||
{ |
|
||||
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(value, serializerSettings)); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -1,48 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// EventWrapper.cs
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex Group
|
|
||||
// All rights reserved.
|
|
||||
// ==========================================================================
|
|
||||
|
|
||||
using System; |
|
||||
using EventStore.ClientAPI; |
|
||||
|
|
||||
namespace Squidex.Infrastructure.CQRS.EventStore |
|
||||
{ |
|
||||
internal sealed class EventWrapper : IReceivedEvent |
|
||||
{ |
|
||||
private readonly ResolvedEvent @event; |
|
||||
|
|
||||
public int EventNumber |
|
||||
{ |
|
||||
get { return @event.OriginalEventNumber; } |
|
||||
} |
|
||||
|
|
||||
public string EventType |
|
||||
{ |
|
||||
get { return @event.Event.EventType; } |
|
||||
} |
|
||||
|
|
||||
public byte[] Metadata |
|
||||
{ |
|
||||
get { return @event.Event.Metadata; } |
|
||||
} |
|
||||
|
|
||||
public byte[] Payload |
|
||||
{ |
|
||||
get { return @event.Event.Data; } |
|
||||
} |
|
||||
|
|
||||
public DateTime Created |
|
||||
{ |
|
||||
get { return @event.Event.Created; } |
|
||||
} |
|
||||
|
|
||||
public EventWrapper(ResolvedEvent @event) |
|
||||
{ |
|
||||
this.@event = @event; |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -1,16 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// IStreamPositionStorage.cs
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex Group
|
|
||||
// All rights reserved.
|
|
||||
// ==========================================================================
|
|
||||
namespace Squidex.Infrastructure.CQRS.EventStore |
|
||||
{ |
|
||||
public interface IStreamPositionStorage |
|
||||
{ |
|
||||
int? ReadPosition(string subscriptionName); |
|
||||
|
|
||||
void WritePosition(string subscriptionName, int position); |
|
||||
} |
|
||||
} |
|
||||
@ -0,0 +1,116 @@ |
|||||
|
// ==========================================================================
|
||||
|
// EventBus.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Threading.Tasks; |
||||
|
using Microsoft.Extensions.Logging; |
||||
|
using NodaTime; |
||||
|
|
||||
|
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
|
||||
|
// ReSharper disable InvertIf
|
||||
|
|
||||
|
namespace Squidex.Infrastructure.CQRS.Events |
||||
|
{ |
||||
|
public sealed class EventBus |
||||
|
{ |
||||
|
private readonly EventDataFormatter formatter; |
||||
|
private readonly IEnumerable<ILiveEventConsumer> liveConsumers; |
||||
|
private readonly IEnumerable<ICatchEventConsumer> catchConsumers; |
||||
|
private readonly IEventStream eventStream; |
||||
|
private readonly ILogger<EventBus> logger; |
||||
|
private bool isSubscribed; |
||||
|
|
||||
|
public EventBus( |
||||
|
ILogger<EventBus> logger, |
||||
|
IEventStream eventStream, |
||||
|
IEnumerable<ILiveEventConsumer> liveConsumers, |
||||
|
IEnumerable<ICatchEventConsumer> catchConsumers, |
||||
|
EventDataFormatter formatter) |
||||
|
{ |
||||
|
Guard.NotNull(logger, nameof(logger)); |
||||
|
Guard.NotNull(formatter, nameof(formatter)); |
||||
|
Guard.NotNull(eventStream, nameof(eventStream)); |
||||
|
Guard.NotNull(liveConsumers, nameof(liveConsumers)); |
||||
|
Guard.NotNull(catchConsumers, nameof(catchConsumers)); |
||||
|
|
||||
|
this.logger = logger; |
||||
|
this.formatter = formatter; |
||||
|
this.eventStream = eventStream; |
||||
|
this.liveConsumers = liveConsumers; |
||||
|
this.catchConsumers = catchConsumers; |
||||
|
} |
||||
|
|
||||
|
public void Subscribe() |
||||
|
{ |
||||
|
if (isSubscribed) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
var startTime = SystemClock.Instance.GetCurrentInstant(); |
||||
|
|
||||
|
eventStream.Connect("squidex", eventData => |
||||
|
{ |
||||
|
var @event = ParseEvent(eventData); |
||||
|
|
||||
|
if (@event == null) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
var isLive = @event.Headers.Timestamp() >= startTime; |
||||
|
|
||||
|
if (isLive) |
||||
|
{ |
||||
|
DispatchConsumers(liveConsumers.OfType<IEventConsumer>().Union(catchConsumers), @event); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
DispatchConsumers(liveConsumers, @event); |
||||
|
} |
||||
|
}); |
||||
|
|
||||
|
isSubscribed = true; |
||||
|
} |
||||
|
|
||||
|
private void DispatchConsumers(IEnumerable<IEventConsumer> consumers, Envelope<IEvent> @event) |
||||
|
{ |
||||
|
Task.WaitAll(consumers.Select(c => DispatchConsumer(@event, c)).ToArray()); |
||||
|
} |
||||
|
|
||||
|
private async Task DispatchConsumer(Envelope<IEvent> @event, IEventConsumer consumer) |
||||
|
{ |
||||
|
try |
||||
|
{ |
||||
|
await consumer.On(@event); |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
logger.LogError(InfrastructureErrors.EventHandlingFailed, ex, "[{0}]: Failed to handle event {1} ({2})", consumer, @event.Payload, @event.Headers.EventId()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private Envelope<IEvent> ParseEvent(EventData eventData) |
||||
|
{ |
||||
|
try |
||||
|
{ |
||||
|
var @event = formatter.Parse(eventData); |
||||
|
|
||||
|
return @event; |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
logger.LogError(InfrastructureErrors.EventDeserializationFailed, ex, "Failed to parse event {0}", eventData.EventId); |
||||
|
|
||||
|
return null; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,23 @@ |
|||||
|
// ==========================================================================
|
||||
|
// EventData.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.CQRS.Events |
||||
|
{ |
||||
|
public class EventData |
||||
|
{ |
||||
|
public Guid EventId { get; set; } |
||||
|
|
||||
|
public string Payload { get; set; } |
||||
|
|
||||
|
public string Metadata { get; set; } |
||||
|
|
||||
|
public string Type { get; set; } |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,59 @@ |
|||||
|
// ==========================================================================
|
||||
|
// EventDataFormatter.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using Newtonsoft.Json; |
||||
|
|
||||
|
// ReSharper disable InconsistentNaming
|
||||
|
|
||||
|
namespace Squidex.Infrastructure.CQRS.Events |
||||
|
{ |
||||
|
public class EventDataFormatter |
||||
|
{ |
||||
|
private readonly JsonSerializerSettings serializerSettings; |
||||
|
|
||||
|
public EventDataFormatter(JsonSerializerSettings serializerSettings = null) |
||||
|
{ |
||||
|
this.serializerSettings = serializerSettings ?? new JsonSerializerSettings(); |
||||
|
} |
||||
|
|
||||
|
public Envelope<IEvent> Parse(EventData eventData) |
||||
|
{ |
||||
|
var headers = ReadJson<PropertiesBag>(eventData.Metadata); |
||||
|
|
||||
|
var eventType = TypeNameRegistry.GetType(eventData.Type); |
||||
|
var eventContent = ReadJson<IEvent>(eventData.Payload, eventType); |
||||
|
|
||||
|
var envelope = new Envelope<IEvent>(eventContent, headers); |
||||
|
|
||||
|
return envelope; |
||||
|
} |
||||
|
|
||||
|
public EventData ToEventData(Envelope<IEvent> envelope, Guid commitId) |
||||
|
{ |
||||
|
var eventType = TypeNameRegistry.GetName(envelope.Payload.GetType()); |
||||
|
|
||||
|
envelope.SetCommitId(commitId); |
||||
|
|
||||
|
var headers = WriteJson(envelope.Headers); |
||||
|
var content = WriteJson(envelope.Payload); |
||||
|
|
||||
|
return new EventData { EventId = envelope.Headers.EventId(), Type = eventType, Payload = content, Metadata = headers }; |
||||
|
} |
||||
|
|
||||
|
private T ReadJson<T>(string data, Type type = null) |
||||
|
{ |
||||
|
return (T)JsonConvert.DeserializeObject(data, type ?? typeof(T), serializerSettings); |
||||
|
} |
||||
|
|
||||
|
private string WriteJson(object value) |
||||
|
{ |
||||
|
return JsonConvert.SerializeObject(value, serializerSettings); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,15 @@ |
|||||
|
// ==========================================================================
|
||||
|
// IEventPublisher.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
namespace Squidex.Infrastructure.CQRS.Events |
||||
|
{ |
||||
|
public interface IEventPublisher |
||||
|
{ |
||||
|
void Publish(EventData events); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,23 @@ |
|||||
|
// ==========================================================================
|
||||
|
// IEventStore.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Threading.Tasks; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.CQRS.Events |
||||
|
{ |
||||
|
public interface IEventStore |
||||
|
{ |
||||
|
IObservable<EventData> GetEventsAsync(); |
||||
|
|
||||
|
IObservable<EventData> GetEventsAsync(string streamName); |
||||
|
|
||||
|
Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable<EventData> events); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,67 @@ |
|||||
|
// ==========================================================================
|
||||
|
// EnumExtensions.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
|
||||
|
namespace Squidex.Infrastructure |
||||
|
{ |
||||
|
public abstract class DisposableObject : IDisposable |
||||
|
{ |
||||
|
private readonly object disposeLock = new object(); |
||||
|
private bool isDisposed; |
||||
|
public bool IsDisposed |
||||
|
{ |
||||
|
get |
||||
|
{ |
||||
|
return isDisposed; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
Dispose(true); |
||||
|
|
||||
|
GC.SuppressFinalize(this); |
||||
|
} |
||||
|
|
||||
|
protected void Dispose(bool disposing) |
||||
|
{ |
||||
|
if (isDisposed) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
if (disposing) |
||||
|
{ |
||||
|
lock (disposeLock) |
||||
|
{ |
||||
|
if (!isDisposed) |
||||
|
{ |
||||
|
DisposeObject(true); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
DisposeObject(false); |
||||
|
} |
||||
|
|
||||
|
isDisposed = true; |
||||
|
} |
||||
|
|
||||
|
protected abstract void DisposeObject(bool disposing); |
||||
|
|
||||
|
protected void ThrowIfDisposed() |
||||
|
{ |
||||
|
if (isDisposed) |
||||
|
{ |
||||
|
throw new ObjectDisposedException(GetType().Name); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -1,59 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// MongoStreamPositionStorage.cs
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex Group
|
|
||||
// All rights reserved.
|
|
||||
// ==========================================================================
|
|
||||
|
|
||||
using System.Threading.Tasks; |
|
||||
using MongoDB.Driver; |
|
||||
using Squidex.Infrastructure; |
|
||||
using Squidex.Infrastructure.CQRS.EventStore; |
|
||||
using Squidex.Store.MongoDb.Utils; |
|
||||
|
|
||||
// ReSharper disable InvertIf
|
|
||||
|
|
||||
namespace Squidex.Store.MongoDb.Infrastructure |
|
||||
{ |
|
||||
public sealed class MongoStreamPositionStorage : MongoRepositoryBase<MongoStreamPositionEntity>, IStreamPositionStorage |
|
||||
{ |
|
||||
public MongoStreamPositionStorage(IMongoDatabase database) |
|
||||
: base(database) |
|
||||
{ |
|
||||
} |
|
||||
|
|
||||
protected override Task SetupCollectionAsync(IMongoCollection<MongoStreamPositionEntity> collection) |
|
||||
{ |
|
||||
return collection.Indexes.CreateOneAsync(IndexKeys.Ascending(x => x.SubscriptionName), new CreateIndexOptions { Unique = true }); |
|
||||
} |
|
||||
|
|
||||
protected override string CollectionName() |
|
||||
{ |
|
||||
return "StreamPositions"; |
|
||||
} |
|
||||
|
|
||||
public int? ReadPosition(string subscriptionName) |
|
||||
{ |
|
||||
Guard.NotNullOrEmpty(subscriptionName, nameof(subscriptionName)); |
|
||||
|
|
||||
var document = Collection.Find(t => t.SubscriptionName == subscriptionName).FirstOrDefault(); |
|
||||
|
|
||||
if (document == null) |
|
||||
{ |
|
||||
document = new MongoStreamPositionEntity { SubscriptionName = subscriptionName }; |
|
||||
|
|
||||
Collection.InsertOne(document); |
|
||||
} |
|
||||
|
|
||||
return document.Position; |
|
||||
} |
|
||||
|
|
||||
public void WritePosition(string subscriptionName, int position) |
|
||||
{ |
|
||||
Guard.NotNullOrEmpty(subscriptionName, nameof(subscriptionName)); |
|
||||
|
|
||||
Collection.UpdateOne(t => t.SubscriptionName == subscriptionName, Update.Set(t => t.Position, position)); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -1,55 +0,0 @@ |
|||||
// ==========================================================================
|
|
||||
// EventStoreModule.cs
|
|
||||
// Squidex Headless CMS
|
|
||||
// ==========================================================================
|
|
||||
// Copyright (c) Squidex Group
|
|
||||
// All rights reserved.
|
|
||||
// ==========================================================================
|
|
||||
|
|
||||
using System.Net; |
|
||||
using Autofac; |
|
||||
using EventStore.ClientAPI; |
|
||||
using EventStore.ClientAPI.SystemData; |
|
||||
using Microsoft.Extensions.Options; |
|
||||
using Squidex.Infrastructure.CQRS.EventStore; |
|
||||
|
|
||||
namespace Squidex.Config.EventStore |
|
||||
{ |
|
||||
public class EventStoreModule : Module |
|
||||
{ |
|
||||
protected override void Load(ContainerBuilder builder) |
|
||||
{ |
|
||||
builder.Register(context => |
|
||||
{ |
|
||||
var options = context.Resolve<IOptions<MyEventStoreOptions>>().Value; |
|
||||
|
|
||||
var eventStore = |
|
||||
EventStoreConnection.Create( |
|
||||
ConnectionSettings.Create() |
|
||||
.UseConsoleLogger() |
|
||||
.UseDebugLogger() |
|
||||
.KeepReconnecting() |
|
||||
.KeepRetrying(), |
|
||||
new IPEndPoint(IPAddress.Parse(options.IPAddress), options.Port)); |
|
||||
|
|
||||
eventStore.ConnectAsync().Wait(); |
|
||||
|
|
||||
return eventStore; |
|
||||
}).SingleInstance(); |
|
||||
|
|
||||
builder.Register(context => |
|
||||
{ |
|
||||
var options = context.Resolve<IOptions<MyEventStoreOptions>>().Value; |
|
||||
|
|
||||
return new UserCredentials(options.Username, options.Password); |
|
||||
}).SingleInstance(); |
|
||||
|
|
||||
builder.Register(context => |
|
||||
{ |
|
||||
var options = context.Resolve<IOptions<MyEventStoreOptions>>().Value; |
|
||||
|
|
||||
return new DefaultNameResolver(options.Prefix); |
|
||||
}).As<IStreamNameResolver>().SingleInstance(); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -0,0 +1,28 @@ |
|||||
|
// ==========================================================================
|
||||
|
// MongoDbEventStoreModule.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using Autofac; |
||||
|
using Squidex.Infrastructure.CQRS.Events; |
||||
|
using Squidex.Infrastructure.MongoDb.EventStore; |
||||
|
|
||||
|
namespace Squidex.Config.EventStore |
||||
|
{ |
||||
|
public class MongoDbEventStoreModule : Module |
||||
|
{ |
||||
|
protected override void Load(ContainerBuilder builder) |
||||
|
{ |
||||
|
builder.RegisterType<MongoEventStore>() |
||||
|
.As<IEventStore>() |
||||
|
.SingleInstance(); |
||||
|
|
||||
|
builder.RegisterType<DefaultNameResolver>() |
||||
|
.As<IStreamNameResolver>() |
||||
|
.SingleInstance(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,39 @@ |
|||||
|
// ==========================================================================
|
||||
|
// RabbitMqEventChannelModule.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using Autofac; |
||||
|
using Microsoft.Extensions.Options; |
||||
|
using RabbitMQ.Client; |
||||
|
using Squidex.Infrastructure.CQRS.Events; |
||||
|
using Squidex.Infrastructure.RabbitMq; |
||||
|
|
||||
|
namespace Squidex.Config.EventStore |
||||
|
{ |
||||
|
public class RabbitMqEventChannelModule : Module |
||||
|
{ |
||||
|
protected override void Load(ContainerBuilder builder) |
||||
|
{ |
||||
|
builder.Register(context => |
||||
|
{ |
||||
|
var options = context.Resolve<IOptions<MyRabbitMqOptions>>().Value; |
||||
|
|
||||
|
var factory = new ConnectionFactory(); |
||||
|
|
||||
|
factory.SetUri(new Uri(options.ConnectionString)); |
||||
|
|
||||
|
return factory; |
||||
|
}).As<IConnectionFactory>().SingleInstance(); |
||||
|
|
||||
|
builder.RegisterType<RabbitMqEventChannel>() |
||||
|
.As<IEventPublisher>() |
||||
|
.As<IEventStream>() |
||||
|
.SingleInstance(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,62 @@ |
|||||
|
// ==========================================================================
|
||||
|
// DisposableObjectTest.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using Xunit; |
||||
|
|
||||
|
namespace Squidex.Infrastructure |
||||
|
{ |
||||
|
public class DisposableObjectTests |
||||
|
{ |
||||
|
public sealed class MyDisposableObject : DisposableObject |
||||
|
{ |
||||
|
public int DisposeCallCount { get; set; } |
||||
|
|
||||
|
protected override void DisposeObject(bool disposing) |
||||
|
{ |
||||
|
DisposeCallCount++; |
||||
|
} |
||||
|
|
||||
|
public void Ensure() |
||||
|
{ |
||||
|
ThrowIfDisposed(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public void Should_not_throw_exception_when_not_disposed() |
||||
|
{ |
||||
|
var sut = new MyDisposableObject(); |
||||
|
|
||||
|
sut.Ensure(); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public void Should_dispose_once() |
||||
|
{ |
||||
|
var sut = new MyDisposableObject(); |
||||
|
|
||||
|
sut.Dispose(); |
||||
|
sut.Dispose(); |
||||
|
|
||||
|
Assert.True(sut.IsDisposed); |
||||
|
|
||||
|
Assert.Equal(1, sut.DisposeCallCount); |
||||
|
} |
||||
|
|
||||
|
[Fact] |
||||
|
public void Should_throw_exception_when_disposed() |
||||
|
{ |
||||
|
var sut = new MyDisposableObject(); |
||||
|
|
||||
|
sut.Dispose(); |
||||
|
|
||||
|
Assert.Throws<ObjectDisposedException>(() => sut.Ensure()); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,27 @@ |
|||||
|
// ==========================================================================
|
||||
|
// TaskExtensionsTests.cs
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex Group
|
||||
|
// All rights reserved.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System.Threading.Tasks; |
||||
|
using Squidex.Infrastructure.Tasks; |
||||
|
using Xunit; |
||||
|
|
||||
|
namespace Squidex.Infrastructure |
||||
|
{ |
||||
|
public class TaskExtensionsTests |
||||
|
{ |
||||
|
[Fact] |
||||
|
public void Should_do_nothing_on_forget() |
||||
|
{ |
||||
|
var task = Task.FromResult(123); |
||||
|
|
||||
|
task.Forget(); |
||||
|
|
||||
|
Assert.Equal(123, task.Result); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue