Browse Source

Tests for dequeuer.

pull/157/head
Sebastian Stehle 8 years ago
parent
commit
43081693d8
  1. 2
      src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs
  2. 8
      src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs
  3. 40
      src/Squidex.Domain.Apps.Read.MongoDb/Rules/MongoRuleEventRepository.cs
  4. 6
      src/Squidex.Domain.Apps.Read/Rules/Repositories/IRuleEventRepository.cs
  5. 27
      src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs
  6. 128
      tests/Squidex.Domain.Apps.Read.Tests/Webhooks/WebhookDequeuerTests.cs
  7. 2
      tests/Squidex.Domain.Apps.Read.Tests/Webhooks/WebhookEnqueuerTests.cs

2
src/Squidex.Domain.Apps.Core.Model/Rules/RuleJob.cs

@ -13,7 +13,7 @@ namespace Squidex.Domain.Apps.Core.Rules
{ {
public sealed class RuleJob public sealed class RuleJob
{ {
public Guid Id { get; set; } public Guid RuleId { get; set; }
public Guid AppId { get; set; } public Guid AppId { get; set; }

8
src/Squidex.Domain.Apps.Core.Operations/HandleRules/RuleService.cs

@ -21,7 +21,7 @@ using Squidex.Infrastructure.CQRS.Events;
namespace Squidex.Domain.Apps.Core.HandleRules namespace Squidex.Domain.Apps.Core.HandleRules
{ {
public sealed class RuleService public class RuleService
{ {
private const string ContentPrefix = "Content"; private const string ContentPrefix = "Content";
private static readonly Duration ExpirationTime = Duration.FromDays(2); private static readonly Duration ExpirationTime = Duration.FromDays(2);
@ -49,7 +49,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules
this.clock = clock; this.clock = clock;
} }
public RuleJob CreateJob(Rule rule, Envelope<IEvent> @event) public virtual RuleJob CreateJob(Rule rule, Envelope<IEvent> @event)
{ {
Guard.NotNull(rule, nameof(rule)); Guard.NotNull(rule, nameof(rule));
Guard.NotNull(@event, nameof(@event)); Guard.NotNull(@event, nameof(@event));
@ -87,7 +87,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules
var job = new RuleJob var job = new RuleJob
{ {
Id = Guid.NewGuid(), RuleId = Guid.NewGuid(),
ActionName = actionName, ActionName = actionName,
ActionData = actionData.Data, ActionData = actionData.Data,
AppId = appEvent.AppId.Id, AppId = appEvent.AppId.Id,
@ -100,7 +100,7 @@ namespace Squidex.Domain.Apps.Core.HandleRules
return job; return job;
} }
public async Task<(string Dump, RuleResult Result, TimeSpan Elapsed)> InvokeAsync(string actionName, RuleJobData job) public virtual async Task<(string Dump, RuleResult Result, TimeSpan Elapsed)> InvokeAsync(string actionName, RuleJobData job)
{ {
try try
{ {

40
src/Squidex.Domain.Apps.Read.MongoDb/Rules/MongoRuleEventRepository.cs

@ -16,7 +16,6 @@ using Squidex.Domain.Apps.Core.HandleRules;
using Squidex.Domain.Apps.Core.Rules; using Squidex.Domain.Apps.Core.Rules;
using Squidex.Domain.Apps.Read.Rules; using Squidex.Domain.Apps.Read.Rules;
using Squidex.Domain.Apps.Read.Rules.Repositories; using Squidex.Domain.Apps.Read.Rules.Repositories;
using Squidex.Infrastructure;
using Squidex.Infrastructure.MongoDb; using Squidex.Infrastructure.MongoDb;
using Squidex.Infrastructure.Reflection; using Squidex.Infrastructure.Reflection;
@ -24,14 +23,9 @@ namespace Squidex.Domain.Apps.Read.MongoDb.Rules
{ {
public sealed class MongoRuleEventRepository : MongoRepositoryBase<MongoRuleEventEntity>, IRuleEventRepository public sealed class MongoRuleEventRepository : MongoRepositoryBase<MongoRuleEventEntity>, IRuleEventRepository
{ {
private readonly IClock clock; public MongoRuleEventRepository(IMongoDatabase database)
public MongoRuleEventRepository(IMongoDatabase database, IClock clock)
: base(database) : base(database)
{ {
Guard.NotNull(clock, nameof(clock));
this.clock = clock;
} }
protected override string CollectionName() protected override string CollectionName()
@ -47,10 +41,8 @@ namespace Squidex.Domain.Apps.Read.MongoDb.Rules
collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Expires), new CreateIndexOptions { ExpireAfter = TimeSpan.Zero })); collection.Indexes.CreateOneAsync(Index.Ascending(x => x.Expires), new CreateIndexOptions { ExpireAfter = TimeSpan.Zero }));
} }
public Task QueryPendingAsync(Func<IRuleEventEntity, Task> callback, CancellationToken cancellationToken = default(CancellationToken)) public Task QueryPendingAsync(Instant now, Func<IRuleEventEntity, Task> callback, CancellationToken cancellationToken = default(CancellationToken))
{ {
var now = clock.GetCurrentInstant();
return Collection.Find(x => x.NextAttempt < now && !x.IsSending).ForEachAsync(callback, cancellationToken); return Collection.Find(x => x.NextAttempt < now && !x.IsSending).ForEachAsync(callback, cancellationToken);
} }
@ -63,15 +55,6 @@ namespace Squidex.Domain.Apps.Read.MongoDb.Rules
return webhookEventEntities; return webhookEventEntities;
} }
public async Task<IRuleEventEntity> FindAsync(Guid id)
{
var webhookEventEntity =
await Collection.Find(x => x.Id == id)
.FirstOrDefaultAsync();
return webhookEventEntity;
}
public async Task<int> CountByAppAsync(Guid appId) public async Task<int> CountByAppAsync(Guid appId)
{ {
return (int)await Collection.CountAsync(x => x.AppId == appId); return (int)await Collection.CountAsync(x => x.AppId == appId);
@ -84,7 +67,7 @@ namespace Squidex.Domain.Apps.Read.MongoDb.Rules
public Task EnqueueAsync(RuleJob job, Instant nextAttempt) public Task EnqueueAsync(RuleJob job, Instant nextAttempt)
{ {
var entity = SimpleMapper.Map(job, new MongoRuleEventEntity { Created = clock.GetCurrentInstant(), NextAttempt = nextAttempt }); var entity = SimpleMapper.Map(job, new MongoRuleEventEntity { Created = nextAttempt, NextAttempt = nextAttempt });
return Collection.InsertOneIfNotExistsAsync(entity); return Collection.InsertOneIfNotExistsAsync(entity);
} }
@ -94,23 +77,8 @@ namespace Squidex.Domain.Apps.Read.MongoDb.Rules
return Collection.UpdateOneAsync(x => x.Id == jobId, Update.Set(x => x.IsSending, true)); return Collection.UpdateOneAsync(x => x.Id == jobId, Update.Set(x => x.IsSending, true));
} }
public Task MarkSentAsync(Guid jobId, string dump, RuleResult result, TimeSpan elapsed, Instant? nextAttempt) public Task MarkSentAsync(Guid jobId, string dump, RuleResult result, RuleJobResult jobResult, TimeSpan elapsed, Instant? nextAttempt)
{ {
RuleJobResult jobResult;
if (result != RuleResult.Success && nextAttempt == null)
{
jobResult = RuleJobResult.Failed;
}
else if (result != RuleResult.Success && nextAttempt.HasValue)
{
jobResult = RuleJobResult.Retry;
}
else
{
jobResult = RuleJobResult.Success;
}
return Collection.UpdateOneAsync(x => x.Id == jobId, return Collection.UpdateOneAsync(x => x.Id == jobId,
Update.Set(x => x.Result, result) Update.Set(x => x.Result, result)
.Set(x => x.LastDump, dump) .Set(x => x.LastDump, dump)

6
src/Squidex.Domain.Apps.Read/Rules/Repositories/IRuleEventRepository.cs

@ -24,14 +24,12 @@ namespace Squidex.Domain.Apps.Read.Rules.Repositories
Task MarkSendingAsync(Guid jobId); Task MarkSendingAsync(Guid jobId);
Task MarkSentAsync(Guid jobId, string dump, RuleResult result, TimeSpan elapsed, Instant? nextCall); Task MarkSentAsync(Guid jobId, string dump, RuleResult result, RuleJobResult jobResult, TimeSpan elapsed, Instant? nextCall);
Task QueryPendingAsync(Func<IRuleEventEntity, Task> callback, CancellationToken cancellationToken = default(CancellationToken)); Task QueryPendingAsync(Instant now, Func<IRuleEventEntity, Task> callback, CancellationToken cancellationToken = default(CancellationToken));
Task<int> CountByAppAsync(Guid appId); Task<int> CountByAppAsync(Guid appId);
Task<IReadOnlyList<IRuleEventEntity>> QueryByAppAsync(Guid appId, int skip = 0, int take = 20); Task<IReadOnlyList<IRuleEventEntity>> QueryByAppAsync(Guid appId, int skip = 0, int take = 20);
Task<IRuleEventEntity> FindAsync(Guid id);
} }
} }

27
src/Squidex.Domain.Apps.Read/Rules/RuleDequeuer.cs

@ -26,17 +26,21 @@ namespace Squidex.Domain.Apps.Read.Rules
private readonly IRuleEventRepository ruleEventRepository; private readonly IRuleEventRepository ruleEventRepository;
private readonly RuleService ruleService; private readonly RuleService ruleService;
private readonly CompletionTimer timer; private readonly CompletionTimer timer;
private readonly IClock clock;
private readonly ISemanticLog log; private readonly ISemanticLog log;
public RuleDequeuer(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log) public RuleDequeuer(RuleService ruleService, IRuleEventRepository ruleEventRepository, ISemanticLog log, IClock clock)
{ {
Guard.NotNull(ruleEventRepository, nameof(ruleEventRepository)); Guard.NotNull(ruleEventRepository, nameof(ruleEventRepository));
Guard.NotNull(ruleService, nameof(ruleService)); Guard.NotNull(ruleService, nameof(ruleService));
Guard.NotNull(clock, nameof(clock));
Guard.NotNull(log, nameof(log)); Guard.NotNull(log, nameof(log));
this.ruleEventRepository = ruleEventRepository; this.ruleEventRepository = ruleEventRepository;
this.ruleService = ruleService; this.ruleService = ruleService;
this.clock = clock;
this.log = log; this.log = log;
requestBlock = requestBlock =
@ -76,7 +80,9 @@ namespace Squidex.Domain.Apps.Read.Rules
{ {
try try
{ {
await ruleEventRepository.QueryPendingAsync(blockBlock.SendAsync, cancellationToken); var now = clock.GetCurrentInstant();
await ruleEventRepository.QueryPendingAsync(now, blockBlock.SendAsync, cancellationToken);
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -133,7 +139,22 @@ namespace Squidex.Domain.Apps.Read.Rules
} }
} }
await ruleEventRepository.MarkSentAsync(@event.Id, response.Dump, response.Result, response.Elapsed, nextCall); RuleJobResult jobResult;
if (response.Result != RuleResult.Success && !nextCall.HasValue)
{
jobResult = RuleJobResult.Failed;
}
else if (response.Result != RuleResult.Success && nextCall.HasValue)
{
jobResult = RuleJobResult.Retry;
}
else
{
jobResult = RuleJobResult.Success;
}
await ruleEventRepository.MarkSentAsync(@event.Id, response.Dump, response.Result, jobResult, response.Elapsed, nextCall);
} }
catch (Exception ex) catch (Exception ex)
{ {

128
tests/Squidex.Domain.Apps.Read.Tests/Webhooks/WebhookDequeuerTests.cs

@ -1,128 +0,0 @@
// ==========================================================================
// WebhookDequeuerTests.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Threading;
using System.Threading.Tasks;
using FakeItEasy;
using NodaTime;
using Squidex.Domain.Apps.Read.Webhooks.Repositories;
using Squidex.Infrastructure.Log;
using Xunit;
#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void
namespace Squidex.Domain.Apps.Read.Webhooks
{
public class WebhookDequeuerTests
{
private readonly IClock clock = A.Fake<IClock>();
private readonly IWebhookRepository webhookRepository = A.Fake<IWebhookRepository>();
private readonly IWebhookEventRepository webhookEventRepository = A.Fake<IWebhookEventRepository>();
private readonly WebhookSender webhookSender = A.Fake<WebhookSender>();
private readonly Instant now = SystemClock.Instance.GetCurrentInstant();
public WebhookDequeuerTests()
{
A.CallTo(() => clock.GetCurrentInstant()).Returns(now);
}
[Fact]
public void Should_update_repositories_on_successful_requests()
{
var @event = CreateEvent(0);
var requestResult = WebhookResult.Success;
var requestTime = TimeSpan.FromMinutes(1);
var requestDump = "Dump";
SetupSender(@event, requestDump, requestResult, requestTime);
SetupPendingEvents(@event);
var sut = new WebhookDequeuer(
webhookSender,
webhookEventRepository,
webhookRepository,
clock, A.Fake<ISemanticLog>());
sut.Next();
sut.Dispose();
VerifyRepositories(@event, requestDump, requestResult, requestTime, null);
}
[Theory]
[InlineData(0, 5)]
[InlineData(1, 60)]
[InlineData(2, 300)]
[InlineData(3, 360)]
public void Should_set_next_attempt_based_on_num_calls(int calls, int minutes)
{
var @event = CreateEvent(calls);
var requestResult = WebhookResult.Failed;
var requestTime = TimeSpan.FromMinutes(1);
var requestDump = "Dump";
SetupSender(@event, requestDump, requestResult, requestTime);
SetupPendingEvents(@event);
var sut = new WebhookDequeuer(
webhookSender,
webhookEventRepository,
webhookRepository,
clock, A.Fake<ISemanticLog>());
sut.Next();
sut.Dispose();
VerifyRepositories(@event, requestDump, requestResult, requestTime, now.Plus(Duration.FromMinutes(minutes)));
}
private void SetupSender(IWebhookEventEntity @event, string requestDump, WebhookResult requestResult, TimeSpan requestTime)
{
A.CallTo(() => webhookSender.SendAsync(@event.Job))
.Returns((requestDump, requestResult, requestTime));
}
private void SetupPendingEvents(IWebhookEventEntity @event)
{
A.CallTo(() => webhookEventRepository.QueryPendingAsync(A<Func<IWebhookEventEntity, Task>>.Ignored, A<CancellationToken>.Ignored))
.Invokes(async (Func<IWebhookEventEntity, Task> callback, CancellationToken ct) =>
{
await callback(@event);
});
}
private void VerifyRepositories(IWebhookEventEntity @event, string requestDump, WebhookResult requestResult, TimeSpan requestTime, Instant? nextAttempt)
{
A.CallTo(() => webhookEventRepository.TraceSendingAsync(@event.Id))
.MustHaveHappened();
A.CallTo(() => webhookEventRepository.TraceSendingAsync(@event.Id))
.MustHaveHappened();
A.CallTo(() => webhookEventRepository.TraceSentAsync(@event.Id, requestDump, requestResult, requestTime, nextAttempt))
.MustHaveHappened();
A.CallTo(() => webhookRepository.TraceSentAsync(@event.Job.WebhookId, requestResult, requestTime))
.MustHaveHappened();
}
private static IWebhookEventEntity CreateEvent(int numCalls)
{
var @event = A.Fake<IWebhookEventEntity>();
A.CallTo(() => @event.Id).Returns(Guid.NewGuid());
A.CallTo(() => @event.Job).Returns(new WebhookJob { WebhookId = Guid.NewGuid() });
A.CallTo(() => @event.NumCalls).Returns(numCalls);
return @event;
}
}
}

2
tests/Squidex.Domain.Apps.Read.Tests/Webhooks/WebhookEnqueuerTests.cs

@ -6,6 +6,7 @@
// All rights reserved. // All rights reserved.
// ========================================================================== // ==========================================================================
/*
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
@ -119,3 +120,4 @@ namespace Squidex.Domain.Apps.Read.Webhooks
} }
} }
} }
*/
Loading…
Cancel
Save