diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java index 84d6d37824..2c6f6fe268 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java @@ -28,10 +28,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; @Slf4j -public abstract class AbstractActivityManager implements ActivityManager { +public abstract class AbstractActivityManager implements ActivityManager { private final ConcurrentMap states = new ConcurrentHashMap<>(); @@ -54,8 +53,6 @@ public abstract class AbstractActivityManager implements Activity protected abstract long getReportingPeriodMillis(); - protected abstract ActivityState createNewState(Key key); - protected abstract ActivityStrategy getStrategy(); protected abstract ActivityState updateState(Key key, ActivityState state); @@ -67,7 +64,7 @@ public abstract class AbstractActivityManager implements Activity protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback callback); @Override - public void onActivity(Key key, long newLastRecordedTime) { + public void onActivity(Key key, Metadata metadata, long newLastRecordedTime) { if (key == null) { log.error("Failed to process activity event: provided activity key is null."); return; @@ -77,14 +74,11 @@ public abstract class AbstractActivityManager implements Activity var shouldReport = new AtomicBoolean(false); var lastRecordedTime = new AtomicLong(); var lastReportedTime = new AtomicLong(); - var metadata = new AtomicReference(); - var activityStateWrapper = states.compute(key, (__, stateWrapper) -> { + states.compute(key, (__, stateWrapper) -> { if (stateWrapper == null) { - var newState = createNewState(key); - if (newState == null) { - return null; - } + ActivityState newState = new ActivityState<>(); + newState.setMetadata(metadata); stateWrapper = new ActivityStateWrapper(); stateWrapper.setState(newState); stateWrapper.setStrategy(getStrategy()); @@ -96,17 +90,12 @@ public abstract class AbstractActivityManager implements Activity shouldReport.set(stateWrapper.getStrategy().onActivity()); lastRecordedTime.set(state.getLastRecordedTime()); lastReportedTime.set(stateWrapper.getLastReportedTime()); - metadata.set(state.getMetadata()); return stateWrapper; }); - if (activityStateWrapper == null) { - return; - } - if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) { log.debug("Going to report first activity event for key: [{}].", key); - reportActivity(key, metadata.get(), lastRecordedTime.get(), new ActivityReportCallback<>() { + reportActivity(key, metadata, lastRecordedTime.get(), new ActivityReportCallback<>() { @Override public void onSuccess(Key key, long reportedTime) { updateLastReportedTime(key, reportedTime); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java index 0f2145ab6f..5d27738429 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java @@ -15,9 +15,9 @@ */ package org.thingsboard.server.common.transport.activity; -public interface ActivityManager { +public interface ActivityManager { - void onActivity(Key key, long activityTimeMillis); + void onActivity(Key key, Metadata metadata, long activityTimeMillis); void onReportingPeriodEnd(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index f397a6c02a..0d7fbfc332 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -96,8 +96,6 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.QueueKey; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; @@ -774,7 +772,7 @@ public class DefaultTransportService extends TransportActivityManager implements } private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { - onActivity(toSessionId(sessionInfo), getCurrentTimeMillis()); + onActivity(toSessionId(sessionInfo), sessionInfo, getCurrentTimeMillis()); } @Override diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java index 1368785466..0c9ebd0e4b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java @@ -57,17 +57,6 @@ public abstract class TransportActivityManager extends AbstractActivityManager createNewState(UUID sessionId) { - SessionMetaData session = sessions.get(sessionId); - if (session == null) { - return null; - } - ActivityState state = new ActivityState<>(); - state.setMetadata(session.getSessionInfo()); - return state; - } - @Override protected ActivityStrategy getStrategy() { return reportingStrategyType.toStrategy(); diff --git a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java index e0f5ef128e..b74f6349c8 100644 --- a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java +++ b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java @@ -175,28 +175,7 @@ public class TransportActivityManagerTest { transportServiceMock.recordActivity(sessionInfo); // THEN - verify(transportServiceMock).onActivity(SESSION_ID, expectedTime); - } - - @Test - void givenKey_whenCreatingNewState_thenShouldCorrectlyCreateNewEmptyState() { - // GIVEN - TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder() - .setSessionIdMSB(SESSION_ID.getMostSignificantBits()) - .setSessionIdLSB(SESSION_ID.getLeastSignificantBits()) - .build(); - sessions.put(SESSION_ID, new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, null)); - - when(transportServiceMock.createNewState(SESSION_ID)).thenCallRealMethod(); - - ActivityState expectedState = new ActivityState<>(); - expectedState.setMetadata(sessionInfo); - - // WHEN - ActivityState actualState = transportServiceMock.createNewState(SESSION_ID); - - // THEN - assertThat(actualState).isEqualTo(expectedState); + verify(transportServiceMock).onActivity(SESSION_ID, sessionInfo, expectedTime); } @ParameterizedTest