mirror of https://github.com/Squidex/squidex.git
33 changed files with 212 additions and 146 deletions
@ -0,0 +1,73 @@ |
|||
// ==========================================================================
|
|||
// GetEventStoreSubscription.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
using System.Threading.Tasks; |
|||
using EventStore.ClientAPI; |
|||
using EventStore.ClientAPI.Exceptions; |
|||
using Squidex.Infrastructure.Tasks; |
|||
|
|||
namespace Squidex.Infrastructure.EventSourcing |
|||
{ |
|||
internal sealed class GetEventStoreSubscription : IEventSubscription |
|||
{ |
|||
private readonly IEventStoreConnection eventStoreConnection; |
|||
private readonly IEventSubscriber eventSubscriber; |
|||
private readonly EventStoreCatchUpSubscription subscription; |
|||
private readonly long? position; |
|||
|
|||
public GetEventStoreSubscription( |
|||
IEventStoreConnection eventStoreConnection, |
|||
IEventSubscriber eventSubscriber, |
|||
string projectionHost, |
|||
string prefix, |
|||
string position, |
|||
string streamFilter) |
|||
{ |
|||
Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); |
|||
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); |
|||
|
|||
this.eventStoreConnection = eventStoreConnection; |
|||
this.eventSubscriber = eventSubscriber; |
|||
this.position = ProjectionHelper.ParsePositionOrNull(position); |
|||
|
|||
var streamName = eventStoreConnection.CreateProjectionAsync(projectionHost, prefix, streamFilter).Result; |
|||
|
|||
subscription = SubscribeToStream(streamName); |
|||
} |
|||
|
|||
public Task StopAsync() |
|||
{ |
|||
subscription.Stop(); |
|||
|
|||
return TaskHelper.Done; |
|||
} |
|||
|
|||
private EventStoreCatchUpSubscription SubscribeToStream(string streamName) |
|||
{ |
|||
var settings = CatchUpSubscriptionSettings.Default; |
|||
|
|||
return eventStoreConnection.SubscribeToStreamFrom(streamName, position, settings, |
|||
(s, e) => |
|||
{ |
|||
var storedEvent = Formatter.Read(e); |
|||
|
|||
eventSubscriber.OnEventAsync(this, storedEvent).Wait(); |
|||
}, null, |
|||
(s, reason, ex) => |
|||
{ |
|||
if (reason != SubscriptionDropReason.ConnectionClosed && |
|||
reason != SubscriptionDropReason.UserInitiated) |
|||
{ |
|||
ex = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); |
|||
|
|||
eventSubscriber.OnErrorAsync(this, ex); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,15 @@ |
|||
// ==========================================================================
|
|||
// IInitializable.cs
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex Group
|
|||
// All rights reserved.
|
|||
// ==========================================================================
|
|||
|
|||
namespace Squidex.Infrastructure |
|||
{ |
|||
public interface IInitializable |
|||
{ |
|||
void Initialize(); |
|||
} |
|||
} |
|||
Loading…
Reference in new issue