diff --git a/Squidex.sln b/Squidex.sln
index 2165f46ce..385ea40c2 100644
--- a/Squidex.sln
+++ b/Squidex.sln
@@ -40,6 +40,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tests", "tests", "{B56EBCEC
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Benchmarks", "tests\Benchmarks\Benchmarks.csproj", "{D48A03DF-BCD3-4667-8747-2F251347E2B6}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "migrations", "migrations", "{94207AA6-4923-4183-A558-E0F8196B8CA3}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Migrate_01", "tools\Migrate_01\Migrate_01.csproj", "{B51126A8-0D75-4A79-867D-10724EC6AC84}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -190,6 +194,18 @@ Global
{D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x64.Build.0 = Release|Any CPU
{D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x86.ActiveCfg = Release|Any CPU
{D48A03DF-BCD3-4667-8747-2F251347E2B6}.Release|x86.Build.0 = Release|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x64.Build.0 = Debug|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Debug|x86.Build.0 = Debug|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x64.ActiveCfg = Release|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x64.Build.0 = Release|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x86.ActiveCfg = Release|Any CPU
+ {B51126A8-0D75-4A79-867D-10724EC6AC84}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -210,5 +226,6 @@ Global
{C1E5BBB6-6B6A-4DE5-B19D-0538304DE343} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF}
{945871B1-77B8-43FB-B53C-27CF385AB756} = {8CF53B92-5EB1-461D-98F8-70DA9B603FBF}
{D48A03DF-BCD3-4667-8747-2F251347E2B6} = {B56EBCEC-9C50-46A7-848C-65502DE69C5C}
+ {B51126A8-0D75-4A79-867D-10724EC6AC84} = {94207AA6-4923-4183-A558-E0F8196B8CA3}
EndGlobalSection
EndGlobal
diff --git a/src/Squidex.Core/Squidex.Core.csproj b/src/Squidex.Core/Squidex.Core.csproj
index bb10ba971..d8a4583d2 100644
--- a/src/Squidex.Core/Squidex.Core.csproj
+++ b/src/Squidex.Core/Squidex.Core.csproj
@@ -14,7 +14,7 @@
-
+
diff --git a/src/Squidex.Events/Squidex.Events.csproj b/src/Squidex.Events/Squidex.Events.csproj
index 7a80cf949..ab936853b 100644
--- a/src/Squidex.Events/Squidex.Events.csproj
+++ b/src/Squidex.Events/Squidex.Events.csproj
@@ -13,5 +13,6 @@
+
diff --git a/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj b/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj
index e3b4db52b..8fd29a45b 100644
--- a/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj
+++ b/src/Squidex.Infrastructure.GoogleCloud/Squidex.Infrastructure.GoogleCloud.csproj
@@ -7,7 +7,8 @@
True
-
+
+
diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs
index 7c4cabf31..6b1fefc86 100644
--- a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs
+++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventCommit.cs
@@ -10,7 +10,6 @@ using System;
using System.Collections.Generic;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
-using NodaTime;
namespace Squidex.Infrastructure.MongoDb.EventStore
{
@@ -23,15 +22,11 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
[BsonRequired]
[BsonElement]
- public Instant Timestamp { get; set; }
+ public BsonTimestamp Timestamp { get; set; }
[BsonElement]
[BsonRequired]
- public List Events { get; set; }
-
- [BsonElement]
- [BsonRequired]
- public long EventsOffset { get; set; }
+ public MongoEvent[] Events { get; set; }
[BsonElement]
[BsonRequired]
@@ -39,10 +34,10 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
[BsonElement]
[BsonRequired]
- public string EventStream { get; set; }
+ public long EventsCount { get; set; }
[BsonElement]
[BsonRequired]
- public long EventsCount { get; set; }
+ public string EventStream { get; set; }
}
}
diff --git a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs
index c23adfade..e10f27d30 100644
--- a/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs
+++ b/src/Squidex.Infrastructure.MongoDb/EventStore/MongoEventStore.cs
@@ -8,15 +8,12 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
-using NodaTime;
using Squidex.Infrastructure.CQRS.Events;
-using Squidex.Infrastructure.Reflection;
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
// ReSharper disable ClassNeverInstantiated.Local
@@ -28,18 +25,14 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
public class MongoEventStore : MongoRepositoryBase, IEventStore
{
private const int Retries = 500;
+ private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
private readonly IEventNotifier notifier;
- private readonly IClock clock;
- private string eventsOffsetIndex;
- public MongoEventStore(IMongoDatabase database, IEventNotifier notifier, IClock clock)
+ public MongoEventStore(IMongoDatabase database, IEventNotifier notifier)
: base(database)
{
- Guard.NotNull(clock, nameof(clock));
Guard.NotNull(notifier, nameof(notifier));
- this.clock = clock;
-
this.notifier = notifier;
}
@@ -55,17 +48,11 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
protected override async Task SetupCollectionAsync(IMongoCollection collection)
{
- var indexNames =
- await Task.WhenAll(
- collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }),
- collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }),
- collection.Indexes.CreateOneAsync(Index.Descending(x => x.EventsOffset), new CreateIndexOptions { Unique = true }),
- collection.Indexes.CreateOneAsync(Index.Descending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true }));
-
- eventsOffsetIndex = indexNames[0];
+ await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStream).Ascending(x => x.Timestamp));
+ await collection.Indexes.CreateOneAsync(Index.Ascending(x => x.EventStreamOffset).Ascending(x => x.EventStream), new CreateIndexOptions { Unique = true });
}
- public IObservable GetEventsAsync(string streamFilter, long lastReceivedEventNumber = -1)
+ public IObservable GetEventsAsync(string streamFilter = null, string position = null)
{
return Observable.Create((observer, ct) =>
{
@@ -74,22 +61,29 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
observer.OnNext(storedEvent);
return Tasks.TaskHelper.Done;
- }, ct, streamFilter, lastReceivedEventNumber);
+ }, ct, streamFilter, position);
});
}
- public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1)
+ public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
{
Guard.NotNull(callback, nameof(callback));
-
- var filters = new List>();
- if (lastReceivedEventNumber >= 0)
+ var tokenTimestamp = EmptyTimestamp;
+ var tokenEventStreamNumber = -1;
+
+ if (position != null)
{
- var commitOffset = await GetPreviousOffsetAsync(lastReceivedEventNumber);
+ var token = ParsePosition(position);
- filters.Add(Filter.Gte(x => x.EventsOffset, commitOffset));
+ tokenTimestamp = token.Timestamp;
+ tokenEventStreamNumber = token.EventStreamNumber;
}
+
+ var filters = new List>
+ {
+ Filter.Gte(x => x.Timestamp, tokenTimestamp)
+ };
if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, "*", StringComparison.OrdinalIgnoreCase))
{
@@ -114,140 +108,106 @@ namespace Squidex.Infrastructure.MongoDb.EventStore
filter = filters[0];
}
- await Collection.Find(filter).SortBy(x => x.EventsOffset).ForEachAsync(async commit =>
+ await Collection.Find(filter).SortBy(x => x.Timestamp).ForEachAsync(async commit =>
{
- var eventNumber = commit.EventsOffset;
- var eventStreamNumber = commit.EventStreamOffset;
+ var eventStreamNumber = (int)commit.EventStreamOffset;
- foreach (var mongoEvent in commit.Events)
+ foreach (var e in commit.Events)
{
- eventNumber++;
eventStreamNumber++;
- if (eventNumber > lastReceivedEventNumber)
+ if (eventStreamNumber > tokenEventStreamNumber)
{
- var eventData = SimpleMapper.Map(mongoEvent, new EventData());
+ var eventData = new EventData { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type };
+ var eventToken = CreateToken(commit.Timestamp, eventStreamNumber);
- await callback(new StoredEvent(eventNumber, eventStreamNumber, eventData));
+ await callback(new StoredEvent(eventToken, eventStreamNumber, eventData));
}
}
}, cancellationToken);
}
- public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable events)
+ public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
- var currentVersion = await GetEventStreamOffset(streamName);
+ var eventsCount = events.Count;
- if (currentVersion != expectedVersion)
+ if (eventsCount > 0)
{
- throw new WrongEventVersionException(currentVersion, expectedVersion);
- }
+ var commitEvents = new MongoEvent[events.Count];
- var now = clock.GetCurrentInstant();
+ var i = 0;
- var commitEvents = events.Select(x => SimpleMapper.Map(x, new MongoEvent())).ToList();
+ foreach (var e in events)
+ {
+ var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type };
- if (commitEvents.Any())
- {
- var offset = await GetEventOffsetAsync();
+ commitEvents[i++] = mongoEvent;
+ }
var commit = new MongoEventCommit
{
Id = commitId,
Events = commitEvents,
- EventsOffset = offset,
- EventsCount = commitEvents.Count,
+ EventsCount = eventsCount,
EventStream = streamName,
EventStreamOffset = expectedVersion,
- Timestamp = now
+ Timestamp = EmptyTimestamp
};
+
+ try
+ {
+ await Collection.InsertOneAsync(commit);
- for (var retry = 0; retry < Retries; retry++)
+ notifier.NotifyEventsStored();
+ }
+ catch (MongoWriteException ex)
{
- try
+ if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
- await Collection.InsertOneAsync(commit);
+ var currentVersion = await GetEventStreamOffset(streamName);
- notifier.NotifyEventsStored();
-
- return;
- }
- catch (MongoWriteException ex)
- {
- if (ex.Message.IndexOf(eventsOffsetIndex, StringComparison.OrdinalIgnoreCase) >= 0)
- {
- commit.EventsOffset = await GetEventOffsetAsync();
- }
- else if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
- {
- currentVersion = await GetEventStreamOffset(streamName);
-
- throw new WrongEventVersionException(currentVersion, expectedVersion);
- }
- else
- {
- throw;
- }
+ throw new WrongEventVersionException(currentVersion, expectedVersion);
}
+
+ throw;
}
}
}
- private async Task GetPreviousOffsetAsync(long startEventNumber)
+ private async Task GetEventStreamOffset(string streamName)
{
var document =
- await Collection.Find(x => x.EventsOffset <= startEventNumber)
+ await Collection.Find(x => x.EventStream == streamName)
.Project(Project
- .Include(x => x.EventsOffset))
- .SortByDescending(x => x.EventsOffset).Limit(1)
+ .Include(x => x.EventStreamOffset)
+ .Include(x => x.EventsCount))
+ .SortByDescending(x => x.EventStreamOffset).Limit(1)
.FirstOrDefaultAsync();
if (document != null)
{
- return document["EventsOffset"].ToInt64();
+ return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64();
}
return -1;
}
- private async Task GetEventOffsetAsync()
+ private static string CreateToken(BsonTimestamp timestamp, int eventStreamNumber)
{
- var document =
- await Collection.Find(new BsonDocument())
- .Project(Project
- .Include(x => x.EventsOffset)
- .Include(x => x.EventsCount))
- .SortByDescending(x => x.EventsOffset).Limit(1)
- .FirstOrDefaultAsync();
+ var parts = new object[] { timestamp.Timestamp, timestamp.Increment, eventStreamNumber };
- if (document != null)
- {
- return document["EventsOffset"].ToInt64() + document["EventsCount"].ToInt64();
- }
-
- return -1;
+ return string.Join("$", parts);
}
- private async Task GetEventStreamOffset(string streamName)
+ private static (BsonTimestamp Timestamp, int EventStreamNumber) ParsePosition(string position)
{
- var document =
- await Collection.Find(x => x.EventStream == streamName)
- .Project(Project
- .Include(x => x.EventStreamOffset)
- .Include(x => x.EventsCount))
- .SortByDescending(x => x.EventsOffset).Limit(1)
- .FirstOrDefaultAsync();
-
- if (document != null)
- {
- return document["EventStreamOffset"].ToInt64() + document["EventsCount"].ToInt64();
- }
+ var parts = position.Split('$');
- return -1;
+ return (new BsonTimestamp(int.Parse(parts[0]), int.Parse(parts[1])), int.Parse(parts[2]));
}
}
}
diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs
index b1b179913..2c7fec6d5 100644
--- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs
+++ b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs
@@ -32,6 +32,6 @@ namespace Squidex.Infrastructure.MongoDb
[BsonElement]
[BsonRequired]
- public long LastHandledEventNumber { get; set; }
+ public string Position { get; set; }
}
}
\ No newline at end of file
diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs
index dabe44358..11dfdbad3 100644
--- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs
+++ b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs
@@ -47,7 +47,7 @@ namespace Squidex.Infrastructure.MongoDb
{
try
{
- await Collection.InsertOneAsync(new MongoEventConsumerInfo { Name = consumerName, LastHandledEventNumber = -1 });
+ await Collection.InsertOneAsync(new MongoEventConsumerInfo { Name = consumerName, Position = null });
}
catch (MongoWriteException ex)
{
@@ -61,31 +61,27 @@ namespace Squidex.Infrastructure.MongoDb
public Task StartAsync(string consumerName)
{
- return Collection.UpdateOneAsync(x => x.Name == consumerName,
- Update.Unset(x => x.IsStopped));
+ return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Unset(x => x.IsStopped));
}
public Task StopAsync(string consumerName, string error = null)
{
- return Collection.UpdateOneAsync(x => x.Name == consumerName,
- Update.Set(x => x.IsStopped, true).Set(x => x.Error, error));
+ return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsStopped, true).Set(x => x.Error, error));
}
public Task ResetAsync(string consumerName)
{
- return Collection.UpdateOneAsync(x => x.Name == consumerName,
- Update.Set(x => x.IsResetting, true));
+ return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsResetting, true));
}
- public Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber)
+ public Task SetLastHandledEventNumberAsync(string consumerName, string position)
{
- return Collection.ReplaceOneAsync(x => x.Name == consumerName,
- CreateEntity(consumerName, eventNumber));
+ return Collection.ReplaceOneAsync(x => x.Name == consumerName, CreateEntity(consumerName, position));
}
- private static MongoEventConsumerInfo CreateEntity(string consumerName, long eventNumber)
+ private static MongoEventConsumerInfo CreateEntity(string consumerName, string position)
{
- return new MongoEventConsumerInfo { Name = consumerName, LastHandledEventNumber = eventNumber };
+ return new MongoEventConsumerInfo { Name = consumerName, Position = position };
}
}
}
diff --git a/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj b/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj
index e2d8bc9af..df9225f17 100644
--- a/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj
+++ b/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj
@@ -11,5 +11,6 @@
+
diff --git a/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj b/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj
index ab2f8af4c..4d863d14f 100644
--- a/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj
+++ b/src/Squidex.Infrastructure.RabbitMq/Squidex.Infrastructure.RabbitMq.csproj
@@ -8,6 +8,7 @@
+
diff --git a/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj b/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj
index 6cb63c49e..dc7458aa0 100644
--- a/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj
+++ b/src/Squidex.Infrastructure.Redis/Squidex.Infrastructure.Redis.csproj
@@ -10,6 +10,7 @@
-
+
+
diff --git a/src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs b/src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs
index 80886571a..485ae0668 100644
--- a/src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs
+++ b/src/Squidex.Infrastructure/CQRS/Events/EnvelopeExtensions.cs
@@ -14,12 +14,12 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public static class EnvelopeExtensions
{
- public static long EventNumber(this EnvelopeHeaders headers)
+ public static string EventPosition(this EnvelopeHeaders headers)
{
- return headers[CommonHeaders.EventNumber].ToInt32(CultureInfo.InvariantCulture);
+ return headers[CommonHeaders.EventNumber].ToString();
}
- public static Envelope SetEventNumber(this Envelope envelope, long value) where T : class
+ public static Envelope SetEventPosition(this Envelope envelope, string value) where T : class
{
envelope.Headers.Set(CommonHeaders.EventNumber, value);
diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
index 08b169e32..44ccbef61 100644
--- a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
+++ b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs
@@ -11,6 +11,7 @@ using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Timers;
+// ReSharper disable ConvertToLambdaExpression
// ReSharper disable MethodSupportsCancellation
// ReSharper disable ConvertIfStatementToConditionalTernaryExpression
// ReSharper disable InvertIf
@@ -97,21 +98,23 @@ namespace Squidex.Infrastructure.CQRS.Events
{
var status = await eventConsumerInfoRepository.FindAsync(consumerName);
- var lastHandledEventNumber = status.LastHandledEventNumber;
+ var position = status.Position;
if (status.IsResetting)
{
await ResetAsync(eventConsumer, consumerName);
- lastHandledEventNumber = -1;
+ position = null;
}
else if (status.IsStopped)
{
return;
}
- await eventStore.GetEventsAsync(se => HandleEventAsync(eventConsumer, se, consumerName), ct,
- eventConsumer.EventsFilter, lastHandledEventNumber);
+ await eventStore.GetEventsAsync(se =>
+ {
+ return HandleEventAsync(eventConsumer, se, consumerName);
+ }, ct, eventConsumer.EventsFilter, position);
}
catch (Exception ex)
{
@@ -133,8 +136,9 @@ namespace Squidex.Infrastructure.CQRS.Events
return;
}
- await DispatchConsumer(@event, eventConsumer);
- await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventNumber);
+ await DispatchConsumer(@event, eventConsumer, consumerName);
+
+ await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventPosition);
}
private async Task ResetAsync(IEventConsumer eventConsumer, string consumerName)
@@ -149,7 +153,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("eventConsumer", eventConsumer.GetType().Name));
await eventConsumer.ClearAsync();
- await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, -1);
+ await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, null);
log.LogInformation(w => w
.WriteProperty("action", "EventConsumerReset")
@@ -169,7 +173,7 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
- private async Task DispatchConsumer(Envelope @event, IEventConsumer eventConsumer)
+ private async Task DispatchConsumer(Envelope @event, IEventConsumer eventConsumer, string consumerName)
{
var eventId = @event.Headers.EventId().ToString();
var eventType = @event.Payload.GetType().Name;
@@ -181,7 +185,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
- .WriteProperty("eventConsumer", eventConsumer.GetType().Name));
+ .WriteProperty("eventConsumer", consumerName));
await eventConsumer.On(@event);
@@ -191,7 +195,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("state", "Completed")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
- .WriteProperty("eventConsumer", eventConsumer.GetType().Name));
+ .WriteProperty("eventConsumer", consumerName));
}
catch (Exception ex)
{
@@ -201,7 +205,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("state", "Started")
.WriteProperty("eventId", eventId)
.WriteProperty("eventType", eventType)
- .WriteProperty("eventConsumer", eventConsumer.GetType().Name));
+ .WriteProperty("eventConsumer", consumerName));
throw;
}
@@ -213,7 +217,7 @@ namespace Squidex.Infrastructure.CQRS.Events
{
var @event = formatter.Parse(storedEvent.Data);
- @event.SetEventNumber(storedEvent.EventNumber);
+ @event.SetEventPosition(storedEvent.EventPosition);
@event.SetEventStreamNumber(storedEvent.EventStreamNumber);
return @event;
@@ -228,7 +232,7 @@ namespace Squidex.Infrastructure.CQRS.Events
.WriteProperty("action", "ParseEvent")
.WriteProperty("state", "Failed")
.WriteProperty("eventId", storedEvent.Data.EventId.ToString())
- .WriteProperty("eventNumber", storedEvent.EventNumber));
+ .WriteProperty("eventPosition", storedEvent.EventPosition));
throw;
}
diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs
index 492c73a8b..3e3551f8c 100644
--- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs
+++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs
@@ -10,8 +10,6 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventConsumerInfo
{
- long LastHandledEventNumber { get; }
-
bool IsStopped { get; }
bool IsResetting { get; }
@@ -19,5 +17,7 @@ namespace Squidex.Infrastructure.CQRS.Events
string Name { get; }
string Error { get; }
+
+ string Position { get; }
}
}
\ No newline at end of file
diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs
index 5e77c01ae..6df6475e2 100644
--- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs
+++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs
@@ -25,6 +25,6 @@ namespace Squidex.Infrastructure.CQRS.Events
Task ResetAsync(string consumerName);
- Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber);
+ Task SetLastHandledEventNumberAsync(string consumerName, string position);
}
}
diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
index ef69f6b03..f3f6491eb 100644
--- a/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
+++ b/src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
@@ -15,10 +15,10 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public interface IEventStore
{
- IObservable GetEventsAsync(string streamFilter = null, long lastReceivedEventNumber = -1);
+ IObservable GetEventsAsync(string streamFilter = null, string position = null);
- Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1);
+ Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null);
- Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable events);
+ Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events);
}
}
diff --git a/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs b/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs
index 547956c13..2029df12e 100644
--- a/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs
+++ b/src/Squidex.Infrastructure/CQRS/Events/StoredEvent.cs
@@ -10,31 +10,32 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public sealed class StoredEvent
{
- private readonly long eventNumber;
- private readonly long eventStreamNumber;
+ private readonly string eventPosition;
+ private readonly int eventStreamNumber;
private readonly EventData data;
- public long EventNumber
+ public string EventPosition
{
- get { return eventNumber; }
+ get { return eventPosition; }
}
- public long EventStreamNumber
+ public EventData Data
{
- get { return eventStreamNumber; }
+ get { return data; }
}
- public EventData Data
+ public int EventStreamNumber
{
- get { return data; }
+ get { return eventStreamNumber; }
}
- public StoredEvent(long eventNumber, long eventStreamNumber, EventData data)
+ public StoredEvent(string eventPosition, int eventStreamNumber, EventData data)
{
+ Guard.NotNullOrEmpty(eventPosition, nameof(eventPosition));
Guard.NotNull(data, nameof(data));
this.data = data;
- this.eventNumber = eventNumber;
+ this.eventPosition = eventPosition;
this.eventStreamNumber = eventStreamNumber;
}
}
diff --git a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj
index a52fa4d98..e38f9d1ca 100644
--- a/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj
+++ b/src/Squidex.Infrastructure/Squidex.Infrastructure.csproj
@@ -11,11 +11,12 @@
-
+
+
diff --git a/src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj b/src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj
index ab6a992d6..3d222610b 100644
--- a/src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj
+++ b/src/Squidex.Read.MongoDb/Squidex.Read.MongoDb.csproj
@@ -19,6 +19,7 @@
+
diff --git a/src/Squidex.Read/Squidex.Read.csproj b/src/Squidex.Read/Squidex.Read.csproj
index 4bc4ca905..e949d599b 100644
--- a/src/Squidex.Read/Squidex.Read.csproj
+++ b/src/Squidex.Read/Squidex.Read.csproj
@@ -18,5 +18,6 @@
+
diff --git a/src/Squidex.Write/Squidex.Write.csproj b/src/Squidex.Write/Squidex.Write.csproj
index 55bc89d81..89971bac2 100644
--- a/src/Squidex.Write/Squidex.Write.csproj
+++ b/src/Squidex.Write/Squidex.Write.csproj
@@ -15,5 +15,6 @@
+
diff --git a/src/Squidex/Squidex.csproj b/src/Squidex/Squidex.csproj
index 49d21c99f..b4c6a37e9 100644
--- a/src/Squidex/Squidex.csproj
+++ b/src/Squidex/Squidex.csproj
@@ -65,13 +65,14 @@
-
+
-
+
-
-
+
+
+
diff --git a/tests/Benchmarks/Benchmarks.csproj b/tests/Benchmarks/Benchmarks.csproj
index 664de6b95..e6995c8d6 100644
--- a/tests/Benchmarks/Benchmarks.csproj
+++ b/tests/Benchmarks/Benchmarks.csproj
@@ -9,5 +9,6 @@
+
diff --git a/tests/Benchmarks/IBenchmark.cs b/tests/Benchmarks/IBenchmark.cs
index 19eff0e07..a67b4171f 100644
--- a/tests/Benchmarks/IBenchmark.cs
+++ b/tests/Benchmarks/IBenchmark.cs
@@ -5,7 +5,6 @@
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
-
namespace Benchmarks
{
public interface IBenchmark
diff --git a/tests/Benchmarks/Program.cs b/tests/Benchmarks/Program.cs
index 595223d34..5859fe367 100644
--- a/tests/Benchmarks/Program.cs
+++ b/tests/Benchmarks/Program.cs
@@ -1,4 +1,12 @@
-using System;
+// ==========================================================================
+// Program.cs
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex Group
+// All rights reserved.
+// ==========================================================================
+
+using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
@@ -10,7 +18,8 @@ namespace Benchmarks
{
private static readonly List Benchmarks = new List
{
- new AppendToEventStore()
+ new AppendToEventStore(),
+ new AppendToEventStoreParallel()
};
public static void Main(string[] args)
diff --git a/tests/Benchmarks/Properties/launchSettings.json b/tests/Benchmarks/Properties/launchSettings.json
index 2e81874bd..5d43f5fe1 100644
--- a/tests/Benchmarks/Properties/launchSettings.json
+++ b/tests/Benchmarks/Properties/launchSettings.json
@@ -2,7 +2,7 @@
"profiles": {
"Benchmarks": {
"commandName": "Project",
- "commandLineArgs": "appendToEventStore"
+ "commandLineArgs": "appendToEventStoreParallel"
}
}
}
\ No newline at end of file
diff --git a/tests/Benchmarks/Tests/AppendToEventStore.cs b/tests/Benchmarks/Tests/AppendToEventStore.cs
index 5eab44b5c..6387bce5e 100644
--- a/tests/Benchmarks/Tests/AppendToEventStore.cs
+++ b/tests/Benchmarks/Tests/AppendToEventStore.cs
@@ -7,8 +7,8 @@
// ==========================================================================
using System;
+using Benchmarks.Utils;
using MongoDB.Driver;
-using NodaTime;
using Squidex.Infrastructure;
using Squidex.Infrastructure.CQRS.Events;
using Squidex.Infrastructure.MongoDb.EventStore;
@@ -20,14 +20,6 @@ namespace Benchmarks.Tests
private IMongoClient mongoClient;
private IMongoDatabase mongoDatabase;
- private static readonly EventData EventData = new EventData
- {
- EventId = Guid.NewGuid(),
- Metadata = "EventMetdata",
- Payload = "EventPayload",
- Type = "MyEvent"
- };
-
public string Id
{
get { return "appendToEventStore"; }
@@ -46,15 +38,14 @@ namespace Benchmarks.Tests
public void RunInitialize()
{
mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString());
- mongoDatabase.CreateCollection("Test");
}
public long Run()
{
- const long numCommits = 10;
+ const long numCommits = 200;
const long eventStreams = 10;
- var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()), SystemClock.Instance);
+ var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()));
for (var streamId = 0; streamId < eventStreams; streamId++)
{
@@ -63,7 +54,7 @@ namespace Benchmarks.Tests
for (var commitId = 0; commitId < numCommits; commitId++)
{
- eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { EventData }).Wait();
+ eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait();
eventOffset++;
}
diff --git a/tests/Benchmarks/Tests/AppendToEventStoreParallel.cs b/tests/Benchmarks/Tests/AppendToEventStoreParallel.cs
new file mode 100644
index 000000000..ababb07f5
--- /dev/null
+++ b/tests/Benchmarks/Tests/AppendToEventStoreParallel.cs
@@ -0,0 +1,76 @@
+// ==========================================================================
+// AppendToEventStoreParallel.cs
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex Group
+// All rights reserved.
+// ==========================================================================
+
+using System;
+using System.Threading.Tasks;
+using Benchmarks.Utils;
+using MongoDB.Driver;
+using Squidex.Infrastructure;
+using Squidex.Infrastructure.CQRS.Events;
+using Squidex.Infrastructure.MongoDb.EventStore;
+
+namespace Benchmarks.Tests
+{
+ public sealed class AppendToEventStoreParallel : IBenchmark
+ {
+ private IMongoClient mongoClient;
+ private IMongoDatabase mongoDatabase;
+
+ public string Id
+ {
+ get { return "appendToEventStoreParallel"; }
+ }
+
+ public string Name
+ {
+ get { return "Append Events to EventStore Parallel"; }
+ }
+
+ public void Initialize()
+ {
+ mongoClient = new MongoClient("mongodb://localhost");
+ }
+
+ public void RunInitialize()
+ {
+ mongoDatabase = mongoClient.GetDatabase(Guid.NewGuid().ToString());
+ }
+
+ public long Run()
+ {
+ const long numCommits = 200;
+ const long eventStreams = 10;
+
+ var eventStore = new MongoEventStore(mongoDatabase, new DefaultEventNotifier(new InMemoryPubSub()));
+
+ Parallel.For(0, eventStreams, streamId =>
+ {
+ var eventOffset = -1;
+ var streamName = streamId.ToString();
+
+ for (var commitId = 0; commitId < numCommits; commitId++)
+ {
+ eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait();
+
+ eventOffset++;
+ }
+ });
+
+ return numCommits * eventStreams;
+ }
+
+ public void RunCleanup()
+ {
+ mongoClient.DropDatabase(mongoDatabase.DatabaseNamespace.DatabaseName);
+ }
+
+ public void Cleanup()
+ {
+ }
+ }
+}
diff --git a/tests/Benchmarks/Utils/Helper.cs b/tests/Benchmarks/Utils/Helper.cs
new file mode 100644
index 000000000..576740b70
--- /dev/null
+++ b/tests/Benchmarks/Utils/Helper.cs
@@ -0,0 +1,22 @@
+// ==========================================================================
+// Helper.cs
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex Group
+// All rights reserved.
+// ==========================================================================
+
+using System;
+using Squidex.Infrastructure.CQRS.Events;
+
+namespace Benchmarks.Utils
+{
+ public static class Helper
+ {
+ public static EventData CreateEventData()
+ {
+ return new EventData { EventId = Guid.NewGuid(), Metadata = "EventMetdata", Payload = "EventPayload", Type = "MyEvent" };
+
+ }
+ }
+}
diff --git a/tests/RunCoverage.ps1 b/tests/RunCoverage.ps1
index aa0978b6f..5e9194cca 100644
--- a/tests/RunCoverage.ps1
+++ b/tests/RunCoverage.ps1
@@ -50,6 +50,6 @@ New-Item -ItemType directory -Path $reportsFolder
-output:"$workingFolder\$reportsFolder\Read.xml" `
-oldStyle
-&"$userProfile\.nuget\packages\ReportGenerator\2.5.8\tools\ReportGenerator.exe" `
+&"$userProfile\.nuget\packages\ReportGenerator\2.5.9\tools\ReportGenerator.exe" `
-reports:"$workingFolder\$reportsFolder\*.xml" `
-targetdir:"$workingFolder\$reportsFolder\Output"
\ No newline at end of file
diff --git a/tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj b/tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj
index 27b4d85aa..1ca55e6c7 100644
--- a/tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj
+++ b/tests/Squidex.Core.Tests/Squidex.Core.Tests.csproj
@@ -10,9 +10,10 @@
-
+
-
+
+
diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs
index 167f44a9a..00c598911 100644
--- a/tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs
+++ b/tests/Squidex.Infrastructure.Tests/CQRS/Commands/DefaultDomainObjectRepositoryTests.cs
@@ -74,7 +74,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
[Fact]
public async Task Should_throw_exception_when_event_store_returns_no_events()
{
- eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(Observable.Empty());
+ eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(Observable.Empty());
await Assert.ThrowsAsync(() => sut.GetByIdAsync(aggregateId));
}
@@ -90,11 +90,11 @@ namespace Squidex.Infrastructure.CQRS.Commands
var events = new[]
{
- new StoredEvent(0, 0, eventData1),
- new StoredEvent(1, 1, eventData2)
+ new StoredEvent("0", 0, eventData1),
+ new StoredEvent("1", 1, eventData2)
};
- eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(events.ToObservable());
+ eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(events.ToObservable());
eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope(event1));
eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope(event2));
@@ -115,11 +115,11 @@ namespace Squidex.Infrastructure.CQRS.Commands
var events = new[]
{
- new StoredEvent(0, 0, eventData1),
- new StoredEvent(1, 1, eventData2)
+ new StoredEvent("0", 0, eventData1),
+ new StoredEvent("1", 1, eventData2)
};
- eventStore.Setup(x => x.GetEventsAsync(streamName, -1)).Returns(events.ToObservable());
+ eventStore.Setup(x => x.GetEventsAsync(streamName, null)).Returns(events.ToObservable());
eventDataFormatter.Setup(x => x.Parse(eventData1)).Returns(new Envelope(event1));
eventDataFormatter.Setup(x => x.Parse(eventData2)).Returns(new Envelope(event2));
@@ -141,7 +141,7 @@ namespace Squidex.Infrastructure.CQRS.Commands
eventDataFormatter.Setup(x => x.ToEventData(It.Is>(e => e.Payload == event1), commitId)).Returns(eventData1);
eventDataFormatter.Setup(x => x.ToEventData(It.Is>(e => e.Payload == event2), commitId)).Returns(eventData2);
- eventStore.Setup(x => x.AppendEventsAsync(commitId, streamName, 123, It.Is>(e => e.Count() == 2)))
+ eventStore.Setup(x => x.AppendEventsAsync(commitId, streamName, 123, It.Is>(e => e.Count() == 2)))
.Returns(TaskHelper.Done)
.Verifiable();
diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs
index c66c25fe3..f626afb6e 100644
--- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs
+++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EnvelopeExtensionsTests.cs
@@ -65,12 +65,12 @@ namespace Squidex.Infrastructure.CQRS.Events
[Fact]
public void Should_set_and_get_event_number()
{
- const int eventNumber = 123;
+ const string eventNumber = "123";
- sut.SetEventNumber(eventNumber);
+ sut.SetEventPosition(eventNumber);
- Assert.Equal(eventNumber, sut.Headers.EventNumber());
- Assert.Equal(eventNumber, sut.Headers["EventNumber"].ToInt32(culture));
+ Assert.Equal(eventNumber, sut.Headers.EventPosition());
+ Assert.Equal(eventNumber, sut.Headers["EventNumber"].ToString());
}
[Fact]
diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs
index e76e34877..d0d55be94 100644
--- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs
+++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventDataFormatterTests.cs
@@ -41,7 +41,7 @@ namespace Squidex.Infrastructure.CQRS.Events
inputEvent.SetAggregateId(Guid.NewGuid());
inputEvent.SetCommitId(commitId);
inputEvent.SetEventId(Guid.NewGuid());
- inputEvent.SetEventNumber(1);
+ inputEvent.SetEventPosition("1");
inputEvent.SetEventStreamNumber(1);
inputEvent.SetTimestamp(SystemClock.Instance.GetCurrentInstant());
diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs
index c8c6844a2..15a99eba6 100644
--- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs
+++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs
@@ -27,13 +27,15 @@ namespace Squidex.Infrastructure.CQRS.Events
private sealed class MyEventConsumerInfo : IEventConsumerInfo
{
- public long LastHandledEventNumber { get; set; }
-
public bool IsStopped { get; set; }
+
public bool IsResetting { get; set; }
public string Name { get; set; }
+
public string Error { get; set; }
+
+ public string Position { get; set; }
}
private sealed class MyEventStore : IEventStore
@@ -45,7 +47,7 @@ namespace Squidex.Infrastructure.CQRS.Events
this.storedEvents = storedEvents;
}
- public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, long lastReceivedEventNumber = -1)
+ public async Task GetEventsAsync(Func callback, CancellationToken cancellationToken, string streamFilter = null, string position = null)
{
foreach (var @event in storedEvents)
{
@@ -53,12 +55,12 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
- public IObservable GetEventsAsync(string streamFilter = null, long lastReceivedEventNumber = -1)
+ public IObservable GetEventsAsync(string streamFilter = null, string position = null)
{
throw new NotSupportedException();
}
- public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, IEnumerable events)
+ public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection events)
{
throw new NotSupportedException();
}
@@ -83,9 +85,9 @@ namespace Squidex.Infrastructure.CQRS.Events
{
var events = new[]
{
- new StoredEvent(3, 3, eventData1),
- new StoredEvent(4, 4, eventData2),
- new StoredEvent(5, 5, eventData3)
+ new StoredEvent("3", 3, eventData1),
+ new StoredEvent("4", 4, eventData2),
+ new StoredEvent("5", 5, eventData3)
};
consumerName = eventConsumer.Object.GetType().Name;
@@ -116,7 +118,7 @@ namespace Squidex.Infrastructure.CQRS.Events
[Fact]
public void Should_subscribe_to_consumer_and_handle_events()
{
- consumerInfo.LastHandledEventNumber = 2L;
+ consumerInfo.Position = "2";
sut.Subscribe(eventConsumer.Object);
sut.Next();
@@ -130,7 +132,7 @@ namespace Squidex.Infrastructure.CQRS.Events
[Fact]
public void Should_abort_if_handling_failed()
{
- consumerInfo.LastHandledEventNumber = 2L;
+ consumerInfo.Position = "2";
eventConsumer.Setup(x => x.On(envelope1)).Returns(TaskHelper.True);
eventConsumer.Setup(x => x.On(envelope2)).Throws(new InvalidOperationException());
@@ -149,7 +151,7 @@ namespace Squidex.Infrastructure.CQRS.Events
[Fact]
public void Should_abort_if_serialization_failed()
{
- consumerInfo.LastHandledEventNumber = 2L;
+ consumerInfo.Position = "2";
formatter.Setup(x => x.Parse(eventData2)).Throws(new InvalidOperationException());
@@ -168,7 +170,7 @@ namespace Squidex.Infrastructure.CQRS.Events
public void Should_reset_if_requested()
{
consumerInfo.IsResetting = true;
- consumerInfo.LastHandledEventNumber = 2L;
+ consumerInfo.Position = "2";
sut.Subscribe(eventConsumer.Object);
sut.Next();
diff --git a/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj b/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj
index f03670b95..d552c8f72 100644
--- a/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj
+++ b/tests/Squidex.Infrastructure.Tests/Squidex.Infrastructure.Tests.csproj
@@ -8,11 +8,12 @@
-
+
-
+
+
diff --git a/tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj b/tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj
index 4c7a3cb0a..9cda57a9f 100644
--- a/tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj
+++ b/tests/Squidex.Read.Tests/Squidex.Read.Tests.csproj
@@ -12,10 +12,11 @@
-
+
-
+
+
diff --git a/tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj b/tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj
index 4e86bea48..c4e6ef2e5 100644
--- a/tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj
+++ b/tests/Squidex.Write.Tests/Squidex.Write.Tests.csproj
@@ -11,10 +11,11 @@
-
+
-
+
+
diff --git a/tools/Migrate_01/Migrate_01.csproj b/tools/Migrate_01/Migrate_01.csproj
new file mode 100644
index 000000000..8403c40b3
--- /dev/null
+++ b/tools/Migrate_01/Migrate_01.csproj
@@ -0,0 +1,9 @@
+
+
+ Exe
+ netcoreapp1.1
+
+
+
+
+
diff --git a/tools/Migrate_01/Program.cs b/tools/Migrate_01/Program.cs
new file mode 100644
index 000000000..e0aed2cf0
--- /dev/null
+++ b/tools/Migrate_01/Program.cs
@@ -0,0 +1,94 @@
+// ==========================================================================
+// Program.cs
+// Squidex Headless CMS
+// ==========================================================================
+// Copyright (c) Squidex Group
+// All rights reserved.
+// ==========================================================================
+
+using System;
+using MongoDB.Bson;
+using MongoDB.Driver;
+
+namespace Migrate_01
+{
+ public class Program
+ {
+ public static void Main(string[] args)
+ {
+ Console.WriteLine("Migrate EventStore");
+
+ var mongoClient = new MongoClient(GetMongoConnectionValue());
+ var mongoDatabase = mongoClient.GetDatabase(GetMongoDatabaseValue());
+
+ var collection = mongoDatabase.GetCollection("Events");
+
+ Console.Write("Migrate Indices.....");
+
+ collection.Indexes.DropAll();
+
+ Console.WriteLine("DONE");
+
+ var query =
+ collection.Find(new BsonDocument())
+ .Project(
+ Builders.Projection.Include(Field("EventsOffset")))
+ .ToList();
+
+ Console.Write("Migrate Documents...");
+
+ foreach (var eventCommit in query)
+ {
+ var eventsOffset = (int)eventCommit["EventsOffset"].AsInt64;
+
+ var ts = new BsonTimestamp(eventsOffset + 10, 1);
+
+ collection.UpdateOne(
+ Builders.Filter
+ .Eq(Field("_id"), eventCommit["_id"].AsString),
+ Builders.Update
+ .Set(Field("Timestamp"), ts).Unset(Field("EventsOffset")));
+ }
+
+ Console.WriteLine("DONE");
+ }
+
+ private static StringFieldDefinition Field(string fieldName)
+ {
+ return new StringFieldDefinition(fieldName);
+ }
+
+ private static StringFieldDefinition Field(string fieldName)
+ {
+ return new StringFieldDefinition(fieldName);
+ }
+
+ private static string GetMongoConnectionValue()
+ {
+ Console.Write("Mongo Connection (ENTER for 'mongodb://localhost'): ");
+
+ var mongoConnection = Console.ReadLine();
+
+ if (string.IsNullOrWhiteSpace(mongoConnection))
+ {
+ mongoConnection = "mongodb://localhost";
+ }
+
+ return mongoConnection;
+ }
+
+ private static string GetMongoDatabaseValue()
+ {
+ Console.Write("Mongo Database (ENTER for 'Squidex'): ");
+
+ var mongoDatabase = Console.ReadLine();
+
+ if (string.IsNullOrWhiteSpace(mongoDatabase))
+ {
+ mongoDatabase = "Squidex";
+ }
+
+ return mongoDatabase;
+ }
+ }
+}