Browse Source

Fix event consumer.

pull/805/head
Sebastian 4 years ago
parent
commit
4b0356f122
  1. 13
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs
  2. 17
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
  3. 23
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs
  4. 16
      frontend/app/features/rules/pages/rule/rule-page.component.html

13
backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs

@ -97,8 +97,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
{
await foreach (var task in taskQueue.Reader.ReadAllAsync(completed.Token))
{
var scheduler = TaskScheduler.Current;
var sender = eventSubscription?.Sender;
if (sender == null)
@ -139,6 +137,17 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
{
return;
}
catch (Exception ex)
{
var sender = eventSubscription?.Sender;
if (sender != null)
{
await grain.OnErrorAsync(sender, ex);
}
throw;
}
}
public Task CompleteAsync()

17
backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

@ -61,9 +61,18 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
public async Task CompleteAsync()
{
if (currentSubscription is BatchSubscriber batchSubscriber)
{
try
{
await batchSubscriber.CompleteAsync();
}
catch (Exception ex)
{
log.LogFatal(ex, w => w
.WriteProperty("action", "CompleteConsumer")
.WriteProperty("status", "Failed"));
}
}
}
public Task<EventConsumerInfo> GetStateAsync()
@ -227,14 +236,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
await state.WriteAsync();
}
}
catch (Exception ex)
{
Unsubscribe();
State = State.Stopped(ex);
DeactivateOnIdle();
}
finally
{
semaphore.Release();

23
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs

@ -7,6 +7,7 @@
using FakeItEasy;
using FluentAssertions;
using Orleans.Storage;
using Squidex.Infrastructure.Orleans;
using Squidex.Infrastructure.TestHelpers;
using Squidex.Log;
@ -479,10 +480,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
[Fact]
public async Task Should_start_after_stop_if_handling_failed()
{
var exception = new InvalidOperationException();
var ex = new InvalidOperationException();
A.CallTo(() => eventConsumer.On(envelope))
.Throws(exception);
.Throws(ex);
await sut.ActivateAsync(consumerName);
await sut.ActivateAsync();
@ -510,6 +511,24 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
.MustHaveHappened(2, Times.Exactly);
}
[Fact]
public async Task Should_fail_if_writing_failed()
{
var ex = new InconsistentStateException();
A.CallTo(() => grainState.WriteAsync())
.Throws(ex);
await sut.ActivateAsync(consumerName);
await sut.ActivateAsync();
await OnEventAsync(eventSubscription, storedEvent);
await sut.CompleteAsync();
AssetGrainState(isStopped: true, position: storedEvent.EventPosition, error: ex.ToString(), 1);
}
private Task OnErrorAsync(IEventSubscription subscription, Exception exception)
{
return sut.OnErrorAsync(subscription, exception);

16
frontend/app/features/rules/pages/rule/rule-page.component.html

@ -14,14 +14,6 @@
</ng-container>
<ng-container menu>
<button class="btn btn-outline-secondary btn-run" *ngIf="isManual" [disabled]="!rule?.canTrigger"
(sqxConfirmClick)="trigger()"
confirmTitle="i18n:rules.triggerConfirmTitle"
confirmText="i18n:rules.triggerConfirmText"
confirmRememberKey="triggerRule">
<i class="icon-play-line"></i>
</button>
<div class="btn btn-outline-secondary btn-enabled ms-2" *ngIf="rule">
<span class="me-2" *ngIf="isEnabled">
{{ 'common.enabled' | sqxTranslate }}
@ -34,6 +26,14 @@
<sqx-toggle [(ngModel)]="isEnabled" [ngModelOptions]="{ standalone: true }" [disabled]="!isEditable"></sqx-toggle>
</div>
<button class="btn btn-outline-secondary btn-run ms-2" *ngIf="isManual" [disabled]="!rule?.canTrigger"
(sqxConfirmClick)="trigger()"
confirmTitle="i18n:rules.triggerConfirmTitle"
confirmText="i18n:rules.triggerConfirmText"
confirmRememberKey="triggerRule">
<i class="icon-play-line"></i>
</button>
<button type="button" class="btn btn-primary ms-2" (click)="save()">
{{ 'common.save' | sqxTranslate }}
</button>

Loading…
Cancel
Save