Browse Source

Fix double sharding.

pull/1077/head
Sebastian Stehle 2 years ago
parent
commit
329ae5dd2f
  1. 7
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoShardedAssetRepository.cs
  2. 7
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoShardedContentRepository.cs
  3. 19
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/ShardedSnapshotStore.cs
  4. 14
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Text/MongoShardedTextIndex.cs
  5. 24
      backend/src/Squidex.Infrastructure/States/ShardedService.cs
  6. 4
      backend/tests/Squidex.Infrastructure.Tests/States/ShardedServiceTests.cs

7
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoShardedAssetRepository.cs

@ -16,7 +16,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets;
public sealed class MongoShardedAssetRepository : ShardedSnapshotStore<MongoAssetRepository, Asset>, IAssetRepository, IDeleter public sealed class MongoShardedAssetRepository : ShardedSnapshotStore<MongoAssetRepository, Asset>, IAssetRepository, IDeleter
{ {
public MongoShardedAssetRepository(IShardingStrategy sharding, Func<string, MongoAssetRepository> factory) public MongoShardedAssetRepository(IShardingStrategy sharding, Func<string, MongoAssetRepository> factory)
: base(sharding, factory) : base(sharding, factory, x => x.AppId.Id)
{ {
} }
@ -82,9 +82,4 @@ public sealed class MongoShardedAssetRepository : ShardedSnapshotStore<MongoAsse
{ {
return Shard(appId).StreamAll(appId, ct); return Shard(appId).StreamAll(appId, ct);
} }
protected override string GetShardKey(Asset state)
{
return GetShardKey(state.AppId.Id);
}
} }

7
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoShardedContentRepository.cs

@ -21,7 +21,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents;
public sealed class MongoShardedContentRepository : ShardedSnapshotStore<MongoContentRepository, WriteContent>, IContentRepository, IDeleter public sealed class MongoShardedContentRepository : ShardedSnapshotStore<MongoContentRepository, WriteContent>, IContentRepository, IDeleter
{ {
public MongoShardedContentRepository(IShardingStrategy sharding, Func<string, MongoContentRepository> factory) public MongoShardedContentRepository(IShardingStrategy sharding, Func<string, MongoContentRepository> factory)
: base(sharding, factory) : base(sharding, factory, x => x.AppId.Id)
{ {
} }
@ -90,9 +90,4 @@ public sealed class MongoShardedContentRepository : ShardedSnapshotStore<MongoCo
} }
} }
} }
protected override string GetShardKey(WriteContent state)
{
return GetShardKey(state.AppId.Id);
}
} }

19
backend/src/Squidex.Domain.Apps.Entities.MongoDb/ShardedSnapshotStore.cs

@ -12,19 +12,20 @@ using Squidex.Infrastructure.States;
namespace Squidex.Domain.Apps.Entities.MongoDb; namespace Squidex.Domain.Apps.Entities.MongoDb;
public abstract class ShardedSnapshotStore<T, TState> : ShardedService<T>, ISnapshotStore<TState>, IDeleter where T : ISnapshotStore<TState>, IDeleter public abstract class ShardedSnapshotStore<TStore, TState> : ShardedService<DomainId, TStore>, ISnapshotStore<TState>, IDeleter where TStore : ISnapshotStore<TState>, IDeleter
{ {
protected ShardedSnapshotStore(IShardingStrategy sharding, Func<string, T> factory) private readonly Func<TState, DomainId> getShardKey;
protected ShardedSnapshotStore(IShardingStrategy sharding, Func<string, TStore> factory, Func<TState, DomainId> getShardKey)
: base(sharding, factory) : base(sharding, factory)
{ {
this.getShardKey = getShardKey;
} }
protected abstract string GetShardKey(TState state);
public Task WriteAsync(SnapshotWriteJob<TState> job, public Task WriteAsync(SnapshotWriteJob<TState> job,
CancellationToken ct = default) CancellationToken ct = default)
{ {
var shard = Shard(GetShardKey(job.Value)); var shard = Shard(getShardKey(job.Value));
return shard.WriteAsync(job, ct); return shard.WriteAsync(job, ct);
} }
@ -77,12 +78,10 @@ public abstract class ShardedSnapshotStore<T, TState> : ShardedService<T>, ISnap
public async Task WriteManyAsync(IEnumerable<SnapshotWriteJob<TState>> jobs, public async Task WriteManyAsync(IEnumerable<SnapshotWriteJob<TState>> jobs,
CancellationToken ct = default) CancellationToken ct = default)
{ {
// Some commands might share a shared, therefore we don't group by app id. // Reduce the number of writes by grouping by shard.
foreach (var byShard in jobs.GroupBy(c => GetShardKey(c.Value))) foreach (var byShard in jobs.GroupBy(c => Shard(getShardKey(c.Value))))
{ {
var shard = Shard(byShard.Key); await byShard.Key.WriteManyAsync(byShard.ToArray(), ct);
await shard.WriteManyAsync(byShard.ToArray(), ct);
} }
} }

14
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Text/MongoShardedTextIndex.cs

@ -13,7 +13,7 @@ using Squidex.Infrastructure.States;
namespace Squidex.Domain.Apps.Entities.MongoDb.Text; namespace Squidex.Domain.Apps.Entities.MongoDb.Text;
public sealed class MongoShardedTextIndex<T> : ShardedService<MongoTextIndexBase<T>>, ITextIndex, IDeleter where T : class public sealed class MongoShardedTextIndex<T> : ShardedService<DomainId, MongoTextIndexBase<T>>, ITextIndex, IDeleter where T : class
{ {
public MongoShardedTextIndex(IShardingStrategy sharding, Func<string, MongoTextIndexBase<T>> factory) public MongoShardedTextIndex(IShardingStrategy sharding, Func<string, MongoTextIndexBase<T>> factory)
: base(sharding, factory) : base(sharding, factory)
@ -32,12 +32,10 @@ public sealed class MongoShardedTextIndex<T> : ShardedService<MongoTextIndexBase
public async Task ExecuteAsync(IndexCommand[] commands, public async Task ExecuteAsync(IndexCommand[] commands,
CancellationToken ct = default) CancellationToken ct = default)
{ {
// Some commands might share a shared, therefore we don't group by app id. // Reduce the number of writes by grouping by shard.
foreach (var byShard in commands.GroupBy(c => GetShardKey(c.UniqueContentId.AppId))) foreach (var byShard in commands.GroupBy(c => Shard(c.UniqueContentId.AppId)))
{ {
var shard = Shard(byShard.Key); await byShard.Key.ExecuteAsync(byShard.ToArray(), ct);
await shard.ExecuteAsync(byShard.ToArray(), ct);
} }
} }
@ -56,9 +54,7 @@ public sealed class MongoShardedTextIndex<T> : ShardedService<MongoTextIndexBase
public async Task DeleteAppAsync(App app, public async Task DeleteAppAsync(App app,
CancellationToken ct) CancellationToken ct)
{ {
var shard = Shard(app.Id) as IDeleter; if (Shard(app.Id) is IDeleter shard)
if (shard != null)
{ {
await shard.DeleteAppAsync(app, ct); await shard.DeleteAppAsync(app, ct);
} }

24
backend/src/Squidex.Infrastructure/States/ShardedService.cs

@ -9,15 +9,15 @@ using Squidex.Hosting;
namespace Squidex.Infrastructure.States; namespace Squidex.Infrastructure.States;
public abstract class ShardedService<T> : IInitializable public abstract class ShardedService<TKey, TService> : IInitializable where TKey : notnull
{ {
private readonly Dictionary<string, T> shards = new Dictionary<string, T>(); private readonly Dictionary<string, TService> shards = [];
private readonly IShardingStrategy sharding; private readonly IShardingStrategy sharding;
private readonly Func<string, T> factory; private readonly Func<string, TService> factory;
protected IEnumerable<T> Shards => shards.Values; protected IEnumerable<TService> Shards => shards.Values;
protected ShardedService(IShardingStrategy sharding, Func<string, T> factory) protected ShardedService(IShardingStrategy sharding, Func<string, TService> factory)
{ {
this.sharding = sharding; this.sharding = sharding;
this.factory = factory; this.factory = factory;
@ -51,18 +51,10 @@ public abstract class ShardedService<T> : IInitializable
} }
} }
protected string GetShardKey<TKey>(TKey key) where TKey : notnull protected TService Shard(TKey key)
{ {
return sharding.GetShardKey(key); var shardKey = sharding.GetShardKey(key);
}
protected T Shard<TKey>(TKey key) where TKey : notnull return shards[shardKey];
{
return shards[GetShardKey(key)];
}
protected string GetShardKey(DomainId appId)
{
return sharding.GetShardKey(appId);
} }
} }

4
backend/tests/Squidex.Infrastructure.Tests/States/ShardedServiceTests.cs

@ -19,14 +19,14 @@ public class ShardedServiceTests
{ {
} }
private class TestSut : ShardedService<IInner> private class TestSut : ShardedService<int, IInner>
{ {
public TestSut(IShardingStrategy sharding, Func<string, IInner> factory) public TestSut(IShardingStrategy sharding, Func<string, IInner> factory)
: base(sharding, factory) : base(sharding, factory)
{ {
} }
public IInner ExposeShard<TKey>(TKey key) where TKey : notnull public IInner ExposeShard(int key)
{ {
return Shard(key); return Shard(key);
} }

Loading…
Cancel
Save