Browse Source

Fix new activity state never created if the underlying session got removed before activity manager tried to create a new state.

pull/10013/head
Dmytro Skarzhynets 2 years ago
committed by Dmytro Skarzhynets
parent
commit
65a825ef55
  1. 23
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java
  2. 4
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java
  3. 4
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  4. 11
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java
  5. 23
      common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java

23
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java

@ -28,10 +28,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public abstract class AbstractActivityManager<Key, Metadata> implements ActivityManager<Key> {
public abstract class AbstractActivityManager<Key, Metadata> implements ActivityManager<Key, Metadata> {
private final ConcurrentMap<Key, ActivityStateWrapper> states = new ConcurrentHashMap<>();
@ -54,8 +53,6 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
protected abstract long getReportingPeriodMillis();
protected abstract ActivityState<Metadata> createNewState(Key key);
protected abstract ActivityStrategy getStrategy();
protected abstract ActivityState<Metadata> updateState(Key key, ActivityState<Metadata> state);
@ -67,7 +64,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback<Key> callback);
@Override
public void onActivity(Key key, long newLastRecordedTime) {
public void onActivity(Key key, Metadata metadata, long newLastRecordedTime) {
if (key == null) {
log.error("Failed to process activity event: provided activity key is null.");
return;
@ -77,14 +74,11 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
var shouldReport = new AtomicBoolean(false);
var lastRecordedTime = new AtomicLong();
var lastReportedTime = new AtomicLong();
var metadata = new AtomicReference<Metadata>();
var activityStateWrapper = states.compute(key, (__, stateWrapper) -> {
states.compute(key, (__, stateWrapper) -> {
if (stateWrapper == null) {
var newState = createNewState(key);
if (newState == null) {
return null;
}
ActivityState<Metadata> newState = new ActivityState<>();
newState.setMetadata(metadata);
stateWrapper = new ActivityStateWrapper();
stateWrapper.setState(newState);
stateWrapper.setStrategy(getStrategy());
@ -96,17 +90,12 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
shouldReport.set(stateWrapper.getStrategy().onActivity());
lastRecordedTime.set(state.getLastRecordedTime());
lastReportedTime.set(stateWrapper.getLastReportedTime());
metadata.set(state.getMetadata());
return stateWrapper;
});
if (activityStateWrapper == null) {
return;
}
if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) {
log.debug("Going to report first activity event for key: [{}].", key);
reportActivity(key, metadata.get(), lastRecordedTime.get(), new ActivityReportCallback<>() {
reportActivity(key, metadata, lastRecordedTime.get(), new ActivityReportCallback<>() {
@Override
public void onSuccess(Key key, long reportedTime) {
updateLastReportedTime(key, reportedTime);

4
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java

@ -15,9 +15,9 @@
*/
package org.thingsboard.server.common.transport.activity;
public interface ActivityManager<Key> {
public interface ActivityManager<Key, Metadata> {
void onActivity(Key key, long activityTimeMillis);
void onActivity(Key key, Metadata metadata, long activityTimeMillis);
void onReportingPeriodEnd();

4
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -96,8 +96,6 @@ import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
@ -774,7 +772,7 @@ public class DefaultTransportService extends TransportActivityManager implements
}
private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) {
onActivity(toSessionId(sessionInfo), getCurrentTimeMillis());
onActivity(toSessionId(sessionInfo), sessionInfo, getCurrentTimeMillis());
}
@Override

11
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java

@ -57,17 +57,6 @@ public abstract class TransportActivityManager extends AbstractActivityManager<U
return sessionReportTimeout;
}
@Override
protected ActivityState<TransportProtos.SessionInfoProto> createNewState(UUID sessionId) {
SessionMetaData session = sessions.get(sessionId);
if (session == null) {
return null;
}
ActivityState<TransportProtos.SessionInfoProto> state = new ActivityState<>();
state.setMetadata(session.getSessionInfo());
return state;
}
@Override
protected ActivityStrategy getStrategy() {
return reportingStrategyType.toStrategy();

23
common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java

@ -175,28 +175,7 @@ public class TransportActivityManagerTest {
transportServiceMock.recordActivity(sessionInfo);
// THEN
verify(transportServiceMock).onActivity(SESSION_ID, expectedTime);
}
@Test
void givenKey_whenCreatingNewState_thenShouldCorrectlyCreateNewEmptyState() {
// GIVEN
TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder()
.setSessionIdMSB(SESSION_ID.getMostSignificantBits())
.setSessionIdLSB(SESSION_ID.getLeastSignificantBits())
.build();
sessions.put(SESSION_ID, new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, null));
when(transportServiceMock.createNewState(SESSION_ID)).thenCallRealMethod();
ActivityState<TransportProtos.SessionInfoProto> expectedState = new ActivityState<>();
expectedState.setMetadata(sessionInfo);
// WHEN
ActivityState<TransportProtos.SessionInfoProto> actualState = transportServiceMock.createNewState(SESSION_ID);
// THEN
assertThat(actualState).isEqualTo(expectedState);
verify(transportServiceMock).onActivity(SESSION_ID, sessionInfo, expectedTime);
}
@ParameterizedTest

Loading…
Cancel
Save