diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index fafa9d60ea..cc6653f4c5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -76,6 +76,7 @@ import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.audit.AuditLogService; import org.thingsboard.server.dao.cassandra.CassandraCluster; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.dashboard.DashboardService; @@ -86,6 +87,7 @@ import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; +import org.thingsboard.server.dao.event.EventService; import org.thingsboard.server.dao.nosql.CassandraStatementTask; import org.thingsboard.server.dao.nosql.TbResultSetFuture; import org.thingsboard.server.dao.notification.NotificationRequestService; @@ -907,6 +909,16 @@ class DefaultTbContext implements TbContext { return mainCtx.getEntityService(); } + @Override + public EventService getEventService() { + return mainCtx.getEventService(); + } + + @Override + public AuditLogService getAuditLogService() { + return mainCtx.getAuditLogService(); + } + private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("ruleNodeId", ruleNodeId.toString()); diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 28e6ad190c..e1ccc7dbf5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -261,7 +261,7 @@ public class TenantActor extends RuleChainManagerActor { edgeRpcService.updateEdge(tenantId, edge); } } - if (msg.getEntityId().getEntityType() == EntityType.DEVICE && ComponentLifecycleEvent.DELETED == msg.getEvent()) { + if (msg.getEntityId().getEntityType() == EntityType.DEVICE && ComponentLifecycleEvent.DELETED == msg.getEvent() && isMyPartition(msg.getEntityId())) { DeviceId deviceId = (DeviceId) msg.getEntityId(); onToDeviceActorMsg(new DeviceDeleteMsg(tenantId, deviceId), true); deletedDevices.add(deviceId); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index b3aba37632..03fb982d6d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -216,6 +216,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { case ALARM: alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg); break; + case ALARM_COMMENT: + alarmProcessor.processAlarmCommentNotification(tenantId, edgeNotificationMsg); + break; case RELATION: relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg); break; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java index 351f42d1fc..ca7048c7e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeEventSourcingListener.java @@ -23,10 +23,12 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.event.TransactionalEventListener; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmApiCallResult; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.relation.EntityRelation; @@ -43,8 +45,6 @@ import org.thingsboard.server.dao.user.UserServiceImpl; import javax.annotation.PostConstruct; -import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType; - /** * This event listener does not support async event processing because relay on ThreadLocal * Another possible approach is to implement a special annotation and a bunch of classes similar to TransactionalApplicationListener @@ -79,9 +79,12 @@ public class EdgeEventSourcingListener { return; } log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event); - EdgeEventActionType action = Boolean.TRUE.equals(event.getAdded()) ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; + boolean isCreated = Boolean.TRUE.equals(event.getCreated()); + String body = getBodyMsgForEntityEvent(event.getEntity()); + EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity()); + EdgeEventActionType action = getActionForEntityEvent(event.getEntity(), isCreated); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - null, null, action, edgeSynchronizationManager.getEdgeId().get()); + body, type, action, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process SaveEntityEvent: {}", event.getTenantId(), event, e); } @@ -91,20 +94,29 @@ public class EdgeEventSourcingListener { public void handleEvent(DeleteEntityEvent event) { try { log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event); + EdgeEventType type = getEdgeEventTypeForEntityEvent(event.getEntity()); + EdgeEventActionType actionType = getEdgeEventActionTypeForEntityEvent(event.getEntity()); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, event.getEntityId(), - JacksonUtil.toString(event.getEntity()), null, EdgeEventActionType.DELETED, + JacksonUtil.toString(event.getEntity()), type, actionType, edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process DeleteEntityEvent: {}", event.getTenantId(), event, e); } } + private EdgeEventActionType getEdgeEventActionTypeForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return EdgeEventActionType.DELETED_COMMENT; + } + return EdgeEventActionType.DELETED; + } + @TransactionalEventListener(fallbackExecution = true) public void handleEvent(ActionEntityEvent event) { try { log.trace("[{}] ActionEntityEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), event.getEdgeId(), event.getEntityId(), - event.getBody(), null, edgeTypeByActionType(event.getActionType()), + event.getBody(), null, EdgeUtils.getEdgeEventActionTypeByActionType(event.getActionType()), edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process ActionEntityEvent: {}", event.getTenantId(), event, e); @@ -125,7 +137,7 @@ public class EdgeEventSourcingListener { } log.trace("[{}] RelationActionEvent called: {}", event.getTenantId(), event); tbClusterService.sendNotificationMsgToEdge(event.getTenantId(), null, null, - JacksonUtil.toString(relation), EdgeEventType.RELATION, edgeTypeByActionType(event.getActionType()), + JacksonUtil.toString(relation), EdgeEventType.RELATION, EdgeUtils.getEdgeEventActionTypeByActionType(event.getActionType()), edgeSynchronizationManager.getEdgeId().get()); } catch (Exception e) { log.error("[{}] failed to process RelationActionEvent: {}", event.getTenantId(), event, e); @@ -173,4 +185,25 @@ public class EdgeEventSourcingListener { } } } + + private EdgeEventType getEdgeEventTypeForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return EdgeEventType.ALARM_COMMENT; + } + return null; + } + + private String getBodyMsgForEntityEvent(Object entity) { + if (entity instanceof AlarmComment) { + return JacksonUtil.toString(entity); + } + return null; + } + + private EdgeEventActionType getActionForEntityEvent(Object entity, boolean isCreated) { + if (entity instanceof AlarmComment) { + return isCreated ? EdgeEventActionType.ADDED_COMMENT : EdgeEventActionType.UPDATED_COMMENT; + } + return isCreated ? EdgeEventActionType.ADDED : EdgeEventActionType.UPDATED; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index f1f473f976..5e6f7a263e 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -530,6 +531,9 @@ public final class EdgeGrpcSession implements Closeable { case RPC_CALL: case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: + case ADDED_COMMENT: + case UPDATED_COMMENT: + case DELETED_COMMENT: downlinkMsg = convertEntityEventToDownlink(edgeEvent); log.trace("[{}][{}] entity message processed [{}]", this.tenantId, this.sessionId, downlinkMsg); break; @@ -644,6 +648,8 @@ public final class EdgeGrpcSession implements Closeable { return ctx.getRuleChainProcessor().convertRuleChainMetadataEventToDownlink(edgeEvent, this.edgeVersion); case ALARM: return ctx.getAlarmProcessor().convertAlarmEventToDownlink(edgeEvent, this.edgeVersion); + case ALARM_COMMENT: + return ctx.getAlarmProcessor().convertAlarmCommentEventToDownlink(edgeEvent, this.edgeVersion); case USER: return ctx.getUserProcessor().convertUserEventToDownlink(edgeEvent, this.edgeVersion); case RELATION: @@ -714,6 +720,12 @@ public final class EdgeGrpcSession implements Closeable { .processAlarmMsgFromEdge(edge.getTenantId(), edge.getId(), alarmUpdateMsg)); } } + if (uplinkMsg.getAlarmCommentUpdateMsgCount() > 0) { + for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : uplinkMsg.getAlarmCommentUpdateMsgList()) { + result.add(((AlarmProcessor) ctx.getAlarmEdgeProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion)) + .processAlarmCommentMsgFromEdge(edge.getTenantId(), edge.getId(), alarmCommentUpdateMsg)); + } + } if (uplinkMsg.getEntityViewUpdateMsgCount() > 0) { for (EntityViewUpdateMsg entityViewUpdateMsg : uplinkMsg.getEntityViewUpdateMsgList()) { result.add(((EntityViewProcessor) ctx.getEntityViewProcessorFactory().getProcessorByEdgeVersion(this.edgeVersion)) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java index 81606434b5..b5a0f20d6b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructor.java @@ -16,6 +16,8 @@ package org.thingsboard.server.service.edge.rpc.constructor.alarm; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor; @@ -23,4 +25,6 @@ import org.thingsboard.server.service.edge.rpc.constructor.MsgConstructor; public interface AlarmMsgConstructor extends MsgConstructor { AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName); + + AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java index 5abba029de..81a489671f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV1.java @@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @Component @TbCoreComponent -public class AlarmMsgConstructorV1 implements AlarmMsgConstructor { +public class AlarmMsgConstructorV1 extends BaseAlarmMsgConstructor { @Override public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java index d9768b8f9e..143a41b73c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/AlarmMsgConstructorV2.java @@ -24,7 +24,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; @Component @TbCoreComponent -public class AlarmMsgConstructorV2 implements AlarmMsgConstructor { +public class AlarmMsgConstructorV2 extends BaseAlarmMsgConstructor { @Override public AlarmUpdateMsg constructAlarmUpdatedMsg(UpdateMsgType msgType, Alarm alarm, String entityName) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java new file mode 100644 index 0000000000..c9af86e8cc --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/alarm/BaseAlarmMsgConstructor.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.constructor.alarm; + +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; + +public abstract class BaseAlarmMsgConstructor implements AlarmMsgConstructor { + + @Override + public AlarmCommentUpdateMsg constructAlarmCommentUpdatedMsg(UpdateMsgType msgType, AlarmComment alarmComment) { + return AlarmCommentUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(alarmComment)).build(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java index 6f7474b15d..256f191215 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java @@ -51,6 +51,7 @@ import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; @@ -59,6 +60,7 @@ import org.thingsboard.server.common.data.rule.RuleChainConnectionInfo; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; @@ -145,6 +147,9 @@ public abstract class BaseEdgeProcessor { @Autowired protected AlarmService alarmService; + @Autowired + protected AlarmCommentService alarmCommentService; + @Autowired protected DeviceService deviceService; @@ -350,10 +355,13 @@ public abstract class BaseEdgeProcessor { case ALARM_ASSIGNED: case ALARM_UNASSIGNED: case CREDENTIALS_REQUEST: + case ADDED_COMMENT: + case UPDATED_COMMENT: return true; } switch (type) { case ALARM: + case ALARM_COMMENT: case RULE_CHAIN: case RULE_CHAIN_METADATA: case USER: @@ -441,14 +449,17 @@ public abstract class BaseEdgeProcessor { case CREDENTIALS_UPDATED: case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: + case UPDATED_COMMENT: return UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE; case ADDED: case ASSIGNED_TO_EDGE: case RELATION_ADD_OR_UPDATE: + case ADDED_COMMENT: return UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE; case DELETED: case UNASSIGNED_FROM_EDGE: case RELATION_DELETED: + case DELETED_COMMENT: return UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE; case ALARM_ACK: return UpdateMsgType.ALARM_ACK_RPC_MESSAGE; @@ -517,22 +528,14 @@ public abstract class BaseEdgeProcessor { private ListenableFuture processNotificationToRelatedEdges(TenantId tenantId, EntityId entityId, EdgeEventType type, EdgeEventActionType actionType, EdgeId sourceEdgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, type, actionType, entityId, null)); } - } while (pageData != null && pageData.hasNext()); + } return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); } @@ -715,19 +718,13 @@ public abstract class BaseEdgeProcessor { } private boolean isEntityNotAssignedToEdge(TenantId tenantId, EntityId entityId, EdgeId edgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - if (pageData.getData().contains(edgeId)) { - return false; - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, entityId, DEFAULT_PAGE_SIZE); + for (EdgeId edgeId1 : edgeIds) { + if (edgeId1.equals(edgeId)) { + return false; } - } while (pageData != null && pageData.hasNext()); + } return true; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java index ea48032b69..f83cbbba9c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmEdgeProcessor.java @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -29,12 +30,14 @@ import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructor; import java.util.ArrayList; import java.util.List; @@ -67,58 +70,90 @@ public abstract class AlarmEdgeProcessor extends BaseAlarmProcessor implements A return null; } + @Override + public ListenableFuture processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) { + log.trace("[{}] processAlarmCommentMsgFromEdge [{}]", tenantId, alarmCommentUpdateMsg); + try { + edgeSynchronizationManager.getEdgeId().set(edgeId); + return processAlarmCommentMsg(tenantId, alarmCommentUpdateMsg); + } finally { + edgeSynchronizationManager.getEdgeId().remove(); + } + } + + @Override + public DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) { + UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); + switch (edgeEvent.getAction()) { + case ADDED_COMMENT: + case UPDATED_COMMENT: + case DELETED_COMMENT: + AlarmComment alarmComment = JacksonUtil.convertValue(edgeEvent.getBody(), AlarmComment.class); + if (alarmComment != null) { + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addAlarmCommentUpdateMsg(((AlarmMsgConstructor) alarmMsgConstructorFactory + .getMsgConstructorByEdgeVersion(edgeVersion)).constructAlarmCommentUpdatedMsg(msgType, alarmComment)) + .build(); + } + default: + return null; + } + } + public ListenableFuture processAlarmNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); - switch (actionType) { - case DELETED: - Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); - if (deletedAlarm == null) { - return Futures.immediateFuture(null); - } - List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), - alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId); - return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); - default: - ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); - return Futures.transformAsync(alarmFuture, alarm -> { - if (alarm == null) { - return Futures.immediateFuture(null); - } - EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); - if (type == null) { - return Futures.immediateFuture(null); - } - List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), - alarmId, actionType, null, originatorEdgeId); - return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); - }, dbCallbackExecutorService); + if (EdgeEventActionType.DELETED.equals(actionType)) { + Alarm deletedAlarm = JacksonUtil.fromString(edgeNotificationMsg.getBody(), Alarm.class); + if (deletedAlarm == null) { + return Futures.immediateFuture(null); + } + List> delFutures = pushEventToAllRelatedEdges(tenantId, deletedAlarm.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(deletedAlarm), originatorEdgeId, EdgeEventType.ALARM); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); + } + ListenableFuture alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); + return Futures.transformAsync(alarmFuture, alarm -> { + if (alarm == null) { + return Futures.immediateFuture(null); + } + EdgeEventType type = EdgeUtils.getEdgeEventTypeByEntityType(alarm.getOriginator().getEntityType()); + if (type == null) { + return Futures.immediateFuture(null); + } + List> futures = pushEventToAllRelatedEdges(tenantId, alarm.getOriginator(), + alarmId, actionType, null, originatorEdgeId, EdgeEventType.ALARM); + return Futures.transform(Futures.allAsList(futures), voids -> null, dbCallbackExecutorService); + }, dbCallbackExecutorService); + } + + public ListenableFuture processAlarmCommentNotification(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { + EdgeEventActionType actionType = EdgeEventActionType.valueOf(edgeNotificationMsg.getAction()); + AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); + EdgeId originatorEdgeId = safeGetEdgeId(edgeNotificationMsg.getOriginatorEdgeIdMSB(), edgeNotificationMsg.getOriginatorEdgeIdLSB()); + AlarmComment alarmComment = JacksonUtil.fromString(edgeNotificationMsg.getBody(), AlarmComment.class); + if (alarmComment == null) { + return Futures.immediateFuture(null); } + Alarm alarmById = alarmService.findAlarmById(tenantId, new AlarmId(alarmComment.getAlarmId().getId())); + List> delFutures = pushEventToAllRelatedEdges(tenantId, alarmById.getOriginator(), + alarmId, actionType, JacksonUtil.valueToTree(alarmComment), originatorEdgeId, EdgeEventType.ALARM_COMMENT); + return Futures.transform(Futures.allAsList(delFutures), voids -> null, dbCallbackExecutorService); } - private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId) { - PageLink pageLink = new PageLink(DEFAULT_PAGE_SIZE); - PageData pageData; + private List> pushEventToAllRelatedEdges(TenantId tenantId, EntityId originatorId, AlarmId alarmId, + EdgeEventActionType actionType, JsonNode body, EdgeId sourceEdgeId, + EdgeEventType edgeEventType) { List> futures = new ArrayList<>(); - do { - pageData = edgeService.findRelatedEdgeIdsByEntityId(tenantId, originatorId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId relatedEdgeId : pageData.getData()) { - if (!relatedEdgeId.equals(sourceEdgeId)) { - futures.add(saveEdgeEvent(tenantId, - relatedEdgeId, - EdgeEventType.ALARM, - actionType, - alarmId, - body)); - } - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } + PageDataIterableByTenantIdEntityId edgeIds = + new PageDataIterableByTenantIdEntityId<>(edgeService::findRelatedEdgeIdsByEntityId, tenantId, originatorId, DEFAULT_PAGE_SIZE); + for (EdgeId relatedEdgeId : edgeIds) { + if (!relatedEdgeId.equals(sourceEdgeId)) { + futures.add(saveEdgeEvent(tenantId, relatedEdgeId, edgeEventType, actionType, alarmId, body)); } - } while (pageData != null && pageData.hasNext()); + } return futures; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java index 68a72bf01e..938269462d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/AlarmProcessor.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; @@ -29,4 +30,8 @@ public interface AlarmProcessor extends EdgeProcessor { ListenableFuture processAlarmMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmUpdateMsg alarmUpdateMsg); DownlinkMsg convertAlarmEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion); + + ListenableFuture processAlarmCommentMsgFromEdge(TenantId tenantId, EdgeId edgeId, AlarmCommentUpdateMsg alarmCommentUpdateMsg); + + DownlinkMsg convertAlarmCommentEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index 546d6e63db..11cd2ab56c 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -19,10 +19,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.alarm.AlarmCreateOrUpdateActiveRequest; import org.thingsboard.server.common.data.alarm.AlarmUpdateRequest; import org.thingsboard.server.common.data.asset.Asset; @@ -33,6 +35,8 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.alarm.AlarmCommentDao; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; @@ -44,6 +48,9 @@ import java.util.UUID; @Slf4j public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { + @Autowired + protected AlarmCommentDao alarmCommentDao; + public ListenableFuture processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg); AlarmId alarmId = new AlarmId(new UUID(alarmUpdateMsg.getIdMSB(), alarmUpdateMsg.getIdLSB())); @@ -93,6 +100,42 @@ public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { return Futures.immediateFuture(null); } + public ListenableFuture processAlarmCommentMsg(TenantId tenantId, AlarmCommentUpdateMsg alarmCommentUpdateMsg) { + log.trace("[{}] processAlarmCommentMsg [{}]", tenantId, alarmCommentUpdateMsg); + AlarmComment alarmComment = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + if (alarmComment == null) { + throw new RuntimeException("[{" + tenantId + "}] alarmCommentUpdateMsg {" + alarmCommentUpdateMsg + "} cannot be converted to alarm comment"); + } + try { + Alarm alarm = alarmService.findAlarmById(tenantId, new AlarmId(alarmComment.getAlarmId().getId())); + if (alarm == null) { + return Futures.immediateFuture(null); + } + switch (alarmCommentUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + alarmCommentDao.createAlarmComment(tenantId, alarmComment); + break; + case ENTITY_UPDATED_RPC_MESSAGE: + alarmCommentService.createOrUpdateAlarmComment(tenantId, alarmComment); + break; + case ENTITY_DELETED_RPC_MESSAGE: + AlarmComment alarmCommentToDelete = alarmCommentService.findAlarmCommentById(tenantId, alarmComment.getId()); + if (alarmCommentToDelete != null) { + alarmCommentService.saveAlarmComment(tenantId, alarmCommentToDelete); + } + break; + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(alarmCommentUpdateMsg.getMsgType()); + } + } catch (Exception e) { + log.error("[{}] Failed to process alarm comment update msg [{}]", tenantId, alarmCommentUpdateMsg, e); + return Futures.immediateFailedFuture(e); + } + return Futures.immediateFuture(null); + } + + protected abstract EntityId getAlarmOriginatorFromMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg); protected abstract Alarm constructAlarmFromUpdateMsg(TenantId tenantId, AlarmId alarmId, EntityId originatorId, AlarmUpdateMsg alarmUpdateMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java index 321cc69604..3a87fff506 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java @@ -19,7 +19,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.HasName; @@ -27,7 +26,6 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; @@ -40,6 +38,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.service.action.EntityActionService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -174,39 +173,4 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS metaData.putValue("assignedFromTenantName", tenant.getName()); return metaData; } - - public static EdgeEventActionType edgeTypeByActionType(ActionType actionType) { - switch (actionType) { - case ADDED: - return EdgeEventActionType.ADDED; - case UPDATED: - return EdgeEventActionType.UPDATED; - case ALARM_ACK: - return EdgeEventActionType.ALARM_ACK; - case ALARM_CLEAR: - return EdgeEventActionType.ALARM_CLEAR; - case ALARM_ASSIGNED: - return EdgeEventActionType.ALARM_ASSIGNED; - case ALARM_UNASSIGNED: - return EdgeEventActionType.ALARM_UNASSIGNED; - case DELETED: - return EdgeEventActionType.DELETED; - case RELATION_ADD_OR_UPDATE: - return EdgeEventActionType.RELATION_ADD_OR_UPDATE; - case RELATION_DELETED: - return EdgeEventActionType.RELATION_DELETED; - case ASSIGNED_TO_CUSTOMER: - return EdgeEventActionType.ASSIGNED_TO_CUSTOMER; - case UNASSIGNED_FROM_CUSTOMER: - return EdgeEventActionType.UNASSIGNED_FROM_CUSTOMER; - case ASSIGNED_TO_EDGE: - return EdgeEventActionType.ASSIGNED_TO_EDGE; - case UNASSIGNED_FROM_EDGE: - return EdgeEventActionType.UNASSIGNED_FROM_EDGE; - case CREDENTIALS_UPDATED: - return EdgeEventActionType.CREDENTIALS_UPDATED; - default: - return null; - } - } } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java index c0274792cb..da7c69050e 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/alarm/TbAlarmCommentService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.exception.ThingsboardException; public interface TbAlarmCommentService { + AlarmComment saveAlarmComment(Alarm alarm, AlarmComment alarmComment, User user) throws ThingsboardException; void deleteAlarmComment(Alarm alarm, AlarmComment alarmComment, User user) throws ThingsboardException; diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/EntitiesLimitTriggerProcessor.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/EntitiesLimitTriggerProcessor.java index b7847b9857..e1db043188 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/EntitiesLimitTriggerProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/EntitiesLimitTriggerProcessor.java @@ -19,10 +19,10 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.notification.info.EntitiesLimitNotificationInfo; import org.thingsboard.server.common.data.notification.info.RuleOriginatedNotificationInfo; +import org.thingsboard.server.common.data.notification.rule.trigger.EntitiesLimitTrigger; import org.thingsboard.server.common.data.notification.rule.trigger.config.EntitiesLimitNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.data.notification.rule.trigger.EntitiesLimitTrigger; import org.thingsboard.server.dao.entity.EntityCountService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; @@ -48,6 +48,9 @@ public class EntitiesLimitTriggerProcessor implements NotificationRuleTriggerPro return false; } long currentCount = entityCountService.countByTenantIdAndEntityType(trigger.getTenantId(), trigger.getEntityType()); + if (currentCount == 0) { + return false; + } trigger.setLimit(limit); trigger.setCurrentCount(currentCount); return (int) (limit * triggerConfig.getThreshold()) == currentCount; // strict comparing not to send notification on each new entity @@ -59,7 +62,7 @@ public class EntitiesLimitTriggerProcessor implements NotificationRuleTriggerPro .entityType(trigger.getEntityType()) .currentCount(trigger.getCurrentCount()) .limit(trigger.getLimit()) - .percents((int) (((float)trigger.getCurrentCount() / trigger.getLimit()) * 100)) + .percents((int) (((float) trigger.getCurrentCount() / trigger.getLimit()) * 100)) .tenantId(trigger.getTenantId()) .tenantName(tenantService.findTenantById(trigger.getTenantId()).getName()) .build(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index e888dcd526..2d5e6963eb 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -373,10 +373,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService 0 ? RpcError.values()[proto.getError()] : null; @@ -164,10 +165,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< tbDeviceRpcService.processRpcResponseFromDevice(response); callback.onSuccess(); } else if (nfMsg.hasQueueUpdateMsg()) { - ctx.getScheduler().execute(() -> updateQueue(nfMsg.getQueueUpdateMsg())); + updateQueue(nfMsg.getQueueUpdateMsg()); callback.onSuccess(); } else if (nfMsg.hasQueueDeleteMsg()) { - ctx.getScheduler().execute(() -> deleteQueue(nfMsg.getQueueDeleteMsg())); + deleteQueue(nfMsg.getQueueDeleteMsg()); callback.onSuccess(); } else { log.trace("Received notification with missing handler"); @@ -204,13 +205,30 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); var consumerManager = consumers.remove(queueKey); if (consumerManager != null) { - consumerManager.delete(); + consumerManager.delete(true); } partitionService.removeQueue(queueDeleteMsg); partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); } + @EventListener + public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) { + if (event.getEntityId().getEntityType() == EntityType.TENANT) { + if (event.getEvent() == ComponentLifecycleEvent.DELETED) { + List toRemove = consumers.keySet().stream() + .filter(queueKey -> queueKey.getTenantId().equals(event.getTenantId())) + .collect(Collectors.toList()); + toRemove.forEach(queueKey -> { + var consumerManager = consumers.remove(queueKey); + if (consumerManager != null) { + consumerManager.delete(false); + } + }); + } + } + } + private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) { return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 394f41883d..3065652d0f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.queue.processing; -import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationEventPublisher; @@ -30,7 +29,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -166,57 +164,51 @@ public abstract class AbstractConsumerService actorMsgOpt = encodingService.decode(nfMsg.toByteArray()); - actorMsgOpt.ifPresent(tbActorMsg -> handleComponentLifecycleMsg(id, tbActorMsg)); - } - - protected void handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg) { - if (actorMsg instanceof ComponentLifecycleMsg) { - ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg; - log.debug("[{}][{}][{}] Received Lifecycle event: {}", componentLifecycleMsg.getTenantId(), componentLifecycleMsg.getEntityId().getEntityType(), - componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent()); - if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId()); - tenantProfileCache.evict(tenantProfileId); + protected final void handleComponentLifecycleMsg(UUID id, ComponentLifecycleMsg componentLifecycleMsg) { + TenantId tenantId = componentLifecycleMsg.getTenantId(); + log.debug("[{}][{}][{}] Received Lifecycle event: {}", tenantId, componentLifecycleMsg.getEntityId().getEntityType(), + componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent()); + if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId()); + tenantProfileCache.evict(tenantProfileId); + if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { + apiUsageStateService.onTenantProfileUpdate(tenantProfileId); + } + } else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + if (TenantId.SYS_TENANT_ID.equals(tenantId)) { + jwtSettingsService.ifPresent(JwtSettingsService::reloadJwtSettings); + return; + } else { + tenantProfileCache.evict(tenantId); + partitionService.evictTenantInfo(tenantId); if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { - apiUsageStateService.onTenantProfileUpdate(tenantProfileId); - } - } else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - if (TenantId.SYS_TENANT_ID.equals(componentLifecycleMsg.getTenantId())) { - jwtSettingsService.ifPresent(JwtSettingsService::reloadJwtSettings); - return; - } else { - tenantProfileCache.evict(componentLifecycleMsg.getTenantId()); - partitionService.removeTenant(componentLifecycleMsg.getTenantId()); - if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { - apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId()); - } else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) { - apiUsageStateService.onTenantDelete((TenantId) componentLifecycleMsg.getEntityId()); - } - } - } else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - assetProfileCache.evict(componentLifecycleMsg.getTenantId(), new AssetProfileId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - assetProfileCache.evict(componentLifecycleMsg.getTenantId(), new AssetId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg); - } else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - apiUsageStateService.onApiUsageStateUpdate(componentLifecycleMsg.getTenantId()); - } else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) { - apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId()); + apiUsageStateService.onTenantUpdate(tenantId); + } else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) { + apiUsageStateService.onTenantDelete(tenantId); + partitionService.removeTenant(tenantId); } } - eventPublisher.publishEvent(componentLifecycleMsg); + } else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + deviceProfileCache.evict(tenantId, new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + deviceProfileCache.evict(tenantId, new DeviceId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + assetProfileCache.evict(tenantId, new AssetProfileId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + assetProfileCache.evict(tenantId, new AssetId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg); + } else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + apiUsageStateService.onApiUsageStateUpdate(tenantId); + } else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) { + apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId()); + } } - log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, actorMsg); - actorContext.tellWithHighPriority(actorMsg); + + eventPublisher.publishEvent(componentLifecycleMsg); + log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, componentLifecycleMsg); + actorContext.tellWithHighPriority(componentLifecycleMsg); } protected abstract void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) throws Exception; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java index 20eea32a6a..3c52e31afe 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.queue.ruleengine; +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; import org.thingsboard.server.common.data.queue.Queue; @@ -24,24 +25,24 @@ import java.util.Set; @Getter @ToString +@AllArgsConstructor public class TbQueueConsumerManagerTask { private final QueueEvent event; private Queue queue; private Set partitions; + private boolean drainQueue; - public TbQueueConsumerManagerTask(QueueEvent event) { - this.event = event; + public static TbQueueConsumerManagerTask delete(boolean drainQueue) { + return new TbQueueConsumerManagerTask(QueueEvent.DELETE, null, null, drainQueue); } - public TbQueueConsumerManagerTask(QueueEvent event, Queue queue) { - this.event = event; - this.queue = queue; + public static TbQueueConsumerManagerTask configUpdate(Queue queue) { + return new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue, null, false); } - public TbQueueConsumerManagerTask(QueueEvent event, Set partitions) { - this.event = event; - this.partitions = partitions; + public static TbQueueConsumerManagerTask partitionChange(Set partitions) { + return new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, null, partitions, false); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 22db5b7b38..2da3dbc6dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -95,15 +95,15 @@ public class TbRuleEngineQueueConsumerManager { } public void update(Queue queue) { - addTask(new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue)); + addTask(TbQueueConsumerManagerTask.configUpdate(queue)); } public void update(Set partitions) { - addTask(new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, partitions)); + addTask(TbQueueConsumerManagerTask.partitionChange(partitions)); } - public void delete() { - addTask(new TbQueueConsumerManagerTask(QueueEvent.DELETE)); + public void delete(boolean drainQueue) { + addTask(TbQueueConsumerManagerTask.delete(drainQueue)); } private void addTask(TbQueueConsumerManagerTask todo) { @@ -138,7 +138,7 @@ public class TbRuleEngineQueueConsumerManager { } else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) { newConfiguration = task.getQueue(); } else if (task.getEvent() == QueueEvent.DELETE) { - doDelete(); + doDelete(task.isDrainQueue()); return; } } @@ -205,7 +205,7 @@ public class TbRuleEngineQueueConsumerManager { log.debug("[{}] Unsubscribed and stopped consumers", queueKey); } - private void doDelete() { + private void doDelete(boolean drainQueue) { stopped = true; log.info("[{}] Handling queue deletion", queueKey); consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion); @@ -213,7 +213,9 @@ public class TbRuleEngineQueueConsumerManager { List>> queueConsumers = consumerWrapper.getConsumers().stream() .map(TbQueueConsumerTask::getConsumer).collect(Collectors.toList()); ctx.getConsumersExecutor().submit(() -> { - drainQueue(queueConsumers); + if (drainQueue) { + drainQueue(queueConsumers); + } queueConsumers.forEach(consumer -> { for (String topic : consumer.getFullTopicNames()) { diff --git a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java index f52fc32f4f..d0e3444d0f 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java +++ b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java @@ -38,7 +38,6 @@ import org.thingsboard.server.common.data.oauth2.OAuth2MapperConfig; import org.thingsboard.server.common.data.oauth2.OAuth2Registration; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.UserCredentials; import org.thingsboard.server.dao.customer.CustomerService; @@ -47,12 +46,12 @@ import org.thingsboard.server.dao.oauth2.OAuth2User; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.service.entitiy.tenant.TbTenantService; import org.thingsboard.server.service.entitiy.user.TbUserService; import org.thingsboard.server.service.install.InstallScripts; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.model.UserPrincipal; -import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.concurrent.locks.Lock; @@ -71,6 +70,9 @@ public abstract class AbstractOAuth2ClientMapper { @Autowired private TenantService tenantService; + @Autowired + private TbTenantService tbTenantService; + @Autowired private CustomerService customerService; @@ -92,7 +94,7 @@ public abstract class AbstractOAuth2ClientMapper { @Value("${edges.enabled}") @Getter private boolean edgesEnabled; - + private final Lock userCreationLock = new ReentrantLock(); protected SecurityUser getOrCreateSecurityUserFromOAuth2User(OAuth2User oauth2User, OAuth2Registration registration) { @@ -171,19 +173,13 @@ public abstract class AbstractOAuth2ClientMapper { } } - private TenantId getTenantId(String tenantName) throws IOException { + private TenantId getTenantId(String tenantName) throws Exception { List tenants = tenantService.findTenants(new PageLink(1, 0, tenantName)).getData(); Tenant tenant; if (tenants == null || tenants.isEmpty()) { tenant = new Tenant(); tenant.setTitle(tenantName); - tenant = tenantService.saveTenant(tenant); - installScripts.createDefaultRuleChains(tenant.getId()); - installScripts.createDefaultEdgeRuleChains(tenant.getId()); - tenantProfileCache.evict(tenant.getId()); - tbClusterService.onTenantChange(tenant, null); - tbClusterService.broadcastEntityStateChangeEvent(tenant.getId(), tenant.getId(), - ComponentLifecycleEvent.CREATED); + tenant = tbTenantService.save(tenant); } else { tenant = tenants.get(0); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 2f710f3e92..936f30af60 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -777,7 +777,12 @@ audit-log: # Device state parameters state: - # Should be greater than transport.sessions.report_timeout + # Device inactivity timeout is a global configuration parameter that defines when the device will be marked as "inactive" by the server. + # The parameter value is in seconds. A user can overwrite this parameter for an individual device by setting the “inactivityTimeout” server-side attribute (NOTE: expects value in milliseconds). + # We recommend this parameter to be in sync with session inactivity timeout ("transport.sessions.inactivity_timeout" or TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT) parameter + # which is responsible for detection of the stale device connection sessions. + # The value of the session inactivity timeout parameter should be greater or equal to the device inactivity timeout. + # Note that the session inactivity timeout is set in milliseconds while device inactivity timeout is in seconds. defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:600}" defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:60}" # Interval for checking the device state after a specified period. Time in seconds # Controls whether we store the device 'active' flag in attributes (default) or telemetry. @@ -866,8 +871,15 @@ js: # Transport configuration parameters transport: sessions: - # Inactivity timeout for device session in transport service. The last activity time of the device session is updated if the device sends any message, including keepalive messages - inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" + # Session inactivity timeout is a global configuration parameter that defines how long the device transport session will be opened after the last message arrives from the device. + # The parameter value is in milliseconds. + # The last activity time of the device session is updated if the device sends any message, including keepalive messages + # If there is no activity, the session will be closed, and all subscriptions will be deleted. + # We recommend this parameter to be in sync with device inactivity timeout ("state.defaultInactivityTimeoutInSec" or DEFAULT_INACTIVITY_TIMEOUT) parameter + # which is responsible for detection of the device connectivity status in the core service of the platform. + # The value of the session inactivity timeout parameter should be greater or equal to the device inactivity timeout. + # Note that the session inactivity timeout is set in milliseconds while device inactivity timeout is in seconds. + inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:600000}" # Interval of periodic check for expired sessions and report of the changes to session last activity time report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:3000}" activity: diff --git a/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java b/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java new file mode 100644 index 0000000000..a282452f4e --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java @@ -0,0 +1,74 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.tenant; + +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg; +import org.thingsboard.server.dao.tenant.TenantService; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TenantActorTest { + + TenantActor tenantActor; + TbActorCtx ctx; + ActorSystemContext systemContext; + TenantId tenantId = TenantId.SYS_TENANT_ID; + DeviceId deviceId = DeviceId.fromString("78bf9b26-74ef-4af2-9cfb-ad6cf24ad2ec"); + + @Before + public void setUp() throws Exception { + systemContext = mock(ActorSystemContext.class); + ctx = mock(TbActorCtx.class); + tenantActor = (TenantActor) new TenantActor.ActorCreator(systemContext, tenantId).createActor(); + when(systemContext.getTenantService()).thenReturn(mock(TenantService.class)); + tenantActor.init(ctx); + tenantActor.cantFindTenant = false; + } + + @Test + public void deleteDeviceTest() { + TbActorRef deviceActorRef = mock(TbActorRef.class); + when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 0,true)); + when(ctx.getOrCreateChildActor(any(), any(), any(), any())).thenReturn(deviceActorRef); + ComponentLifecycleMsg componentLifecycleMsg = new ComponentLifecycleMsg(tenantId, deviceId, ComponentLifecycleEvent.DELETED); + tenantActor.doProcess(componentLifecycleMsg); + verify(deviceActorRef).tellWithHighPriority(eq(new DeviceDeleteMsg(tenantId, deviceId))); + + reset(ctx, deviceActorRef); + when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 1,false)); + tenantActor.doProcess(componentLifecycleMsg); + verify(ctx, never()).getOrCreateChildActor(any(), any(), any(), any()); + verify(deviceActorRef, never()).tellWithHighPriority(any()); + } + +} \ No newline at end of file diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java index 429d0b765f..44302f08ba 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractNotifyEntityTest.java @@ -20,6 +20,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.Mockito; import org.springframework.boot.test.mock.mockito.SpyBean; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasTenantId; @@ -47,7 +48,6 @@ import java.util.stream.Collectors; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; -import static org.thingsboard.server.service.entitiy.DefaultTbNotificationEntityService.edgeTypeByActionType; @Slf4j public abstract class AbstractNotifyEntityTest extends AbstractWebTest { @@ -91,7 +91,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { int cntTime = 1; Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); ArgumentMatcher matcherOriginatorId = argument -> argument.equals(relation.getTo()); ArgumentMatcher matcherEntityClassEquals = Objects::isNull; ArgumentMatcher matcherCustomerId = customerId == null ? @@ -111,7 +111,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { ActionType actionType, int cntTime) { Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.isNull(), Mockito.isNull(), Mockito.any(), Mockito.eq(EdgeEventType.RELATION), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); ArgumentMatcher matcherOriginatorId = argument -> argument.getClass().equals(relation.getFrom().getClass()); ArgumentMatcher matcherEntityClassEquals = Objects::isNull; ArgumentMatcher matcherCustomerId = customerId == null ? @@ -317,7 +317,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testNotificationMsgToEdgeServiceNeverWithActionType(EntityId entityId, ActionType actionType) { EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? - EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType); + EdgeEventActionType.CREDENTIALS_UPDATED : EdgeUtils.getEdgeEventActionTypeByActionType(actionType); Mockito.verify(tbClusterService, never()).sendNotificationMsgToEdge(Mockito.any(), Mockito.any(), Mockito.any(entityId.getClass()), Mockito.any(), Mockito.any(), Mockito.eq(edgeEventActionType), Mockito.any()); } @@ -353,7 +353,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testNotificationMsgToEdgeServiceTime(EntityId entityId, TenantId tenantId, ActionType actionType, int cntTime) { EdgeEventActionType edgeEventActionType = ActionType.CREDENTIALS_UPDATED.equals(actionType) ? - EdgeEventActionType.CREDENTIALS_UPDATED : edgeTypeByActionType(actionType); + EdgeEventActionType.CREDENTIALS_UPDATED : EdgeUtils.getEdgeEventActionTypeByActionType(actionType); ArgumentMatcher matcherEntityId = cntTime == 1 ? argument -> argument.equals(entityId) : argument -> argument.getClass().equals(entityId.getClass()); Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), @@ -364,7 +364,7 @@ public abstract class AbstractNotifyEntityTest extends AbstractWebTest { private void testSendNotificationMsgToEdgeServiceTimeEntityEqAny(TenantId tenantId, ActionType actionType, int cntTime) { Mockito.verify(tbClusterService, times(cntTime)).sendNotificationMsgToEdge(Mockito.eq(tenantId), Mockito.any(), Mockito.any(EntityId.class), Mockito.any(), Mockito.isNull(), - Mockito.eq(edgeTypeByActionType(actionType)), Mockito.any()); + Mockito.eq(EdgeUtils.getEdgeEventActionTypeByActionType(actionType)), Mockito.any()); } protected void testBroadcastEntityStateChangeEventTime(EntityId entityId, TenantId tenantId, int cntTime) { diff --git a/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java index f5c1fbdae9..1415e38d7e 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AlarmControllerTest.java @@ -110,7 +110,7 @@ public class AlarmControllerTest extends AbstractControllerTest { Alarm alarm = createAlarm(TEST_ALARM_TYPE); - testNotifyEntityAllOneTime(alarm, alarm.getId(), alarm.getOriginator(), + testNotifyEntityOneTimeMsgToEdgeServiceNever(alarm, alarm.getId(), alarm.getOriginator(), tenantId, customerId, customerUserId, CUSTOMER_USER_EMAIL, ActionType.ADDED); } @@ -122,7 +122,7 @@ public class AlarmControllerTest extends AbstractControllerTest { Alarm alarm = createAlarm(TEST_ALARM_TYPE); - testNotifyEntityAllOneTime(alarm, alarm.getId(), alarm.getOriginator(), + testNotifyEntityOneTimeMsgToEdgeServiceNever(alarm, alarm.getId(), alarm.getOriginator(), tenantId, customerId, tenantAdminUserId, TENANT_ADMIN_EMAIL, ActionType.ADDED); } @@ -772,7 +772,7 @@ public class AlarmControllerTest extends AbstractControllerTest { alarm = doPost("/api/alarm", alarm, Alarm.class); Assert.assertNotNull("Saved alarm is null!", alarm); - testNotifyEntityNeverMsgToEdgeServiceOneTime(alarm, alarm.getId(), tenantId, ActionType.ADDED); + testNotifyEntityNever(alarm.getId(), alarm); resetTokens(); diff --git a/application/src/test/java/org/thingsboard/server/controller/ImageControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/ImageControllerTest.java index 3761e0c9c1..e272a40857 100644 --- a/application/src/test/java/org/thingsboard/server/controller/ImageControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/ImageControllerTest.java @@ -130,13 +130,14 @@ public class ImageControllerTest extends AbstractControllerTest { checkPngImageDescriptor(imageInfo.getDescriptor(ImageDescriptor.class)); String newFilename = "my_jpeg_image.png"; - imageInfo = uploadImage(HttpMethod.PUT, "/api/images/tenant/" + filename, newFilename, "image/jpeg", JPEG_IMAGE); + TbResourceInfo newImageInfo = uploadImage(HttpMethod.PUT, "/api/images/tenant/" + filename, newFilename, "image/jpeg", JPEG_IMAGE); - assertThat(imageInfo.getTitle()).isEqualTo(filename); - assertThat(imageInfo.getResourceKey()).isEqualTo(filename); - assertThat(imageInfo.getFileName()).isEqualTo(newFilename); + assertThat(newImageInfo.getTitle()).isEqualTo(filename); + assertThat(newImageInfo.getResourceKey()).isEqualTo(filename); + assertThat(newImageInfo.getFileName()).isEqualTo(newFilename); + assertThat(newImageInfo.getPublicResourceKey()).isEqualTo(imageInfo.getPublicResourceKey()); - ImageDescriptor imageDescriptor = imageInfo.getDescriptor(ImageDescriptor.class); + ImageDescriptor imageDescriptor = newImageInfo.getDescriptor(ImageDescriptor.class); checkJpegImageDescriptor(imageDescriptor); assertThat(downloadImage("tenant", filename)).containsExactly(JPEG_IMAGE); @@ -154,12 +155,15 @@ public class ImageControllerTest extends AbstractControllerTest { assertThat(imageInfo.getFileName()).isEqualTo(filename); String newTitle = "My PNG image"; - imageInfo.setTitle(newTitle); - imageInfo.setDescriptor(JacksonUtil.newObjectNode()); - imageInfo = doPut("/api/images/tenant/" + filename + "/info", imageInfo, TbResourceInfo.class); - - assertThat(imageInfo.getTitle()).isEqualTo(newTitle); - assertThat(imageInfo.getDescriptor(ImageDescriptor.class)).isEqualTo(imageDescriptor); + TbResourceInfo newImageInfo = new TbResourceInfo(imageInfo); + newImageInfo.setTitle(newTitle); + newImageInfo.setDescriptor(JacksonUtil.newObjectNode()); + newImageInfo = doPut("/api/images/tenant/" + filename + "/info", newImageInfo, TbResourceInfo.class); + + assertThat(newImageInfo.getTitle()).isEqualTo(newTitle); + assertThat(newImageInfo.getDescriptor(ImageDescriptor.class)).isEqualTo(imageDescriptor); + assertThat(newImageInfo.getResourceKey()).isEqualTo(imageInfo.getResourceKey()); + assertThat(newImageInfo.getPublicResourceKey()).isEqualTo(newImageInfo.getPublicResourceKey()); } @Test diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java index 07fa41f8bd..cd47988658 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantInfo; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -64,6 +65,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; import java.util.ArrayList; import java.util.Collections; @@ -71,7 +73,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -80,6 +81,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.containsString; import static org.mockito.ArgumentMatchers.argThat; @@ -700,6 +702,45 @@ public class TenantControllerTest extends AbstractControllerTest { }); } + @Test + public void whenTenantIsDeleted_thenDeleteQueues() throws Exception { + loginSysAdmin(); + TenantProfile tenantProfile = new TenantProfile(); + tenantProfile.setName("Test profile"); + TenantProfileData tenantProfileData = new TenantProfileData(); + tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration()); + tenantProfile.setProfileData(tenantProfileData); + tenantProfile.setIsolatedTbRuleEngine(true); + addQueueConfig(tenantProfile, MAIN_QUEUE_NAME); + tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); + createDifferentTenant(); + loginSysAdmin(); + savedDifferentTenant.setTenantProfileId(tenantProfile.getId()); + savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class); + TenantId tenantId = differentTenantId; + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNotNull(); + }); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId); + assertThat(tpi.getTenantId()).hasValue(tenantId); + TbMsg tbMsg = publishTbMsg(tenantId, tpi); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + verify(actorContext).tell(argThat(msg -> { + return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(tbMsg.getId()); + })); + }); + + deleteDifferentTenant(); + + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNull(); + assertThatThrownBy(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId)) + .isInstanceOf(TenantNotFoundException.class); + + verify(queueAdmin).deleteTopic(eq(tpi.getFullTopicName())); + }); + } + private TbMsg publishTbMsg(TenantId tenantId, TopicPartitionInfo tpi) { TbMsg tbMsg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, tenantId, TbMsgMetaData.EMPTY, "{\"test\":1}"); TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() @@ -759,7 +800,7 @@ public class TenantControllerTest extends AbstractControllerTest { queueConfiguration.setName(queueName); queueConfiguration.setTopic(topic); queueConfiguration.setPollInterval(25); - queueConfiguration.setPartitions(1 + new Random().nextInt(99)); + queueConfiguration.setPartitions(12); queueConfiguration.setConsumerPerPartition(true); queueConfiguration.setPackProcessingTimeout(2000); SubmitStrategy submitStrategy = new SubmitStrategy(); @@ -799,20 +840,20 @@ public class TenantControllerTest extends AbstractControllerTest { ArgumentMatcher matcherTenant = cntTime == 1 ? argument -> argument.equals(tenant) : argument -> argument.getClass().equals(Tenant.class); if (ComponentLifecycleEvent.DELETED.equals(event)) { - Mockito.verify(tbClusterService, times( cntTime)).onTenantDelete(Mockito.argThat(matcherTenant), + Mockito.verify(tbClusterService, times(cntTime)).onTenantDelete(Mockito.argThat(matcherTenant), Mockito.isNull()); } else { - Mockito.verify(tbClusterService, times( cntTime)).onTenantChange(Mockito.argThat(matcherTenant), + Mockito.verify(tbClusterService, times(cntTime)).onTenantChange(Mockito.argThat(matcherTenant), Mockito.isNull()); } TenantId tenantId = cntTime == 1 ? tenant.getId() : (TenantId) createEntityId_NULL_UUID(tenant); - testBroadcastEntityStateChangeEventTime(tenantId, tenantId, cntTime); + testBroadcastEntityStateChangeEventTime(tenantId, tenantId, cntTime); Mockito.reset(tbClusterService); } private void testBroadcastEntityStateChangeEventNeverTenant() { Mockito.verify(tbClusterService, never()).onTenantChange(Mockito.any(Tenant.class), - Mockito.isNull()); + Mockito.isNull()); testBroadcastEntityStateChangeEventNever(createEntityId_NULL_UUID(new Tenant())); Mockito.reset(tbClusterService); } diff --git a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java index 5ada0e69d5..239cb87ab4 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AlarmEdgeTest.java @@ -15,21 +15,28 @@ */ package org.thingsboard.server.edge; +import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.node.TextNode; import com.google.protobuf.AbstractMessage; import org.junit.Assert; import org.junit.Test; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.alarm.AlarmComment; +import org.thingsboard.server.common.data.alarm.AlarmCommentInfo; +import org.thingsboard.server.common.data.alarm.AlarmCommentType; import org.thingsboard.server.common.data.alarm.AlarmInfo; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.alarm.AlarmStatus; +import org.thingsboard.server.common.data.id.AlarmCommentId; import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; @@ -84,26 +91,18 @@ public class AlarmEdgeTest extends AbstractEdgeTest { alarm.setOriginator(device.getId()); alarm.setType("alarm"); alarm.setSeverity(AlarmSeverity.CRITICAL); - edgeImitator.expectMessageAmount(1); Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class); - Assert.assertTrue(edgeImitator.waitForMessages()); - AbstractMessage latestMessage = edgeImitator.getLatestMessage(); - Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - AlarmUpdateMsg alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; - Alarm alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); - Assert.assertNotNull(alarmMsg); - Assert.assertEquals(savedAlarm, alarmMsg); - Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); + edgeImitator.ignoreType(AlarmCommentUpdateMsg.class); // ack alarm edgeImitator.expectMessageAmount(1); doPost("/api/alarm/" + savedAlarm.getUuidId() + "/ack"); Assert.assertTrue(edgeImitator.waitForMessages()); - latestMessage = edgeImitator.getLatestMessage(); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); Assert.assertTrue(latestMessage instanceof AlarmUpdateMsg); - alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; + AlarmUpdateMsg alarmUpdateMsg = (AlarmUpdateMsg) latestMessage; Assert.assertEquals(UpdateMsgType.ALARM_ACK_RPC_MESSAGE, alarmUpdateMsg.getMsgType()); - alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); + Alarm alarmMsg = JacksonUtil.fromString(alarmUpdateMsg.getEntity(), Alarm.class, true); Assert.assertNotNull(alarmMsg); Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); @@ -137,6 +136,99 @@ public class AlarmEdgeTest extends AbstractEdgeTest { Assert.assertEquals(savedAlarm.getType(), alarmMsg.getType()); Assert.assertEquals(savedAlarm.getName(), alarmMsg.getName()); Assert.assertEquals(AlarmStatus.CLEARED_ACK, alarmMsg.getStatus()); + edgeImitator.allowIgnoredTypes(); + } + + @Test + public void testSendAlarmCommentToCloud() throws Exception { + Device device = saveDeviceOnCloudAndVerifyDeliveryToEdge(); + + Alarm alarm = buildAlarmForUplinkMsg(device.getId()); + + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + AlarmUpdateMsg.Builder alarmUpdateMgBuilder = AlarmUpdateMsg.newBuilder(); + alarmUpdateMgBuilder.setIdMSB(alarm.getUuidId().getMostSignificantBits()); + alarmUpdateMgBuilder.setIdLSB(alarm.getUuidId().getLeastSignificantBits()); + alarmUpdateMgBuilder.setEntity(JacksonUtil.toString(alarm)); + testAutoGeneratedCodeByProtobuf(alarmUpdateMgBuilder); + uplinkMsgBuilder.addAlarmUpdateMsg(alarmUpdateMgBuilder.build()); + + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + AlarmComment alarmComment = buildAlarmCommentForUplinkMsg(alarm.getId()); + + uplinkMsgBuilder = UplinkMsg.newBuilder(); + AlarmCommentUpdateMsg.Builder alarmCommentUpdateMgBuilder = AlarmCommentUpdateMsg.newBuilder(); + alarmCommentUpdateMgBuilder.setEntity(JacksonUtil.toString(alarmComment)); + alarmCommentUpdateMgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); + testAutoGeneratedCodeByProtobuf(alarmCommentUpdateMgBuilder); + uplinkMsgBuilder.addAlarmCommentUpdateMsg(alarmCommentUpdateMgBuilder.build()); + + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + Assert.assertTrue(edgeImitator.waitForResponses()); + + PageData pageData = doGetTyped("/api/alarm/" + alarmComment.getAlarmId().getId() + "/comment" + "?page=0&pageSize=1", new TypeReference<>() {}); + Assert.assertNotNull("Found pageData is null", pageData); + Assert.assertNotEquals("Expected alarms are not found!", 0, pageData.getTotalElements()); + + Assert.assertNotNull(pageData.getData().get(0)); + AlarmCommentInfo alarmInfo = pageData.getData().get(0); + Assert.assertEquals(alarm.getId(), alarmInfo.getAlarmId()); + Assert.assertEquals(alarmComment.getAlarmId(), alarmInfo.getAlarmId()); + } + + @Test + public void testAlarmComments() throws Exception { + Device device = findDeviceByName("Edge Device 1"); + Alarm alarm = new Alarm(); + alarm.setOriginator(device.getId()); + alarm.setType("alarm"); + alarm.setSeverity(AlarmSeverity.MINOR); + Alarm savedAlarm = doPost("/api/alarm", alarm, Alarm.class); + + // create alarm comment + edgeImitator.expectMessageAmount(1); + AlarmComment alarmComment = new AlarmComment(); + alarmComment.setComment(new TextNode("Test")); + alarmComment.setAlarmId(savedAlarm.getId()); + alarmComment = doPost("/api/alarm/" + savedAlarm.getUuidId() + "/comment", alarmComment, AlarmComment.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + AbstractMessage latestMessage = edgeImitator.getLatestMessage(); + AlarmCommentUpdateMsg alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + AlarmComment alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); + Assert.assertEquals(alarmComment, alarmCommentMsg); + + // update alarm comment + edgeImitator.expectMessageAmount(1); + alarmComment.setComment(JacksonUtil.newObjectNode().set("text", new TextNode("Updated comment"))); + alarmComment = doPost("/api/alarm/" + savedAlarm.getUuidId() + "/comment", alarmComment, AlarmComment.class); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); + Assert.assertEquals(alarmComment, alarmCommentMsg); + + // delete alarm + edgeImitator.expectMessageAmount(1); + doDelete("/api/alarm/" + savedAlarm.getUuidId() + "/comment/" + alarmComment.getUuidId()) + .andExpect(status().isOk()); + Assert.assertTrue(edgeImitator.waitForMessages()); + latestMessage = edgeImitator.getLatestMessage(); + alarmCommentUpdateMsg = (AlarmCommentUpdateMsg) latestMessage; + Assert.assertEquals(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE, alarmCommentUpdateMsg.getMsgType()); + alarmCommentMsg = JacksonUtil.fromString(alarmCommentUpdateMsg.getEntity(), AlarmComment.class, true); + Assert.assertNotNull(alarmCommentMsg); } private Alarm buildAlarmForUplinkMsg(DeviceId deviceId) { @@ -148,4 +240,16 @@ public class AlarmEdgeTest extends AbstractEdgeTest { alarm.setSeverity(AlarmSeverity.CRITICAL); return alarm; } + + private AlarmComment buildAlarmCommentForUplinkMsg(AlarmId alarmId) { + UUID uuid = Uuids.timeBased(); + AlarmComment alarmComment = new AlarmComment(); + alarmComment.setAlarmId(alarmId); + alarmComment.setType(AlarmCommentType.OTHER); + alarmComment.setUserId(tenantAdminUserId); + alarmComment.setId(new AlarmCommentId(uuid)); + alarmComment.setComment(new TextNode("AlarmComment")); + alarmComment.setCreatedTime(Uuids.unixTimestamp(uuid)); + return alarmComment; + } } diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index f7c464fab9..f5e018ed3a 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -27,6 +27,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.edge.rpc.EdgeGrpcClient; import org.thingsboard.edge.rpc.EdgeRpcClient; import org.thingsboard.server.gen.edge.v1.AdminSettingsUpdateMsg; +import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; @@ -231,6 +232,11 @@ public class EdgeImitator { result.add(saveDownlinkMsg(alarmUpdateMsg)); } } + if (downlinkMsg.getAlarmCommentUpdateMsgCount() > 0) { + for (AlarmCommentUpdateMsg alarmCommentUpdateMsg : downlinkMsg.getAlarmCommentUpdateMsgList()) { + result.add(saveDownlinkMsg(alarmCommentUpdateMsg)); + } + } if (downlinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : downlinkMsg.getEntityDataList()) { if (randomFailuresOnTimeseriesDownlink) { diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index 384c00c8ed..9c45977a7c 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; @@ -157,6 +158,9 @@ public abstract class BaseEdgeProcessorTest { @MockBean protected AlarmService alarmService; + @MockBean + protected AlarmCommentService alarmCommentService; + @MockBean protected DeviceService deviceService; diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index c03c2babaa..6bd0cd9c0e 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -447,7 +447,7 @@ public class TbRuleEngineQueueConsumerManagerTest { verifyMsgProcessed(consumer1.testMsg); verifyMsgProcessed(consumer2.testMsg); - consumerManager.delete(); + consumerManager.delete(true); await().atMost(2, TimeUnit.SECONDS) .untilAsserted(() -> { @@ -488,7 +488,7 @@ public class TbRuleEngineQueueConsumerManagerTest { verifySubscribedAndLaunched(consumer, partitions); verifyMsgProcessed(consumer.testMsg); - consumerManager.delete(); + consumerManager.delete(true); await().atMost(2, TimeUnit.SECONDS) .untilAsserted(() -> { diff --git a/application/src/test/resources/logback-test.xml b/application/src/test/resources/logback-test.xml index 3d0ce3f933..981bcab132 100644 --- a/application/src/test/resources/logback-test.xml +++ b/application/src/test/resources/logback-test.xml @@ -38,8 +38,6 @@ - - diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java index 7bee9f6983..c822693a71 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/alarm/AlarmCommentService.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; public interface AlarmCommentService { + AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment); AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java index dc969bb362..69a3fc7d8c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Throwables; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -32,6 +33,7 @@ import java.util.concurrent.ThreadLocalRandom; public final class EdgeUtils { private static final EnumMap entityTypeEdgeEventTypeEnumMap; + private static final EnumMap actionTypeEdgeEventActionTypeEnumMap; static { entityTypeEdgeEventTypeEnumMap = new EnumMap<>(EntityType.class); @@ -40,6 +42,13 @@ public final class EdgeUtils { entityTypeEdgeEventTypeEnumMap.put(edgeEventType.getEntityType(), edgeEventType); } } + + actionTypeEdgeEventActionTypeEnumMap = new EnumMap<>(ActionType.class); + for (EdgeEventActionType edgeEventActionType : EdgeEventActionType.values()) { + if (edgeEventActionType.getActionType() != null) { + actionTypeEdgeEventActionTypeEnumMap.put(edgeEventActionType.getActionType(), edgeEventActionType); + } + } } private static final int STACK_TRACE_LIMIT = 10; @@ -54,6 +63,10 @@ public final class EdgeUtils { return entityTypeEdgeEventTypeEnumMap.get(entityType); } + public static EdgeEventActionType getEdgeEventActionTypeByActionType(ActionType actionType) { + return actionTypeEdgeEventActionTypeEnumMap.get(actionType); + } + public static EdgeEvent constructEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java index e355901182..680f744c7b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventActionType.java @@ -15,26 +15,39 @@ */ package org.thingsboard.server.common.data.edge; +import lombok.Getter; +import org.thingsboard.server.common.data.audit.ActionType; + +@Getter public enum EdgeEventActionType { - ADDED, - DELETED, - UPDATED, - POST_ATTRIBUTES, - ATTRIBUTES_UPDATED, - ATTRIBUTES_DELETED, - TIMESERIES_UPDATED, - CREDENTIALS_UPDATED, - ASSIGNED_TO_CUSTOMER, - UNASSIGNED_FROM_CUSTOMER, - RELATION_ADD_OR_UPDATE, - RELATION_DELETED, - RPC_CALL, - ALARM_ACK, - ALARM_CLEAR, - ALARM_ASSIGNED, - ALARM_UNASSIGNED, - ASSIGNED_TO_EDGE, - UNASSIGNED_FROM_EDGE, - CREDENTIALS_REQUEST, - ENTITY_MERGE_REQUEST // deprecated -} \ No newline at end of file + ADDED(ActionType.ADDED), + UPDATED(ActionType.UPDATED), + DELETED(ActionType.DELETED), + POST_ATTRIBUTES(null), + ATTRIBUTES_UPDATED(ActionType.ATTRIBUTES_UPDATED), + ATTRIBUTES_DELETED(ActionType.ATTRIBUTES_DELETED), + TIMESERIES_UPDATED(ActionType.TIMESERIES_UPDATED), + CREDENTIALS_UPDATED(ActionType.CREDENTIALS_UPDATED), + ASSIGNED_TO_CUSTOMER(ActionType.ASSIGNED_TO_CUSTOMER), + UNASSIGNED_FROM_CUSTOMER(ActionType.UNASSIGNED_FROM_CUSTOMER), + RELATION_ADD_OR_UPDATE(ActionType.RELATION_ADD_OR_UPDATE), + RELATION_DELETED(ActionType.RELATION_DELETED), + RPC_CALL(ActionType.RPC_CALL), + ALARM_ACK(ActionType.ALARM_ACK), + ALARM_CLEAR(ActionType.ALARM_CLEAR), + ALARM_ASSIGNED(ActionType.ALARM_ASSIGNED), + ALARM_UNASSIGNED(ActionType.ALARM_UNASSIGNED), + ADDED_COMMENT(ActionType.ADDED_COMMENT), + UPDATED_COMMENT(ActionType.UPDATED_COMMENT), + DELETED_COMMENT(ActionType.DELETED_COMMENT), + ASSIGNED_TO_EDGE(ActionType.ASSIGNED_TO_EDGE), + UNASSIGNED_FROM_EDGE(ActionType.UNASSIGNED_FROM_EDGE), + CREDENTIALS_REQUEST(null), + ENTITY_MERGE_REQUEST(null); // deprecated + + private final ActionType actionType; + + EdgeEventActionType(ActionType actionType) { + this.actionType = actionType; + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index a8627d3c0d..6fc255501f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -27,6 +27,7 @@ public enum EdgeEventType { ASSET_PROFILE(true, EntityType.ASSET_PROFILE), ENTITY_VIEW(false, EntityType.ENTITY_VIEW), ALARM(false, EntityType.ALARM), + ALARM_COMMENT(false, null), RULE_CHAIN(false, EntityType.RULE_CHAIN), RULE_CHAIN_METADATA(false, null), EDGE(false, EntityType.EDGE), diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java b/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java index 19c3fc8f4f..8d46a46781 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/page/BasePageDataIterable.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.page; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -64,8 +65,8 @@ public abstract class BasePageDataIterable implements Iterable, Iterator pageData = fetchPageData(link); currentIdx = 0; - currentItems = pageData.getData(); - hasNextPack = pageData.hasNext(); + currentItems = pageData != null ? pageData.getData() : new ArrayList<>(); + hasNextPack = pageData != null && pageData.hasNext(); nextPackLink = link.nextPageLink(); } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 2f33c21fd8..34a8c6f093 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -325,6 +325,11 @@ message AlarmUpdateMsg { string entity = 18; } +message AlarmCommentUpdateMsg { + UpdateMsgType msgType = 1; + string entity = 2; +} + message CustomerUpdateMsg { UpdateMsgType msgType = 1; int64 idMSB = 2; @@ -618,6 +623,7 @@ message UplinkMsg { repeated AssetProfileUpdateMsg assetProfileUpdateMsg = 19; repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 20; repeated ResourceUpdateMsg resourceUpdateMsg = 21; + repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 22; } message UplinkResponseMsg { @@ -661,5 +667,6 @@ message DownlinkMsg { repeated TenantUpdateMsg tenantUpdateMsg = 26; repeated TenantProfileUpdateMsg tenantProfileUpdateMsg = 27; repeated ResourceUpdateMsg resourceUpdateMsg = 28; + repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 29; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 1b496c8aca..6caf17aeef 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -35,7 +35,6 @@ import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent; import org.thingsboard.server.queue.util.AfterStartUp; import javax.annotation.PostConstruct; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -190,14 +189,25 @@ public class HashPartitionService implements PartitionService { myPartitions.remove(queueKey); partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); - //TODO: remove after merging tb entity services - removeTenant(tenantId); - + evictTenantInfo(tenantId); if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet())); } } + @Override + public void removeTenant(TenantId tenantId) { + List queueKeys = partitionSizesMap.keySet().stream() + .filter(queueKey -> tenantId.equals(queueKey.getTenantId())) + .collect(Collectors.toList()); + queueKeys.forEach(queueKey -> { + myPartitions.remove(queueKey); + partitionTopicsMap.remove(queueKey); + partitionSizesMap.remove(queueKey); + }); + evictTenantInfo(tenantId); + } + @Override public boolean isManagedByCurrentService(TenantId tenantId) { Set assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles(); @@ -258,6 +268,7 @@ public class HashPartitionService implements PartitionService { @Override public synchronized void recalculatePartitions(ServiceInfo currentService, List otherServices) { + log.info("Recalculating partitions"); tbTransportServicesByType.clear(); responsibleServices.clear(); logServiceInfo(currentService); @@ -274,9 +285,14 @@ public class HashPartitionService implements PartitionService { final ConcurrentMap> newPartitions = new ConcurrentHashMap<>(); partitionSizesMap.forEach((queueKey, size) -> { for (int i = 0; i < size; i++) { - ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i); - if (currentService.equals(serviceInfo)) { - newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); + try { + ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i); + log.trace("Server responsible for {}[{}] - {}", queueKey, i, serviceInfo != null ? serviceInfo.getServiceId() : "none"); + if (currentService.equals(serviceInfo)) { + newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); + } + } catch (Exception e) { + log.warn("Failed to resolve server responsible for {}[{}]", queueKey, i, e); } } }); @@ -399,7 +415,7 @@ public class HashPartitionService implements PartitionService { } @Override - public void removeTenant(TenantId tenantId) { + public void evictTenantInfo(TenantId tenantId) { tenantRoutingInfoMap.remove(tenantId); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 48e5b55ece..2b2479f59b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -59,7 +59,7 @@ public interface PartitionService { int resolvePartitionIndex(UUID entityId, int partitions); - void removeTenant(TenantId tenantId); + void evictTenantInfo(TenantId tenantId); int countTransportsByType(String type); @@ -67,6 +67,8 @@ public interface PartitionService { void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg); + void removeTenant(TenantId tenantId); + boolean isManagedByCurrentService(TenantId tenantId); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java index 84d6d37824..f754d6f5b1 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/AbstractActivityManager.java @@ -28,10 +28,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; @Slf4j -public abstract class AbstractActivityManager implements ActivityManager { +public abstract class AbstractActivityManager implements ActivityManager { private final ConcurrentMap states = new ConcurrentHashMap<>(); @@ -54,8 +53,6 @@ public abstract class AbstractActivityManager implements Activity protected abstract long getReportingPeriodMillis(); - protected abstract ActivityState createNewState(Key key); - protected abstract ActivityStrategy getStrategy(); protected abstract ActivityState updateState(Key key, ActivityState state); @@ -67,7 +64,7 @@ public abstract class AbstractActivityManager implements Activity protected abstract void reportActivity(Key key, Metadata metadata, long timeToReport, ActivityReportCallback callback); @Override - public void onActivity(Key key, long newLastRecordedTime) { + public void onActivity(Key key, Metadata metadata, long newLastRecordedTime) { if (key == null) { log.error("Failed to process activity event: provided activity key is null."); return; @@ -77,36 +74,28 @@ public abstract class AbstractActivityManager implements Activity var shouldReport = new AtomicBoolean(false); var lastRecordedTime = new AtomicLong(); var lastReportedTime = new AtomicLong(); - var metadata = new AtomicReference(); - var activityStateWrapper = states.compute(key, (__, stateWrapper) -> { + states.compute(key, (__, stateWrapper) -> { if (stateWrapper == null) { - var newState = createNewState(key); - if (newState == null) { - return null; - } + ActivityState newState = new ActivityState<>(); stateWrapper = new ActivityStateWrapper(); stateWrapper.setState(newState); stateWrapper.setStrategy(getStrategy()); } var state = stateWrapper.getState(); + state.setMetadata(metadata); if (state.getLastRecordedTime() < newLastRecordedTime) { state.setLastRecordedTime(newLastRecordedTime); } shouldReport.set(stateWrapper.getStrategy().onActivity()); lastRecordedTime.set(state.getLastRecordedTime()); lastReportedTime.set(stateWrapper.getLastReportedTime()); - metadata.set(state.getMetadata()); return stateWrapper; }); - if (activityStateWrapper == null) { - return; - } - if (shouldReport.get() && lastReportedTime.get() < lastRecordedTime.get()) { log.debug("Going to report first activity event for key: [{}].", key); - reportActivity(key, metadata.get(), lastRecordedTime.get(), new ActivityReportCallback<>() { + reportActivity(key, metadata, lastRecordedTime.get(), new ActivityReportCallback<>() { @Override public void onSuccess(Key key, long reportedTime) { updateLastReportedTime(key, reportedTime); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java index 0f2145ab6f..5d27738429 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/activity/ActivityManager.java @@ -15,9 +15,9 @@ */ package org.thingsboard.server.common.transport.activity; -public interface ActivityManager { +public interface ActivityManager { - void onActivity(Key key, long activityTimeMillis); + void onActivity(Key key, Metadata metadata, long activityTimeMillis); void onReportingPeriodEnd(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index a6a6f5d196..0d7fbfc332 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -772,7 +772,7 @@ public class DefaultTransportService extends TransportActivityManager implements } private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { - onActivity(toSessionId(sessionInfo), getCurrentTimeMillis()); + onActivity(toSessionId(sessionInfo), sessionInfo, getCurrentTimeMillis()); } @Override @@ -931,8 +931,8 @@ public class DefaultTransportService extends TransportActivityManager implements Optional profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); if (profileOpt.isPresent()) { Tenant tenant = profileOpt.get(); - partitionService.removeTenant(tenant.getId()); boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId()); + partitionService.evictTenantInfo(tenant.getId()); if (updated) { rateLimitService.update(tenant.getId()); } @@ -957,7 +957,9 @@ public class DefaultTransportService extends TransportActivityManager implements } else if (EntityType.TENANT_PROFILE.equals(entityType)) { tenantProfileCache.remove(new TenantProfileId(entityUuid)); } else if (EntityType.TENANT.equals(entityType)) { - rateLimitService.remove(TenantId.fromUUID(entityUuid)); + TenantId tenantId = TenantId.fromUUID(entityUuid); + rateLimitService.remove(tenantId); + partitionService.removeTenant(tenantId); } else if (EntityType.DEVICE.equals(entityType)) { rateLimitService.remove(new DeviceId(entityUuid)); onDeviceDeleted(new DeviceId(entityUuid)); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java index 1368785466..0c9ebd0e4b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportActivityManager.java @@ -57,17 +57,6 @@ public abstract class TransportActivityManager extends AbstractActivityManager createNewState(UUID sessionId) { - SessionMetaData session = sessions.get(sessionId); - if (session == null) { - return null; - } - ActivityState state = new ActivityState<>(); - state.setMetadata(session.getSessionInfo()); - return state; - } - @Override protected ActivityStrategy getStrategy() { return reportingStrategyType.toStrategy(); diff --git a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java index e0f5ef128e..a87532541d 100644 --- a/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java +++ b/common/transport/transport-api/src/test/java/org/thingsboard/server/common/transport/service/TransportActivityManagerTest.java @@ -40,6 +40,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -63,6 +65,34 @@ public class TransportActivityManagerTest { ReflectionTestUtils.setField(transportServiceMock, "sessions", sessions); } + @Test + void givenFirstActivityForAlreadyRemovedSessionAndFirstEventReportingStrategy_whenOnActivity_thenShouldRecordActivityAndReport() { + // GIVEN + ConcurrentMap states = new ConcurrentHashMap<>(); + ReflectionTestUtils.setField(transportServiceMock, "states", states); + + var strategyMock = mock(ActivityStrategy.class); + when(transportServiceMock.getStrategy()).thenReturn(strategyMock); + when(strategyMock.onActivity()).thenReturn(true); + + long activityTime = 123L; + var sessionInfo = TransportProtos.SessionInfoProto.newBuilder() + .setSessionIdMSB(SESSION_ID.getMostSignificantBits()) + .setSessionIdLSB(SESSION_ID.getLeastSignificantBits()) + .build(); + + doCallRealMethod().when(transportServiceMock).getLastRecordedTime(SESSION_ID); + doCallRealMethod().when(transportServiceMock).onActivity(SESSION_ID, sessionInfo, activityTime); + + // WHEN + transportServiceMock.onActivity(SESSION_ID, sessionInfo, activityTime); + + // THEN + assertThat(states).containsKey(SESSION_ID); + assertThat(transportServiceMock.getLastRecordedTime(SESSION_ID)).isEqualTo(activityTime); + verify(transportServiceMock).reportActivity(eq(SESSION_ID), eq(sessionInfo), eq(activityTime), any(ActivityReportCallback.class)); + } + @Test void givenKeyAndTimeToReportAndSessionExists_whenReportingActivity_thenShouldReportActivityWithSubscriptionsAndSessionInfoFromSession() { // GIVEN @@ -175,28 +205,7 @@ public class TransportActivityManagerTest { transportServiceMock.recordActivity(sessionInfo); // THEN - verify(transportServiceMock).onActivity(SESSION_ID, expectedTime); - } - - @Test - void givenKey_whenCreatingNewState_thenShouldCorrectlyCreateNewEmptyState() { - // GIVEN - TransportProtos.SessionInfoProto sessionInfo = TransportProtos.SessionInfoProto.newBuilder() - .setSessionIdMSB(SESSION_ID.getMostSignificantBits()) - .setSessionIdLSB(SESSION_ID.getLeastSignificantBits()) - .build(); - sessions.put(SESSION_ID, new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, null)); - - when(transportServiceMock.createNewState(SESSION_ID)).thenCallRealMethod(); - - ActivityState expectedState = new ActivityState<>(); - expectedState.setMetadata(sessionInfo); - - // WHEN - ActivityState actualState = transportServiceMock.createNewState(SESSION_ID); - - // THEN - assertThat(actualState).isEqualTo(expectedState); + verify(transportServiceMock).onActivity(SESSION_ID, sessionInfo, expectedTime); } @ParameterizedTest diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java index 3bc8ceb1a0..5bdd9803b2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmCommentService.java @@ -31,6 +31,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.entity.AbstractEntityService; +import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; +import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.service.DataValidator; import java.util.UUID; @@ -50,18 +52,28 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al @Override public AlarmComment createOrUpdateAlarmComment(TenantId tenantId, AlarmComment alarmComment) { alarmCommentDataValidator.validate(alarmComment, c -> tenantId); - if (alarmComment.getId() == null) { - return createAlarmComment(tenantId, alarmComment); + boolean isCreated = alarmComment.getId() == null; + AlarmComment result; + if (isCreated) { + result = createAlarmComment(tenantId, alarmComment); } else { - return updateAlarmComment(tenantId, alarmComment); + result = updateAlarmComment(tenantId, alarmComment); + } + if (result != null) { + eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId).entity(result) + .entityId(result.getAlarmId()).created(isCreated).build()); } + return result; } @Override public AlarmComment saveAlarmComment(TenantId tenantId, AlarmComment alarmComment) { log.debug("Deleting Alarm Comment: {}", alarmComment); alarmCommentDataValidator.validate(alarmComment, c -> tenantId); - return alarmCommentDao.save(tenantId, alarmComment); + AlarmComment result = alarmCommentDao.save(tenantId, alarmComment); + eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entity(result) + .entityId(result.getAlarmId()).build()); + return result; } @Override @@ -112,5 +124,4 @@ public class BaseAlarmCommentService extends AbstractEntityService implements Al } return null; } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java index ae9763c59c..72e1b9c7bf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/alarm/BaseAlarmService.java @@ -135,7 +135,7 @@ public class BaseAlarmService extends AbstractCachedEntityService { private final T entity; private final T oldEntity; private final EntityId entityId; - private final Boolean added; + private final Boolean created; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java b/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java index 24539828c9..53a585c358 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/ota/BaseOtaPackageService.java @@ -84,7 +84,7 @@ public class BaseOtaPackageService extends AbstractCachedEntityService> serviceClass; } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java index 971c6ecf8e..f77098c3c3 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/HighLatencyNotification.java @@ -34,7 +34,7 @@ public class HighLatencyNotification implements Notification { StringBuilder text = new StringBuilder(); text.append("Some of the latencies are higher than ").append(thresholdMs).append(" ms:\n"); highLatencies.forEach(latency -> { - text.append(String.format("[%s] %s\n", latency.getKey(), latency.getFormattedValue())); + text.append(String.format("[%s] *%s*\n", latency.getKey(), latency.getFormattedValue())); }); return text.toString(); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java index af6e597dc9..63af34a0fb 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceFailureNotification.java @@ -43,7 +43,7 @@ public class ServiceFailureNotification implements Notification { if (errorMsg == null) { errorMsg = error.getClass().getSimpleName(); } - return String.format("[%s] Failure: %s (number of subsequent failures: %s)", serviceKey, errorMsg, failuresCount); + return String.format("%s - Failure: %s (number of subsequent failures: %s)", serviceKey, errorMsg, failuresCount); } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java index b1735e6645..e6c1e85b83 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/data/notification/ServiceRecoveryNotification.java @@ -25,7 +25,7 @@ public class ServiceRecoveryNotification implements Notification { @Override public String getText() { - return String.format("[%s] is OK", serviceKey); + return String.format("%s is OK", serviceKey); } } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java index 1086165a40..1949584d7f 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/NotificationService.java @@ -17,6 +17,8 @@ package org.thingsboard.monitoring.notification; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.monitoring.data.notification.Notification; import org.thingsboard.monitoring.notification.channels.NotificationChannel; @@ -24,7 +26,6 @@ import org.thingsboard.monitoring.notification.channels.NotificationChannel; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Consumer; @Component @RequiredArgsConstructor @@ -34,15 +35,20 @@ public class NotificationService { private final List notificationChannels; private final ExecutorService notificationExecutor = Executors.newSingleThreadExecutor(); - public void sendNotification(Notification notification) { - forEachNotificationChannel(notificationChannel -> notificationChannel.sendNotification(notification)); - } + @Value("${monitoring.notifications.message_prefix}") + private String messagePrefix; - private void forEachNotificationChannel(Consumer function) { + public void sendNotification(Notification notification) { + String message; + if (StringUtils.isEmpty(messagePrefix)) { + message = notification.getText(); + } else { + message = messagePrefix + System.lineSeparator() + notification.getText(); + } notificationChannels.forEach(notificationChannel -> { notificationExecutor.submit(() -> { try { - function.accept(notificationChannel); + notificationChannel.sendNotification(message); } catch (Exception e) { log.error("Failed to send notification to {}", notificationChannel.getClass().getSimpleName(), e); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java index 6841756607..9ee52608ba 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/NotificationChannel.java @@ -15,10 +15,8 @@ */ package org.thingsboard.monitoring.notification.channels; -import org.thingsboard.monitoring.data.notification.Notification; - public interface NotificationChannel { - void sendNotification(Notification notification); + void sendNotification(String message); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java index 3a6a7a0efb..e508159b8b 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/notification/channels/impl/SlackNotificationChannel.java @@ -21,7 +21,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; -import org.thingsboard.monitoring.data.notification.Notification; import org.thingsboard.monitoring.notification.channels.NotificationChannel; import javax.annotation.PostConstruct; @@ -29,11 +28,11 @@ import java.time.Duration; import java.util.Map; @Component -@ConditionalOnProperty(value = "monitoring.notification_channels.slack.enabled", havingValue = "true") +@ConditionalOnProperty(value = "monitoring.notifications.slack.enabled", havingValue = "true") @Slf4j public class SlackNotificationChannel implements NotificationChannel { - @Value("${monitoring.notification_channels.slack.webhook_url}") + @Value("${monitoring.notifications.slack.webhook_url}") private String webhookUrl; private RestTemplate restTemplate; @@ -47,11 +46,7 @@ public class SlackNotificationChannel implements NotificationChannel { } @Override - public void sendNotification(Notification notification) { - sendNotification(notification.getText()); - } - - private void sendNotification(String message) { + public void sendNotification(String message) { restTemplate.postForObject(webhookUrl, Map.of("text", message), String.class); } diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java index 8dbd39187c..bee65d1f37 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/service/transport/TransportHealthChecker.java @@ -29,6 +29,8 @@ import org.thingsboard.monitoring.service.BaseHealthChecker; import org.thingsboard.monitoring.util.ResourceUtils; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.DeviceProfileType; +import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MBootstrapClientCredentials; import org.thingsboard.server.common.data.device.credentials.lwm2m.LwM2MDeviceCredentials; @@ -38,6 +40,9 @@ import org.thingsboard.server.common.data.device.data.DefaultDeviceConfiguration import org.thingsboard.server.common.data.device.data.DefaultDeviceTransportConfiguration; import org.thingsboard.server.common.data.device.data.DeviceData; import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration; +import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileConfiguration; +import org.thingsboard.server.common.data.device.profile.DefaultDeviceProfileTransportConfiguration; +import org.thingsboard.server.common.data.device.profile.DeviceProfileData; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; @@ -45,27 +50,19 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType; @Slf4j public abstract class TransportHealthChecker extends BaseHealthChecker { - private static final String DEFAULT_DEVICE_NAME = "[Monitoring] %s transport (%s)"; - private static final String DEFAULT_PROFILE_NAME = "[Monitoring] %s"; - public TransportHealthChecker(C config, TransportMonitoringTarget target) { super(config, target); } @Override protected void initialize(TbClient tbClient) { - String deviceName = String.format(DEFAULT_DEVICE_NAME, config.getTransportType(), target.getBaseUrl()); - Device device = tbClient.getTenantDevice(deviceName) - .orElseGet(() -> { - log.info("Creating new device '{}'", deviceName); - return createDevice(config.getTransportType(), deviceName, tbClient); - }); + Device device = getOrCreateDevice(tbClient); DeviceCredentials credentials = tbClient.getDeviceCredentialsByDeviceId(device.getId()) .orElseThrow(() -> new IllegalArgumentException("No credentials found for device " + device.getId())); DeviceConfig deviceConfig = new DeviceConfig(); deviceConfig.setId(device.getId().toString()); - deviceConfig.setName(deviceName); + deviceConfig.setName(device.getName()); deviceConfig.setCredentials(credentials); target.setDevice(deviceConfig); } @@ -77,51 +74,43 @@ public abstract class TransportHealthChecker { - TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class); - log.info("Creating LwM2M resource"); - return tbClient.saveResource(newResource); - }); - String profileName = String.format(DEFAULT_PROFILE_NAME, transportType); - DeviceProfile profile = tbClient.getDeviceProfiles(new PageLink(1, 0, profileName)).getData() - .stream().findFirst() - .orElseGet(() -> { - DeviceProfile newProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class); - newProfile.setName(profileName); - log.info("Creating LwM2M device profile"); - return tbClient.saveDeviceProfile(newProfile); - }); - device.setType(profileName); - device.setDeviceProfileId(profile.getId()); deviceData.setTransportConfiguration(new Lwm2mDeviceTransportConfiguration()); - credentials.setCredentialsType(DeviceCredentialsType.LWM2M_CREDENTIALS); LwM2MDeviceCredentials lwm2mCreds = new LwM2MDeviceCredentials(); NoSecClientCredential client = new NoSecClientCredential(); @@ -133,7 +122,42 @@ public abstract class TransportHealthChecker { + TbResource newResource = ResourceUtils.getResource("lwm2m/resource.json", TbResource.class); + log.info("Creating LwM2M resource"); + return tbClient.saveResource(newResource); + }); + deviceProfile = ResourceUtils.getResource("lwm2m/device_profile.json", DeviceProfile.class); + } + + deviceProfile.setName(profileName); + deviceProfile.setDefaultQueueName(target.getQueue()); + return tbClient.saveDeviceProfile(deviceProfile); + } + } diff --git a/monitoring/src/main/resources/tb-monitoring.yml b/monitoring/src/main/resources/tb-monitoring.yml index 32af8636cd..1f65f48ca9 100644 --- a/monitoring/src/main/resources/tb-monitoring.yml +++ b/monitoring/src/main/resources/tb-monitoring.yml @@ -38,7 +38,7 @@ monitoring: check_timeout_ms: '${CHECK_TIMEOUT_MS:5000}' # Failures threshold for notifying - failures_threshold: '${FAILURES_THRESHOLD:2}' + failures_threshold: '${FAILURES_THRESHOLD:1}' # Notify after each REPEATED_FAILURE_NOTIFICATION subsequent failures, 0 to notify only once on first failure repeated_failure_notification: '${REPEATED_FAILURE_NOTIFICATION:4}' @@ -53,6 +53,8 @@ monitoring: targets: # MQTT transport base url, tcp://DOMAIN:1883 by default - base_url: '${MQTT_TRANSPORT_BASE_URL:tcp://${monitoring.domain}:1883}' + # Queue to use for target device + queue: '${MQTT_TRANSPORT_USED_QUEUE:Main}' # Whether to monitor IPs associated with the domain from base url check_domain_ips: '${MQTT_TRANSPORT_CHECK_DOMAIN_IPS:false}' # To add more targets, use following environment variables: @@ -66,6 +68,8 @@ monitoring: targets: # CoAP transport base url, coap://DOMAIN by default - base_url: '${COAP_TRANSPORT_BASE_URL:coap://${monitoring.domain}}' + # Queue to use for target device + queue: '${COAP_TRANSPORT_USED_QUEUE:Main}' # Whether to monitor IPs associated with the domain from base url check_domain_ips: '${COAP_TRANSPORT_CHECK_DOMAIN_IPS:false}' # To add more targets, use following environment variables: @@ -79,6 +83,8 @@ monitoring: targets: # HTTP transport base url, http://DOMAIN by default - base_url: '${HTTP_TRANSPORT_BASE_URL:http://${monitoring.domain}}' + # Queue to use for target device + queue: '${HTTP_TRANSPORT_USED_QUEUE:Main}' # Whether to monitor IPs associated with the domain from base url check_domain_ips: '${HTTP_TRANSPORT_CHECK_DOMAIN_IPS:false}' # To add more targets, use following environment variables: @@ -92,12 +98,15 @@ monitoring: targets: # LwM2M transport base url, coap://DOMAIN:5685 by default - base_url: '${LWM2M_TRANSPORT_BASE_URL:coap://${monitoring.domain}:5685}' + # Queue to use for target device + queue: '${LWM2M_TRANSPORT_USED_QUEUE:Main}' # Whether to monitor IPs associated with the domain from base url check_domain_ips: '${LWM2M_TRANSPORT_CHECK_DOMAIN_IPS:false}' # To add more targets, use following environment variables: # monitoring.transports.lwm2m.targets[1].base_url, monitoring.transports.lwm2m.targets[2].base_url, etc. - notification_channels: + notifications: + message_prefix: '${NOTIFICATION_MESSAGE_PREFIX:}' slack: # Enable notifying via Slack enabled: '${SLACK_NOTIFICATION_CHANNEL_ENABLED:false}' diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index d19726e2f1..f3a7ee77c7 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -18,6 +18,8 @@ package org.thingsboard.rest.client; import com.auth0.jwt.JWT; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Strings; +import org.apache.commons.io.IOUtils; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.Resource; @@ -53,11 +55,13 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.EntityViewInfo; import org.thingsboard.server.common.data.EventInfo; +import org.thingsboard.server.common.data.ImageExportData; import org.thingsboard.server.common.data.OtaPackage; import org.thingsboard.server.common.data.OtaPackageInfo; import org.thingsboard.server.common.data.SaveDeviceWithCredentialsRequest; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.SystemInfo; +import org.thingsboard.server.common.data.TbImageDeleteResult; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; @@ -168,6 +172,7 @@ import org.thingsboard.server.common.data.widget.WidgetTypeInfo; import org.thingsboard.server.common.data.widget.WidgetsBundle; import java.io.Closeable; +import java.io.IOException; import java.net.URI; import java.util.Collections; import java.util.HashMap; @@ -3591,6 +3596,97 @@ public class RestClient implements Closeable { restTemplate.delete("/api/resource/{resourceId}", resourceId.getId().toString()); } + public TbResourceInfo getImageInfo(String type, String key) { + return restTemplate.getForObject(baseURL + "/api/images/{type}/{key}/info", TbResourceInfo.class, Map.of( + "type", type, + "key", key + )); + } + + public PageData getImages(PageLink pageLink, boolean includeSystemImages) { + Map params = new HashMap<>(); + addPageLinkToParam(params, pageLink); + params.put("includeSystemImages", String.valueOf(includeSystemImages)); + return restTemplate.exchange(baseURL + "/api/images?includeSystemImages={includeSystemImages}&" + getUrlParams(pageLink), + HttpMethod.GET, + HttpEntity.EMPTY, + new ParameterizedTypeReference>() {}, + params + ).getBody(); + } + + public TbResourceInfo uploadImage(String fileName, byte[] data, String contentType, String title) { + HttpEntity> request = createMultipartRequest(fileName, data, contentType, Map.of( + "title", Strings.nullToEmpty(title) + )); + return restTemplate.postForObject(baseURL + "/api/image", request, TbResourceInfo.class); + } + + public TbResourceInfo updateImage(String type, String key, String fileName, byte[] data, String contentType) { + HttpEntity> request = createMultipartRequest(fileName, data, contentType, Map.of()); + return restTemplate.exchange(baseURL + "/api/images/{type}/{key}", HttpMethod.PUT, request, TbResourceInfo.class, Map.of( + "type", type, + "key", key + )).getBody(); + } + + public TbResourceInfo updateImageInfo(String type, String key, TbResourceInfo request) { + return restTemplate.exchange(baseURL + "/api/images/{type}/{key}/info", HttpMethod.PUT, new HttpEntity<>(request), TbResourceInfo.class, Map.of( + "type", type, + "key", key + )).getBody(); + } + + public void updateImagePublicStatus(String type, String key, boolean isPublic) { + restTemplate.put(baseURL + "/api/images/{type}/{key}/public/{isPublic}", null, Map.of( + "type", type, + "key", key, + "isPublic", isPublic + )); + } + + public byte[] downloadImage(String type, String key) throws IOException { + Resource image = restTemplate.exchange(baseURL + "/api/images/{type}/{key}", HttpMethod.GET, null, Resource.class, Map.of( + "type", type, + "key", key + )).getBody(); + return IOUtils.toByteArray(image.getInputStream()); + } + + public byte[] downloadImagePreview(String type, String key) throws IOException { + Resource image = restTemplate.exchange(baseURL + "/api/images/{type}/{key}/preview", HttpMethod.GET, null, Resource.class, Map.of( + "type", type, + "key", key + )).getBody(); + return IOUtils.toByteArray(image.getInputStream()); + } + + public byte[] downloadPublicImage(String publicResourceKey) throws IOException { + Resource image = restTemplate.exchange(baseURL + "/api/images/public/{publicResourceKey}", HttpMethod.GET, null, Resource.class, Map.of( + "publicResourceKey", publicResourceKey + )).getBody(); + return IOUtils.toByteArray(image.getInputStream()); + } + + public ImageExportData exportImage(String type, String key) { + return restTemplate.getForObject(baseURL + "/api/images/{type}/{key}/export", ImageExportData.class, Map.of( + "type", type, + "key", key + )); + } + + public TbResourceInfo importImage(ImageExportData exportData) { + return restTemplate.exchange(baseURL + "/api/image/import", HttpMethod.PUT, new HttpEntity<>(exportData), TbResourceInfo.class).getBody(); + } + + public TbImageDeleteResult deleteImage(String type, String key, boolean force) { + return restTemplate.exchange(baseURL + "/api/images/{type}/{key}?force={force}", HttpMethod.DELETE, null, TbImageDeleteResult.class, Map.of( + "type", type, + "key", key, + "force", force + )).getBody(); + } + public ResponseEntity downloadOtaPackage(OtaPackageId otaPackageId) { Map params = new HashMap<>(); params.put("otaPackageId", otaPackageId.getId().toString()); @@ -3640,16 +3736,7 @@ public class RestClient implements Closeable { } public OtaPackageInfo saveOtaPackageData(OtaPackageId otaPackageId, String checkSum, ChecksumAlgorithm checksumAlgorithm, String fileName, byte[] fileBytes) throws Exception { - HttpHeaders header = new HttpHeaders(); - header.setContentType(MediaType.MULTIPART_FORM_DATA); - - MultiValueMap fileMap = new LinkedMultiValueMap<>(); - fileMap.add(HttpHeaders.CONTENT_DISPOSITION, "form-data; name=file; filename=" + fileName); - HttpEntity fileEntity = new HttpEntity<>(new ByteArrayResource(fileBytes), fileMap); - - MultiValueMap body = new LinkedMultiValueMap<>(); - body.add("file", fileEntity); - HttpEntity> requestEntity = new HttpEntity<>(body, header); + HttpEntity> requestEntity = createMultipartRequest(fileName, fileBytes, null, Collections.emptyMap()); Map params = new HashMap<>(); params.put("otaPackageId", otaPackageId.getId().toString()); @@ -3854,6 +3941,23 @@ public class RestClient implements Closeable { return listToString(list.stream().map(Enum::name).collect(Collectors.toList())); } + private HttpEntity> createMultipartRequest(String fileName, byte[] fileData, String fileContentType, Map otherParts) { + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.MULTIPART_FORM_DATA); + + MultiValueMap fileMap = new LinkedMultiValueMap<>(); + fileMap.add(HttpHeaders.CONTENT_DISPOSITION, "form-data; name=file; filename=" + fileName); + if (fileContentType != null) { + fileMap.add(HttpHeaders.CONTENT_TYPE, fileContentType); + } + HttpEntity fileEntity = new HttpEntity<>(new ByteArrayResource(fileData), fileMap); + + MultiValueMap body = new LinkedMultiValueMap<>(); + body.setAll(otherParts); + body.add("file", fileEntity); + return new HttpEntity<>(body, headers); + } + @Override public void close() { service.shutdown(); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index a1d7d9a97d..59f5b128a7 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -49,6 +49,7 @@ import org.thingsboard.server.dao.alarm.AlarmCommentService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.audit.AuditLogService; import org.thingsboard.server.dao.cassandra.CassandraCluster; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.dashboard.DashboardService; @@ -59,6 +60,7 @@ import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; +import org.thingsboard.server.dao.event.EventService; import org.thingsboard.server.dao.nosql.CassandraStatementTask; import org.thingsboard.server.dao.nosql.TbResultSetFuture; import org.thingsboard.server.dao.notification.NotificationRequestService; @@ -398,4 +400,8 @@ public interface TbContext { RuleEngineApiUsageStateService getRuleEngineApiUsageStateService(); EntityService getEntityService(); + + EventService getEventService(); + + AuditLogService getAuditLogService(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java index 7c8afe18bf..855727494e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbAbstractAlarmNode.java @@ -60,7 +60,7 @@ public abstract class TbAbstractAlarmNode keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() {}); + List keys = JacksonUtil.convertValue(dataJson.get("attributes"), new TypeReference<>() { + }); entityBody.put("keys", keys); entityBody.put(SCOPE, getScope(metadata)); break; @@ -138,9 +140,8 @@ public abstract class AbstractTbMsgPushNode metadata) { @@ -174,7 +175,7 @@ public abstract class AbstractTbMsgPushNodeSupports next originator types:" + - "
DEVICE" + - "
ASSET" + - "
ENTITY_VIEW" + - "
DASHBOARD" + - "
TENANT" + - "
CUSTOMER" + - "
EDGE

" + - "As well node supports next message types:" + + "Supports next message types:" + "
POST_TELEMETRY_REQUEST" + "
POST_ATTRIBUTES_REQUEST" + "
ATTRIBUTES_UPDATED" + diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java index 724bfa8359..701d7fd717 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/edge/TbMsgPushToEdgeNode.java @@ -31,8 +31,7 @@ import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.PageDataIterableByTenantIdEntityId; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.rule.RuleChainType; import org.thingsboard.server.common.msg.TbMsg; @@ -52,21 +51,13 @@ import java.util.UUID; "This node used only on cloud instances to push messages from cloud to edge. " + "Once message arrived into this node it’s going to be converted into edge event and saved to the database. " + "Node doesn't push messages directly to edge, but stores event(s) in the edge queue. " + - "
Supports next originator types:" + - "
DEVICE" + - "
ASSET" + - "
ENTITY_VIEW" + - "
DASHBOARD" + - "
TENANT" + - "
CUSTOMER" + - "
EDGE

" + - "As well node supports next message types:" + + "Supports next message types:" + "
POST_TELEMETRY_REQUEST" + "
POST_ATTRIBUTES_REQUEST" + "
ATTRIBUTES_UPDATED" + "
ATTRIBUTES_DELETED" + "
ALARM

" + - "Message will be routed via Failure route if node was not able to save edge event to database or unsupported originator type/message type arrived. " + + "Message will be routed via Failure route if node was not able to save edge event to database or unsupported message type arrived. " + "In case successful storage edge event to database message will be routed via Success route.", uiResources = {"static/rulenode/rulenode-core-config.js"}, configDirective = "tbActionNodePushToEdgeConfig", @@ -129,21 +120,13 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode pageData; List> futures = new ArrayList<>(); - do { - pageData = ctx.getEdgeService().findRelatedEdgeIdsByEntityId(ctx.getTenantId(), msg.getOriginator(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - for (EdgeId edgeId : pageData.getData()) { - EdgeEvent edgeEvent = buildEvent(msg, ctx); - futures.add(notifyEdge(ctx, edgeEvent, edgeId)); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); + PageDataIterableByTenantIdEntityId edgeIds = new PageDataIterableByTenantIdEntityId<>( + ctx.getEdgeService()::findRelatedEdgeIdsByEntityId, ctx.getTenantId(), msg.getOriginator(), DEFAULT_PAGE_SIZE); + for (EdgeId edgeId : edgeIds) { + EdgeEvent edgeEvent = buildEvent(msg, ctx); + futures.add(notifyEdge(ctx, edgeEvent, edgeId)); + } if (futures.isEmpty()) { // ack in case no edges are related to provided entity @@ -176,5 +159,4 @@ public class TbMsgPushToEdgeNode extends AbstractTbMsgPushNode