Browse Source

Cleaner

pull/309/head
Sebastian 8 years ago
parent
commit
ee7d692892
  1. 15
      src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs
  2. 5
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs
  3. 9
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentDraftCollection.cs
  4. 8
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs
  5. 5
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs
  6. 147
      src/Squidex.Domain.Apps.Entities/Backup/CleanerGrain.cs
  7. 17
      src/Squidex.Domain.Apps.Entities/IAppStorage.cs
  8. 2
      src/Squidex.Domain.Apps.Entities/Rules/Indexes/IRulesByAppIndex.cs
  9. 5
      src/Squidex.Domain.Apps.Entities/Rules/Indexes/RulesByAppIndexGrain.cs
  10. 2
      src/Squidex.Domain.Apps.Entities/Schemas/Indexes/ISchemasByAppIndex.cs
  11. 5
      src/Squidex.Domain.Apps.Entities/Schemas/Indexes/SchemasByAppIndexGrain.cs
  12. 2
      src/Squidex.Domain.Apps.Entities/Tags/ITagGrain.cs
  13. 5
      src/Squidex.Domain.Apps.Entities/Tags/TagGrain.cs
  14. 10
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs
  15. 14
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  16. 10
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs
  17. 8
      src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs
  18. 4
      src/Squidex.Infrastructure/EventSourcing/IEventStore.cs
  19. 2
      src/Squidex.Infrastructure/States/IPersistence{TState}.cs
  20. 2
      src/Squidex.Infrastructure/States/ISnapshotStore.cs
  21. 2
      src/Squidex.Infrastructure/States/IStore.cs
  22. 13
      src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs
  23. 5
      src/Squidex.Infrastructure/States/Store.cs

15
src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs

@ -18,11 +18,6 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
{
public sealed partial class MongoAssetRepository : ISnapshotStore<AssetState, Guid>
{
Task ISnapshotStore<AssetState, Guid>.ReadAllAsync(Func<AssetState, long, Task> callback)
{
throw new NotSupportedException();
}
public async Task<(AssetState Value, long Version)> ReadAsync(Guid key)
{
using (Profiler.TraceMethod<MongoAssetRepository>())
@ -52,5 +47,15 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
await Collection.ReplaceOneAsync(x => x.Id == key && x.Version == oldVersion, entity, Upsert);
}
}
Task ISnapshotStore<AssetState, Guid>.ReadAllAsync(Func<AssetState, long, Task> callback)
{
throw new NotSupportedException();
}
Task ISnapshotStore<AssetState, Guid>.RemoveAsync(Guid key)
{
throw new NotSupportedException();
}
}
}

5
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs

@ -117,5 +117,10 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
Filter.AnyNe(x => x.ReferencedIdsDeleted, id)),
Update.AddToSet(x => x.ReferencedIdsDeleted, id));
}
public Task RemoveAsync(Guid id)
{
return Collection.DeleteOneAsync(x => x.Id == id);
}
}
}

9
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentDraftCollection.cs

@ -58,6 +58,15 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
return ids.Except(contentEntities.Select(x => Guid.Parse(x["_id"].AsString))).ToList();
}
public async Task<IReadOnlyList<Guid>> QueryIdsAsync(Guid appId)
{
var contentEntities =
await Collection.Find(x => x.IndexedAppId == appId).Only(x => x.Id)
.ToListAsync();
return contentEntities.Select(x => Guid.Parse(x["_id"].AsString)).ToList();
}
public Task QueryScheduledWithoutDataAsync(Instant now, Func<IContentEntity, Task> callback)
{
return Collection.Find(x => x.ScheduledAt < now && x.IsDeleted != true)

8
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs

@ -99,6 +99,14 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
}
}
public async Task<IReadOnlyList<Guid>> QueryIdsAsync(Guid appId)
{
using (Profiler.TraceMethod<MongoContentRepository>())
{
return await contentsDraft.QueryIdsAsync(appId);
}
}
public async Task QueryScheduledWithoutDataAsync(Instant now, Func<IContentEntity, Task> callback)
{
using (Profiler.TraceMethod<MongoContentRepository>())

5
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository_SnapshotStore.cs

@ -83,6 +83,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
return schema;
}
Task ISnapshotStore<ContentState, Guid>.RemoveAsync(Guid key)
{
throw new NotSupportedException();
}
Task ISnapshotStore<ContentState, Guid>.ReadAllAsync(Func<ContentState, long, Task> callback)
{
throw new NotSupportedException();

147
src/Squidex.Domain.Apps.Entities/Backup/CleanerGrain.cs

@ -0,0 +1,147 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
using Orleans.Runtime;
using Squidex.Domain.Apps.Entities.Apps.State;
using Squidex.Domain.Apps.Entities.Rules;
using Squidex.Domain.Apps.Entities.Rules.State;
using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Domain.Apps.Entities.Schemas.State;
using Squidex.Infrastructure;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.States;
namespace Squidex.Domain.Apps.Entities.Backup
{
[Reentrant]
public sealed class CleanerGrain : GrainOfGuid, IRemindable
{
private readonly IGrainFactory grainFactory;
private readonly IStore<Guid> store;
private readonly IEventStore eventStore;
private readonly IEnumerable<IAppStorage> storages;
private IPersistence<State> persistence;
private bool isCleaning;
private State state = new State();
[CollectionName("Index_AppsByName")]
public sealed class State
{
public HashSet<Guid> Apps { get; set; } = new HashSet<Guid>();
public HashSet<Guid> PendingApps { get; set; } = new HashSet<Guid>();
}
public CleanerGrain(IGrainFactory grainFactory, IEventStore eventStore, IStore<Guid> store, IEnumerable<IAppStorage> storages)
{
Guard.NotNull(grainFactory, nameof(grainFactory));
Guard.NotNull(store, nameof(store));
Guard.NotNull(storages, nameof(storages));
Guard.NotNull(eventStore, nameof(eventStore));
this.grainFactory = grainFactory;
this.store = store;
this.storages = storages;
this.eventStore = eventStore;
}
public async override Task OnActivateAsync(Guid key)
{
await RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10));
persistence = store.WithSnapshots<CleanerGrain, State, Guid>(key, s =>
{
state = s;
});
await persistence.ReadAsync();
await CleanAsync();
}
public Task ReceiveReminder(string reminderName, TickStatus status)
{
return CleanAsync();
}
private async Task CleanAsync()
{
if (isCleaning)
{
return;
}
isCleaning = true;
try
{
foreach (var appId in state.Apps.ToList())
{
try
{
await CleanAsync(appId);
state.Apps.Remove(appId);
}
catch (NotSupportedException)
{
state.Apps.Remove(appId);
state.PendingApps.Add(appId);
}
finally
{
await persistence.WriteSnapshotAsync(state);
}
}
}
finally
{
isCleaning = false;
}
}
private async Task CleanAsync(Guid appId)
{
await eventStore.DeleteManyAsync("AppId", appId);
var ruleIds = await grainFactory.GetGrain<IRulesByAppIndex>(appId).GetRuleIdsAsync();
foreach (var ruleId in ruleIds)
{
await store.ClearSnapshotAsync<RuleState>(ruleId);
}
var schemaIds = await grainFactory.GetGrain<ISchemasByAppIndex>(appId).GetSchemaIdsAsync();
foreach (var schemaId in schemaIds)
{
await store.ClearSnapshotAsync<SchemaState>(schemaId);
}
foreach (var storage in storages)
{
await storage.ClearAsync(appId);
}
await store.ClearSnapshotAsync<AppState>(appId;
}
private async Task DeleteAsync<TState>(Guid id)
{
await store.ClearSnapshotAsync<TState>(id);
}
}
}

17
src/Squidex.Domain.Apps.Entities/IAppStorage.cs

@ -0,0 +1,17 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Threading.Tasks;
namespace Squidex.Domain.Apps.Entities
{
public interface IAppStorage
{
Task ClearAsync(Guid appId);
}
}

2
src/Squidex.Domain.Apps.Entities/Rules/Indexes/IRulesByAppIndex.cs

@ -20,6 +20,8 @@ namespace Squidex.Domain.Apps.Entities.Rules
Task RebuildAsync(HashSet<Guid> rules);
Task ClearAsync();
Task<List<Guid>> GetRuleIdsAsync();
}
}

5
src/Squidex.Domain.Apps.Entities/Rules/Indexes/RulesByAppIndexGrain.cs

@ -69,5 +69,10 @@ namespace Squidex.Domain.Apps.Entities.Rules.Indexes
{
return Task.FromResult(state.Rules.ToList());
}
public Task ClearAsync()
{
return persistence.DeleteAsync();
}
}
}

2
src/Squidex.Domain.Apps.Entities/Schemas/Indexes/ISchemasByAppIndex.cs

@ -18,6 +18,8 @@ namespace Squidex.Domain.Apps.Entities.Schemas
Task RemoveSchemaAsync(Guid schemaId);
Task ClearAsync();
Task RebuildAsync(Dictionary<string, Guid> schemas);
Task<Guid> GetSchemaIdAsync(string name);

5
src/Squidex.Domain.Apps.Entities/Schemas/Indexes/SchemasByAppIndexGrain.cs

@ -76,5 +76,10 @@ namespace Squidex.Domain.Apps.Entities.Schemas.Indexes
{
return Task.FromResult(state.Schemas.Values.ToList());
}
public Task ClearAsync()
{
return persistence.DeleteAsync();
}
}
}

2
src/Squidex.Domain.Apps.Entities/Tags/ITagGrain.cs

@ -20,5 +20,7 @@ namespace Squidex.Domain.Apps.Entities.Tags
Task<Dictionary<string, string>> DenormalizeTagsAsync(HashSet<string> ids);
Task<Dictionary<string, int>> GetTagsAsync();
Task ClearAsync();
}
}

5
src/Squidex.Domain.Apps.Entities/Tags/TagGrain.cs

@ -147,5 +147,10 @@ namespace Squidex.Domain.Apps.Entities.Tags
{
return Task.FromResult(state.Tags.Values.ToDictionary(x => x.Name, x => x.Count));
}
public Task ClearAsync()
{
return persistence.DeleteAsync();
}
}
}

10
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs

@ -118,6 +118,11 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
public Task DeleteStreamAsync(string streamName)
{
return connection.DeleteStreamAsync(streamName, ExpectedVersion.Any);
}
public Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
return AppendEventsInternalAsync(streamName, EtagVersion.Any, events);
@ -163,6 +168,11 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
public Task DeleteManyAsync(string property, object value)
{
throw new NotSupportedException();
}
private string GetStreamName(string streamName)
{
return $"{prefix}-{streamName}";

14
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -124,8 +124,8 @@ namespace Squidex.Infrastructure.EventSourcing
{
var filters = new List<FilterDefinition<MongoEventCommit>>();
AddPositionFilter(streamPosition, filters);
AddPropertyFitler(property, value, filters);
FilterByPosition(streamPosition, filters);
FilterByProperty(property, value, filters);
return Filter.And(filters);
}
@ -134,18 +134,18 @@ namespace Squidex.Infrastructure.EventSourcing
{
var filters = new List<FilterDefinition<MongoEventCommit>>();
AddPositionFilter(streamPosition, filters);
AddStreamFilter(streamFilter, filters);
FilterByPosition(streamPosition, filters);
FilterByStream(streamFilter, filters);
return Filter.And(filters);
}
private static void AddPropertyFitler(string property, object value, List<FilterDefinition<MongoEventCommit>> filters)
private static void FilterByProperty(string property, object value, List<FilterDefinition<MongoEventCommit>> filters)
{
filters.Add(Filter.Eq(CreateIndexPath(property), value));
}
private static void AddStreamFilter(string streamFilter, List<FilterDefinition<MongoEventCommit>> filters)
private static void FilterByStream(string streamFilter, List<FilterDefinition<MongoEventCommit>> filters)
{
if (!string.IsNullOrWhiteSpace(streamFilter) && !string.Equals(streamFilter, ".*", StringComparison.OrdinalIgnoreCase))
{
@ -160,7 +160,7 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
private static void AddPositionFilter(StreamPosition streamPosition, List<FilterDefinition<MongoEventCommit>> filters)
private static void FilterByPosition(StreamPosition streamPosition, List<FilterDefinition<MongoEventCommit>> filters)
{
if (streamPosition.IsEndOfCommit)
{

10
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs

@ -20,6 +20,16 @@ namespace Squidex.Infrastructure.EventSourcing
private const int MaxWriteAttempts = 20;
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
public Task DeleteStreamAsync(string streamName)
{
return Collection.DeleteManyAsync(x => x.EventStream == streamName);
}
public Task DeleteManyAsync(string property, object value)
{
return Collection.DeleteManyAsync(Filter.Eq(CreateIndexPath(property), value));
}
public Task AppendAsync(Guid commitId, string streamName, ICollection<EventData> events)
{
return AppendAsync(commitId, streamName, EtagVersion.Any, events);

8
src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs

@ -68,5 +68,13 @@ namespace Squidex.Infrastructure.States
await Collection.Find(new BsonDocument()).ForEachAsync(x => callback(x.Doc, x.Version));
}
}
public async Task RemoveAsync(TKey key)
{
using (Profiler.TraceMethod<MongoSnapshotStore<T, TKey>>())
{
await Collection.DeleteOneAsync(x => x.Id.Equals(key));
}
}
}
}

4
src/Squidex.Infrastructure/EventSourcing/IEventStore.cs

@ -26,6 +26,10 @@ namespace Squidex.Infrastructure.EventSourcing
Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events);
Task DeleteStreamAsync(string streamName);
Task DeleteManyAsync(string property, object value);
IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null);
}
}

2
src/Squidex.Infrastructure/States/IPersistence{TState}.cs

@ -15,6 +15,8 @@ namespace Squidex.Infrastructure.States
{
long Version { get; }
Task DeleteAsync();
Task WriteEventsAsync(IEnumerable<Envelope<IEvent>> @events);
Task WriteSnapshotAsync(TState state);

2
src/Squidex.Infrastructure/States/ISnapshotStore.cs

@ -18,6 +18,8 @@ namespace Squidex.Infrastructure.States
Task ClearAsync();
Task RemoveAsync(TKey key);
Task ReadAllAsync(Func<T, long, Task> callback);
}
}

2
src/Squidex.Infrastructure/States/IStore.cs

@ -22,5 +22,7 @@ namespace Squidex.Infrastructure.States
ISnapshotStore<TState, TKey> GetSnapshotStore<TState>();
Task ClearSnapshotsAsync<TState>();
Task ClearSnapshotAsync<TState>(TKey key);
}
}

13
src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs

@ -175,6 +175,19 @@ namespace Squidex.Infrastructure.States
UpdateVersion();
}
public async Task DeleteAsync()
{
if (UseEventSourcing())
{
await eventStore.DeleteStreamAsync(GetStreamName());
}
if (UseSnapshots())
{
await snapshotStore.RemoveAsync(ownerKey);
}
}
private EventData[] GetEventData(Envelope<IEvent>[] events, Guid commitId)
{
return @events.Select(x => eventDataFormatter.ToEventData(x, commitId, true)).ToArray();

5
src/Squidex.Infrastructure/States/Store.cs

@ -63,6 +63,11 @@ namespace Squidex.Infrastructure.States
return GetSnapshotStore<TState>().ClearAsync();
}
public Task ClearSnapshotAsync<TState>(TKey key)
{
return GetSnapshotStore<TState>().RemoveAsync(key);
}
public ISnapshotStore<TState, TKey> GetSnapshotStore<TState>()
{
return (ISnapshotStore<TState, TKey>)services.GetService(typeof(ISnapshotStore<TState, TKey>));

Loading…
Cancel
Save