Browse Source

Cancellable startup.

pull/322/head
Sebastian 7 years ago
parent
commit
006768b21c
  1. 6
      src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs
  2. 13
      src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetStatsRepository.cs
  3. 5
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs
  4. 36
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentDraftCollection.cs
  5. 21
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentPublishedCollection.cs
  6. 6
      src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs
  7. 25
      src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs
  8. 15
      src/Squidex.Domain.Apps.Entities.MongoDb/Rules/MongoRuleEventRepository.cs
  9. 14
      src/Squidex.Domain.Users.MongoDb/Infrastructure/MongoPersistedGrantStore.cs
  10. 4
      src/Squidex.Domain.Users.MongoDb/MongoRoleStore.cs
  11. 18
      src/Squidex.Domain.Users.MongoDb/MongoUserStore.cs
  12. 6
      src/Squidex.Infrastructure.Azure/Assets/AzureBlobAssetStore.cs
  13. 8
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs
  14. 3
      src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs
  15. 6
      src/Squidex.Infrastructure.GoogleCloud/Assets/GoogleCloudAssetStore.cs
  16. 4
      src/Squidex.Infrastructure.MongoDb/Assets/MongoGridFsAssetStore.cs
  17. 14
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs
  18. 2
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  19. 26
      src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs
  20. 5
      src/Squidex.Infrastructure.MongoDb/UsageTracking/MongoUsageStore.cs
  21. 5
      src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs
  22. 7
      src/Squidex.Infrastructure.Redis/RedisPubSub.cs
  23. 14
      src/Squidex.Infrastructure/Assets/FolderAssetStore.cs
  24. 5
      src/Squidex.Infrastructure/IInitializable.cs
  25. 14
      src/Squidex.Infrastructure/IRunnable.cs
  26. 34
      src/Squidex.Infrastructure/Orleans/InitializerStartup.cs
  27. 47
      src/Squidex/Config/Domain/SystemExtensions.cs
  28. 6
      src/Squidex/Config/Orleans/OrleansServices.cs
  29. 24
      src/Squidex/Config/Orleans/SiloWrapper.cs
  30. 6
      src/Squidex/WebStartup.cs
  31. 2
      tests/Squidex.Infrastructure.Tests/Assets/AssetStoreTests.cs
  32. 2
      tests/Squidex.Infrastructure.Tests/Assets/FolderAssetStoreTests.cs

6
src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs

@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Domain.Apps.Entities.Assets;
@ -32,7 +33,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
return "States_Assets";
}
protected override Task SetupCollectionAsync(IMongoCollection<MongoAssetEntity> collection)
protected override Task SetupCollectionAsync(IMongoCollection<MongoAssetEntity> collection, CancellationToken ct = default(CancellationToken))
{
return collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoAssetEntity>(
@ -41,7 +42,8 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
.Ascending(x => x.IsDeleted)
.Ascending(x => x.FileName)
.Ascending(x => x.Tags)
.Descending(x => x.LastModified)));
.Descending(x => x.LastModified)),
cancellationToken: ct);
}
public async Task<IResultList<IAssetEntity>> QueryAsync(Guid appId, Query query)

13
src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetStatsRepository.cs

@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Domain.Apps.Entities.Assets;
@ -30,12 +31,14 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
return "Projections_AssetStats";
}
protected override async Task SetupCollectionAsync(IMongoCollection<MongoAssetStatsEntity> collection)
protected override Task SetupCollectionAsync(IMongoCollection<MongoAssetStatsEntity> collection, CancellationToken ct = default(CancellationToken))
{
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoAssetStatsEntity>(Index.Ascending(x => x.AssetId).Ascending(x => x.Date)));
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoAssetStatsEntity>(Index.Ascending(x => x.AssetId).Descending(x => x.Date)));
return collection.Indexes.CreateManyAsync(
new[]
{
new CreateIndexModel<MongoAssetStatsEntity>(Index.Ascending(x => x.AssetId).Ascending(x => x.Date)),
new CreateIndexModel<MongoAssetStatsEntity>(Index.Ascending(x => x.AssetId).Descending(x => x.Date))
}, ct);
}
public async Task<IReadOnlyList<IAssetStatsEntity>> QueryAsync(Guid appId, DateTime fromDate, DateTime toDate)

5
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs

@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Domain.Apps.Core.Contents;
@ -31,10 +32,10 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
this.collectionName = collectionName;
}
protected override async Task SetupCollectionAsync(IMongoCollection<MongoContentEntity> collection)
protected override async Task SetupCollectionAsync(IMongoCollection<MongoContentEntity> collection, CancellationToken ct = default(CancellationToken))
{
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoContentEntity>(Index.Ascending(x => x.ReferencedIds)));
new CreateIndexModel<MongoContentEntity>(Index.Ascending(x => x.ReferencedIds)), cancellationToken: ct);
}
protected override string CollectionName()

36
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentDraftCollection.cs

@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using NodaTime;
@ -29,24 +30,25 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
{
}
protected override async Task SetupCollectionAsync(IMongoCollection<MongoContentEntity> collection)
protected override async Task SetupCollectionAsync(IMongoCollection<MongoContentEntity> collection, CancellationToken ct = default(CancellationToken))
{
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoContentEntity>(
Index
.Ascending(x => x.IndexedSchemaId)
.Ascending(x => x.Id)
.Ascending(x => x.IsDeleted)));
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoContentEntity>(
Index
.Text(x => x.DataText)
.Ascending(x => x.IndexedSchemaId)
.Ascending(x => x.IsDeleted)
.Ascending(x => x.Status)));
await base.SetupCollectionAsync(collection);
await collection.Indexes.CreateManyAsync(
new[]
{
new CreateIndexModel<MongoContentEntity>(
Index
.Ascending(x => x.IndexedSchemaId)
.Ascending(x => x.Id)
.Ascending(x => x.IsDeleted)),
new CreateIndexModel<MongoContentEntity>(
Index
.Text(x => x.DataText)
.Ascending(x => x.IndexedSchemaId)
.Ascending(x => x.IsDeleted)
.Ascending(x => x.Status)),
}, ct);
await base.SetupCollectionAsync(collection, ct);
}
public async Task<IReadOnlyList<Guid>> QueryNotFoundAsync(Guid appId, Guid schemaId, IList<Guid> ids)

21
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentPublishedCollection.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Domain.Apps.Entities.Apps;
@ -22,18 +23,16 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
{
}
protected override async Task SetupCollectionAsync(IMongoCollection<MongoContentEntity> collection)
protected override async Task SetupCollectionAsync(IMongoCollection<MongoContentEntity> collection, CancellationToken ct = default(CancellationToken))
{
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoContentEntity>(Index.Text(x => x.DataText).Ascending(x => x.IndexedSchemaId)));
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoContentEntity>(
Index
.Ascending(x => x.IndexedSchemaId)
.Ascending(x => x.Id)));
await base.SetupCollectionAsync(collection);
await collection.Indexes.CreateManyAsync(
new[]
{
new CreateIndexModel<MongoContentEntity>(Index.Text(x => x.DataText).Ascending(x => x.IndexedSchemaId)),
new CreateIndexModel<MongoContentEntity>(Index.Ascending(x => x.IndexedSchemaId).Ascending(x => x.Id))
}, ct);
await base.SetupCollectionAsync(collection, ct);
}
public async Task<IContentEntity> FindContentAsync(IAppEntity app, ISchemaEntity schema, Guid id)

6
src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentRepository.cs

@ -7,6 +7,7 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using NodaTime;
@ -40,10 +41,9 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
this.database = database;
}
public void Initialize()
public Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
contentsDraft.Initialize();
contentsPublished.Initialize();
return Task.WhenAll(contentsDraft.InitializeAsync(ct), contentsPublished.InitializeAsync(ct));
}
public async Task<IResultList<IContentEntity>> QueryAsync(IAppEntity app, ISchemaEntity schema, Status[] status, Query query)

25
src/Squidex.Domain.Apps.Entities.MongoDb/History/MongoHistoryEventRepository.cs

@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Domain.Apps.Entities.History;
@ -52,18 +53,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.History
return "Projections_History";
}
protected override async Task SetupCollectionAsync(IMongoCollection<MongoHistoryEventEntity> collection)
protected override Task SetupCollectionAsync(IMongoCollection<MongoHistoryEventEntity> collection, CancellationToken ct = default(CancellationToken))
{
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoHistoryEventEntity>(
Index
.Ascending(x => x.AppId)
.Ascending(x => x.Channel)
.Descending(x => x.Created)
.Descending(x => x.Version)));
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoHistoryEventEntity>(Index.Ascending(x => x.Created), new CreateIndexOptions { ExpireAfter = TimeSpan.FromDays(365) }));
return collection.Indexes.CreateManyAsync(
new[]
{
new CreateIndexModel<MongoHistoryEventEntity>(
Index
.Ascending(x => x.AppId)
.Ascending(x => x.Channel)
.Descending(x => x.Created)
.Descending(x => x.Version)),
new CreateIndexModel<MongoHistoryEventEntity>(Index.Ascending(x => x.Created),
new CreateIndexOptions { ExpireAfter = TimeSpan.FromDays(365) })
}, ct);
}
public async Task<IReadOnlyList<IHistoryEventEntity>> QueryByChannelAsync(Guid appId, string channelPrefix, int count)

15
src/Squidex.Domain.Apps.Entities.MongoDb/Rules/MongoRuleEventRepository.cs

@ -32,14 +32,15 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Rules
return "RuleEvents";
}
protected override async Task SetupCollectionAsync(IMongoCollection<MongoRuleEventEntity> collection)
protected override async Task SetupCollectionAsync(IMongoCollection<MongoRuleEventEntity> collection, CancellationToken ct = default(CancellationToken))
{
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoRuleEventEntity>(Index.Ascending(x => x.NextAttempt)));
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoRuleEventEntity>(Index.Ascending(x => x.AppId).Descending(x => x.Created)));
await collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoRuleEventEntity>(Index.Ascending(x => x.Expires), new CreateIndexOptions { ExpireAfter = TimeSpan.Zero }));
await collection.Indexes.CreateManyAsync(
new[]
{
new CreateIndexModel<MongoRuleEventEntity>(Index.Ascending(x => x.NextAttempt)),
new CreateIndexModel<MongoRuleEventEntity>(Index.Ascending(x => x.AppId).Descending(x => x.Created)),
new CreateIndexModel<MongoRuleEventEntity>(Index.Ascending(x => x.Expires), new CreateIndexOptions { ExpireAfter = TimeSpan.Zero })
}, ct);
}
public Task QueryPendingAsync(Instant now, Func<IRuleEventEntity, Task> callback, CancellationToken ct = default(CancellationToken))

14
src/Squidex.Domain.Users.MongoDb/Infrastructure/MongoPersistedGrantStore.cs

@ -6,6 +6,7 @@
// ==========================================================================
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using IdentityServer4.Models;
using IdentityServer4.Stores;
@ -36,13 +37,14 @@ namespace Squidex.Domain.Users.MongoDb.Infrastructure
return "Identity_PersistedGrants";
}
protected override Task SetupCollectionAsync(IMongoCollection<PersistedGrant> collection)
protected override Task SetupCollectionAsync(IMongoCollection<PersistedGrant> collection, CancellationToken ct = default(CancellationToken))
{
return Task.WhenAll(
collection.Indexes.CreateOneAsync(
new CreateIndexModel<PersistedGrant>(Index.Ascending(x => x.ClientId))),
collection.Indexes.CreateOneAsync(
new CreateIndexModel<PersistedGrant>(Index.Ascending(x => x.SubjectId))));
return collection.Indexes.CreateManyAsync(
new[]
{
new CreateIndexModel<PersistedGrant>(Index.Ascending(x => x.ClientId)),
new CreateIndexModel<PersistedGrant>(Index.Ascending(x => x.SubjectId))
}, ct);
}
public Task StoreAsync(PersistedGrant grant)

4
src/Squidex.Domain.Users.MongoDb/MongoRoleStore.cs

@ -26,10 +26,10 @@ namespace Squidex.Domain.Users.MongoDb
return "Identity_Roles";
}
protected override Task SetupCollectionAsync(IMongoCollection<MongoRole> collection)
protected override Task SetupCollectionAsync(IMongoCollection<MongoRole> collection, CancellationToken ct = default(CancellationToken))
{
return collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoRole>(Index.Ascending(x => x.NormalizedName), new CreateIndexOptions { Unique = true }));
new CreateIndexModel<MongoRole>(Index.Ascending(x => x.NormalizedName), new CreateIndexOptions { Unique = true }), cancellationToken: ct);
}
protected override MongoCollectionSettings CollectionSettings()

18
src/Squidex.Domain.Users.MongoDb/MongoUserStore.cs

@ -46,15 +46,15 @@ namespace Squidex.Domain.Users.MongoDb
return "Identity_Users";
}
protected override Task SetupCollectionAsync(IMongoCollection<MongoUser> collection)
{
return Task.WhenAll(
collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoUser>(Index.Ascending("Logins.LoginProvider").Ascending("Logins.ProviderKey"))),
collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoUser>(Index.Ascending(x => x.NormalizedUserName), new CreateIndexOptions { Unique = true })),
collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoUser>(Index.Ascending(x => x.NormalizedEmail), new CreateIndexOptions { Unique = true })));
protected override Task SetupCollectionAsync(IMongoCollection<MongoUser> collection, CancellationToken ct = default(CancellationToken))
{
return collection.Indexes.CreateManyAsync(
new[]
{
new CreateIndexModel<MongoUser>(Index.Ascending("Logins.LoginProvider").Ascending("Logins.ProviderKey")),
new CreateIndexModel<MongoUser>(Index.Ascending(x => x.NormalizedUserName), new CreateIndexOptions { Unique = true }),
new CreateIndexModel<MongoUser>(Index.Ascending(x => x.NormalizedEmail), new CreateIndexOptions { Unique = true })
}, ct);
}
protected override MongoCollectionSettings CollectionSettings()

6
src/Squidex.Infrastructure.Azure/Assets/AzureBlobAssetStore.cs

@ -29,7 +29,7 @@ namespace Squidex.Infrastructure.Assets
this.containerName = containerName;
}
public void Initialize()
public async Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
try
{
@ -38,7 +38,7 @@ namespace Squidex.Infrastructure.Assets
var blobClient = storageAccount.CreateCloudBlobClient();
var blobReference = blobClient.GetContainerReference(containerName);
blobReference.CreateIfNotExistsAsync().Wait();
await blobReference.CreateIfNotExistsAsync();
blobContainer = blobReference;
}
@ -130,7 +130,7 @@ namespace Squidex.Infrastructure.Assets
return blob.DeleteIfExistsAsync();
}
private async Task UploadCoreAsync(string blobName, Stream stream, CancellationToken ct)
private async Task UploadCoreAsync(string blobName, Stream stream, CancellationToken ct = default(CancellationToken))
{
try
{

8
src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs

@ -34,18 +34,18 @@ namespace Squidex.Infrastructure.EventSourcing
projectionClient = new ProjectionClient(connection, prefix, projectionHost);
}
public void Initialize()
public async Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
try
{
connection.ConnectAsync().Wait();
await connection.ConnectAsync();
}
catch (Exception ex)
{
throw new ConfigurationException("Cannot connect to event store.", ex);
}
projectionClient.ConnectAsync().Wait();
await projectionClient.ConnectAsync();
}
public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string streamFilter, string position = null)
@ -82,7 +82,7 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
private async Task QueryAsync(Func<StoredEvent, Task> callback, string streamName, long sliceStart, CancellationToken ct)
private async Task QueryAsync(Func<StoredEvent, Task> callback, string streamName, long sliceStart, CancellationToken ct = default(CancellationToken))
{
StreamEventsSlice currentSlice;
do

3
src/Squidex.Infrastructure.GetEventStore/EventSourcing/ProjectionClient.cs

@ -10,6 +10,7 @@ using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
@ -103,7 +104,7 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
public async Task ConnectAsync()
public async Task ConnectAsync(CancellationToken ct = default(CancellationToken))
{
var addressParts = projectionHost.Split(':');

6
src/Squidex.Infrastructure.GoogleCloud/Assets/GoogleCloudAssetStore.cs

@ -30,13 +30,13 @@ namespace Squidex.Infrastructure.Assets
this.bucketName = bucketName;
}
public void Initialize()
public async Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
try
{
storageClient = StorageClient.Create();
storageClient.GetBucket(bucketName);
await storageClient.GetBucketAsync(bucketName, cancellationToken: ct);
}
catch (Exception ex)
{
@ -103,7 +103,7 @@ namespace Squidex.Infrastructure.Assets
return DeleteCoreAsync(fileName);
}
private async Task UploadCoreAsync(string objectName, Stream stream, CancellationToken ct)
private async Task UploadCoreAsync(string objectName, Stream stream, CancellationToken ct = default(CancellationToken))
{
try
{

4
src/Squidex.Infrastructure.MongoDb/Assets/MongoGridFsAssetStore.cs

@ -27,11 +27,11 @@ namespace Squidex.Infrastructure.Assets
this.bucket = bucket;
}
public void Initialize()
public async Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
try
{
bucket.Database.ListCollections();
await bucket.Database.ListCollectionsAsync(cancellationToken: ct);
}
catch (MongoException ex)
{

14
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs

@ -5,6 +5,7 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Driver;
@ -43,13 +44,14 @@ namespace Squidex.Infrastructure.EventSourcing
return new MongoCollectionSettings { ReadPreference = ReadPreference.Primary, WriteConcern = WriteConcern.WMajority };
}
protected override Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection)
protected override Task SetupCollectionAsync(IMongoCollection<MongoEventCommit> collection, CancellationToken ct = default(CancellationToken))
{
return Task.WhenAll(
collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoEventCommit>(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream))),
collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoEventCommit>(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true })));
return collection.Indexes.CreateManyAsync(
new[]
{
new CreateIndexModel<MongoEventCommit>(Index.Ascending(x => x.Timestamp).Ascending(x => x.EventStream)),
new CreateIndexModel<MongoEventCommit>(Index.Ascending(x => x.EventStream).Descending(x => x.EventStreamOffset), new CreateIndexOptions { Unique = true })
}, ct);
}
}
}

2
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -91,7 +91,7 @@ namespace Squidex.Infrastructure.EventSourcing
return QueryAsync(callback, lastPosition, filter, ct);
}
private async Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, FilterDefinition<MongoEventCommit> filter, CancellationToken ct)
private async Task QueryAsync(Func<StoredEvent, Task> callback, StreamPosition lastPosition, FilterDefinition<MongoEventCommit> filter, CancellationToken ct = default(CancellationToken))
{
using (Profiler.TraceMethod<MongoEventStore>())
{

26
src/Squidex.Infrastructure.MongoDb/MongoDb/MongoRepositoryBase.cs

@ -7,6 +7,7 @@
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Infrastructure.Tasks;
@ -67,21 +68,12 @@ namespace Squidex.Infrastructure.MongoDb
private Lazy<IMongoCollection<TEntity>> CreateCollection()
{
return new Lazy<IMongoCollection<TEntity>>(() =>
{
return Task.Run(async () =>
{
var databaseCollection = mongoDatabase.GetCollection<TEntity>(
CollectionName(),
CollectionSettings() ?? new MongoCollectionSettings());
await SetupCollectionAsync(databaseCollection).ConfigureAwait(false);
return databaseCollection;
}).Result;
});
mongoDatabase.GetCollection<TEntity>(
CollectionName(),
CollectionSettings() ?? new MongoCollectionSettings()));
}
protected virtual Task SetupCollectionAsync(IMongoCollection<TEntity> collection)
protected virtual Task SetupCollectionAsync(IMongoCollection<TEntity> collection, CancellationToken ct = default(CancellationToken))
{
return TaskHelper.Done;
}
@ -93,7 +85,7 @@ namespace Squidex.Infrastructure.MongoDb
await SetupCollectionAsync(Collection);
}
public async Task<bool> DropCollectionIfExistsAsync()
public async Task<bool> DropCollectionIfExistsAsync(CancellationToken ct = default(CancellationToken))
{
try
{
@ -101,6 +93,8 @@ namespace Squidex.Infrastructure.MongoDb
mongoCollection = CreateCollection();
await SetupCollectionAsync(Collection, ct);
return true;
}
catch
@ -109,11 +103,11 @@ namespace Squidex.Infrastructure.MongoDb
}
}
public void Initialize()
public async Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
try
{
Database.ListCollections();
await SetupCollectionAsync(Collection, ct);
}
catch (Exception ex)
{

5
src/Squidex.Infrastructure.MongoDb/UsageTracking/MongoUsageStore.cs

@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Driver;
using Squidex.Infrastructure.MongoDb;
@ -26,10 +27,10 @@ namespace Squidex.Infrastructure.UsageTracking
return "Usages";
}
protected override Task SetupCollectionAsync(IMongoCollection<MongoUsage> collection)
protected override Task SetupCollectionAsync(IMongoCollection<MongoUsage> collection, CancellationToken ct = default(CancellationToken))
{
return collection.Indexes.CreateOneAsync(
new CreateIndexModel<MongoUsage>(Index.Ascending(x => x.Key).Ascending(x => x.Category).Ascending(x => x.Date)));
new CreateIndexModel<MongoUsage>(Index.Ascending(x => x.Key).Ascending(x => x.Category).Ascending(x => x.Date)), cancellationToken: ct);
}
public Task TrackUsagesAsync(DateTime date, string key, string category, double count, double elapsedMs)

5
src/Squidex.Infrastructure.RabbitMq/CQRS/Events/RabbitMqEventConsumer.cs

@ -7,6 +7,7 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using RabbitMQ.Client;
@ -61,7 +62,7 @@ namespace Squidex.Infrastructure.CQRS.Events
}
}
public void Initialize()
public Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
try
{
@ -71,6 +72,8 @@ namespace Squidex.Infrastructure.CQRS.Events
{
throw new ConfigurationException($"RabbitMq event bus failed to connect to {connectionFactory.Endpoint}");
}
return TaskHelper.Done;
}
catch (Exception e)
{

7
src/Squidex.Infrastructure.Redis/RedisPubSub.cs

@ -7,7 +7,10 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.Tasks;
using StackExchange.Redis;
namespace Squidex.Infrastructure
@ -30,11 +33,13 @@ namespace Squidex.Infrastructure
redisSubscriber = new Lazy<ISubscriber>(() => redis.Value.GetSubscriber());
}
public void Initialize()
public Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
try
{
redisClient.Value.GetStatus();
return TaskHelper.Done;
}
catch (Exception ex)
{

14
src/Squidex.Infrastructure/Assets/FolderAssetStore.cs

@ -5,6 +5,7 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.IO;
using System.Linq;
using System.Threading;
@ -30,7 +31,7 @@ namespace Squidex.Infrastructure.Assets
directory = new DirectoryInfo(path);
}
public void Initialize()
public Task InitializeAsync(CancellationToken ct = default(CancellationToken))
{
try
{
@ -42,13 +43,12 @@ namespace Squidex.Infrastructure.Assets
log.LogInformation(w => w
.WriteProperty("action", "FolderAssetStoreConfigured")
.WriteProperty("path", directory.FullName));
return TaskHelper.Done;
}
catch
catch (Exception ex)
{
if (!directory.Exists)
{
throw new ConfigurationException($"Cannot access directory {directory.FullName}");
}
throw new ConfigurationException($"Cannot access directory {directory.FullName}", ex);
}
}
@ -125,7 +125,7 @@ namespace Squidex.Infrastructure.Assets
return TaskHelper.Done;
}
private static async Task UploadCoreAsync(FileInfo file, Stream stream, CancellationToken ct)
private static async Task UploadCoreAsync(FileInfo file, Stream stream, CancellationToken ct = default(CancellationToken))
{
try
{

5
src/Squidex.Infrastructure/IInitializable.cs

@ -5,10 +5,13 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Threading;
using System.Threading.Tasks;
namespace Squidex.Infrastructure
{
public interface IInitializable
{
void Initialize();
Task InitializeAsync(CancellationToken ct = default(CancellationToken));
}
}

14
src/Squidex.Infrastructure/IRunnable.cs

@ -1,14 +0,0 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschränkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Infrastructure
{
public interface IRunnable
{
void Run();
}
}

34
src/Squidex.Infrastructure/Orleans/InitializerStartup.cs

@ -0,0 +1,34 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Orleans.Runtime;
namespace Squidex.Infrastructure.Orleans
{
public sealed class InitializerStartup : IStartupTask
{
private readonly IEnumerable<IInitializable> initializables;
public InitializerStartup(IEnumerable<IInitializable> initializables)
{
Guard.NotNull(initializables, nameof(initializables));
this.initializables = initializables;
}
public async Task Execute(CancellationToken cancellationToken)
{
foreach (var initializable in initializables)
{
await initializable.InitializeAsync(cancellationToken);
}
}
}
}

47
src/Squidex/Config/Domain/SystemExtensions.cs

@ -5,9 +5,10 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using Microsoft.Extensions.DependencyInjection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Migrations;
@ -15,31 +16,47 @@ namespace Squidex.Config.Domain
{
public static class SystemExtensions
{
public static void RunInitialization(this IServiceProvider services)
public sealed class InitializeHostedService : IHostedService
{
var systems = services.GetRequiredService<IEnumerable<IInitializable>>();
private readonly IEnumerable<IInitializable> targets;
foreach (var system in systems)
public InitializeHostedService(IEnumerable<IInitializable> targets)
{
system.Initialize();
this.targets = targets;
}
}
public static void RunRunnables(this IServiceProvider services)
{
var systems = services.GetRequiredService<IEnumerable<IRunnable>>();
public async Task StartAsync(CancellationToken cancellationToken)
{
foreach (var target in targets)
{
await target.InitializeAsync(cancellationToken);
}
}
foreach (var system in systems)
public Task StopAsync(CancellationToken cancellationToken)
{
system.Run();
return Task.CompletedTask;
}
}
public static void RunMigrate(this IServiceProvider services)
public sealed class MigratorHostedService : IHostedService
{
var migrator = services.GetRequiredService<Migrator>();
private readonly Migrator migrator;
public MigratorHostedService(Migrator migrator)
{
this.migrator = migrator;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return migrator.MigrateAsync();
}
migrator.MigrateAsync().Wait();
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
}

6
src/Squidex/Config/Orleans/OrleansServices.cs

@ -6,8 +6,8 @@
// ==========================================================================
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Orleans;
using Squidex.Infrastructure;
namespace Squidex.Config.Orleans
{
@ -16,7 +16,7 @@ namespace Squidex.Config.Orleans
public static void AddOrleansSilo(this IServiceCollection services)
{
services.AddSingletonAs<SiloWrapper>()
.As<IInitializable>()
.As<IHostedService>()
.AsSelf();
services.AddServicesForSelfHostedDashboard(null, options =>
@ -24,7 +24,7 @@ namespace Squidex.Config.Orleans
options.HideTrace = true;
});
services.AddSingletonAs(c => c.GetRequiredService<SiloWrapper>().Client)
services.AddSingletonAs(c => c.GetRequiredService<IClusterClient>())
.As<IClusterClient>();
services.AddSingletonAs(c => c.GetRequiredService<SiloWrapper>().Client)

24
src/Squidex/Config/Orleans/SiloWrapper.cs

@ -7,9 +7,11 @@
using System;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Orleans.Configuration;
@ -26,7 +28,7 @@ using Squidex.Infrastructure.Tasks;
namespace Squidex.Config.Orleans
{
public sealed class SiloWrapper : DisposableObjectBase, IInitializable, IDisposable
public sealed class SiloWrapper : IHostedService
{
private readonly Lazy<ISiloHost> silo;
private readonly ISemanticLog log;
@ -61,15 +63,10 @@ namespace Squidex.Config.Orleans
.UseDashboard(options => options.HostSelf = false)
.EnableDirectClient()
.AddIncomingGrainCallFilter<LocalCacheFilter>()
.AddStartupTask<InitializerStartup>()
.AddStartupTask<Bootstrap<IContentSchedulerGrain>>()
.AddStartupTask<Bootstrap<IEventConsumerManagerGrain>>()
.AddStartupTask<Bootstrap<IRuleDequeuerGrain>>()
.AddStartupTask((services, ct) =>
{
services.RunInitialization();
return TaskHelper.Done;
})
.Configure<ClusterOptions>(options =>
{
options.Configure();
@ -145,12 +142,12 @@ namespace Squidex.Config.Orleans
});
}
public void Initialize()
public async Task StartAsync(CancellationToken cancellationToken)
{
var watch = ValueStopwatch.StartNew();
try
{
silo.Value.StartAsync().Wait();
await silo.Value.StartAsync(cancellationToken);
}
finally
{
@ -162,14 +159,11 @@ namespace Squidex.Config.Orleans
}
}
protected override void DisposeObject(bool disposing)
public async Task StopAsync(CancellationToken cancellationToken)
{
if (disposing)
if (!silo.IsValueCreated)
{
if (silo.IsValueCreated)
{
Task.Run(() => silo.Value.StopAsync()).Wait();
}
await silo.Value.StopAsync(cancellationToken);
}
}
}

6
src/Squidex/WebStartup.cs

@ -32,14 +32,14 @@ namespace Squidex
{
services.AddOrleansSilo();
services.AddAppServices(configuration);
services.AddHostedService<SystemExtensions.InitializeHostedService>();
services.AddHostedService<SystemExtensions.MigratorHostedService>();
}
public void Configure(IApplicationBuilder app)
{
app.ApplicationServices.LogConfiguration();
app.ApplicationServices.RunInitialization();
app.ApplicationServices.RunMigrate();
app.ApplicationServices.RunRunnables();
app.UseMyLocalCache();
app.UseMyCors();

2
tests/Squidex.Infrastructure.Tests/Assets/AssetStoreTests.cs

@ -23,7 +23,7 @@ namespace Squidex.Infrastructure.Assets
{
sut = new Lazy<T>(CreateStore);
((IInitializable)Sut).Initialize();
((IInitializable)Sut).InitializeAsync().Wait();
}
protected T Sut

2
tests/Squidex.Infrastructure.Tests/Assets/FolderAssetStoreTests.cs

@ -33,7 +33,7 @@ namespace Squidex.Infrastructure.Assets
[Fact]
public void Should_throw_when_creating_directory_failed()
{
Assert.Throws<ConfigurationException>(() => new FolderAssetStore(CreateInvalidPath(), A.Dummy<ISemanticLog>()).Initialize());
Assert.Throws<ConfigurationException>(() => new FolderAssetStore(CreateInvalidPath(), A.Dummy<ISemanticLog>()).InitializeAsync().Wait());
}
[Fact]

Loading…
Cancel
Save