|
|
|
@ -54,6 +54,7 @@ import org.thingsboard.server.common.data.id.RuleChainId; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.id.UserId; |
|
|
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.KvEntry; |
|
|
|
import org.thingsboard.server.common.data.msg.TbMsgType; |
|
|
|
import org.thingsboard.server.common.msg.TbMsg; |
|
|
|
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|
|
|
@ -79,7 +80,10 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; |
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { |
|
|
|
@ -111,7 +115,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { |
|
|
|
|
|
|
|
abstract protected String getMsgSourceKey(); |
|
|
|
|
|
|
|
public List<ListenableFuture<Void>> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) { |
|
|
|
public List<ListenableFuture<Void>> processTelemetryMsg(TenantId tenantId, EntityDataProto entityData) throws Exception { |
|
|
|
log.trace("[{}] processTelemetryMsg [{}]", tenantId, entityData); |
|
|
|
List<ListenableFuture<Void>> result = new ArrayList<>(); |
|
|
|
EntityId entityId = constructEntityId(entityData.getEntityType(), entityData.getEntityIdMSB(), entityData.getEntityIdLSB()); |
|
|
|
@ -122,11 +126,14 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { |
|
|
|
CustomerId customerId = pair.getValue(); |
|
|
|
metaData.putValue(DataConstants.MSG_SOURCE_KEY, getMsgSourceKey()); |
|
|
|
if (entityData.hasPostAttributesMsg()) { |
|
|
|
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); |
|
|
|
metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); |
|
|
|
long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); |
|
|
|
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData, ts)); |
|
|
|
} |
|
|
|
if (entityData.hasAttributesUpdatedMsg()) { |
|
|
|
metaData.putValue("scope", entityData.getPostAttributeScope()); |
|
|
|
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); |
|
|
|
metaData.putValue(DataConstants.SCOPE, entityData.getPostAttributeScope()); |
|
|
|
long ts = entityData.hasAttributeTs() ? entityData.getAttributeTs() : System.currentTimeMillis(); |
|
|
|
result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData, ts)); |
|
|
|
} |
|
|
|
if (entityData.hasPostTelemetryMsg()) { |
|
|
|
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); |
|
|
|
@ -254,23 +261,39 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { |
|
|
|
return new ImmutablePair<>(queueName, ruleChainId); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { |
|
|
|
private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, |
|
|
|
TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData, long ts) throws Exception { |
|
|
|
SettableFuture<Void> futureToSet = SettableFuture.create(); |
|
|
|
JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); |
|
|
|
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); |
|
|
|
TbMsg tbMsg = TbMsg.newMsg() |
|
|
|
.queueName(defaultQueueAndRuleChain.getKey()) |
|
|
|
.type(TbMsgType.POST_ATTRIBUTES_REQUEST) |
|
|
|
.originator(entityId) |
|
|
|
.customerId(customerId) |
|
|
|
.copyMetaData(metaData) |
|
|
|
.data(gson.toJson(json)) |
|
|
|
.ruleChainId(defaultQueueAndRuleChain.getValue()) |
|
|
|
.build(); |
|
|
|
edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { |
|
|
|
AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); |
|
|
|
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); |
|
|
|
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes); |
|
|
|
Futures.addCallback(future, new FutureCallback<>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(TbQueueMsgMetadata metadata) { |
|
|
|
futureToSet.set(null); |
|
|
|
public void onSuccess(List<AttributeKvEntry> attributesToSave) { |
|
|
|
JsonObject jsonToSave = filterAttributesFromJson(json, attributesToSave); |
|
|
|
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); |
|
|
|
TbMsg tbMsg = TbMsg.newMsg() |
|
|
|
.queueName(defaultQueueAndRuleChain.getKey()) |
|
|
|
.type(TbMsgType.POST_ATTRIBUTES_REQUEST) |
|
|
|
.originator(entityId) |
|
|
|
.customerId(customerId) |
|
|
|
.copyMetaData(metaData) |
|
|
|
.data(gson.toJson(jsonToSave)) |
|
|
|
.ruleChainId(defaultQueueAndRuleChain.getValue()) |
|
|
|
.build(); |
|
|
|
edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(TbQueueMsgMetadata metadata) { |
|
|
|
futureToSet.set(null); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
log.error("[{}] Can't process post attributes [{}]", tenantId, msg, t); |
|
|
|
futureToSet.setException(t); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -278,7 +301,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { |
|
|
|
log.error("[{}] Can't process post attributes [{}]", tenantId, msg, t); |
|
|
|
futureToSet.setException(t); |
|
|
|
} |
|
|
|
}); |
|
|
|
}, dbCallbackExecutorService); |
|
|
|
return futureToSet; |
|
|
|
} |
|
|
|
|
|
|
|
@ -286,33 +309,47 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { |
|
|
|
CustomerId customerId, |
|
|
|
EntityId entityId, |
|
|
|
TransportProtos.PostAttributeMsg msg, |
|
|
|
TbMsgMetaData metaData) { |
|
|
|
TbMsgMetaData metaData, |
|
|
|
long ts) { |
|
|
|
SettableFuture<Void> futureToSet = SettableFuture.create(); |
|
|
|
JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); |
|
|
|
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); |
|
|
|
String scope = metaData.getValue("scope"); |
|
|
|
tsSubService.saveAttributes(AttributesSaveRequest.builder() |
|
|
|
.tenantId(tenantId) |
|
|
|
.entityId(entityId) |
|
|
|
.scope(AttributeScope.valueOf(scope)) |
|
|
|
.entries(attributes) |
|
|
|
.callback(new FutureCallback<>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(@Nullable Void tmp) { |
|
|
|
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); |
|
|
|
TbMsg tbMsg = TbMsg.newMsg() |
|
|
|
.queueName(defaultQueueAndRuleChain.getKey()) |
|
|
|
.type(TbMsgType.ATTRIBUTES_UPDATED) |
|
|
|
.originator(entityId) |
|
|
|
.customerId(customerId) |
|
|
|
.copyMetaData(metaData) |
|
|
|
.data(gson.toJson(json)) |
|
|
|
.ruleChainId(defaultQueueAndRuleChain.getValue()) |
|
|
|
.build(); |
|
|
|
edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { |
|
|
|
AttributeScope scope = AttributeScope.valueOf(metaData.getValue(DataConstants.SCOPE)); |
|
|
|
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json, ts)); |
|
|
|
ListenableFuture<List<AttributeKvEntry>> future = filterAttributesByTs(tenantId, entityId, scope, attributes); |
|
|
|
Futures.addCallback(future, new FutureCallback<>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(List<AttributeKvEntry> attributesToSave) { |
|
|
|
JsonObject jsonToSave = filterAttributesFromJson(json, attributesToSave); |
|
|
|
tsSubService.saveAttributes(AttributesSaveRequest.builder() |
|
|
|
.tenantId(tenantId) |
|
|
|
.entityId(entityId) |
|
|
|
.scope(scope) |
|
|
|
.entries(attributesToSave) |
|
|
|
.callback(new FutureCallback<>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(TbQueueMsgMetadata metadata) { |
|
|
|
futureToSet.set(null); |
|
|
|
public void onSuccess(@Nullable Void tmp) { |
|
|
|
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); |
|
|
|
TbMsg tbMsg = TbMsg.newMsg() |
|
|
|
.queueName(defaultQueueAndRuleChain.getKey()) |
|
|
|
.type(TbMsgType.ATTRIBUTES_UPDATED) |
|
|
|
.originator(entityId) |
|
|
|
.customerId(customerId) |
|
|
|
.copyMetaData(metaData) |
|
|
|
.data(gson.toJson(jsonToSave)) |
|
|
|
.ruleChainId(defaultQueueAndRuleChain.getValue()) |
|
|
|
.build(); |
|
|
|
edgeCtx.getClusterService().pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { |
|
|
|
@Override |
|
|
|
public void onSuccess(TbQueueMsgMetadata metadata) { |
|
|
|
futureToSet.set(null); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); |
|
|
|
futureToSet.setException(t); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -320,22 +357,28 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { |
|
|
|
log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); |
|
|
|
futureToSet.setException(t); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
}) |
|
|
|
.build()); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); |
|
|
|
futureToSet.setException(t); |
|
|
|
} |
|
|
|
}) |
|
|
|
.build()); |
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
log.error("[{}] Can't process attributes update [{}]", tenantId, msg, t); |
|
|
|
futureToSet.setException(t); |
|
|
|
} |
|
|
|
}, dbCallbackExecutorService); |
|
|
|
return futureToSet; |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, |
|
|
|
String entityType) { |
|
|
|
private JsonObject filterAttributesFromJson(JsonObject json, List<AttributeKvEntry> attributesToSave) { |
|
|
|
Set<String> keysToSave = attributesToSave.stream() |
|
|
|
.map(KvEntry::getKey) |
|
|
|
.collect(Collectors.toSet()); |
|
|
|
json.keySet().removeIf(key -> !keysToSave.contains(key)); |
|
|
|
return json; |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Void> processAttributeDeleteMsg(TenantId tenantId, EntityId entityId, AttributeDeleteMsg attributeDeleteMsg, String entityType) { |
|
|
|
String scope = attributeDeleteMsg.getScope(); |
|
|
|
List<String> attributeKeys = attributeDeleteMsg.getAttributeNamesList(); |
|
|
|
ListenableFuture<List<String>> removeAllFuture = edgeCtx.getAttributesService().removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys); |
|
|
|
@ -386,4 +429,19 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { |
|
|
|
return bodyJackson == null ? null : EdgeMsgConstructorUtils.constructEntityDataMsg(tenantId, entityId, actionType, JsonParser.parseString(bodyJackson)); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<List<AttributeKvEntry>> filterAttributesByTs(TenantId tenantId, EntityId entityId, AttributeScope scope, |
|
|
|
List<AttributeKvEntry> attributes) { |
|
|
|
List<String> keys = attributes.stream().map(KvEntry::getKey).toList(); |
|
|
|
ListenableFuture<List<AttributeKvEntry>> future = edgeCtx.getAttributesService().find(tenantId, entityId, scope, keys); |
|
|
|
return Futures.transform(future, input -> { |
|
|
|
Map<String, Long> existingAttributesTs = input.stream().collect(Collectors.toMap(KvEntry::getKey, AttributeKvEntry::getLastUpdateTs)); |
|
|
|
return attributes.stream() |
|
|
|
.filter(attribute -> { |
|
|
|
String key = attribute.getKey(); |
|
|
|
long incomingTs = attribute.getLastUpdateTs(); |
|
|
|
return incomingTs > existingAttributesTs.getOrDefault(key, 0L); |
|
|
|
}).toList(); |
|
|
|
}, dbCallbackExecutorService); |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|