Browse Source

Stream reader.

pull/311/head
Sebastian 8 years ago
parent
commit
feaf4788f6
  1. 13
      src/Squidex.Domain.Apps.Entities/Backup/BackupGrain.cs
  2. 87
      src/Squidex.Domain.Apps.Entities/Backup/EventStreamReader.cs
  3. 16
      src/Squidex.Domain.Apps.Entities/Backup/EventStreamWriter.cs
  4. 18
      src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs
  5. 85
      tests/Squidex.Domain.Apps.Entities.Tests/Backup/EventStreamTests.cs

13
src/Squidex.Domain.Apps.Entities/Backup/BackupGrain.cs

@ -26,7 +26,7 @@ using Squidex.Infrastructure.States;
namespace Squidex.Domain.Apps.Entities.Backup
{
[Reentrant]
public sealed class BackupGrain : Grain, IBackupGrain
public sealed class BackupGrain : GrainOfGuid, IBackupGrain
{
private const int MaxBackups = 10;
private static readonly Duration UpdateDuration = Duration.FromSeconds(1);
@ -69,16 +69,11 @@ namespace Squidex.Domain.Apps.Entities.Backup
this.log = log;
}
public override Task OnActivateAsync()
public override async Task OnActivateAsync(Guid key)
{
return OnActivateAsync(this.GetPrimaryKey());
}
public async Task OnActivateAsync(Guid appId)
{
this.appId = appId;
appId = key;
persistence = store.WithSnapshots<BackupState, Guid>(GetType(), appId, s => state = s);
persistence = store.WithSnapshots<BackupState, Guid>(GetType(), key, s => state = s);
await ReadAsync();
await CleanupAsync();

87
src/Squidex.Domain.Apps.Entities/Backup/EventStreamReader.cs

@ -0,0 +1,87 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.IO;
using System.IO.Compression;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
namespace Squidex.Domain.Apps.Entities.Backup
{
public sealed class EventStreamReader : DisposableObjectBase
{
private const int MaxItemsPerFolder = 1000;
private static readonly JsonSerializer JsonSerializer = JsonSerializer.CreateDefault();
private readonly ZipArchive archive;
public EventStreamReader(Stream stream)
{
archive = new ZipArchive(stream, ZipArchiveMode.Read, true);
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
archive.Dispose();
}
}
public async Task ReadEventsAsync(Func<EventData, Stream, Task> eventHandler)
{
Guard.NotNull(eventHandler, nameof(eventHandler));
var readEvents = 0;
var readAttachments = 0;
while (true)
{
var eventFolder = readEvents / MaxItemsPerFolder;
var eventPath = $"events/{eventFolder}/{readEvents}.json";
var eventEntry = archive.GetEntry(eventPath);
if (eventEntry == null)
{
break;
}
EventData eventData;
using (var stream = eventEntry.Open())
{
using (var textReader = new StreamReader(stream))
{
eventData = (EventData)JsonSerializer.Deserialize(textReader, typeof(EventData));
}
}
readEvents++;
var attachmentFolder = readAttachments / MaxItemsPerFolder;
var attachmentPath = $"attachments/{attachmentFolder}/{readEvents}.blob";
var attachmentEntry = archive.GetEntry(attachmentPath);
if (attachmentEntry != null)
{
using (var stream = attachmentEntry.Open())
{
await eventHandler(eventData, stream);
readAttachments++;
}
}
else
{
await eventHandler(eventData, null);
}
}
}
}
}

16
src/Squidex.Domain.Apps.Entities/Backup/EventStreamWriter.cs

@ -28,6 +28,14 @@ namespace Squidex.Domain.Apps.Entities.Backup
archive = new ZipArchive(stream, ZipArchiveMode.Update, true);
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
archive.Dispose();
}
}
public async Task WriteEventAsync(EventData eventData, Func<Stream, Task> attachment = null)
{
var eventObject =
@ -67,13 +75,5 @@ namespace Squidex.Domain.Apps.Entities.Backup
writtenAttachments++;
}
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
archive.Dispose();
}
}
}
}

18
src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs

@ -0,0 +1,18 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Squidex.Infrastructure.Orleans;
namespace Squidex.Domain.Apps.Entities.Backup
{
public sealed class RestoreGrain : GrainOfString
{
public RestoreGrain()
{
}
}
}

85
tests/Squidex.Domain.Apps.Entities.Tests/Backup/EventStreamTests.cs

@ -0,0 +1,85 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using FluentAssertions;
using Squidex.Infrastructure.EventSourcing;
using Xunit;
namespace Squidex.Domain.Apps.Entities.Backup
{
public class EventStreamTests
{
public sealed class EventInfo
{
public EventData Data { get; set; }
public byte[] Attachment { get; set; }
}
[Fact]
public async Task Should_write_and_read_events()
{
var stream = new MemoryStream();
var sourceEvents = new List<EventInfo>();
for (var i = 0; i < 1000; i++)
{
var eventData = new EventData { Type = i.ToString(), Metadata = i, Payload = i };
var eventInfo = new EventInfo { Data = eventData };
if (i % 10 == 0)
{
eventInfo.Attachment = new byte[] { (byte)i };
}
sourceEvents.Add(eventInfo);
}
using (var reader = new EventStreamWriter(stream))
{
foreach (var @event in sourceEvents)
{
if (@event.Attachment == null)
{
await reader.WriteEventAsync(@event.Data);
}
else
{
await reader.WriteEventAsync(@event.Data, s => s.WriteAsync(@event.Attachment, 0, 1));
}
}
}
stream.Position = 0;
var readEvents = new List<EventInfo>();
using (var reader = new EventStreamReader(stream))
{
await reader.ReadEventsAsync(async (eventData, attachment) =>
{
var eventInfo = new EventInfo { Data = eventData };
if (attachment != null)
{
eventInfo.Attachment = new byte[1];
await attachment.ReadAsync(eventInfo.Attachment, 0, 1);
}
readEvents.Add(eventInfo);
});
}
readEvents.Should().BeEquivalentTo(sourceEvents);
}
}
}
Loading…
Cancel
Save