Browse Source

Refactor: remove explicit init method from activity manager interface

pull/9980/head
Dmytro Skarzhynets 3 years ago
parent
commit
47c72fb4e5
  1. 40
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java
  2. 2
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java
  3. 16
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

40
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java

@ -33,7 +33,6 @@ package org.thingsboard.server.common.transport.activity;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.transport.activity.strategy.ActivityStrategy;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
@ -52,9 +51,6 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
@Autowired
protected SchedulerComponent scheduler;
protected String name;
private boolean initialized;
@Data
private class ActivityStateWrapper {
@ -64,21 +60,13 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
}
@Override
public synchronized void init(String name, long reportingPeriodMillis) {
if (!initialized) {
this.name = StringUtils.notBlankOrDefault(name, "activity-manager");
log.info("Activity manager with name [{}] is initializing.", this.name);
if (reportingPeriodMillis <= 0) {
reportingPeriodMillis = 3000;
log.error("[{}] Negative or zero reporting period millisecond was provided. Going to use reporting period value of 3 seconds.", this.name);
}
scheduler.scheduleAtFixedRate(this::onReportingPeriodEnd, new Random().nextInt((int) reportingPeriodMillis), reportingPeriodMillis, TimeUnit.MILLISECONDS);
initialized = true;
log.info("Activity manager with name [{}] is initialized.", this.name);
}
protected void init() {
var reportingPeriodMillis = getReportingPeriodMillis();
scheduler.scheduleAtFixedRate(this::onReportingPeriodEnd, new Random().nextInt((int) reportingPeriodMillis), reportingPeriodMillis, TimeUnit.MILLISECONDS);
}
protected abstract long getReportingPeriodMillis();
protected abstract ActivityState<Metadata> createNewState(Key key);
protected abstract ActivityStrategy getStrategy();
@ -93,15 +81,11 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
@Override
public void onActivity(Key key) {
if (!initialized) {
log.error("[{}] Failed to process activity event: activity manager is not initialized.", name);
return;
}
if (key == null) {
log.error("[{}] Failed to process activity event: provided activity key is null.", name);
log.error("Failed to process activity event: provided activity key is null.");
return;
}
log.debug("[{}] Received activity event for key: [{}]", name, key);
log.debug("Received activity event for key: [{}]", key);
long newLastRecordedTime = System.currentTimeMillis();
var shouldReport = new AtomicBoolean(false);
@ -131,7 +115,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
long lastRecordedTime = activityState.getLastRecordedTime();
long lastReportedTime = activityStateWrapper.getLastReportedTime();
if (shouldReport.get() && lastReportedTime < lastRecordedTime) {
log.debug("[{}] Going to report first activity event for key: [{}].", name, key);
log.debug("Going to report first activity event for key: [{}].", key);
reportActivity(key, activityState.getMetadata(), lastRecordedTime, new ActivityReportCallback<>() {
@Override
public void onSuccess(Key key, long reportedTime) {
@ -140,7 +124,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
@Override
public void onFailure(Key key, Throwable t) {
log.debug("[{}] Failed to report first activity event for key: [{}].", name, key, t);
log.debug("Failed to report first activity event for key: [{}].", key, t);
}
});
}
@ -153,7 +137,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
}
private void onReportingPeriodEnd() {
log.debug("[{}] Going to end reporting period.", name);
log.debug("Going to end reporting period.");
for (Map.Entry<Key, ActivityStateWrapper> entry : states.entrySet()) {
var key = entry.getKey();
var stateWrapper = entry.getValue();
@ -185,7 +169,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
}
if (shouldReport && lastReportedTime < lastRecordedTime) {
log.debug("[{}] Going to report last activity event for key: [{}].", name, key);
log.debug("Going to report last activity event for key: [{}].", key);
reportActivity(key, metadata, lastRecordedTime, new ActivityReportCallback<>() {
@Override
public void onSuccess(Key key, long reportedTime) {
@ -194,7 +178,7 @@ public abstract class AbstractActivityManager<Key, Metadata> implements Activity
@Override
public void onFailure(Key key, Throwable t) {
log.debug("[{}] Failed to report last activity event for key: [{}].", name, key, t);
log.debug("Failed to report last activity event for key: [{}].", key, t);
}
});
}

2
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java

@ -32,8 +32,6 @@ package org.thingsboard.server.common.transport.activity;
public interface ActivityManager<Key> {
void init(String name, long reportingPeriodMillis);
void onActivity(Key key);
long getLastRecordedTime(Key key);

16
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -138,7 +138,6 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime";
public static final String SESSION_EXPIRED_MESSAGE = "Session has expired due to last activity time!";
private static final String ACTIVITY_MANAGER_NAME = "transport-activity-manager";
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = getSessionEventMsg(TransportProtos.SessionEvent.OPEN);
public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_CLOSED = getSessionEventMsg(TransportProtos.SessionEvent.CLOSED);
public static final TransportProtos.SessionCloseNotificationProto SESSION_EXPIRED_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder()
@ -239,7 +238,7 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
@PostConstruct
public void init() {
super.init(ACTIVITY_MANAGER_NAME, sessionReportTimeout);
super.init();
this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer");
this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer");
this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer");
@ -789,6 +788,11 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
onActivity(toSessionId(sessionInfo));
}
@Override
protected long getReportingPeriodMillis() {
return sessionReportTimeout;
}
@Override
protected ActivityState<TransportProtos.SessionInfoProto> createNewState(UUID sessionId) {
SessionMetaData session = sessions.get(sessionId);
@ -827,9 +831,9 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
long lastRecordedTime = state.getLastRecordedTime();
long gwLastRecordedTime = getLastRecordedTime(gwSessionId);
log.debug("[{}] Session with id: [{}] has gateway session with id: [{}] with overwrite activity time enabled. " +
log.debug("Session with id: [{}] has gateway session with id: [{}] with overwrite activity time enabled. " +
"Updating last activity time. Session last recorded time: [{}], gateway session last recorded time: [{}].",
name, sessionId, gwSessionId, lastRecordedTime, gwLastRecordedTime);
sessionId, gwSessionId, lastRecordedTime, gwLastRecordedTime);
state.setLastRecordedTime(Math.max(lastRecordedTime, gwLastRecordedTime));
return state;
}
@ -845,7 +849,7 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
@Override
protected void onStateExpiry(UUID sessionId, TransportProtos.SessionInfoProto sessionInfo) {
log.debug("[{}] Session with id: [{}] has expired due to last activity time.", name, sessionId);
log.debug("Session with id: [{}] has expired due to last activity time.", sessionId);
SessionMetaData expiredSession = sessions.remove(sessionId);
if (expiredSession != null) {
deregisterSession(sessionInfo);
@ -856,7 +860,7 @@ public class DefaultTransportService extends AbstractActivityManager<UUID, Trans
@Override
protected void reportActivity(UUID sessionId, TransportProtos.SessionInfoProto currentSessionInfo, long timeToReport, ActivityReportCallback<UUID> callback) {
log.debug("[{}] Reporting activity state for session with id: [{}]. Time to report: [{}].", name, sessionId, timeToReport);
log.debug("Reporting activity state for session with id: [{}]. Time to report: [{}].", sessionId, timeToReport);
SessionMetaData session = sessions.get(sessionId);
TransportProtos.SubscriptionInfoProto subscriptionInfo = TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(session != null && session.isSubscribedToAttributes())

Loading…
Cancel
Save