diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 3287a58791..623573feff 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -555,7 +555,7 @@ public final class EdgeGrpcSession implements Closeable { try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), entityData)); + result.addAll(ctx.getTelemetryProcessor().processTelemetryMsg(edge.getTenantId(), entityData)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java index af7a613d25..203f301c92 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/alarm/BaseAlarmProcessor.java @@ -37,7 +37,7 @@ import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import java.util.UUID; @Slf4j -public class BaseAlarmProcessor extends BaseEdgeProcessor { +public abstract class BaseAlarmProcessor extends BaseEdgeProcessor { public ListenableFuture processAlarmMsg(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { log.trace("[{}] processAlarmMsg [{}]", tenantId, alarmUpdateMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java index dc2ad80bec..e1c937eaad 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/relation/BaseRelationProcessor.java @@ -38,7 +38,7 @@ import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; import java.util.UUID; @Slf4j -public class BaseRelationProcessor extends BaseEdgeProcessor { +public abstract class BaseRelationProcessor extends BaseEdgeProcessor { public ListenableFuture processRelationMsg(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { log.trace("[{}] processRelationFromEdge [{}]", tenantId, relationUpdateMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java new file mode 100644 index 0000000000..30979187d1 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -0,0 +1,342 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.telemetry; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.asset.AssetProfile; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DashboardId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.msg.session.SessionMsgType; +import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.transport.util.JsonUtils; +import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; +import org.thingsboard.server.gen.edge.v1.EntityDataProto; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; +import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.queue.TbQueueProducer; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; + +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Slf4j +public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { + + private final Gson gson = new Gson(); + + private TbQueueProducer> tbCoreMsgProducer; + + @PostConstruct + public void init() { + tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); + } + + abstract protected String getMsgSourceKey(); + + public List> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) { + log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData); + List> result = new ArrayList<>(); + EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB()); + if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { + Pair pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId); + TbMsgMetaData metaData = pair.getKey(); + CustomerId customerId = pair.getValue(); + metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); + if (entityData.hasPostAttributesMsg()) { + result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); + } + if (entityData.hasAttributesUpdatedMsg()) { + metaData.putValue("scope", entityData.getPostAttributeScope()); + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + } + if (entityData.hasPostTelemetryMsg()) { + result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); + } + if (EntityType.DEVICE.equals(entityId.getEntityType())) { + DeviceId deviceId = new DeviceId(entityId.getId()); + + long currentTs = System.currentTimeMillis(); + + TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setLastActivityTime(currentTs).build(); + + log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); + tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), + TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null); + } + } + if (entityData.hasAttributeDeleteMsg()) { + result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType())); + } + return result; + } + + private Pair getBaseMsgMetadataAndCustomerId(TenantId tenantId, EntityId entityId) { + TbMsgMetaData metaData = new TbMsgMetaData(); + CustomerId customerId = null; + switch (entityId.getEntityType()) { + case DEVICE: + Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); + if (device != null) { + customerId = device.getCustomerId(); + metaData.putValue("deviceName", device.getName()); + metaData.putValue("deviceType", device.getType()); + } + break; + case ASSET: + Asset asset = assetService.findAssetById(tenantId, new AssetId(entityId.getId())); + if (asset != null) { + customerId = asset.getCustomerId(); + metaData.putValue("assetName", asset.getName()); + metaData.putValue("assetType", asset.getType()); + } + break; + case ENTITY_VIEW: + EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())); + if (entityView != null) { + customerId = entityView.getCustomerId(); + metaData.putValue("entityViewName", entityView.getName()); + metaData.putValue("entityViewType", entityView.getType()); + } + break; + case EDGE: + Edge edge = edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())); + if (edge != null) { + customerId = edge.getCustomerId(); + metaData.putValue("edgeName", edge.getName()); + metaData.putValue("edgeType", edge.getType()); + } + break; + default: + log.debug("Using empty metadata for entityId [{}]", entityId); + break; + } + return new ImmutablePair<>(metaData, customerId != null ? customerId : new CustomerId(ModelConstants.NULL_UUID)); + } + + private ListenableFuture processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { + SettableFuture futureToSet = SettableFuture.create(); + for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { + JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); + metaData.putValue("ts", tsKv.getTs() + ""); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process post telemetry [{}]", msg, t); + futureToSet.setException(t); + } + }); + } + return futureToSet; + } + + private Pair getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) { + RuleChainId ruleChainId = null; + String queueName = null; + if (EntityType.DEVICE.equals(entityId.getEntityType())) { + DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId())); + if (deviceProfile == null) { + log.warn("[{}] Device profile is null!", entityId); + } else { + ruleChainId = deviceProfile.getDefaultRuleChainId(); + queueName = deviceProfile.getDefaultQueueName(); + } + } else if (EntityType.ASSET.equals(entityId.getEntityType())) { + AssetProfile assetProfile = assetProfileCache.get(tenantId, new AssetId(entityId.getId())); + if (assetProfile == null) { + log.warn("[{}] Asset profile is null!", entityId); + } else { + ruleChainId = assetProfile.getDefaultRuleChainId(); + queueName = assetProfile.getDefaultQueueName(); + } + } + return new ImmutablePair<>(queueName, ruleChainId); + } + + private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + SettableFuture futureToSet = SettableFuture.create(); + JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process post attributes [{}]", msg, t); + futureToSet.setException(t); + } + }); + return futureToSet; + } + + private ListenableFuture processAttributesUpdate(TenantId tenantId, + CustomerId customerId, + EntityId entityId, + TransportProtos.PostAttributeMsg msg, + TbMsgMetaData metaData) { + SettableFuture futureToSet = SettableFuture.create(); + JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); + List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); + String scope = metaData.getValue("scope"); + tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void tmp) { + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, + customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); + tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process attributes update [{}]", msg, t); + futureToSet.setException(t); + } + }); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process attributes update [{}]", msg, t); + futureToSet.setException(t); + } + }); + return futureToSet; + } + + private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, + String entityType) { + SettableFuture futureToSet = SettableFuture.create(); + String scope = attributeDeleteMsg.getScope(); + List attributeKeys = attributeDeleteMsg.getAttributeNamesList(); + attributesService.removeAll(tenantId, entityId, scope, attributeKeys); + if (EntityType.DEVICE.name().equals(entityType)) { + tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( + tenantId, (DeviceId) entityId, scope, attributeKeys), new TbQueueCallback() { + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, t); + futureToSet.setException(t); + } + }); + } + return futureToSet; + } + + public EntityDataProto convertTelemetryEventToEntityDataProto(EntityType entityType, + UUID entityUUID, + EdgeEventActionType actionType, + JsonNode body) throws JsonProcessingException { + EntityId entityId; + switch (entityType) { + case DEVICE: + entityId = new DeviceId(entityUUID); + break; + case ASSET: + entityId = new AssetId(entityUUID); + break; + case ENTITY_VIEW: + entityId = new EntityViewId(entityUUID); + break; + case DASHBOARD: + entityId = new DashboardId(entityUUID); + break; + case TENANT: + entityId = TenantId.fromUUID(entityUUID); + break; + case CUSTOMER: + entityId = new CustomerId(entityUUID); + break; + case USER: + entityId = new UserId(entityUUID); + break; + case EDGE: + entityId = new EdgeId(entityUUID); + break; + default: + log.warn("Unsupported edge event type [{}]", entityType); + return null; + } + JsonElement entityData = JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(body)); + return entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java index f828be3ca6..ffdb76b1ce 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/TelemetryEdgeProcessor.java @@ -16,335 +16,33 @@ package org.thingsboard.server.service.edge.rpc.processor.telemetry; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; -import com.google.gson.Gson; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.EntityView; -import org.thingsboard.server.common.data.asset.Asset; -import org.thingsboard.server.common.data.asset.AssetProfile; -import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; -import org.thingsboard.server.common.data.edge.EdgeEventActionType; -import org.thingsboard.server.common.data.id.AssetId; -import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DashboardId; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityViewId; -import org.thingsboard.server.common.data.id.RuleChainId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.id.UserId; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.common.msg.session.SessionMsgType; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.common.transport.util.JsonUtils; -import org.thingsboard.server.dao.model.ModelConstants; -import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; import org.thingsboard.server.gen.edge.v1.DownlinkMsg; import org.thingsboard.server.gen.edge.v1.EntityDataProto; -import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueMsgMetadata; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; - -import javax.annotation.Nullable; -import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.List; @Component @Slf4j @TbCoreComponent -public class TelemetryEdgeProcessor extends BaseEdgeProcessor { - - private final Gson gson = new Gson(); +public class TelemetryEdgeProcessor extends BaseTelemetryProcessor { - private TbQueueProducer> tbCoreMsgProducer; - - @PostConstruct - public void init() { - tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); - } - - public List> processTelemetryFromEdge(TenantId tenantId, EntityDataProto entityData) { - log.trace("[{}] processTelemetryFromEdge [{}]", tenantId, entityData); - List> result = new ArrayList<>(); - EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB()); - if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) { - Pair pair = getBaseMsgMetadataAndCustomerId(tenantId, entityId); - TbMsgMetaData metaData = pair.getKey(); - CustomerId customerId = pair.getValue(); - metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); - if (entityData.hasPostAttributesMsg()) { - result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); - } - if (entityData.hasAttributesUpdatedMsg()) { - metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); - } - if (entityData.hasPostTelemetryMsg()) { - result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); - } - if (EntityType.DEVICE.equals(entityId.getEntityType())) { - DeviceId deviceId = new DeviceId(entityId.getId()); - - long currentTs = System.currentTimeMillis(); - - TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder() - .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) - .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) - .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .setLastActivityTime(currentTs).build(); - - log.trace("[{}][{}] device activity time is going to be updated, ts {}", tenantId, deviceId, currentTs); - - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); - tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), - TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null); - } - } - if (entityData.hasAttributeDeleteMsg()) { - result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType())); - } - return result; - } - - private Pair getBaseMsgMetadataAndCustomerId(TenantId tenantId, EntityId entityId) { - TbMsgMetaData metaData = new TbMsgMetaData(); - CustomerId customerId = null; - switch (entityId.getEntityType()) { - case DEVICE: - Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); - if (device != null) { - customerId = device.getCustomerId(); - metaData.putValue("deviceName", device.getName()); - metaData.putValue("deviceType", device.getType()); - } - break; - case ASSET: - Asset asset = assetService.findAssetById(tenantId, new AssetId(entityId.getId())); - if (asset != null) { - customerId = asset.getCustomerId(); - metaData.putValue("assetName", asset.getName()); - metaData.putValue("assetType", asset.getType()); - } - break; - case ENTITY_VIEW: - EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(entityId.getId())); - if (entityView != null) { - customerId = entityView.getCustomerId(); - metaData.putValue("entityViewName", entityView.getName()); - metaData.putValue("entityViewType", entityView.getType()); - } - break; - case EDGE: - Edge edge = edgeService.findEdgeById(tenantId, new EdgeId(entityId.getId())); - if (edge != null) { - customerId = edge.getCustomerId(); - metaData.putValue("edgeName", edge.getName()); - metaData.putValue("edgeType", edge.getType()); - } - break; - default: - log.debug("Using empty metadata for entityId [{}]", entityId); - break; - } - return new ImmutablePair<>(metaData, customerId != null ? customerId : new CustomerId(ModelConstants.NULL_UUID)); - } - - private ListenableFuture processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { - SettableFuture futureToSet = SettableFuture.create(); - for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { - JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); - metaData.putValue("ts", tsKv.getTs() + ""); - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't process post telemetry [{}]", msg, t); - futureToSet.setException(t); - } - }); - } - return futureToSet; - } - - private Pair getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) { - RuleChainId ruleChainId = null; - String queueName = null; - if (EntityType.DEVICE.equals(entityId.getEntityType())) { - DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId())); - if (deviceProfile == null) { - log.warn("[{}] Device profile is null!", entityId); - } else { - ruleChainId = deviceProfile.getDefaultRuleChainId(); - queueName = deviceProfile.getDefaultQueueName(); - } - } else if (EntityType.ASSET.equals(entityId.getEntityType())) { - AssetProfile assetProfile = assetProfileCache.get(tenantId, new AssetId(entityId.getId())); - if (assetProfile == null) { - log.warn("[{}] Asset profile is null!", entityId); - } else { - ruleChainId = assetProfile.getDefaultRuleChainId(); - queueName = assetProfile.getDefaultQueueName(); - } - } - return new ImmutablePair<>(queueName, ruleChainId); - } - - private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { - SettableFuture futureToSet = SettableFuture.create(); - JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't process post attributes [{}]", msg, t); - futureToSet.setException(t); - } - }); - return futureToSet; - } - - private ListenableFuture processAttributesUpdate(TenantId tenantId, - CustomerId customerId, - EntityId entityId, - TransportProtos.PostAttributeMsg msg, - TbMsgMetaData metaData) { - SettableFuture futureToSet = SettableFuture.create(); - JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); - String scope = metaData.getValue("scope"); - tsSubService.saveAndNotify(tenantId, entityId, scope, attributes, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void tmp) { - var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, - customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't process attributes update [{}]", msg, t); - futureToSet.setException(t); - } - }); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't process attributes update [{}]", msg, t); - futureToSet.setException(t); - } - }); - return futureToSet; - } - - private ListenableFuture processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, - String entityType) { - SettableFuture futureToSet = SettableFuture.create(); - String scope = attributeDeleteMsg.getScope(); - List attributeNames = attributeDeleteMsg.getAttributeNamesList(); - attributesService.removeAll(tenantId, entityId, scope, attributeNames); - if (EntityType.DEVICE.name().equals(entityType)) { - tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( - tenantId, (DeviceId) entityId, scope, attributeNames), new TbQueueCallback() { - @Override - public void onSuccess(TbQueueMsgMetadata metadata) { - futureToSet.set(null); - } - - @Override - public void onFailure(Throwable t) { - log.error("Can't process attribute delete msg [{}]", attributeDeleteMsg, t); - futureToSet.setException(t); - } - }); - } - return futureToSet; + @Override + protected String getMsgSourceKey() { + return DataConstants.EDGE_MSG_SOURCE; } public DownlinkMsg convertTelemetryEventToDownlink(EdgeEvent edgeEvent) throws JsonProcessingException { - EntityId entityId; - switch (edgeEvent.getType()) { - case DEVICE: - entityId = new DeviceId(edgeEvent.getEntityId()); - break; - case ASSET: - entityId = new AssetId(edgeEvent.getEntityId()); - break; - case ENTITY_VIEW: - entityId = new EntityViewId(edgeEvent.getEntityId()); - break; - case DASHBOARD: - entityId = new DashboardId(edgeEvent.getEntityId()); - break; - case TENANT: - entityId = TenantId.fromUUID(edgeEvent.getEntityId()); - break; - case CUSTOMER: - entityId = new CustomerId(edgeEvent.getEntityId()); - break; - case USER: - entityId = new UserId(edgeEvent.getEntityId()); - break; - case EDGE: - entityId = new EdgeId(edgeEvent.getEntityId()); - break; - default: - log.warn("Unsupported edge event type [{}]", edgeEvent); - return null; - } - return constructEntityDataProtoMsg(entityId, edgeEvent.getAction(), - JsonParser.parseString(JacksonUtil.OBJECT_MAPPER.writeValueAsString(edgeEvent.getBody()))); - } - - private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) { - EntityDataProto entityDataProto = entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData); + EntityType entityType = EntityType.valueOf(edgeEvent.getType().name()); + EntityDataProto entityDataProto = convertTelemetryEventToEntityDataProto(entityType, edgeEvent.getEntityId(), + edgeEvent.getAction(), edgeEvent.getBody()); return DownlinkMsg.newBuilder() .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .addEntityData(entityDataProto) .build(); } - }