|
|
@ -11,7 +11,6 @@ using System.Collections.Concurrent; |
|
|
using System.Linq; |
|
|
using System.Linq; |
|
|
using System.Net; |
|
|
using System.Net; |
|
|
using System.Net.Sockets; |
|
|
using System.Net.Sockets; |
|
|
using System.Text; |
|
|
|
|
|
using System.Threading.Tasks; |
|
|
using System.Threading.Tasks; |
|
|
using EventStore.ClientAPI; |
|
|
using EventStore.ClientAPI; |
|
|
using EventStore.ClientAPI.Exceptions; |
|
|
using EventStore.ClientAPI.Exceptions; |
|
|
@ -39,7 +38,7 @@ namespace Squidex.Infrastructure.GetEventStore |
|
|
this.streamFilter = streamFilter; |
|
|
this.streamFilter = streamFilter; |
|
|
this.projectionHost = projectionHost; |
|
|
this.projectionHost = projectionHost; |
|
|
|
|
|
|
|
|
streamName = CreateStreamName(streamFilter, prefix); |
|
|
streamName = $"by-{prefix.Simplify()}-{streamFilter.Simplify()}"; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public void Dispose() |
|
|
public void Dispose() |
|
|
@ -47,15 +46,40 @@ namespace Squidex.Infrastructure.GetEventStore |
|
|
internalSubscription?.Stop(); |
|
|
internalSubscription?.Stop(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async Task SubscribeAsync(Func<StoredEvent, Task> handler) |
|
|
public async Task SubscribeAsync(Func<StoredEvent, Task> onNext, Func<Exception, Task> onError = null) |
|
|
{ |
|
|
{ |
|
|
Guard.NotNull(handler, nameof(handler)); |
|
|
Guard.NotNull(onNext, nameof(onNext)); |
|
|
|
|
|
|
|
|
if (internalSubscription != null) |
|
|
if (internalSubscription != null) |
|
|
{ |
|
|
{ |
|
|
throw new InvalidOperationException("An handler has already been registered."); |
|
|
throw new InvalidOperationException("An handler has already been registered."); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
await CreateProjectionAsync(); |
|
|
|
|
|
|
|
|
|
|
|
long? eventStorePosition = null; |
|
|
|
|
|
|
|
|
|
|
|
if (long.TryParse(position, out var parsedPosition)) |
|
|
|
|
|
{ |
|
|
|
|
|
eventStorePosition = parsedPosition; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
internalSubscription = connection.SubscribeToStreamFrom(streamName, eventStorePosition, CatchUpSubscriptionSettings.Default, |
|
|
|
|
|
(subscription, resolved) => |
|
|
|
|
|
{ |
|
|
|
|
|
var storedEvent = Formatter.Read(resolved); |
|
|
|
|
|
|
|
|
|
|
|
onNext(storedEvent).Wait(); |
|
|
|
|
|
}, subscriptionDropped: (subscription, reason, ex) => |
|
|
|
|
|
{ |
|
|
|
|
|
var exception = ex ?? new ConnectionClosedException($"Subscription closed with reason {reason}."); |
|
|
|
|
|
|
|
|
|
|
|
onError?.Invoke(exception); |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private async Task CreateProjectionAsync() |
|
|
|
|
|
{ |
|
|
if (subscriptionsCreated.TryAdd(streamName, true)) |
|
|
if (subscriptionsCreated.TryAdd(streamName, true)) |
|
|
{ |
|
|
{ |
|
|
var projectsManager = await ConnectToProjections(); |
|
|
var projectsManager = await ConnectToProjections(); |
|
|
@ -79,20 +103,6 @@ namespace Squidex.Infrastructure.GetEventStore |
|
|
// Projection already exists.
|
|
|
// Projection already exists.
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
long? eventStorePosition = null; |
|
|
|
|
|
|
|
|
|
|
|
if (long.TryParse(position, out var parsedPosition)) |
|
|
|
|
|
{ |
|
|
|
|
|
eventStorePosition = parsedPosition; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
internalSubscription = connection.SubscribeToStreamFrom(streamName, eventStorePosition, CatchUpSubscriptionSettings.Default, (subscription, resolved) => |
|
|
|
|
|
{ |
|
|
|
|
|
var eventData = Formatter.Read(resolved.Event); |
|
|
|
|
|
|
|
|
|
|
|
handler(new StoredEvent(resolved.OriginalEventNumber.ToString(), resolved.Event.EventNumber, eventData)).Wait(); |
|
|
|
|
|
}); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private async Task<ProjectionsManager> ConnectToProjections() |
|
|
private async Task<ProjectionsManager> ConnectToProjections() |
|
|
@ -113,34 +123,5 @@ namespace Squidex.Infrastructure.GetEventStore |
|
|
connection.Settings.OperationTimeout); |
|
|
connection.Settings.OperationTimeout); |
|
|
return projectsManager; |
|
|
return projectsManager; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private static string CreateStreamName(string streamFilter, string prefix) |
|
|
|
|
|
{ |
|
|
|
|
|
var sb = new StringBuilder(); |
|
|
|
|
|
|
|
|
|
|
|
sb.Append("by-"); |
|
|
|
|
|
sb.Append(prefix.Trim(' ', '-')); |
|
|
|
|
|
sb.Append("-"); |
|
|
|
|
|
|
|
|
|
|
|
var prevIsLetterOrDigit = false; |
|
|
|
|
|
|
|
|
|
|
|
foreach (var c in streamFilter) |
|
|
|
|
|
{ |
|
|
|
|
|
if (char.IsLetterOrDigit(c)) |
|
|
|
|
|
{ |
|
|
|
|
|
sb.Append(char.ToLowerInvariant(c)); |
|
|
|
|
|
|
|
|
|
|
|
prevIsLetterOrDigit = true; |
|
|
|
|
|
} |
|
|
|
|
|
else if (prevIsLetterOrDigit) |
|
|
|
|
|
{ |
|
|
|
|
|
sb.Append("-"); |
|
|
|
|
|
|
|
|
|
|
|
prevIsLetterOrDigit = false; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return sb.ToString().Trim(' ', '-'); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|