mirror of https://github.com/Squidex/squidex.git
13 changed files with 702 additions and 9 deletions
@ -0,0 +1,33 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using Newtonsoft.Json; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
internal sealed class CosmosDbEvent |
||||
|
{ |
||||
|
[JsonProperty] |
||||
|
public string Type { get; set; } |
||||
|
|
||||
|
[JsonProperty] |
||||
|
public string Payload { get; set; } |
||||
|
|
||||
|
[JsonProperty] |
||||
|
public EnvelopeHeaders Headers { get; set; } |
||||
|
|
||||
|
public static CosmosDbEvent FromEventData(EventData data) |
||||
|
{ |
||||
|
return new CosmosDbEvent { Type = data.Type, Headers = data.Headers, Payload = data.Payload }; |
||||
|
} |
||||
|
|
||||
|
public EventData ToEventData() |
||||
|
{ |
||||
|
return new EventData(Type, Headers, Payload); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,34 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using Newtonsoft.Json; |
||||
|
using NodaTime; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
internal sealed class CosmosDbEventCommit |
||||
|
{ |
||||
|
[JsonProperty] |
||||
|
public Guid Id { get; set; } |
||||
|
|
||||
|
[JsonProperty] |
||||
|
public CosmosDbEvent[] Events { get; set; } |
||||
|
|
||||
|
[JsonProperty] |
||||
|
public long EventStreamOffset { get; set; } |
||||
|
|
||||
|
[JsonProperty] |
||||
|
public long EventsCount { get; set; } |
||||
|
|
||||
|
[JsonProperty] |
||||
|
public string EventStream { get; set; } |
||||
|
|
||||
|
[JsonProperty] |
||||
|
public long Timestamp { get; set; } |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,67 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.ObjectModel; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using Microsoft.Azure.Documents; |
||||
|
using Microsoft.Azure.Documents.Client; |
||||
|
using Newtonsoft.Json; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
public sealed partial class CosmosDbEventStore : IEventStore, IInitializable |
||||
|
{ |
||||
|
private readonly DocumentClient documentClient; |
||||
|
private readonly Uri databaseUri; |
||||
|
private readonly Uri collectionUri; |
||||
|
private readonly string database; |
||||
|
|
||||
|
public CosmosDbEventStore(DocumentClient documentClient, string database) |
||||
|
{ |
||||
|
Guard.NotNull(documentClient, nameof(documentClient)); |
||||
|
Guard.NotNullOrEmpty(database, nameof(database)); |
||||
|
|
||||
|
this.documentClient = documentClient; |
||||
|
|
||||
|
this.databaseUri = UriFactory.CreateDatabaseUri(database); |
||||
|
this.database = database; |
||||
|
|
||||
|
collectionUri = UriFactory.CreateDocumentCollectionUri(database, FilterBuilder.Collection); |
||||
|
} |
||||
|
|
||||
|
public async Task InitializeAsync(CancellationToken ct = default) |
||||
|
{ |
||||
|
await documentClient.CreateDatabaseIfNotExistsAsync(new Database { Id = database }); |
||||
|
|
||||
|
await documentClient.CreateDocumentCollectionIfNotExistsAsync(databaseUri, |
||||
|
new DocumentCollection |
||||
|
{ |
||||
|
UniqueKeyPolicy = new UniqueKeyPolicy |
||||
|
{ |
||||
|
UniqueKeys = new Collection<UniqueKey> |
||||
|
{ |
||||
|
new UniqueKey |
||||
|
{ |
||||
|
Paths = new Collection<string> |
||||
|
{ |
||||
|
$"/{FilterBuilder.EventStreamField}", |
||||
|
$"/{FilterBuilder.EventStreamOffsetField}" |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
}, |
||||
|
Id = FilterBuilder.Collection, |
||||
|
}, |
||||
|
new RequestOptions |
||||
|
{ |
||||
|
PartitionKey = new PartitionKey($"/{FilterBuilder.EventStreamField}") |
||||
|
}); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,160 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Threading; |
||||
|
using System.Threading.Tasks; |
||||
|
using Microsoft.Azure.Documents.Linq; |
||||
|
using Squidex.Infrastructure.Log; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
public delegate bool EventPredicate(EventData data); |
||||
|
|
||||
|
public partial class CosmosDbEventStore : IEventStore, IInitializable |
||||
|
{ |
||||
|
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) |
||||
|
{ |
||||
|
Guard.NotNull(subscriber, nameof(subscriber)); |
||||
|
Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); |
||||
|
|
||||
|
return new PollingSubscription(this, subscriber, streamFilter, position); |
||||
|
} |
||||
|
|
||||
|
public Task CreateIndexAsync(string property) |
||||
|
{ |
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
public async Task<IReadOnlyList<StoredEvent>> QueryAsync(string streamName, long streamPosition = 0) |
||||
|
{ |
||||
|
using (Profiler.TraceMethod<CosmosDbEventStore>()) |
||||
|
{ |
||||
|
var query = |
||||
|
documentClient.CreateDocumentQuery<CosmosDbEventCommit>(collectionUri, |
||||
|
FilterBuilder.ByStreamName(streamName, streamPosition)); |
||||
|
|
||||
|
var documentQuery = query.AsDocumentQuery(); |
||||
|
|
||||
|
var result = new List<StoredEvent>(); |
||||
|
|
||||
|
while (documentQuery.HasMoreResults) |
||||
|
{ |
||||
|
var commits = await documentQuery.ExecuteNextAsync<CosmosDbEventCommit>(); |
||||
|
|
||||
|
foreach (var commit in commits) |
||||
|
{ |
||||
|
var eventStreamOffset = (int)commit.EventStreamOffset; |
||||
|
|
||||
|
var commitTimestamp = commit.Timestamp; |
||||
|
var commitOffset = 0; |
||||
|
|
||||
|
foreach (var e in commit.Events) |
||||
|
{ |
||||
|
eventStreamOffset++; |
||||
|
|
||||
|
if (eventStreamOffset >= streamPosition) |
||||
|
{ |
||||
|
var eventData = e.ToEventData(); |
||||
|
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
||||
|
|
||||
|
result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return result; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public Task QueryAsync(Func<StoredEvent, Task> callback, string property, object value, string position = null, CancellationToken ct = default) |
||||
|
{ |
||||
|
Guard.NotNull(callback, nameof(callback)); |
||||
|
|
||||
|
StreamPosition lastPosition = position; |
||||
|
|
||||
|
var filterDefinition = CreateFilter(property, value, lastPosition); |
||||
|
var filterExpression = CreateFilterExpression(property, value); |
||||
|
|
||||
|
return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); |
||||
|
} |
||||
|
|
||||
|
public Task QueryAsync(Func<StoredEvent, Task> callback, string streamFilter = null, string position = null, CancellationToken ct = default) |
||||
|
{ |
||||
|
Guard.NotNull(callback, nameof(callback)); |
||||
|
|
||||
|
StreamPosition lastPosition = position; |
||||
|
|
||||
|
var filterDefinition = CreateFilter(streamFilter, lastPosition); |
||||
|
var filterExpression = CreateFilterExpression(null, null); |
||||
|
|
||||
|
return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); |
||||
|
} |
||||
|
|
||||
|
private async Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, IQueryable<CosmosDbEventCommit> query, EventPredicate filterExpression, CancellationToken ct = default) |
||||
|
{ |
||||
|
using (Profiler.TraceMethod<CosmosDbEventStore>()) |
||||
|
{ |
||||
|
var documentQuery = query.AsDocumentQuery(); |
||||
|
|
||||
|
while (documentQuery.HasMoreResults && !ct.IsCancellationRequested) |
||||
|
{ |
||||
|
var commits = await documentQuery.ExecuteNextAsync<CosmosDbEventCommit>(ct); |
||||
|
|
||||
|
foreach (var commit in commits) |
||||
|
{ |
||||
|
var eventStreamOffset = (int)commit.EventStreamOffset; |
||||
|
|
||||
|
var commitTimestamp = commit.Timestamp; |
||||
|
var commitOffset = 0; |
||||
|
|
||||
|
foreach (var e in commit.Events) |
||||
|
{ |
||||
|
eventStreamOffset++; |
||||
|
|
||||
|
if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) |
||||
|
{ |
||||
|
var eventData = e.ToEventData(); |
||||
|
|
||||
|
if (filterExpression(eventData)) |
||||
|
{ |
||||
|
var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); |
||||
|
|
||||
|
await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData)); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
commitOffset++; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private IQueryable<CosmosDbEventCommit> CreateFilter(string property, object value, StreamPosition streamPosition) |
||||
|
{ |
||||
|
var query = FilterBuilder.ByProperty(property, value, streamPosition); |
||||
|
|
||||
|
return documentClient.CreateDocumentQuery<CosmosDbEventCommit>(collectionUri, query); |
||||
|
} |
||||
|
|
||||
|
private IQueryable<CosmosDbEventCommit> CreateFilter(string streamFilter, StreamPosition streamPosition) |
||||
|
{ |
||||
|
var query = FilterBuilder.ByFilter(streamFilter, streamPosition); |
||||
|
|
||||
|
return documentClient.CreateDocumentQuery<CosmosDbEventCommit>(collectionUri, query); |
||||
|
} |
||||
|
|
||||
|
private static EventPredicate CreateFilterExpression(string property, object value) |
||||
|
{ |
||||
|
return FilterBuilder.CreateFilterExpression(property, value); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,138 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using System.Net; |
||||
|
using System.Threading.Tasks; |
||||
|
using Microsoft.Azure.Documents; |
||||
|
using Microsoft.Azure.Documents.Client; |
||||
|
using NodaTime; |
||||
|
using Squidex.Infrastructure.Log; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
public partial class CosmosDbEventStore |
||||
|
{ |
||||
|
private const int MaxWriteAttempts = 20; |
||||
|
private const int MaxCommitSize = 10; |
||||
|
private static readonly FeedOptions TakeOne = new FeedOptions { MaxItemCount = 1 }; |
||||
|
|
||||
|
public Task DeleteStreamAsync(string streamName) |
||||
|
{ |
||||
|
return Task.CompletedTask; |
||||
|
} |
||||
|
|
||||
|
public Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events) |
||||
|
{ |
||||
|
return AppendAsync(commitId, streamName, EtagVersion.Any, events); |
||||
|
} |
||||
|
|
||||
|
public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events) |
||||
|
{ |
||||
|
Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); |
||||
|
Guard.NotNullOrEmpty(streamName, nameof(streamName)); |
||||
|
Guard.NotNull(events, nameof(events)); |
||||
|
Guard.LessThan(events.Count, MaxCommitSize, "events.Count"); |
||||
|
|
||||
|
using (Profiler.TraceMethod<CosmosDbEventStore>()) |
||||
|
{ |
||||
|
if (events.Count == 0) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
var currentVersion = GetEventStreamOffset(streamName); |
||||
|
|
||||
|
if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) |
||||
|
{ |
||||
|
throw new WrongEventVersionException(currentVersion, expectedVersion); |
||||
|
} |
||||
|
|
||||
|
var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); |
||||
|
|
||||
|
for (var attempt = 0; attempt < MaxWriteAttempts; attempt++) |
||||
|
{ |
||||
|
try |
||||
|
{ |
||||
|
await documentClient.CreateDocumentAsync(collectionUri, commit); |
||||
|
|
||||
|
return; |
||||
|
} |
||||
|
catch (DocumentClientException ex) |
||||
|
{ |
||||
|
if (ex.StatusCode == HttpStatusCode.Conflict) |
||||
|
{ |
||||
|
currentVersion = GetEventStreamOffset(streamName); |
||||
|
|
||||
|
if (expectedVersion != EtagVersion.Any) |
||||
|
{ |
||||
|
throw new WrongEventVersionException(currentVersion, expectedVersion); |
||||
|
} |
||||
|
|
||||
|
if (attempt < MaxWriteAttempts) |
||||
|
{ |
||||
|
expectedVersion = currentVersion; |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); |
||||
|
} |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
throw; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private long GetEventStreamOffset(string streamName) |
||||
|
{ |
||||
|
var query = |
||||
|
documentClient.CreateDocumentQuery<CosmosDbEventCommit>(collectionUri, |
||||
|
FilterBuilder.LastPosition(streamName)); |
||||
|
|
||||
|
var document = query.ToList().FirstOrDefault(); |
||||
|
|
||||
|
if (document != null) |
||||
|
{ |
||||
|
return document.EventStreamOffset + document.EventsCount; |
||||
|
} |
||||
|
|
||||
|
return EtagVersion.Empty; |
||||
|
} |
||||
|
|
||||
|
private static CosmosDbEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events) |
||||
|
{ |
||||
|
var commitEvents = new CosmosDbEvent[events.Count]; |
||||
|
|
||||
|
var i = 0; |
||||
|
|
||||
|
foreach (var e in events) |
||||
|
{ |
||||
|
var mongoEvent = CosmosDbEvent.FromEventData(e); |
||||
|
|
||||
|
commitEvents[i++] = mongoEvent; |
||||
|
} |
||||
|
|
||||
|
var mongoCommit = new CosmosDbEventCommit |
||||
|
{ |
||||
|
Id = commitId, |
||||
|
Events = commitEvents, |
||||
|
EventsCount = events.Count, |
||||
|
EventStream = streamName, |
||||
|
EventStreamOffset = expectedVersion, |
||||
|
Timestamp = SystemClock.Instance.GetCurrentInstant().ToUnixTimeTicks() |
||||
|
}; |
||||
|
|
||||
|
return mongoCommit; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,145 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using Microsoft.Azure.Documents; |
||||
|
using Squidex.Infrastructure.Json.Objects; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
internal static class FilterBuilder |
||||
|
{ |
||||
|
public const string Collection = "Events"; |
||||
|
|
||||
|
public static readonly string EventsCountField = nameof(CosmosDbEventCommit.EventsCount).ToCamelCase(); |
||||
|
public static readonly string EventStreamOffsetField = nameof(CosmosDbEventCommit.EventStreamOffset).ToCamelCase(); |
||||
|
public static readonly string EventStreamField = nameof(CosmosDbEventCommit.EventStream).ToCamelCase(); |
||||
|
public static readonly string TimestampField = nameof(CosmosDbEventCommit.Timestamp).ToCamelCase(); |
||||
|
|
||||
|
public static SqlQuerySpec LastPosition(string streamName) |
||||
|
{ |
||||
|
var query = |
||||
|
$"SELECT TOP 1 " + |
||||
|
$" e.{EventStreamOffsetField}," + |
||||
|
$" e.{EventsCountField} " + |
||||
|
$"FROM {Collection} e " + |
||||
|
$"WHERE " + |
||||
|
$" e.{EventStreamField} = @name " + |
||||
|
$"ORDER BY e.{EventStreamOffsetField} DESC"; |
||||
|
|
||||
|
var parameters = new SqlParameterCollection |
||||
|
{ |
||||
|
new SqlParameter("@name", streamName) |
||||
|
}; |
||||
|
|
||||
|
return new SqlQuerySpec(query, parameters); |
||||
|
} |
||||
|
|
||||
|
public static SqlQuerySpec ByStreamName(string streamName, long streamPosition = 0) |
||||
|
{ |
||||
|
var query = |
||||
|
$"SELECT * " + |
||||
|
$"FROM {Collection} e " + |
||||
|
$"WHERE " + |
||||
|
$" e.{EventStreamField} = @name " + |
||||
|
$"AND e.{EventStreamOffsetField} >= @position " + |
||||
|
$"ORDER BY e.{EventStreamOffsetField} ASC"; |
||||
|
|
||||
|
var parameters = new SqlParameterCollection |
||||
|
{ |
||||
|
new SqlParameter("@name", streamName), |
||||
|
new SqlParameter("@position", streamPosition) |
||||
|
}; |
||||
|
|
||||
|
return new SqlQuerySpec(query, parameters); |
||||
|
} |
||||
|
|
||||
|
public static SqlQuerySpec ByProperty(string property, object value, StreamPosition streamPosition) |
||||
|
{ |
||||
|
var filters = new List<string>(); |
||||
|
|
||||
|
var parameters = new SqlParameterCollection(); |
||||
|
|
||||
|
filters.ForPosition(parameters, streamPosition); |
||||
|
filters.ForProperty(parameters, property, value); |
||||
|
|
||||
|
return BuildQuery(filters, parameters); |
||||
|
} |
||||
|
|
||||
|
public static SqlQuerySpec ByFilter(string streamFilter, StreamPosition streamPosition) |
||||
|
{ |
||||
|
var filters = new List<string>(); |
||||
|
|
||||
|
var parameters = new SqlParameterCollection(); |
||||
|
|
||||
|
filters.ForPosition(parameters, streamPosition); |
||||
|
filters.ForRegex(parameters, streamFilter); |
||||
|
|
||||
|
return BuildQuery(filters, parameters); |
||||
|
} |
||||
|
|
||||
|
private static SqlQuerySpec BuildQuery(List<string> filters, SqlParameterCollection parameters) |
||||
|
{ |
||||
|
var query = $"SELECT * FROM {Collection} e WHERE {string.Join(" AND ", filters)} ORDER BY e.{TimestampField}"; |
||||
|
|
||||
|
return new SqlQuerySpec(query, parameters); |
||||
|
} |
||||
|
|
||||
|
private static void ForProperty(this List<string> filters, SqlParameterCollection parameters, string property, object value) |
||||
|
{ |
||||
|
filters.Add($"e.events.headers.{property} = @value"); |
||||
|
|
||||
|
parameters.Add(new SqlParameter("@value", value)); |
||||
|
} |
||||
|
|
||||
|
private static void ForRegex(this List<string> filters, SqlParameterCollection parameters, string streamFilter) |
||||
|
{ |
||||
|
if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, ".*", StringComparison.OrdinalIgnoreCase)) |
||||
|
{ |
||||
|
if (streamFilter.Contains("^")) |
||||
|
{ |
||||
|
filters.Add($"STARTSWITH(e.{EventStreamField}, @filter)"); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
filters.Add($"e.{EventStreamField} = @filter"); |
||||
|
} |
||||
|
|
||||
|
parameters.Add(new SqlParameter("@filter", streamFilter)); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private static void ForPosition(this List<string> filters, SqlParameterCollection parameters, StreamPosition streamPosition) |
||||
|
{ |
||||
|
if (streamPosition.IsEndOfCommit) |
||||
|
{ |
||||
|
filters.Add($"e.{TimestampField} > @time"); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
filters.Add($"e.{TimestampField} >= @time"); |
||||
|
} |
||||
|
|
||||
|
parameters.Add(new SqlParameter("@time", streamPosition.Timestamp)); |
||||
|
} |
||||
|
|
||||
|
public static EventPredicate CreateFilterExpression(string property, object value) |
||||
|
{ |
||||
|
if (!string.IsNullOrWhiteSpace(property)) |
||||
|
{ |
||||
|
var jsonValue = JsonValue.Create(value); |
||||
|
|
||||
|
return x => x.Headers.TryGetValue(property, out var p) && p.Equals(jsonValue); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
return x => true; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,55 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschränkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
internal sealed class StreamPosition |
||||
|
{ |
||||
|
public long Timestamp { get; } |
||||
|
|
||||
|
public long CommitOffset { get; } |
||||
|
|
||||
|
public long CommitSize { get; } |
||||
|
|
||||
|
public bool IsEndOfCommit |
||||
|
{ |
||||
|
get { return CommitOffset == CommitSize - 1; } |
||||
|
} |
||||
|
|
||||
|
public StreamPosition(long timestamp, long commitOffset, long commitSize) |
||||
|
{ |
||||
|
Timestamp = timestamp; |
||||
|
|
||||
|
CommitOffset = commitOffset; |
||||
|
CommitSize = commitSize; |
||||
|
} |
||||
|
|
||||
|
public static implicit operator string(StreamPosition position) |
||||
|
{ |
||||
|
var parts = new object[] |
||||
|
{ |
||||
|
position.Timestamp, |
||||
|
position.CommitOffset, |
||||
|
position.CommitSize |
||||
|
}; |
||||
|
|
||||
|
return string.Join("-", parts); |
||||
|
} |
||||
|
|
||||
|
public static implicit operator StreamPosition(string position) |
||||
|
{ |
||||
|
if (!string.IsNullOrWhiteSpace(position)) |
||||
|
{ |
||||
|
var parts = position.Split('-'); |
||||
|
|
||||
|
return new StreamPosition(long.Parse(parts[0]), long.Parse(parts[1]), long.Parse(parts[2])); |
||||
|
} |
||||
|
|
||||
|
return new StreamPosition(0, -1, -1); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,32 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using System; |
||||
|
using Microsoft.Azure.Documents.Client; |
||||
|
using Squidex.Infrastructure.TestHelpers; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
public sealed class CosmosDbEventStoreFixture : IDisposable |
||||
|
{ |
||||
|
private readonly DocumentClient client; |
||||
|
|
||||
|
public CosmosDbEventStore EventStore { get; } |
||||
|
|
||||
|
public CosmosDbEventStoreFixture() |
||||
|
{ |
||||
|
client = new DocumentClient(new Uri("https://localhost:8081"), "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==", JsonHelper.DefaultSettings()); |
||||
|
|
||||
|
EventStore = new CosmosDbEventStore(client, "Test"); |
||||
|
EventStore.InitializeAsync().Wait(); |
||||
|
} |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,29 @@ |
|||||
|
// ==========================================================================
|
||||
|
// Squidex Headless CMS
|
||||
|
// ==========================================================================
|
||||
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
||||
|
// All rights reserved. Licensed under the MIT license.
|
||||
|
// ==========================================================================
|
||||
|
|
||||
|
using Xunit; |
||||
|
|
||||
|
namespace Squidex.Infrastructure.EventSourcing |
||||
|
{ |
||||
|
[Trait("Category", "Dependencies")] |
||||
|
public class CosmosDbEventStoreTests : EventStoreTests<CosmosDbEventStore>, IClassFixture<CosmosDbEventStoreFixture> |
||||
|
{ |
||||
|
private readonly CosmosDbEventStoreFixture fixture; |
||||
|
|
||||
|
protected override int SubscriptionDelayInMs { get; } = 1000; |
||||
|
|
||||
|
public CosmosDbEventStoreTests(CosmosDbEventStoreFixture fixture) |
||||
|
{ |
||||
|
this.fixture = fixture; |
||||
|
} |
||||
|
|
||||
|
public override CosmosDbEventStore CreateStore() |
||||
|
{ |
||||
|
return fixture.EventStore; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
Loading…
Reference in new issue