|
|
|
@ -29,6 +29,9 @@ import org.thingsboard.server.common.msg.TbActorStopReason; |
|
|
|
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|
|
|
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; |
|
|
|
|
|
|
|
import java.util.Optional; |
|
|
|
import java.util.concurrent.ScheduledFuture; |
|
|
|
|
|
|
|
/** |
|
|
|
* @author Andrew Shvayka |
|
|
|
*/ |
|
|
|
@ -41,6 +44,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP |
|
|
|
protected P processor; |
|
|
|
private long messagesProcessed; |
|
|
|
private long errorsOccurred; |
|
|
|
ScheduledFuture<?> statsScheduledFuture = null; |
|
|
|
|
|
|
|
public ComponentActor(ActorSystemContext systemContext, TenantId tenantId, T id) { |
|
|
|
super(systemContext); |
|
|
|
@ -75,7 +79,7 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP |
|
|
|
|
|
|
|
private void scheduleStatsPersistTick() { |
|
|
|
try { |
|
|
|
processor.scheduleStatsPersistTick(ctx, systemContext.getStatisticsPersistFrequency()); |
|
|
|
this.statsScheduledFuture = processor.scheduleStatsPersistTick(ctx, systemContext.getStatisticsPersistFrequency()); |
|
|
|
} catch (Exception e) { |
|
|
|
log.error("[{}][{}] Failed to schedule statistics store message. No statistics is going to be stored: {}", tenantId, id, e.getMessage()); |
|
|
|
logAndPersist("onScheduleStatsPersistMsg", e); |
|
|
|
@ -90,6 +94,8 @@ public abstract class ComponentActor<T extends EntityId, P extends ComponentMsgP |
|
|
|
processor.stop(ctx); |
|
|
|
} |
|
|
|
logLifecycleEvent(ComponentLifecycleEvent.STOPPED); |
|
|
|
Optional.ofNullable(statsScheduledFuture).ifPresent(x -> x.cancel(false)); |
|
|
|
statsScheduledFuture = null; |
|
|
|
} catch (Exception e) { |
|
|
|
log.warn("[{}][{}] Failed to stop {} processor: {}", tenantId, id, id.getEntityType(), e.getMessage()); |
|
|
|
logAndPersist("OnStop", e, true); |
|
|
|
|