diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index d5ed8b8aaa..b2ff883236 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -122,7 +122,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware if (calculatedFields.containsKey(msg.getId().cfId())) { getOrCreateActor(msg.getId().entityId()).tell(msg); } else { - // TODO: remove state from storage + cfExecService.deleteStateFromStorage(msg.getId(), msg.getCallback()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/RocksDBService.java b/application/src/main/java/org/thingsboard/server/service/cf/RocksDBService.java index 5eaeff120a..fe800f61ad 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/RocksDBService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/RocksDBService.java @@ -59,9 +59,9 @@ public class RocksDBService { } } - public void delete(String key) { + public void delete(CalculatedFieldEntityCtxIdProto key) { try { - db.delete(writeOptions, key.getBytes(StandardCharsets.UTF_8)); + db.delete(writeOptions, key.toByteArray()); } catch (RocksDBException e) { log.error("Failed to delete data from RocksDB", e); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index a56aed64f7..4600628838 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.util.TbPair; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; @@ -185,8 +186,18 @@ public class CalculatedFieldCtx { } public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) { - //TODO: IM - implement - return true; + if (!proto.getTsDataList().isEmpty()) { + List updatedTelemetry = proto.getTsDataList().stream() + .map(ProtoUtils::fromProto) + .toList(); + return linkMatches(entityId, updatedTelemetry); + } else { + AttributeScope scope = AttributeScope.valueOf(proto.getScope().name()); + List updatedTelemetry = proto.getAttrDataList().stream() + .map(ProtoUtils::fromProto) + .toList(); + return linkMatches(entityId, updatedTelemetry, scope); + } } public CalculatedFieldEntityCtxId toCalculatedFieldEntityCtxId() { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java index bfed563182..6487ce1a43 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java @@ -18,7 +18,6 @@ package org.thingsboard.server.service.cf.ctx.state; import lombok.RequiredArgsConstructor; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.id.CalculatedFieldId; @@ -67,7 +66,7 @@ public class RocksDBStateService implements CalculatedFieldStateService { @Override public void removeState(CalculatedFieldEntityCtxId ctxId, TbCallback callback) { - rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); + rocksDBService.delete(toProto(ctxId)); callback.onSuccess(); }