diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs new file mode 100644 index 000000000..ce1beb819 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEvent.cs @@ -0,0 +1,33 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Newtonsoft.Json; + +namespace Squidex.Infrastructure.EventSourcing +{ + internal sealed class CosmosDbEvent + { + [JsonProperty] + public string Type { get; set; } + + [JsonProperty] + public string Payload { get; set; } + + [JsonProperty] + public EnvelopeHeaders Headers { get; set; } + + public static CosmosDbEvent FromEventData(EventData data) + { + return new CosmosDbEvent { Type = data.Type, Headers = data.Headers, Payload = data.Payload }; + } + + public EventData ToEventData() + { + return new EventData(Type, Headers, Payload); + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs new file mode 100644 index 000000000..761123f68 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventCommit.cs @@ -0,0 +1,34 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using Newtonsoft.Json; +using NodaTime; + +namespace Squidex.Infrastructure.EventSourcing +{ + internal sealed class CosmosDbEventCommit + { + [JsonProperty] + public Guid Id { get; set; } + + [JsonProperty] + public CosmosDbEvent[] Events { get; set; } + + [JsonProperty] + public long EventStreamOffset { get; set; } + + [JsonProperty] + public long EventsCount { get; set; } + + [JsonProperty] + public string EventStream { get; set; } + + [JsonProperty] + public long Timestamp { get; set; } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs new file mode 100644 index 000000000..0adcadc73 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore.cs @@ -0,0 +1,67 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.ObjectModel; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using Newtonsoft.Json; + +namespace Squidex.Infrastructure.EventSourcing +{ + public sealed partial class CosmosDbEventStore : IEventStore, IInitializable + { + private readonly DocumentClient documentClient; + private readonly Uri databaseUri; + private readonly Uri collectionUri; + private readonly string database; + + public CosmosDbEventStore(DocumentClient documentClient, string database) + { + Guard.NotNull(documentClient, nameof(documentClient)); + Guard.NotNullOrEmpty(database, nameof(database)); + + this.documentClient = documentClient; + + this.databaseUri = UriFactory.CreateDatabaseUri(database); + this.database = database; + + collectionUri = UriFactory.CreateDocumentCollectionUri(database, FilterBuilder.Collection); + } + + public async Task InitializeAsync(CancellationToken ct = default) + { + await documentClient.CreateDatabaseIfNotExistsAsync(new Database { Id = database }); + + await documentClient.CreateDocumentCollectionIfNotExistsAsync(databaseUri, + new DocumentCollection + { + UniqueKeyPolicy = new UniqueKeyPolicy + { + UniqueKeys = new Collection + { + new UniqueKey + { + Paths = new Collection + { + $"/{FilterBuilder.EventStreamField}", + $"/{FilterBuilder.EventStreamOffsetField}" + } + } + } + }, + Id = FilterBuilder.Collection, + }, + new RequestOptions + { + PartitionKey = new PartitionKey($"/{FilterBuilder.EventStreamField}") + }); + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs new file mode 100644 index 000000000..f2d2e2194 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Reader.cs @@ -0,0 +1,160 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Documents.Linq; +using Squidex.Infrastructure.Log; + +namespace Squidex.Infrastructure.EventSourcing +{ + public delegate bool EventPredicate(EventData data); + + public partial class CosmosDbEventStore : IEventStore, IInitializable + { + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null) + { + Guard.NotNull(subscriber, nameof(subscriber)); + Guard.NotNullOrEmpty(streamFilter, nameof(streamFilter)); + + return new PollingSubscription(this, subscriber, streamFilter, position); + } + + public Task CreateIndexAsync(string property) + { + return Task.CompletedTask; + } + + public async Task> QueryAsync(string streamName, long streamPosition = 0) + { + using (Profiler.TraceMethod()) + { + var query = + documentClient.CreateDocumentQuery(collectionUri, + FilterBuilder.ByStreamName(streamName, streamPosition)); + + var documentQuery = query.AsDocumentQuery(); + + var result = new List(); + + while (documentQuery.HasMoreResults) + { + var commits = await documentQuery.ExecuteNextAsync(); + + foreach (var commit in commits) + { + var eventStreamOffset = (int)commit.EventStreamOffset; + + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; + + foreach (var e in commit.Events) + { + eventStreamOffset++; + + if (eventStreamOffset >= streamPosition) + { + var eventData = e.ToEventData(); + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + + result.Add(new StoredEvent(streamName, eventToken, eventStreamOffset, eventData)); + } + } + } + } + + return result; + } + } + + public Task QueryAsync(Func callback, string property, object value, string position = null, CancellationToken ct = default) + { + Guard.NotNull(callback, nameof(callback)); + + StreamPosition lastPosition = position; + + var filterDefinition = CreateFilter(property, value, lastPosition); + var filterExpression = CreateFilterExpression(property, value); + + return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); + } + + public Task QueryAsync(Func callback, string streamFilter = null, string position = null, CancellationToken ct = default) + { + Guard.NotNull(callback, nameof(callback)); + + StreamPosition lastPosition = position; + + var filterDefinition = CreateFilter(streamFilter, lastPosition); + var filterExpression = CreateFilterExpression(null, null); + + return QueryAsync(callback, lastPosition, filterDefinition, filterExpression, ct); + } + + private async Task QueryAsync(Func callback, StreamPosition lastPosition, IQueryable query, EventPredicate filterExpression, CancellationToken ct = default) + { + using (Profiler.TraceMethod()) + { + var documentQuery = query.AsDocumentQuery(); + + while (documentQuery.HasMoreResults && !ct.IsCancellationRequested) + { + var commits = await documentQuery.ExecuteNextAsync(ct); + + foreach (var commit in commits) + { + var eventStreamOffset = (int)commit.EventStreamOffset; + + var commitTimestamp = commit.Timestamp; + var commitOffset = 0; + + foreach (var e in commit.Events) + { + eventStreamOffset++; + + if (commitOffset > lastPosition.CommitOffset || commitTimestamp > lastPosition.Timestamp) + { + var eventData = e.ToEventData(); + + if (filterExpression(eventData)) + { + var eventToken = new StreamPosition(commitTimestamp, commitOffset, commit.Events.Length); + + await callback(new StoredEvent(commit.EventStream, eventToken, eventStreamOffset, eventData)); + } + } + + commitOffset++; + } + } + } + } + } + + private IQueryable CreateFilter(string property, object value, StreamPosition streamPosition) + { + var query = FilterBuilder.ByProperty(property, value, streamPosition); + + return documentClient.CreateDocumentQuery(collectionUri, query); + } + + private IQueryable CreateFilter(string streamFilter, StreamPosition streamPosition) + { + var query = FilterBuilder.ByFilter(streamFilter, streamPosition); + + return documentClient.CreateDocumentQuery(collectionUri, query); + } + + private static EventPredicate CreateFilterExpression(string property, object value) + { + return FilterBuilder.CreateFilterExpression(property, value); + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs new file mode 100644 index 000000000..912713204 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/CosmosDbEventStore_Writer.cs @@ -0,0 +1,138 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Azure.Documents; +using Microsoft.Azure.Documents.Client; +using NodaTime; +using Squidex.Infrastructure.Log; + +namespace Squidex.Infrastructure.EventSourcing +{ + public partial class CosmosDbEventStore + { + private const int MaxWriteAttempts = 20; + private const int MaxCommitSize = 10; + private static readonly FeedOptions TakeOne = new FeedOptions { MaxItemCount = 1 }; + + public Task DeleteStreamAsync(string streamName) + { + return Task.CompletedTask; + } + + public Task AppendAsync(Guid commitId, string streamName, ICollection events) + { + return AppendAsync(commitId, streamName, EtagVersion.Any, events); + } + + public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) + { + Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); + Guard.LessThan(events.Count, MaxCommitSize, "events.Count"); + + using (Profiler.TraceMethod()) + { + if (events.Count == 0) + { + return; + } + + var currentVersion = GetEventStreamOffset(streamName); + + if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } + + var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events); + + for (var attempt = 0; attempt < MaxWriteAttempts; attempt++) + { + try + { + await documentClient.CreateDocumentAsync(collectionUri, commit); + + return; + } + catch (DocumentClientException ex) + { + if (ex.StatusCode == HttpStatusCode.Conflict) + { + currentVersion = GetEventStreamOffset(streamName); + + if (expectedVersion != EtagVersion.Any) + { + throw new WrongEventVersionException(currentVersion, expectedVersion); + } + + if (attempt < MaxWriteAttempts) + { + expectedVersion = currentVersion; + } + else + { + throw new TimeoutException("Could not acquire a free slot for the commit within the provided time."); + } + } + else + { + throw; + } + } + } + } + } + + private long GetEventStreamOffset(string streamName) + { + var query = + documentClient.CreateDocumentQuery(collectionUri, + FilterBuilder.LastPosition(streamName)); + + var document = query.ToList().FirstOrDefault(); + + if (document != null) + { + return document.EventStreamOffset + document.EventsCount; + } + + return EtagVersion.Empty; + } + + private static CosmosDbEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection events) + { + var commitEvents = new CosmosDbEvent[events.Count]; + + var i = 0; + + foreach (var e in events) + { + var mongoEvent = CosmosDbEvent.FromEventData(e); + + commitEvents[i++] = mongoEvent; + } + + var mongoCommit = new CosmosDbEventCommit + { + Id = commitId, + Events = commitEvents, + EventsCount = events.Count, + EventStream = streamName, + EventStreamOffset = expectedVersion, + Timestamp = SystemClock.Instance.GetCurrentInstant().ToUnixTimeTicks() + }; + + return mongoCommit; + } + } +} \ No newline at end of file diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs new file mode 100644 index 000000000..98eed6335 --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/FilterBuilder.cs @@ -0,0 +1,145 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using Microsoft.Azure.Documents; +using Squidex.Infrastructure.Json.Objects; + +namespace Squidex.Infrastructure.EventSourcing +{ + internal static class FilterBuilder + { + public const string Collection = "Events"; + + public static readonly string EventsCountField = nameof(CosmosDbEventCommit.EventsCount).ToCamelCase(); + public static readonly string EventStreamOffsetField = nameof(CosmosDbEventCommit.EventStreamOffset).ToCamelCase(); + public static readonly string EventStreamField = nameof(CosmosDbEventCommit.EventStream).ToCamelCase(); + public static readonly string TimestampField = nameof(CosmosDbEventCommit.Timestamp).ToCamelCase(); + + public static SqlQuerySpec LastPosition(string streamName) + { + var query = + $"SELECT TOP 1 " + + $" e.{EventStreamOffsetField}," + + $" e.{EventsCountField} " + + $"FROM {Collection} e " + + $"WHERE " + + $" e.{EventStreamField} = @name " + + $"ORDER BY e.{EventStreamOffsetField} DESC"; + + var parameters = new SqlParameterCollection + { + new SqlParameter("@name", streamName) + }; + + return new SqlQuerySpec(query, parameters); + } + + public static SqlQuerySpec ByStreamName(string streamName, long streamPosition = 0) + { + var query = + $"SELECT * " + + $"FROM {Collection} e " + + $"WHERE " + + $" e.{EventStreamField} = @name " + + $"AND e.{EventStreamOffsetField} >= @position " + + $"ORDER BY e.{EventStreamOffsetField} ASC"; + + var parameters = new SqlParameterCollection + { + new SqlParameter("@name", streamName), + new SqlParameter("@position", streamPosition) + }; + + return new SqlQuerySpec(query, parameters); + } + + public static SqlQuerySpec ByProperty(string property, object value, StreamPosition streamPosition) + { + var filters = new List(); + + var parameters = new SqlParameterCollection(); + + filters.ForPosition(parameters, streamPosition); + filters.ForProperty(parameters, property, value); + + return BuildQuery(filters, parameters); + } + + public static SqlQuerySpec ByFilter(string streamFilter, StreamPosition streamPosition) + { + var filters = new List(); + + var parameters = new SqlParameterCollection(); + + filters.ForPosition(parameters, streamPosition); + filters.ForRegex(parameters, streamFilter); + + return BuildQuery(filters, parameters); + } + + private static SqlQuerySpec BuildQuery(List filters, SqlParameterCollection parameters) + { + var query = $"SELECT * FROM {Collection} e WHERE {string.Join(" AND ", filters)} ORDER BY e.{TimestampField}"; + + return new SqlQuerySpec(query, parameters); + } + + private static void ForProperty(this List filters, SqlParameterCollection parameters, string property, object value) + { + filters.Add($"e.events.headers.{property} = @value"); + + parameters.Add(new SqlParameter("@value", value)); + } + + private static void ForRegex(this List filters, SqlParameterCollection parameters, string streamFilter) + { + if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, ".*", StringComparison.OrdinalIgnoreCase)) + { + if (streamFilter.Contains("^")) + { + filters.Add($"STARTSWITH(e.{EventStreamField}, @filter)"); + } + else + { + filters.Add($"e.{EventStreamField} = @filter"); + } + + parameters.Add(new SqlParameter("@filter", streamFilter)); + } + } + + private static void ForPosition(this List filters, SqlParameterCollection parameters, StreamPosition streamPosition) + { + if (streamPosition.IsEndOfCommit) + { + filters.Add($"e.{TimestampField} > @time"); + } + else + { + filters.Add($"e.{TimestampField} >= @time"); + } + + parameters.Add(new SqlParameter("@time", streamPosition.Timestamp)); + } + + public static EventPredicate CreateFilterExpression(string property, object value) + { + if (!string.IsNullOrWhiteSpace(property)) + { + var jsonValue = JsonValue.Create(value); + + return x => x.Headers.TryGetValue(property, out var p) && p.Equals(jsonValue); + } + else + { + return x => true; + } + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/EventSourcing/StreamPosition.cs b/src/Squidex.Infrastructure.Azure/EventSourcing/StreamPosition.cs new file mode 100644 index 000000000..f0626ee5d --- /dev/null +++ b/src/Squidex.Infrastructure.Azure/EventSourcing/StreamPosition.cs @@ -0,0 +1,55 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschränkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +namespace Squidex.Infrastructure.EventSourcing +{ + internal sealed class StreamPosition + { + public long Timestamp { get; } + + public long CommitOffset { get; } + + public long CommitSize { get; } + + public bool IsEndOfCommit + { + get { return CommitOffset == CommitSize - 1; } + } + + public StreamPosition(long timestamp, long commitOffset, long commitSize) + { + Timestamp = timestamp; + + CommitOffset = commitOffset; + CommitSize = commitSize; + } + + public static implicit operator string(StreamPosition position) + { + var parts = new object[] + { + position.Timestamp, + position.CommitOffset, + position.CommitSize + }; + + return string.Join("-", parts); + } + + public static implicit operator StreamPosition(string position) + { + if (!string.IsNullOrWhiteSpace(position)) + { + var parts = position.Split('-'); + + return new StreamPosition(long.Parse(parts[0]), long.Parse(parts[1]), long.Parse(parts[2])); + } + + return new StreamPosition(0, -1, -1); + } + } +} diff --git a/src/Squidex.Infrastructure.Azure/Squidex.Infrastructure.Azure.csproj b/src/Squidex.Infrastructure.Azure/Squidex.Infrastructure.Azure.csproj index 755262980..d4d6148da 100644 --- a/src/Squidex.Infrastructure.Azure/Squidex.Infrastructure.Azure.csproj +++ b/src/Squidex.Infrastructure.Azure/Squidex.Infrastructure.Azure.csproj @@ -5,6 +5,7 @@ 7.3 + diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs index fe20956ed..c33c11f40 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs @@ -15,7 +15,6 @@ namespace Squidex.Infrastructure.EventSourcing { public partial class MongoEventStore : MongoRepositoryBase, IEventStore { - private const int MaxCommitSize = 10; private static readonly FieldDefinition TimestampField = Fields.Build(x => x.Timestamp); private static readonly FieldDefinition EventsCountField = Fields.Build(x => x.EventsCount); private static readonly FieldDefinition EventStreamOffsetField = Fields.Build(x => x.EventStreamOffset); diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs index aa0707f2e..a7dbe12de 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -16,6 +16,7 @@ namespace Squidex.Infrastructure.EventSourcing { public partial class MongoEventStore { + private const int MaxCommitSize = 10; private const int MaxWriteAttempts = 20; private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); @@ -31,20 +32,19 @@ namespace Squidex.Infrastructure.EventSourcing public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events) { + Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); + Guard.NotNullOrEmpty(streamName, nameof(streamName)); + Guard.NotNull(events, nameof(events)); Guard.LessThan(events.Count, MaxCommitSize, "events.Count"); using (Profiler.TraceMethod()) { - Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); - Guard.NotNullOrEmpty(streamName, nameof(streamName)); - Guard.NotNull(events, nameof(events)); - if (events.Count == 0) { return; } - var currentVersion = await GetEventStreamOffset(streamName); + var currentVersion = await GetEventStreamOffsetAsync(streamName); if (expectedVersion != EtagVersion.Any && expectedVersion != currentVersion) { @@ -67,7 +67,7 @@ namespace Squidex.Infrastructure.EventSourcing { if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey) { - currentVersion = await GetEventStreamOffset(streamName); + currentVersion = await GetEventStreamOffsetAsync(streamName); if (expectedVersion != EtagVersion.Any) { @@ -92,7 +92,7 @@ namespace Squidex.Infrastructure.EventSourcing } } - private async Task GetEventStreamOffset(string streamName) + private async Task GetEventStreamOffsetAsync(string streamName) { var document = await Collection.Find(Filter.Eq(EventStreamField, streamName)) diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs index 45c0910e9..661c83435 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/StreamPosition.cs @@ -9,7 +9,7 @@ using MongoDB.Bson; namespace Squidex.Infrastructure.EventSourcing { - public sealed class StreamPosition + internal sealed class StreamPosition { private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreFixture.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreFixture.cs new file mode 100644 index 000000000..221a6c305 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreFixture.cs @@ -0,0 +1,32 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using Microsoft.Azure.Documents.Client; +using Squidex.Infrastructure.TestHelpers; + +namespace Squidex.Infrastructure.EventSourcing +{ + public sealed class CosmosDbEventStoreFixture : IDisposable + { + private readonly DocumentClient client; + + public CosmosDbEventStore EventStore { get; } + + public CosmosDbEventStoreFixture() + { + client = new DocumentClient(new Uri("https://localhost:8081"), "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==", JsonHelper.DefaultSettings()); + + EventStore = new CosmosDbEventStore(client, "Test"); + EventStore.InitializeAsync().Wait(); + } + + public void Dispose() + { + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreTests.cs b/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreTests.cs new file mode 100644 index 000000000..f49d2f064 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/EventSourcing/CosmosDbEventStoreTests.cs @@ -0,0 +1,29 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Xunit; + +namespace Squidex.Infrastructure.EventSourcing +{ + [Trait("Category", "Dependencies")] + public class CosmosDbEventStoreTests : EventStoreTests, IClassFixture + { + private readonly CosmosDbEventStoreFixture fixture; + + protected override int SubscriptionDelayInMs { get; } = 1000; + + public CosmosDbEventStoreTests(CosmosDbEventStoreFixture fixture) + { + this.fixture = fixture; + } + + public override CosmosDbEventStore CreateStore() + { + return fixture.EventStore; + } + } +}