diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index b2ff883236..ec86e640d4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -119,7 +119,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } public void onStateRestoreMsg(CalculatedFieldStateRestoreMsg msg) { - if (calculatedFields.containsKey(msg.getId().cfId())) { + var cfId = msg.getId().cfId(); + var calculatedField = calculatedFields.get(cfId); + + if (calculatedField != null) { + msg.getState().setRequiredArguments(calculatedField.getArgNames()); getOrCreateActor(msg.getId().entityId()).tell(msg); } else { cfExecService.deleteStateFromStorage(msg.getId(), msg.getCallback()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index f5f681b505..21105a44cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -15,13 +15,14 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import lombok.NoArgsConstructor; +import lombok.Data; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -@NoArgsConstructor +@Data public abstract class BaseCalculatedFieldState implements CalculatedFieldState { protected List requiredArguments; @@ -32,14 +33,9 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { this.arguments = new HashMap<>(); } - @Override - public Map getArguments() { - return arguments; - } - - @Override - public List getRequiredArguments() { - return requiredArguments; + public BaseCalculatedFieldState() { + this.requiredArguments = new ArrayList<>(); + this.arguments = new HashMap<>(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index 173d299da6..0792b8240d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -43,6 +43,8 @@ public interface CalculatedFieldState { List getRequiredArguments(); + void setRequiredArguments(List requiredArguments); + boolean updateState(Map argumentValues); ListenableFuture performCalculation(CalculatedFieldCtx ctx); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java index 6487ce1a43..18e604118d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/RocksDBStateService.java @@ -25,19 +25,19 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicKvEntry; -import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.common.util.ProtoUtils; +import org.thingsboard.server.common.util.KvProtoUtil; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; -import org.thingsboard.server.gen.transport.TransportProtos.RollingArgumentProto; import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto; -import org.thingsboard.server.gen.transport.TransportProtos.SingleValueProto; +import org.thingsboard.server.gen.transport.TransportProtos.TsValueListProto; +import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto; import org.thingsboard.server.service.cf.RocksDBService; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.CalculatedFieldStateService; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; import java.util.UUID; import java.util.stream.Collectors; @@ -92,8 +92,7 @@ public class RocksDBStateService implements CalculatedFieldStateService { private CalculatedFieldStateProto toProto(CalculatedFieldEntityCtxId stateId, CalculatedFieldState state) { CalculatedFieldStateProto.Builder builder = CalculatedFieldStateProto.newBuilder() .setId(toProto(stateId)) - .setType(state.getType().name()) - .addAllRequiredArguments(state.getRequiredArguments()); + .setType(state.getType().name()); state.getArguments().forEach((argName, argEntry) -> { if (argEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { @@ -107,42 +106,21 @@ public class RocksDBStateService implements CalculatedFieldStateService { } private SingleValueArgumentProto toSingleValueArgumentProto(String argName, SingleValueArgumentEntry entry) { - SingleValueProto.Builder singleValueProtoBuilder = SingleValueProto.newBuilder() - .setTs(entry.getTs()); - - if (entry.getVersion() != null) { - singleValueProtoBuilder.setVersion(entry.getVersion()); - } - - KvEntry value = entry.getKvEntryValue(); - if (value != null) { - singleValueProtoBuilder.setHasV(true) - .setValue(ProtoUtils.toKeyValueProto(value)); - } - - return SingleValueArgumentProto.newBuilder() + SingleValueArgumentProto.Builder builder = SingleValueArgumentProto.newBuilder() .setArgName(argName) - .setValue(singleValueProtoBuilder.build()) - .build(); - } + .setValue(KvProtoUtil.toTsValueProto(entry.getTs(), entry.getKvEntryValue())); - private RollingArgumentProto toRollingArgumentProto(String argName, TsRollingArgumentEntry entry) { - RollingArgumentProto.Builder rollingArgumentProtoBuilder = RollingArgumentProto.newBuilder() - .setArgName(argName); + Optional.ofNullable(entry.getVersion()).ifPresent(builder::setVersion); - entry.getTsRecords().forEach((ts, value) -> { - SingleValueProto.Builder singleValueProtoBuilder = SingleValueProto.newBuilder() - .setTs(ts); + return builder.build(); + } - if (value != null) { - singleValueProtoBuilder.setHasV(true) - .setValue(ProtoUtils.toKeyValueProto(value)); - } + private TsValueListProto toRollingArgumentProto(String argName, TsRollingArgumentEntry entry) { + TsValueListProto.Builder builder = TsValueListProto.newBuilder().setKey(argName); - rollingArgumentProtoBuilder.addValues(singleValueProtoBuilder.build()); - }); + entry.getTsRecords().forEach((ts, value) -> builder.addTsValue(KvProtoUtil.toTsValueProto(ts, value))); - return rollingArgumentProtoBuilder.build(); + return builder.build(); } private CalculatedFieldState fromProto(CalculatedFieldStateProto proto) { @@ -153,8 +131,8 @@ public class RocksDBStateService implements CalculatedFieldStateService { CalculatedFieldType type = CalculatedFieldType.valueOf(proto.getType()); CalculatedFieldState state = switch (type) { - case SIMPLE -> new SimpleCalculatedFieldState(proto.getRequiredArgumentsList()); - case SCRIPT -> new ScriptCalculatedFieldState(proto.getRequiredArgumentsList()); + case SIMPLE -> new SimpleCalculatedFieldState(); + case SCRIPT -> new ScriptCalculatedFieldState(); }; proto.getSingleValueArgumentsList().forEach(argProto -> @@ -162,27 +140,25 @@ public class RocksDBStateService implements CalculatedFieldStateService { if (CalculatedFieldType.SCRIPT.equals(type)) { proto.getRollingValueArgumentsList().forEach(argProto -> - state.getArguments().put(argProto.getArgName(), fromRollingArgumentProto(argProto))); + state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto))); } return state; } private SingleValueArgumentEntry fromSingleValueArgumentProto(SingleValueArgumentProto proto) { - SingleValueProto valueProto = proto.getValue(); - BasicKvEntry value = valueProto.getHasV() ? ProtoUtils.fromProto(valueProto.getValue()) : null; - - return new SingleValueArgumentEntry(valueProto.getTs(), value, valueProto.getVersion()); + TsValueProto tsValueProto = proto.getValue(); + long ts = tsValueProto.getTs(); + BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getArgName(), tsValueProto); + return new SingleValueArgumentEntry(ts, kvEntry, proto.getVersion()); } - private TsRollingArgumentEntry fromRollingArgumentProto(RollingArgumentProto proto) { + private TsRollingArgumentEntry fromRollingArgumentProto(TsValueListProto proto) { TreeMap tsRecords = new TreeMap<>(); - - proto.getValuesList().forEach(singleValueProto -> { - BasicKvEntry value = singleValueProto.getHasV() ? ProtoUtils.fromProto(singleValueProto.getValue()) : null; - tsRecords.put(singleValueProto.getTs(), value); + proto.getTsValueList().forEach(tsValueProto -> { + BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getKey(), tsValueProto); + tsRecords.put(tsValueProto.getTs(), kvEntry); }); - return new TsRollingArgumentEntry(tsRecords); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java index 0fd630fead..054adbab1b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/tenant/profile/DefaultTenantProfileConfiguration.java @@ -135,11 +135,11 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura private double warnThreshold; - private long maxCalculatedFieldsPerTenant; - private long maxCalculatedFieldsPerEntity; + private long maxCalculatedFields; private long maxArgumentsPerCF; private long maxDataPointsPerRollingArg; private long maxStateSizeInKBytes; + private long maxSingleValueArgumentSizeInKBytes; @Override public long getProfileThreshold(ApiUsageRecordKey key) { @@ -181,7 +181,7 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura case DASHBOARD -> maxDashboards; case RULE_CHAIN -> maxRuleChains; case EDGE -> maxEdges; - case CALCULATED_FIELD -> maxCalculatedFieldsPerTenant; + case CALCULATED_FIELD -> maxCalculatedFields; default -> 0; }; } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 6ee4108f9a..4a2f3710ae 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -790,7 +790,7 @@ message CalculatedFieldTelemetryMsgProto { message CalculatedFieldLinkedTelemetryMsgProto { CalculatedFieldTelemetryMsgProto msg = 1; - repeated CalculatedFieldEntityCtxIdProto links = 2; + repeated CalculatedFieldEntityCtxIdProto links = 2; } message CalculatedFieldEntityCtxIdProto { @@ -808,30 +808,18 @@ message CalculatedFieldIdProto { int64 calculatedFieldIdLSB = 2; } -message SingleValueProto { - int64 ts = 1; - int64 version = 2; - bool has_v = 4; - KeyValueProto value = 5; -} - message SingleValueArgumentProto { string argName = 1; - SingleValueProto value = 2; -} - -message RollingArgumentProto { - string argName = 1; - repeated SingleValueProto values = 2; + TsValueProto value = 2; + int64 version = 3; } message CalculatedFieldStateProto { CalculatedFieldEntityCtxIdProto id = 1; // int32 version = 2; string type = 3; - repeated string requiredArguments = 4; - repeated SingleValueArgumentProto singleValueArguments = 5; - repeated RollingArgumentProto rollingValueArguments = 6; + repeated SingleValueArgumentProto singleValueArguments = 4; + repeated TsValueListProto rollingValueArguments = 5; } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/validator/CalculatedFieldDataValidator.java b/dao/src/main/java/org/thingsboard/server/dao/service/validator/CalculatedFieldDataValidator.java index db8997ceb8..b52c2fe9b7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/validator/CalculatedFieldDataValidator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/validator/CalculatedFieldDataValidator.java @@ -19,7 +19,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.cf.CalculatedField; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.dao.cf.CalculatedFieldDao; @@ -39,7 +38,6 @@ public class CalculatedFieldDataValidator extends DataValidator @Override protected void validateCreate(TenantId tenantId, CalculatedField calculatedField) { validateNumberOfEntitiesPerTenant(tenantId, EntityType.CALCULATED_FIELD); - validateNumberOfCFsPerEntity(tenantId, calculatedField.getEntityId()); validateNumberOfArgumentsPerCF(tenantId, calculatedField); } @@ -53,17 +51,11 @@ public class CalculatedFieldDataValidator extends DataValidator return old; } - private void validateNumberOfCFsPerEntity(TenantId tenantId, EntityId entityId) { - long maxCFsPerEntity = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxCalculatedFieldsPerEntity); - long countCFByEntityId = calculatedFieldDao.countCFByEntityId(tenantId, entityId); - - if (countCFByEntityId == maxCFsPerEntity) { - throw new DataValidationException("Calculated fields per entity limit reached!"); - } - } - private void validateNumberOfArgumentsPerCF(TenantId tenantId, CalculatedField calculatedField) { long maxArgumentsPerCF = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getMaxArgumentsPerCF); + if (maxArgumentsPerCF <= 0) { + return; + } if (calculatedField.getConfiguration().getArguments().size() > maxArgumentsPerCF) { throw new DataValidationException("Calculated field arguments limit reached!"); }