Browse Source

Test

pull/95/head
Sebastian Stehle 8 years ago
parent
commit
d90e5efc87
  1. 16
      src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs
  2. 88
      src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs
  3. 2
      src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs
  4. 4
      tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs
  5. 2
      tests/Benchmarks/Utils/Helper.cs
  6. 5
      tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

16
src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStore.cs

@ -80,10 +80,22 @@ namespace Squidex.Infrastructure.CQRS.Events
return result;
}
public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
return AppendEventsInternalAsync(streamName, ExpectedVersion.Any, events);
}
public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
{
Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion));
return AppendEventsInternalAsync(streamName, expectedVersion, events);
}
private async Task AppendEventsInternalAsync(string streamName, long expectedVersion, ICollection<EventData> events)
{
Guard.NotNull(events, nameof(events));
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
if (events.Count == 0)
{

88
src/Squidex.Infrastructure.MongoDb/CQRS/Events/MongoEventStore.cs

@ -25,6 +25,8 @@ namespace Squidex.Infrastructure.CQRS.Events
{
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore
{
private const long AnyVersion = long.MinValue;
private const int AppendTimeoutMs = 1000000;
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
private static readonly FieldDefinition<MongoEventCommit, BsonTimestamp> TimestampField = Fields.Build(x => x.Timestamp);
private static readonly FieldDefinition<MongoEventCommit, long> EventsCountField = Fields.Build(x => x.EventsCount);
@ -107,39 +109,44 @@ namespace Squidex.Infrastructure.CQRS.Events
}, cancellationToken);
}
public async Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
return AppendEventsInternalAsync(commitId, streamName, AnyVersion, events);
}
public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
{
Guard.GreaterEquals(expectedVersion, -1, nameof(expectedVersion));
return AppendEventsInternalAsync(commitId, streamName, expectedVersion, events);
}
private async Task AppendEventsInternalAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
Guard.NotNullOrEmpty(streamName, nameof(streamName));
Guard.NotNull(events, nameof(events));
var eventsCount = events.Count;
if (eventsCount > 0)
if (events.Count == 0)
{
var commitEvents = new MongoEvent[events.Count];
return;
}
var i = 0;
var currentVersion = await GetEventStreamOffset(streamName);
foreach (var e in events)
if (expectedVersion != AnyVersion && expectedVersion != currentVersion)
{
var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type };
commitEvents[i++] = mongoEvent;
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
try
var cts = new CancellationTokenSource(AppendTimeoutMs);
var commit = BuildCommit(commitId, streamName, expectedVersion >= -1 ? expectedVersion : currentVersion, events);
while (!cts.IsCancellationRequested)
{
var document = new MongoEventCommit
try
{
Id = commitId,
Events = commitEvents,
EventsCount = eventsCount,
EventStream = streamName,
EventStreamOffset = expectedVersion,
Timestamp = EmptyTimestamp
};
await Collection.InsertOneAsync(document);
await Collection.InsertOneAsync(commit, new InsertOneOptions(), cts.Token);
notifier.NotifyEventsStored();
}
@ -147,10 +154,21 @@ namespace Squidex.Infrastructure.CQRS.Events
{
if (ex.WriteError?.Category == ServerErrorCategory.DuplicateKey)
{
var currentVersion = await GetEventStreamOffset(streamName);
currentVersion = await GetEventStreamOffset(streamName);
if (expectedVersion != AnyVersion)
{
throw new WrongEventVersionException(currentVersion, expectedVersion);
}
else if (!cts.IsCancellationRequested)
{
commit.EventStreamOffset = currentVersion;
}
else
{
throw new TaskCanceledException("Could not acquire a free slot for the commit within the provided time.");
}
}
else
{
throw;
@ -204,5 +222,31 @@ namespace Squidex.Infrastructure.CQRS.Events
return Filter.And(filters);
}
private static MongoEventCommit BuildCommit(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
{
var commitEvents = new MongoEvent[events.Count];
var i = 0;
foreach (var e in events)
{
var mongoEvent = new MongoEvent { EventId = e.EventId, Metadata = e.Metadata, Payload = e.Payload, Type = e.Type };
commitEvents[i++] = mongoEvent;
}
var mongoCommit = new MongoEventCommit
{
Id = commitId,
Events = commitEvents,
EventsCount = events.Count,
EventStream = streamName,
EventStreamOffset = expectedVersion,
Timestamp = EmptyTimestamp
};
return mongoCommit;
}
}
}

2
src/Squidex.Infrastructure/CQRS/Events/IEventStore.cs

@ -16,6 +16,8 @@ namespace Squidex.Infrastructure.CQRS.Events
{
Task<IReadOnlyList<StoredEvent>> GetEventsAsync(string streamName);
Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events);
Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events);
IEventSubscription CreateSubscription(string streamFilter = null, string position = null);

4
tests/Benchmarks/Tests/AppendToEventStoreWithManyWriters.cs

@ -51,13 +51,11 @@ namespace Benchmarks.Tests
Parallel.For(0, numStreams, streamId =>
{
var eventOffset = -1;
var streamName = streamId.ToString();
for (var commitId = 0; commitId < numCommits; commitId++)
{
eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, eventOffset, new[] { Helper.CreateEventData() }).Wait();
eventOffset++;
eventStore.AppendEventsAsync(Guid.NewGuid(), streamName, new[] { Helper.CreateEventData() }).Wait();
}
});

2
tests/Benchmarks/Utils/Helper.cs

@ -21,7 +21,7 @@ namespace Benchmarks.Utils
public static void Warmup(this IEventStore eventStore)
{
eventStore.AppendEventsAsync(Guid.NewGuid(), "my-stream", -1, new List<EventData> { CreateEventData() }).Wait();
eventStore.AppendEventsAsync(Guid.NewGuid(), "my-stream", new List<EventData> { CreateEventData() }).Wait();
}
}
}

5
tests/Squidex.Infrastructure.Tests/CQRS/Events/EventReceiverTests.cs

@ -88,6 +88,11 @@ namespace Squidex.Infrastructure.CQRS.Events
throw new NotSupportedException();
}
public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
throw new NotSupportedException();
}
public Task AppendEventsAsync(Guid commitId, string streamName, int expectedVersion, ICollection<EventData> events)
{
throw new NotSupportedException();

Loading…
Cancel
Save