Browse Source

Save time series strategies: ensure Device State Service is notified about inactivity timeout updates

pull/12789/head
Dmytro Skarzhynets 1 year ago
parent
commit
2f22a5e581
No known key found for this signature in database GPG Key ID: 2B51652F224037DF
  1. 4
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 4
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  3. 18
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
  4. 8
      application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java
  5. 180
      application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateManager.java
  6. 196
      application/src/main/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManager.java
  7. 3
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java
  8. 36
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  9. 36
      application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java
  10. 5
      application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java
  11. 9
      common/proto/src/main/proto/queue.proto
  12. 6
      common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java
  13. 4
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/DeviceStateManager.java
  14. 2
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  15. 4
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java
  16. 14
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java

4
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -33,7 +33,7 @@ import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.SmsService;
import org.thingsboard.rule.engine.api.notification.SlackService;
import org.thingsboard.rule.engine.api.sms.SmsSenderFactory;
@ -206,7 +206,7 @@ public class ActorSystemContext {
@Autowired(required = false)
@Getter
private RuleEngineDeviceStateManager deviceStateManager;
private DeviceStateManager deviceStateManager;
@Autowired
@Getter

4
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -29,7 +29,7 @@ import org.thingsboard.rule.engine.api.RuleEngineAlarmService;
import org.thingsboard.rule.engine.api.RuleEngineApiUsageStateService;
import org.thingsboard.rule.engine.api.RuleEngineAssetProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.RuleEngineRpcService;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.ScriptEngine;
@ -724,7 +724,7 @@ public class DefaultTbContext implements TbContext {
}
@Override
public RuleEngineDeviceStateManager getDeviceStateManager() {
public DeviceStateManager getDeviceStateManager() {
return mainCtx.getDeviceStateManager();
}

18
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java

@ -289,6 +289,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreMsg.hasDeviceInactivityMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityMsg(), callback);
} else if (toCoreMsg.hasDeviceInactivityTimeoutUpdateMsg()) {
log.trace("[{}] Forwarding message to device state service {}", id, toCoreMsg.getDeviceInactivityTimeoutUpdateMsg());
forwardToStateService(toCoreMsg.getDeviceInactivityTimeoutUpdateMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorNotification()) {
TbActorMsg actorMsg = ProtoUtils.fromProto(toCoreMsg.getToDeviceActorNotification());
if (actorMsg != null) {
@ -658,6 +661,21 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
});
}
void forwardToStateService(TransportProtos.DeviceInactivityTimeoutUpdateProto deviceInactivityTimeoutUpdateMsg, TbCallback callback) {
if (statsEnabled) {
stats.log(deviceInactivityTimeoutUpdateMsg);
}
var tenantId = toTenantId(deviceInactivityTimeoutUpdateMsg.getTenantIdMSB(), deviceInactivityTimeoutUpdateMsg.getTenantIdLSB());
var deviceId = new DeviceId(new UUID(deviceInactivityTimeoutUpdateMsg.getDeviceIdMSB(), deviceInactivityTimeoutUpdateMsg.getDeviceIdLSB()));
ListenableFuture<?> future = deviceActivityEventsExecutor.submit(() -> stateService.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, deviceInactivityTimeoutUpdateMsg.getInactivityTimeout()));
DonAsynchron.withCallback(future,
__ -> callback.onSuccess(),
t -> {
log.warn("[{}] Failed to process device inactivity timeout update message for device [{}]", tenantId.getId(), deviceId.getId(), t);
callback.onFailure(t);
});
}
private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) {
TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB());
NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB()));

8
application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java

@ -40,6 +40,7 @@ public class TbCoreConsumerStats {
public static final String DEVICE_ACTIVITIES = "deviceActivity";
public static final String DEVICE_DISCONNECTS = "deviceDisconnect";
public static final String DEVICE_INACTIVITIES = "deviceInactivity";
public static final String DEVICE_INACTIVITY_TIMEOUT_UPDATES = "deviceInactivityTimeoutUpdate";
public static final String TO_CORE_NF_OTHER = "coreNfOther"; // normally, there is no messages when codebase is fine
public static final String TO_CORE_NF_COMPONENT_LIFECYCLE = "coreNfCompLfcl";
@ -65,6 +66,7 @@ public class TbCoreConsumerStats {
private final StatsCounter deviceActivitiesCounter;
private final StatsCounter deviceDisconnectsCounter;
private final StatsCounter deviceInactivitiesCounter;
private final StatsCounter deviceInactivityTimeoutUpdatesCounter;
private final StatsCounter toCoreNfOtherCounter;
private final StatsCounter toCoreNfComponentLifecycleCounter;
@ -95,6 +97,7 @@ public class TbCoreConsumerStats {
this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES));
this.deviceDisconnectsCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_DISCONNECTS));
this.deviceInactivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_INACTIVITIES));
this.deviceInactivityTimeoutUpdatesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_INACTIVITY_TIMEOUT_UPDATES));
// Core notification counters
this.toCoreNfOtherCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_OTHER));
@ -163,6 +166,11 @@ public class TbCoreConsumerStats {
deviceInactivitiesCounter.increment();
}
public void log(TransportProtos.DeviceInactivityTimeoutUpdateProto msg) {
totalCounter.increment();
deviceInactivityTimeoutUpdatesCounter.increment();
}
public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.increment();
subscriptionMsgCounter.increment();

180
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateManager.java

@ -0,0 +1,180 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Supplier;
@Slf4j
@Service
@RequiredArgsConstructor
public class DefaultDeviceStateManager implements DeviceStateManager {
private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService;
private final Optional<DeviceStateService> deviceStateService;
private final TbClusterService clusterService;
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device connect event to local service. Connect time: [{}].", tenantId.getId(), deviceId.getId(), connectTime);
deviceStateService.onDeviceConnect(tenantId, deviceId, connectTime);
},
() -> {
log.debug("[{}][{}] Sending device connect message to core. Connect time: [{}].", tenantId.getId(), deviceId.getId(), connectTime);
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(connectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
}, callback);
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device activity event to local service. Activity time: [{}].", tenantId.getId(), deviceId.getId(), activityTime);
deviceStateService.onDeviceActivity(tenantId, deviceId, activityTime);
},
() -> {
log.debug("[{}][{}] Sending device activity message to core. Activity time: [{}].", tenantId.getId(), deviceId.getId(), activityTime);
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(activityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
}, callback);
}
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device disconnect event to local service. Disconnect time: [{}].", tenantId.getId(), deviceId.getId(), disconnectTime);
deviceStateService.onDeviceDisconnect(tenantId, deviceId, disconnectTime);
},
() -> {
log.debug("[{}][{}] Sending device disconnect message to core. Disconnect time: [{}].", tenantId.getId(), deviceId.getId(), disconnectTime);
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(disconnectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
}, callback);
}
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device inactivity event to local service. Inactivity time: [{}].", tenantId.getId(), deviceId.getId(), inactivityTime);
deviceStateService.onDeviceInactivity(tenantId, deviceId, inactivityTime);
},
() -> {
log.debug("[{}][{}] Sending device inactivity message to core. Inactivity time: [{}].", tenantId.getId(), deviceId.getId(), inactivityTime);
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(inactivityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
}, callback);
}
@Override
public void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout, TbCallback callback) {
forwardToDeviceStateService(tenantId, deviceId,
deviceStateService -> {
log.debug("[{}][{}] Forwarding device inactivity timeout update to local service. Updated inactivity timeout: [{}].", tenantId.getId(), deviceId.getId(), inactivityTimeout);
deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, inactivityTimeout);
},
() -> {
log.debug("[{}][{}] Sending device inactivity timeout update message to core. Updated inactivity timeout: [{}].", tenantId.getId(), deviceId.getId(), inactivityTimeout);
var deviceInactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(inactivityTimeout)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityTimeoutUpdateMsg(deviceInactivityTimeoutUpdateMsg)
.build();
}, callback);
}
private void forwardToDeviceStateService(
TenantId tenantId, DeviceId deviceId,
Consumer<DeviceStateService> toDeviceStateService,
Supplier<TransportProtos.ToCoreMsg> toCore,
TbCallback callback
) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
if (serviceInfoProvider.isService(ServiceType.TB_CORE) && tpi.isMyPartition() && deviceStateService.isPresent()) {
try {
toDeviceStateService.accept(deviceStateService.get());
} catch (Exception e) {
log.error("[{}][{}] Failed to process device connectivity event.", tenantId.getId(), deviceId.getId(), e);
callback.onFailure(e);
return;
}
callback.onSuccess();
} else {
TransportProtos.ToCoreMsg toCoreMsg = toCore.get();
clusterService.pushMsgToCore(tpi, deviceId.getId(), toCoreMsg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
}
}

196
application/src/main/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManager.java

@ -1,196 +0,0 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.state;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@Service
@TbRuleEngineComponent
public class DefaultRuleEngineDeviceStateManager implements RuleEngineDeviceStateManager {
private final TbServiceInfoProvider serviceInfoProvider;
private final PartitionService partitionService;
private final Optional<DeviceStateService> deviceStateService;
private final TbClusterService clusterService;
public DefaultRuleEngineDeviceStateManager(
TbServiceInfoProvider serviceInfoProvider, PartitionService partitionService,
Optional<DeviceStateService> deviceStateServiceOptional, TbClusterService clusterService
) {
this.serviceInfoProvider = serviceInfoProvider;
this.partitionService = partitionService;
this.deviceStateService = deviceStateServiceOptional;
this.clusterService = clusterService;
}
@Getter
private abstract static class ConnectivityEventInfo {
private final TenantId tenantId;
private final DeviceId deviceId;
private final long eventTime;
private ConnectivityEventInfo(TenantId tenantId, DeviceId deviceId, long eventTime) {
this.tenantId = tenantId;
this.deviceId = deviceId;
this.eventTime = eventTime;
}
abstract void forwardToLocalService();
abstract TransportProtos.ToCoreMsg toQueueMsg();
}
@Override
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, connectTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceConnect(tenantId, deviceId, connectTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastConnectTime(connectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceConnectMsg(deviceConnectMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long activityTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, activityTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceActivity(tenantId, deviceId, activityTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastActivityTime(activityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceActivityMsg(deviceActivityMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId, long disconnectTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, disconnectTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceDisconnect(tenantId, deviceId, disconnectTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastDisconnectTime(disconnectTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceDisconnectMsg(deviceDisconnectMsg)
.build();
}
}, callback);
}
@Override
public void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback) {
routeEvent(new ConnectivityEventInfo(tenantId, deviceId, inactivityTime) {
@Override
void forwardToLocalService() {
deviceStateService.ifPresent(service -> service.onDeviceInactivity(tenantId, deviceId, inactivityTime));
}
@Override
TransportProtos.ToCoreMsg toQueueMsg() {
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setLastInactivityTime(inactivityTime)
.build();
return TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityMsg(deviceInactivityMsg)
.build();
}
}, callback);
}
private void routeEvent(ConnectivityEventInfo eventInfo, TbCallback callback) {
var tenantId = eventInfo.getTenantId();
var deviceId = eventInfo.getDeviceId();
long eventTime = eventInfo.getEventTime();
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
if (serviceInfoProvider.isService(ServiceType.TB_CORE) && tpi.isMyPartition() && deviceStateService.isPresent()) {
log.debug("[{}][{}] Forwarding device connectivity event to local service. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime);
try {
eventInfo.forwardToLocalService();
} catch (Exception e) {
log.error("[{}][{}] Failed to process device connectivity event. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime, e);
callback.onFailure(e);
return;
}
callback.onSuccess();
} else {
TransportProtos.ToCoreMsg msg = eventInfo.toQueueMsg();
log.debug("[{}][{}] Sending device connectivity message to core. Event time: [{}].", tenantId.getId(), deviceId.getId(), eventTime);
clusterService.pushMsgToCore(tpi, UUID.randomUUID(), msg, new SimpleTbQueueCallback(__ -> callback.onSuccess(), callback::onFailure));
}
}
}

3
application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java

@ -161,9 +161,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
@Override
public void onTimeSeriesUpdate(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, TbCallback callback) {
onTimeSeriesUpdate(entityId, ts);
if (entityId.getEntityType() == EntityType.DEVICE) {
updateDeviceInactivityTimeout(tenantId, entityId, ts);
}
callback.onSuccess();
}

36
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -31,6 +31,7 @@ import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
@ -38,9 +39,11 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.msg.queue.TbCallback;
@ -50,6 +53,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.util.KvUtils;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import java.util.ArrayList;
@ -75,6 +79,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
private final TbEntityViewService tbEntityViewService;
private final TbApiUsageReportClient apiUsageClient;
private final TbApiUsageStateService apiUsageStateService;
private final DeviceStateManager deviceStateManager;
private ExecutorService tsCallBackExecutor;
@ -85,12 +90,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
TimeseriesService tsService,
@Lazy TbEntityViewService tbEntityViewService,
TbApiUsageReportClient apiUsageClient,
TbApiUsageStateService apiUsageStateService) {
TbApiUsageStateService apiUsageStateService,
DeviceStateManager deviceStateManager) {
this.attrService = attrService;
this.tsService = tsService;
this.tbEntityViewService = tbEntityViewService;
this.apiUsageClient = apiUsageClient;
this.apiUsageStateService = apiUsageStateService;
this.deviceStateManager = deviceStateManager;
}
@PostConstruct
@ -146,6 +153,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
saveFuture = Futures.immediateFuture(0);
}
if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest()) {
findNewInactivityTimeout(request.getEntries()).ifPresent(newInactivityTimeout ->
addMainCallback(saveFuture, __ -> deviceStateManager.onDeviceInactivityTimeoutUpdate(
tenantId, new DeviceId(entityId.getId()), newInactivityTimeout, TbCallback.EMPTY)
)
);
}
addMainCallback(saveFuture, request.getCallback());
if (strategy.sendWsUpdate()) {
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
@ -156,6 +171,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
return saveFuture;
}
private static Optional<Long> findNewInactivityTimeout(List<TsKvEntry> entries) {
return entries.stream()
.filter(entry -> Objects.equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT, entry.getKey()))
.findFirst()
.map(DefaultTelemetrySubscriptionService::parseAsLong);
}
private static long parseAsLong(KvEntry kve) {
try {
return Long.parseLong(kve.getValueAsString());
} catch (NumberFormatException e) {
return 0L;
}
}
@Override
public void saveAttributes(AttributesSaveRequest request) {
checkInternalEntity(request.getEntityId());
@ -312,6 +342,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addMainCallback(saveFuture, result -> callback.onSuccess(null), callback::onFailure);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, Consumer<S> onSuccess) {
addMainCallback(saveFuture, onSuccess, null);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, Consumer<S> onSuccess, Consumer<Throwable> onFailure) {
DonAsynchron.withCallback(saveFuture, onSuccess, onFailure, tsCallBackExecutor);
}

36
application/src/test/java/org/thingsboard/server/service/state/DefaultRuleEngineDeviceStateManagerTest.java → application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java

@ -52,7 +52,7 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
public class DefaultRuleEngineDeviceStateManagerTest {
public class DefaultDeviceStateManagerTest {
@Mock
private DeviceStateService deviceStateServiceMock;
@ -71,7 +71,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
@Captor
private ArgumentCaptor<TbQueueCallback> queueCallbackCaptor;
private DefaultRuleEngineDeviceStateManager deviceStateManager;
private DefaultDeviceStateManager deviceStateManager;
private static final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("57ab2e6c-bc4c-11ee-a506-0242ac120002"));
private static final DeviceId DEVICE_ID = DeviceId.fromString("74a9053e-bc4c-11ee-a506-0242ac120002");
@ -82,7 +82,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
@BeforeEach
public void setup() {
deviceStateManager = new DefaultRuleEngineDeviceStateManager(serviceInfoProviderMock, partitionServiceMock, Optional.of(deviceStateServiceMock), clusterServiceMock);
deviceStateManager = new DefaultDeviceStateManager(serviceInfoProviderMock, partitionServiceMock, Optional.of(deviceStateServiceMock), clusterServiceMock);
}
@ParameterizedTest
@ -90,7 +90,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
"when onDeviceX() is called, then should route event to local service and call onSuccess() callback.")
@MethodSource
public void givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback(
BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback> onDeviceAction, Consumer<DeviceStateService> actionVerification
BiConsumer<DefaultDeviceStateManager, TbCallback> onDeviceAction, Consumer<DeviceStateService> actionVerification
) {
// GIVEN
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true);
@ -109,19 +109,19 @@ public class DefaultRuleEngineDeviceStateManagerTest {
private static Stream<Arguments> givenRoutedToLocalAndProcessingSuccess_whenOnDeviceAction_thenShouldCallLocalServiceAndSuccessCallback() {
return Stream.of(
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
@ -132,7 +132,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
"when onDeviceX() is called, then should route event to local service and call onFailure() callback.")
@MethodSource
public void givenRoutedToLocalAndProcessingFailure_whenOnDeviceAction_thenShouldCallLocalServiceAndFailureCallback(
Consumer<DeviceStateService> exceptionThrowSetup, BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback> onDeviceAction, Consumer<DeviceStateService> actionVerification
Consumer<DeviceStateService> exceptionThrowSetup, BiConsumer<DefaultDeviceStateManager, TbCallback> onDeviceAction, Consumer<DeviceStateService> actionVerification
) {
// GIVEN
given(serviceInfoProviderMock.isService(ServiceType.TB_CORE)).willReturn(true);
@ -155,22 +155,22 @@ public class DefaultRuleEngineDeviceStateManagerTest {
return Stream.of(
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
@ -181,7 +181,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
"when onDeviceX() is called, then should send correct queue message to external service with correct callback object.")
@MethodSource
public void givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback(
BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback> onDeviceAction, BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>> actionVerification
BiConsumer<DefaultDeviceStateManager, TbCallback> onDeviceAction, BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>> actionVerification
) {
// WHEN
ReflectionTestUtils.setField(deviceStateManager, "deviceStateService", Optional.empty());
@ -203,7 +203,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
private static Stream<Arguments> givenRoutedToExternal_whenOnDeviceAction_thenShouldSendQueueMsgToExternalServiceWithCorrectCallback() {
return Stream.of(
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceConnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceConnectMsg = TransportProtos.DeviceConnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
@ -219,7 +219,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
}
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceActivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
@ -235,7 +235,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
}
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceDisconnect(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceDisconnectMsg = TransportProtos.DeviceDisconnectProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
@ -251,7 +251,7 @@ public class DefaultRuleEngineDeviceStateManagerTest {
}
),
Arguments.of(
(BiConsumer<DefaultRuleEngineDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceInactivityMsg = TransportProtos.DeviceInactivityProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())

5
application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java

@ -29,6 +29,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
@ -125,12 +126,14 @@ class DefaultTelemetrySubscriptionServiceTest {
TbApiUsageReportClient apiUsageClient;
@Mock
TbApiUsageStateService apiUsageStateService;
@Mock
DeviceStateManager deviceStateManager;
DefaultTelemetrySubscriptionService telemetryService;
@BeforeEach
void setup() {
telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService);
telemetryService = new DefaultTelemetrySubscriptionService(attrService, tsService, tbEntityViewService, apiUsageClient, apiUsageStateService, deviceStateManager);
ReflectionTestUtils.setField(telemetryService, "clusterService", clusterService);
ReflectionTestUtils.setField(telemetryService, "partitionService", partitionService);
ReflectionTestUtils.setField(telemetryService, "subscriptionManagerService", Optional.of(subscriptionManagerService));

9
common/proto/src/main/proto/queue.proto

@ -772,6 +772,14 @@ message DeviceInactivityProto {
int64 lastInactivityTime = 5;
}
message DeviceInactivityTimeoutUpdateProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 deviceIdMSB = 3;
int64 deviceIdLSB = 4;
int64 inactivityTimeout = 5;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
message SubscriptionInfoProto {
int64 lastActivityTime = 1;
@ -1515,6 +1523,7 @@ message ToCoreMsg {
DeviceConnectProto deviceConnectMsg = 50;
DeviceDisconnectProto deviceDisconnectMsg = 51;
DeviceInactivityProto deviceInactivityMsg = 52;
DeviceInactivityTimeoutUpdateProto deviceInactivityTimeoutUpdateMsg = 53;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */

6
common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java

@ -36,6 +36,9 @@ public class DonAsynchron {
FutureCallback<T> callback = new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
if (onSuccess == null) {
return;
}
try {
onSuccess.accept(result);
} catch (Throwable th) {
@ -45,6 +48,9 @@ public class DonAsynchron {
@Override
public void onFailure(Throwable t) {
if (onFailure == null) {
return;
}
onFailure.accept(t);
}
};

4
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineDeviceStateManager.java → rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/DeviceStateManager.java

@ -19,7 +19,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback;
public interface RuleEngineDeviceStateManager {
public interface DeviceStateManager {
void onDeviceConnect(TenantId tenantId, DeviceId deviceId, long connectTime, TbCallback callback);
@ -29,4 +29,6 @@ public interface RuleEngineDeviceStateManager {
void onDeviceInactivity(TenantId tenantId, DeviceId deviceId, long inactivityTime, TbCallback callback);
void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout, TbCallback callback);
}

2
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -279,7 +279,7 @@ public interface TbContext {
DeviceCredentialsService getDeviceCredentialsService();
RuleEngineDeviceStateManager getDeviceStateManager();
DeviceStateManager getDeviceStateManager();
String getDeviceStateNodeRateLimitConfig();

4
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbDeviceStateNode.java

@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.action;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -119,7 +119,7 @@ public class TbDeviceStateNode implements TbNode {
TenantId tenantId = ctx.getTenantId();
long eventTs = msg.getMetaDataTs();
RuleEngineDeviceStateManager deviceStateManager = ctx.getDeviceStateManager();
DeviceStateManager deviceStateManager = ctx.getDeviceStateManager();
TbCallback callback = getMsgEnqueuedCallback(ctx, msg);
switch (event) {

14
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbDeviceStateNodeTest.java

@ -29,7 +29,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleEngineDeviceStateManager;
import org.thingsboard.rule.engine.api.DeviceStateManager;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
@ -66,7 +66,7 @@ public class TbDeviceStateNodeTest {
@Mock
private TbContext ctxMock;
@Mock
private RuleEngineDeviceStateManager deviceStateManagerMock;
private DeviceStateManager deviceStateManagerMock;
@Captor
private ArgumentCaptor<TbCallback> callbackCaptor;
private TbDeviceStateNode node;
@ -263,7 +263,7 @@ public class TbDeviceStateNodeTest {
@ParameterizedTest
@MethodSource
public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>> actionVerification) {
public void givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback(TbMsgType supportedEventType, BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>> actionVerification) {
// GIVEN
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
given(ctxMock.getDeviceStateNodeRateLimitConfig()).willReturn("1:1");
@ -297,10 +297,10 @@ public class TbDeviceStateNodeTest {
private static Stream<Arguments> givenSupportedEventAndDeviceOriginator_whenOnMsg_thenCorrectEventIsSentWithCorrectCallback() {
return Stream.of(
Arguments.of(TbMsgType.CONNECT_EVENT, (BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.ACTIVITY_EVENT, (BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.DISCONNECT_EVENT, (BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.INACTIVITY_EVENT, (BiConsumer<RuleEngineDeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture()))
Arguments.of(TbMsgType.CONNECT_EVENT, (BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceConnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.ACTIVITY_EVENT, (BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceActivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.DISCONNECT_EVENT, (BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceDisconnect(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture())),
Arguments.of(TbMsgType.INACTIVITY_EVENT, (BiConsumer<DeviceStateManager, ArgumentCaptor<TbCallback>>) (deviceStateManagerMock, callbackCaptor) -> then(deviceStateManagerMock).should().onDeviceInactivity(eq(TENANT_ID), eq(DEVICE_ID), eq(METADATA_TS), callbackCaptor.capture()))
);
}

Loading…
Cancel
Save