|
|
|
@ -173,7 +173,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S |
|
|
|
public void onSuccess(@Nullable MqttDeviceAwareSessionContext deviceCtx) { |
|
|
|
TransportProtos.PostAttributeMsg kvListProto = attributesMsg.getMsg(); |
|
|
|
try { |
|
|
|
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto.toByteArray()); |
|
|
|
TransportProtos.PostAttributeMsg postAttributeMsg = ProtoConverter.validatePostAttributeMsg(kvListProto); |
|
|
|
processPostAttributesMsg(deviceCtx, postAttributeMsg, deviceName, msgId); |
|
|
|
} catch (Throwable e) { |
|
|
|
log.warn("[{}][{}] Failed to process device attributes command: {}", gateway.getDeviceId(), deviceName, kvListProto, e); |
|
|
|
@ -233,7 +233,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S |
|
|
|
try { |
|
|
|
List<TransportProtos.PostTelemetryMsg> msgs = new ArrayList<>(); |
|
|
|
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { |
|
|
|
if (attributesMetricNames == null || !attributesMetricNames.contains(protoMetric.getName())) { |
|
|
|
if (attributesMetricNames == null || !matches(attributesMetricNames, protoMetric)) { |
|
|
|
long ts = protoMetric.getTimestamp(); |
|
|
|
String key = "bdSeq".equals(protoMetric.getName()) ? |
|
|
|
topicTypeName + " " + protoMetric.getName() : protoMetric.getName(); |
|
|
|
@ -264,7 +264,7 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S |
|
|
|
try { |
|
|
|
List<TransportApiProtos.AttributesMsg> msgs = new ArrayList<>(); |
|
|
|
for (SparkplugBProto.Payload.Metric protoMetric : sparkplugBProto.getMetricsList()) { |
|
|
|
if (attributesMetricNames.contains(protoMetric.getName())) { |
|
|
|
if (matches(attributesMetricNames, protoMetric)) { |
|
|
|
TransportApiProtos.AttributesMsg.Builder deviceAttributesMsgBuilder = TransportApiProtos.AttributesMsg.newBuilder(); |
|
|
|
Optional<TransportProtos.PostAttributeMsg> msgOpt = getPostAttributeMsg(protoMetric); |
|
|
|
if (msgOpt.isPresent()) { |
|
|
|
@ -281,11 +281,24 @@ public class SparkplugNodeSessionHandler extends AbstractGatewaySessionHandler<S |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private boolean matches(Set<String> attributesMetricNames, SparkplugBProto.Payload.Metric protoMetric) { |
|
|
|
String metricName = protoMetric.getName(); |
|
|
|
for (String attributeMetricFilter : attributesMetricNames) { |
|
|
|
if (metricName.equals(attributeMetricFilter) || |
|
|
|
(attributeMetricFilter.endsWith("*") && metricName.startsWith( |
|
|
|
attributeMetricFilter.substring(0, attributeMetricFilter.length() - 1)))) { |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
private Optional<TransportProtos.PostAttributeMsg> getPostAttributeMsg(SparkplugBProto.Payload.Metric protoMetric) throws ThingsboardException { |
|
|
|
Optional<TransportProtos.KeyValueProto> keyValueProtoOpt = fromSparkplugBMetricToKeyValueProto(protoMetric.getName(), protoMetric); |
|
|
|
if (keyValueProtoOpt.isPresent()) { |
|
|
|
TransportProtos.PostAttributeMsg.Builder builder = TransportProtos.PostAttributeMsg.newBuilder(); |
|
|
|
builder.addKv(keyValueProtoOpt.get()); |
|
|
|
builder.setShared(true); |
|
|
|
return Optional.of(builder.build()); |
|
|
|
} |
|
|
|
return Optional.empty(); |
|
|
|
|