|
|
|
@ -52,8 +52,6 @@ import org.thingsboard.server.common.data.kv.AttributeKey; |
|
|
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|
|
|
import org.thingsboard.server.common.msg.queue.ServiceType; |
|
|
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|
|
|
import org.thingsboard.server.common.msg.session.SessionMsgType; |
|
|
|
import org.thingsboard.server.common.transport.adaptor.JsonConverter; |
|
|
|
import org.thingsboard.server.common.transport.util.JsonUtils; |
|
|
|
@ -63,12 +61,9 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto; |
|
|
|
import org.thingsboard.server.gen.transport.TransportProtos; |
|
|
|
import org.thingsboard.server.queue.TbQueueCallback; |
|
|
|
import org.thingsboard.server.queue.TbQueueMsgMetadata; |
|
|
|
import org.thingsboard.server.queue.TbQueueProducer; |
|
|
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|
|
|
import org.thingsboard.server.queue.util.TbCoreComponent; |
|
|
|
|
|
|
|
import javax.annotation.Nullable; |
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.List; |
|
|
|
@ -82,13 +77,6 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
|
|
|
|
private final Gson gson = new Gson(); |
|
|
|
|
|
|
|
private TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> tbCoreMsgProducer; |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
public void init() { |
|
|
|
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); |
|
|
|
} |
|
|
|
|
|
|
|
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData, UUID edgeSessionId) { |
|
|
|
log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); |
|
|
|
List<ListenableFuture<Void>> result = new ArrayList<>(); |
|
|
|
@ -107,9 +95,7 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); |
|
|
|
} |
|
|
|
if (EntityType.DEVICE.equals(entityId.getEntityType())) { |
|
|
|
Device device = deviceService.findDeviceById(tenantId, new DeviceId(entityId.getId())); |
|
|
|
// for edge context sessionId is exact edgeSessionId
|
|
|
|
reportActivity(device, edgeSessionId); |
|
|
|
deviceStateService.onDeviceActivity(tenantId, new DeviceId(entityId.getId()), System.currentTimeMillis()); |
|
|
|
} |
|
|
|
} |
|
|
|
if (entityData.hasAttributeDeleteMsg()) { |
|
|
|
@ -118,37 +104,6 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { |
|
|
|
return result; |
|
|
|
} |
|
|
|
|
|
|
|
private void reportActivity(Device device, UUID sessionId) { |
|
|
|
TransportProtos.SessionInfoProto.Builder builder = TransportProtos.SessionInfoProto.newBuilder() |
|
|
|
.setSessionIdMSB(sessionId.getMostSignificantBits()) |
|
|
|
.setSessionIdLSB(sessionId.getLeastSignificantBits()) |
|
|
|
.setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) |
|
|
|
.setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) |
|
|
|
.setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) |
|
|
|
.setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) |
|
|
|
.setDeviceName(device.getName()) |
|
|
|
.setDeviceType(device.getType()) |
|
|
|
.setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits()) |
|
|
|
.setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits()); |
|
|
|
|
|
|
|
if (device.getCustomerId() != null && !device.getCustomerId().isNullUid()) { |
|
|
|
builder.setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits()); |
|
|
|
builder.setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits()); |
|
|
|
} |
|
|
|
reportActivity(device.getTenantId(), device.getId(), device.getUuidId(), builder.build()); |
|
|
|
} |
|
|
|
|
|
|
|
private void reportActivity(TenantId tenantId, DeviceId deviceId, UUID routingKey, TransportProtos.SessionInfoProto sessionInfo) { |
|
|
|
TransportProtos.SubscriptionInfoProto subscriptionInfoProto = TransportProtos.SubscriptionInfoProto.newBuilder() |
|
|
|
.setAttributeSubscription(false).setRpcSubscription(false) |
|
|
|
.setLastActivityTime(System.currentTimeMillis()).build(); |
|
|
|
TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) |
|
|
|
.setSubscriptionInfo(subscriptionInfoProto).build(); |
|
|
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); |
|
|
|
tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(routingKey, |
|
|
|
TransportProtos.ToCoreMsg.newBuilder().setToDeviceActorMsg(msg).build()), null); |
|
|
|
} |
|
|
|
|
|
|
|
private TbMsgMetaData constructBaseMsgMetadata(TenantId tenantId, EntityId entityId) { |
|
|
|
TbMsgMetaData metaData = new TbMsgMetaData(); |
|
|
|
switch (entityId.getEntityType()) { |
|
|
|
|