|
|
|
@ -101,7 +101,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains |
|
|
|
await DispatchAsync(events); |
|
|
|
|
|
|
|
State = State.Handled(position, events.Count); |
|
|
|
}); |
|
|
|
}, State.Position); |
|
|
|
} |
|
|
|
|
|
|
|
public Task OnErrorAsync(object sender, Exception exception) |
|
|
|
@ -116,7 +116,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains |
|
|
|
Unsubscribe(); |
|
|
|
|
|
|
|
State = State.Stopped(exception); |
|
|
|
}); |
|
|
|
}, State.Position); |
|
|
|
} |
|
|
|
|
|
|
|
public async Task ActivateAsync() |
|
|
|
@ -128,7 +128,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains |
|
|
|
Subscribe(); |
|
|
|
|
|
|
|
State = State.Started(); |
|
|
|
}); |
|
|
|
}, State.Position); |
|
|
|
} |
|
|
|
else if (!State.IsStopped) |
|
|
|
{ |
|
|
|
@ -148,7 +148,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains |
|
|
|
Subscribe(); |
|
|
|
|
|
|
|
State = State.Started(); |
|
|
|
}); |
|
|
|
}, State.Position); |
|
|
|
|
|
|
|
return CreateInfo(); |
|
|
|
} |
|
|
|
@ -165,7 +165,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains |
|
|
|
Unsubscribe(); |
|
|
|
|
|
|
|
State = State.Stopped(); |
|
|
|
}); |
|
|
|
}, State.Position); |
|
|
|
|
|
|
|
return CreateInfo(); |
|
|
|
} |
|
|
|
@ -181,7 +181,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains |
|
|
|
State = EventConsumerState.Initial; |
|
|
|
|
|
|
|
Subscribe(); |
|
|
|
}); |
|
|
|
}, State.Position); |
|
|
|
|
|
|
|
return CreateInfo(); |
|
|
|
} |
|
|
|
@ -194,17 +194,17 @@ namespace Squidex.Infrastructure.EventSourcing.Grains |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Task DoAndUpdateStateAsync(Action action, [CallerMemberName] string? caller = null) |
|
|
|
private Task DoAndUpdateStateAsync(Action action, string? position, [CallerMemberName] string? caller = null) |
|
|
|
{ |
|
|
|
return DoAndUpdateStateAsync(() => |
|
|
|
{ |
|
|
|
action(); |
|
|
|
|
|
|
|
return Task.CompletedTask; |
|
|
|
}, caller); |
|
|
|
}, position, caller); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task DoAndUpdateStateAsync(Func<Task> action, [CallerMemberName] string? caller = null) |
|
|
|
private async Task DoAndUpdateStateAsync(Func<Task> action, string? position, [CallerMemberName] string? caller = null) |
|
|
|
{ |
|
|
|
await semaphore.WaitAsync(); |
|
|
|
try |
|
|
|
@ -229,6 +229,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains |
|
|
|
log.LogFatal(ex, w => w |
|
|
|
.WriteProperty("action", caller) |
|
|
|
.WriteProperty("status", "Failed") |
|
|
|
.WriteProperty("eventPosition", position) |
|
|
|
.WriteProperty("eventConsumer", eventConsumer!.Name)); |
|
|
|
|
|
|
|
State = previousState.Stopped(ex); |
|
|
|
|