diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java index 2124eb13d2..6c41ea70ea 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java @@ -33,7 +33,6 @@ package org.thingsboard.server.common.transport.activity; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.transport.activity.strategy.ActivityStrategy; import org.thingsboard.server.queue.scheduler.SchedulerComponent; @@ -52,9 +51,6 @@ public abstract class AbstractActivityManager implements Activity @Autowired protected SchedulerComponent scheduler; - protected String name; - private boolean initialized; - @Data private class ActivityStateWrapper { @@ -64,21 +60,13 @@ public abstract class AbstractActivityManager implements Activity } - @Override - public synchronized void init(String name, long reportingPeriodMillis) { - if (!initialized) { - this.name = StringUtils.notBlankOrDefault(name, "activity-manager"); - log.info("Activity manager with name [{}] is initializing.", this.name); - if (reportingPeriodMillis <= 0) { - reportingPeriodMillis = 3000; - log.error("[{}] Negative or zero reporting period millisecond was provided. Going to use reporting period value of 3 seconds.", this.name); - } - scheduler.scheduleAtFixedRate(this::onReportingPeriodEnd, new Random().nextInt((int) reportingPeriodMillis), reportingPeriodMillis, TimeUnit.MILLISECONDS); - initialized = true; - log.info("Activity manager with name [{}] is initialized.", this.name); - } + protected void init() { + var reportingPeriodMillis = getReportingPeriodMillis(); + scheduler.scheduleAtFixedRate(this::onReportingPeriodEnd, new Random().nextInt((int) reportingPeriodMillis), reportingPeriodMillis, TimeUnit.MILLISECONDS); } + protected abstract long getReportingPeriodMillis(); + protected abstract ActivityState createNewState(Key key); protected abstract ActivityStrategy getStrategy(); @@ -93,15 +81,11 @@ public abstract class AbstractActivityManager implements Activity @Override public void onActivity(Key key) { - if (!initialized) { - log.error("[{}] Failed to process activity event: activity manager is not initialized.", name); - return; - } if (key == null) { - log.error("[{}] Failed to process activity event: provided activity key is null.", name); + log.error("Failed to process activity event: provided activity key is null."); return; } - log.debug("[{}] Received activity event for key: [{}]", name, key); + log.debug("Received activity event for key: [{}]", key); long newLastRecordedTime = System.currentTimeMillis(); var shouldReport = new AtomicBoolean(false); @@ -131,7 +115,7 @@ public abstract class AbstractActivityManager implements Activity long lastRecordedTime = activityState.getLastRecordedTime(); long lastReportedTime = activityStateWrapper.getLastReportedTime(); if (shouldReport.get() && lastReportedTime < lastRecordedTime) { - log.debug("[{}] Going to report first activity event for key: [{}].", name, key); + log.debug("Going to report first activity event for key: [{}].", key); reportActivity(key, activityState.getMetadata(), lastRecordedTime, new ActivityReportCallback<>() { @Override public void onSuccess(Key key, long reportedTime) { @@ -140,7 +124,7 @@ public abstract class AbstractActivityManager implements Activity @Override public void onFailure(Key key, Throwable t) { - log.debug("[{}] Failed to report first activity event for key: [{}].", name, key, t); + log.debug("Failed to report first activity event for key: [{}].", key, t); } }); } @@ -153,7 +137,7 @@ public abstract class AbstractActivityManager implements Activity } private void onReportingPeriodEnd() { - log.debug("[{}] Going to end reporting period.", name); + log.debug("Going to end reporting period."); for (Map.Entry entry : states.entrySet()) { var key = entry.getKey(); var stateWrapper = entry.getValue(); @@ -185,7 +169,7 @@ public abstract class AbstractActivityManager implements Activity } if (shouldReport && lastReportedTime < lastRecordedTime) { - log.debug("[{}] Going to report last activity event for key: [{}].", name, key); + log.debug("Going to report last activity event for key: [{}].", key); reportActivity(key, metadata, lastRecordedTime, new ActivityReportCallback<>() { @Override public void onSuccess(Key key, long reportedTime) { @@ -194,7 +178,7 @@ public abstract class AbstractActivityManager implements Activity @Override public void onFailure(Key key, Throwable t) { - log.debug("[{}] Failed to report last activity event for key: [{}].", name, key, t); + log.debug("Failed to report last activity event for key: [{}].", key, t); } }); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java index 847e8f4472..f8881d1b61 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java @@ -32,8 +32,6 @@ package org.thingsboard.server.common.transport.activity; public interface ActivityManager { - void init(String name, long reportingPeriodMillis); - void onActivity(Key key); long getLastRecordedTime(Key key); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index fb14936ad5..f733efef67 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -138,7 +138,6 @@ public class DefaultTransportService extends AbstractActivityManager createNewState(UUID sessionId) { SessionMetaData session = sessions.get(sessionId); @@ -827,9 +831,9 @@ public class DefaultTransportService extends AbstractActivityManager callback) { - log.debug("[{}] Reporting activity state for session with id: [{}]. Time to report: [{}].", name, sessionId, timeToReport); + log.debug("Reporting activity state for session with id: [{}]. Time to report: [{}].", sessionId, timeToReport); SessionMetaData session = sessions.get(sessionId); TransportProtos.SubscriptionInfoProto subscriptionInfo = TransportProtos.SubscriptionInfoProto.newBuilder() .setAttributeSubscription(session != null && session.isSubscribedToAttributes())