diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 396ecd3771..0125c7c07d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -81,6 +81,7 @@ import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.event.EventService; +import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.mobile.MobileAppBundleService; import org.thingsboard.server.dao.mobile.MobileAppService; import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; @@ -97,7 +98,6 @@ import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.resource.ResourceService; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.rule.RuleNodeStateService; -import org.thingsboard.server.dao.job.JobService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; @@ -320,7 +320,6 @@ public class ActorSystemContext { @Getter private TbEntityViewService tbEntityViewService; - @Lazy @Autowired @Getter private TelemetrySubscriptionService tsSubService; @@ -453,12 +452,10 @@ public class ActorSystemContext { @Getter private ApiLimitService apiLimitService; - @Lazy @Autowired(required = false) @Getter private RateLimitService rateLimitService; - @Lazy @Autowired(required = false) @Getter private DebugModeRateLimitsConfig debugModeRateLimitsConfig; @@ -539,17 +536,14 @@ public class ActorSystemContext { @Getter private EntityService entityService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldProcessingService calculatedFieldProcessingService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldStateService calculatedFieldStateService; - @Lazy @Autowired(required = false) @Getter private CalculatedFieldQueueService calculatedFieldQueueService; diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 5ea79e42a8..ebc4e60709 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -19,7 +19,6 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.DebugModeUtil; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; @@ -299,7 +298,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM callback.onSuccess(); } if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null); + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, calculationResult.getResult().toString(), null); } } } else { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java index f1cb25c6fa..70b41f069e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldStateService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.cf; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.calculatedField.CalculatedFieldStateRestoreMsg; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -39,6 +40,7 @@ import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto; public abstract class AbstractCalculatedFieldStateService implements CalculatedFieldStateService { @Autowired + @Lazy private ActorSystemContext actorSystemContext; protected QueueStateService, TbProtoQueueMsg> stateService; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java index 78042ffc0e..95fa7aebb1 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldCache.java @@ -19,7 +19,9 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.springframework.util.ConcurrentReferenceHashMap; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.cf.CalculatedField; @@ -49,14 +51,13 @@ import java.util.concurrent.locks.ReentrantLock; @RequiredArgsConstructor public class DefaultCalculatedFieldCache implements CalculatedFieldCache { - private static final Integer UNKNOWN_PARTITION = -1; - - private final Lock calculatedFieldFetchLock = new ReentrantLock(); + private final ConcurrentReferenceHashMap calculatedFieldFetchLocks = new ConcurrentReferenceHashMap<>(); private final CalculatedFieldService calculatedFieldService; private final TbelInvokeService tbelInvokeService; - private final ActorSystemContext actorSystemContext; private final ApiLimitService apiLimitService; + @Lazy + private final ActorSystemContext actorSystemContext; private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> entityIdCalculatedFields = new ConcurrentHashMap<>(); @@ -99,19 +100,20 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { @Override public List getCalculatedFieldsByEntityId(EntityId entityId) { - return entityIdCalculatedFields.getOrDefault(entityId, new CopyOnWriteArrayList<>()); + return entityIdCalculatedFields.getOrDefault(entityId, Collections.emptyList()); } @Override public List getCalculatedFieldLinksByEntityId(EntityId entityId) { - return entityIdCalculatedFieldLinks.getOrDefault(entityId, new CopyOnWriteArrayList<>()); + return entityIdCalculatedFieldLinks.getOrDefault(entityId, Collections.emptyList()); } @Override public CalculatedFieldCtx getCalculatedFieldCtx(CalculatedFieldId calculatedFieldId) { CalculatedFieldCtx ctx = calculatedFieldsCtx.get(calculatedFieldId); if (ctx == null) { - calculatedFieldFetchLock.lock(); + Lock lock = getFetchLock(calculatedFieldId); + lock.lock(); try { ctx = calculatedFieldsCtx.get(calculatedFieldId); if (ctx == null) { @@ -123,7 +125,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { } } } finally { - calculatedFieldFetchLock.unlock(); + lock.unlock(); } } log.trace("[{}] Found calculated field ctx in cache: {}", calculatedFieldId, ctx); @@ -142,7 +144,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { @Override public void addCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId) { - calculatedFieldFetchLock.lock(); + Lock lock = getFetchLock(calculatedFieldId); + lock.lock(); try { CalculatedField calculatedField = calculatedFieldService.findById(tenantId, calculatedFieldId); if (calculatedField == null) { @@ -164,7 +167,7 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { .add(configuration.buildCalculatedFieldLink(tenantId, referencedEntityId, calculatedFieldId)); }); } finally { - calculatedFieldFetchLock.unlock(); + lock.unlock(); } } @@ -188,4 +191,8 @@ public class DefaultCalculatedFieldCache implements CalculatedFieldCache { log.debug("[{}] evict calculated field links from cached links by entity id: {}", calculatedFieldId, oldCalculatedField); } + private Lock getFetchLock(CalculatedFieldId id) { + return calculatedFieldFetchLocks.computeIfAbsent(id, __ -> new ReentrantLock()); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index e9a6cb09aa..f2a6916751 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -25,7 +25,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.math.NumberUtils; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; @@ -66,7 +65,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNot import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; @@ -171,7 +169,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP OutputType type = calculatedFieldResult.getType(); TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; - TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(JacksonUtil.writeValueAsString(calculatedFieldResult.getResult())).build(); + TbMsg msg = TbMsg.newMsg().type(msgType).originator(entityId).previousCalculatedFieldIds(cfIds).metaData(md).data(calculatedFieldResult.getResult().toString()).build(); clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 3dc2b457d7..fc5d75be56 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -87,7 +87,10 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS public void pushRequestToQueue(TimeseriesSaveRequest request, TimeseriesSaveResult result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries()), cf -> cf.linkMatches(entityId, request.getEntries()), + var entries = request.getEntries(); + checkEntityAndPushToQueue(tenantId, entityId, + cf -> cf.matches(entries), + cf -> cf.linkMatches(entityId, entries), () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @@ -100,7 +103,11 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS public void pushRequestToQueue(AttributesSaveRequest request, AttributesSaveResult result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matches(request.getEntries(), request.getScope()), cf -> cf.linkMatches(entityId, request.getEntries(), request.getScope()), + var entries = request.getEntries(); + var scope = request.getScope(); + checkEntityAndPushToQueue(tenantId, entityId, + cf -> cf.matches(entries, scope), + cf -> cf.linkMatches(entityId, entries, scope), () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @@ -113,7 +120,10 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS public void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result, request.getScope()), cf -> cf.linkMatchesAttrKeys(entityId, result, request.getScope()), + var scope = request.getScope(); + checkEntityAndPushToQueue(tenantId, entityId, + cf -> cf.matchesKeys(result, scope), + cf -> cf.linkMatchesAttrKeys(entityId, result, scope), () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @@ -121,8 +131,9 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS public void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback) { var tenantId = request.getTenantId(); var entityId = request.getEntityId(); - - checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result), cf -> cf.linkMatchesTsKeys(entityId, result), + checkEntityAndPushToQueue(tenantId, entityId, + cf -> cf.matchesKeys(result), + cf -> cf.linkMatchesTsKeys(entityId, result), () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } @@ -134,7 +145,8 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } boolean send = checkEntityForCalculatedFields(tenantId, entityId, mainEntityFilter, linkedEntityFilter); if (send) { - clusterService.pushMsgToCalculatedFields(tenantId, entityId, msg.get(), wrap(callback)); + ToCalculatedFieldMsg calculatedFieldMsg = msg.get(); + clusterService.pushMsgToCalculatedFields(tenantId, entityId, calculatedFieldMsg, wrap(callback)); } else { if (callback != null) { callback.onSuccess(null); @@ -143,20 +155,35 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } private boolean checkEntityForCalculatedFields(TenantId tenantId, EntityId entityId, Predicate filter, Predicate linkedEntityFilter) { - boolean send = false; - if (supportedReferencedEntities.contains(entityId.getEntityType())) { - send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId).stream().anyMatch(filter); - if (!send) { - send = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(getProfileId(tenantId, entityId)).stream().anyMatch(filter); + if (!supportedReferencedEntities.contains(entityId.getEntityType())) { + return false; + } + List entityCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(entityId); + for (CalculatedFieldCtx ctx : entityCfs) { + if (filter.test(ctx)) { + return true; } - if (!send) { - send = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId).stream() - .map(CalculatedFieldLink::getCalculatedFieldId) - .map(calculatedFieldCache::getCalculatedFieldCtx) - .anyMatch(linkedEntityFilter); + } + + EntityId profileId = getProfileId(tenantId, entityId); + if (profileId != null) { + List profileCfs = calculatedFieldCache.getCalculatedFieldCtxsByEntityId(profileId); + for (CalculatedFieldCtx ctx : profileCfs) { + if (filter.test(ctx)) { + return true; + } + } + } + + List links = calculatedFieldCache.getCalculatedFieldLinksByEntityId(entityId); + for (CalculatedFieldLink link : links) { + CalculatedFieldCtx ctx = calculatedFieldCache.getCalculatedFieldCtx(link.getCalculatedFieldId()); + if (ctx != null && linkedEntityFilter.test(ctx)) { + return true; } } - return send; + + return false; } private EntityId getProfileId(TenantId tenantId, EntityId entityId) { @@ -168,8 +195,6 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS } private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesSaveRequest request, TimeseriesSaveResult result) { - ToCalculatedFieldMsg.Builder msg = ToCalculatedFieldMsg.newBuilder(); - CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), request.getPreviousCalculatedFieldIds(), request.getTbMsgId(), request.getTbMsgType()); List entries = request.getEntries(); @@ -183,8 +208,7 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS telemetryMsg.addTsData(tsProtoBuilder.build()); } - msg.setTelemetryMsg(telemetryMsg.build()); - return msg.build(); + return ToCalculatedFieldMsg.newBuilder().setTelemetryMsg(telemetryMsg).build(); } private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesSaveRequest request, AttributesSaveResult result) { @@ -198,7 +222,9 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS for (int i = 0; i < entries.size(); i++) { AttributeValueProto.Builder attrProtoBuilder = ProtoUtils.toProto(entries.get(i)).toBuilder(); - attrProtoBuilder.setVersion(versions.get(i)); + if (versions != null && !versions.isEmpty() && versions.get(i) != null) { + attrProtoBuilder.setVersion(versions.get(i)); + } telemetryMsg.addAttrData(attrProtoBuilder.build()); } msg.setTelemetryMsg(telemetryMsg.build()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index dff715a6e8..2e3321eece 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -181,26 +181,37 @@ public class CalculatedFieldCtx { } private boolean matchesAttributes(Map argMap, List values, AttributeScope scope) { + if (argMap.isEmpty() || values.isEmpty()) { + return false; + } + for (AttributeKvEntry attrKv : values) { - ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope); - if (argMap.containsKey(attrKey)) { + if (argMap.containsKey(new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope))) { return true; } } + return false; } private boolean matchesTimeSeries(Map argMap, List values) { + if (argMap.isEmpty() || values.isEmpty()) { + return false; + } + for (TsKvEntry tsKv : values) { + ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null); if (argMap.containsKey(latestKey)) { return true; } + ReferencedEntityKey rollingKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_ROLLING, null); if (argMap.containsKey(rollingKey)) { return true; } } + return false; } @@ -213,26 +224,38 @@ public class CalculatedFieldCtx { } private boolean matchesAttributesKeys(Map argMap, List keys, AttributeScope scope) { + if (argMap.isEmpty() || keys.isEmpty()) { + return false; + } + for (String key : keys) { ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); if (argMap.containsKey(attrKey)) { return true; } } + return false; } private boolean matchesTimeSeriesKeys(Map argMap, List keys) { + if (argMap.isEmpty() || keys.isEmpty()) { + return false; + } + for (String key : keys) { + ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null); if (argMap.containsKey(latestKey)) { return true; } + ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null); if (argMap.containsKey(rollingKey)) { return true; } } + return false; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java index ded7adbc34..2b52892744 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KafkaCalculatedFieldStateService.java @@ -114,18 +114,14 @@ public class KafkaCalculatedFieldStateService extends AbstractCalculatedFieldSta stateProducer.send(tpi, stateId.toKey(), msg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { - if (callback != null) { - callback.onSuccess(); - } } @Override public void onFailure(Throwable t) { - if (callback != null) { - callback.onFailure(t); - } + log.error("Failed to send state message: {}", stateId, t); } }); + callback.onSuccess(); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index 111624882b..577ff80219 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -58,7 +58,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { for (Map.Entry entry : this.arguments.entrySet()) { try { BasicKvEntry kvEntry = ((SingleValueArgumentEntry) entry.getValue()).getKvEntryValue(); - expr.setVariable(entry.getKey(), Double.parseDouble(kvEntry.getValueAsString())); + double value = switch (kvEntry.getDataType()) { + case LONG -> kvEntry.getLongValue().map(Long::doubleValue).orElseThrow(); + case DOUBLE -> kvEntry.getDoubleValue().orElseThrow(); + case BOOLEAN -> kvEntry.getBooleanValue().map(b -> b ? 1.0 : 0.0).orElseThrow(); + case STRING, JSON -> Double.parseDouble(kvEntry.getValueAsString()); + }; + expr.setVariable(entry.getKey(), value); } catch (NumberFormatException e) { throw new IllegalArgumentException("Argument '" + entry.getKey() + "' is not a number."); } @@ -85,7 +91,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { private JsonNode createResultJson(boolean useLatestTs, String outputName, Object result) { ObjectNode valuesNode = JacksonUtil.newObjectNode(); - valuesNode.set(outputName, JacksonUtil.valueToTree(result)); + if (result instanceof Double doubleValue) { + valuesNode.put(outputName, doubleValue); + } else if (result instanceof Integer integerValue) { + valuesNode.put(outputName, integerValue); + } else { + valuesNode.set(outputName, JacksonUtil.valueToTree(result)); + } long latestTs = getLatestTimestamp(); if (useLatestTs && latestTs != -1) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index a14ba10bfb..9d20b4d74e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -130,11 +130,9 @@ public class DefaultTbClusterService implements TbClusterService { private final AtomicInteger toEdgeNfs = new AtomicInteger(0); @Autowired - @Lazy private PartitionService partitionService; @Autowired - @Lazy private TbQueueProducerProvider producerProvider; @Autowired diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 0ff2d42f15..69b41addf9 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -28,7 +28,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.DonAsynchron; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.DeviceStateManager; @@ -69,7 +69,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.Consumer; import static java.util.Comparator.comparing; @@ -96,6 +95,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Value("${sql.ts.value_no_xss_validation:false}") private boolean valueNoXssValidation; + @Value("${sql.ts.callback_thread_pool_size:12}") + private int callbackThreadPoolSize; public DefaultTelemetrySubscriptionService(AttributesService attrService, TimeseriesService tsService, @@ -116,7 +117,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @PostConstruct public void initExecutor() { super.initExecutor(); - tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ts-callback")); + tsCallBackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreadPoolSize, "ts-service-ts-callback"); } @Override diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 09820a81cd..c3d28f9fce 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -346,6 +346,7 @@ sql: stats_print_interval_ms: "${SQL_TS_BATCH_STATS_PRINT_MS:10000}" # Interval in milliseconds for printing timeseries insert statistic batch_threads: "${SQL_TS_BATCH_THREADS:3}" # batch thread count has to be a prime number like 3 or 5 to gain perfect hash distribution value_no_xss_validation: "${SQL_TS_VALUE_NO_XSS_VALIDATION:false}" # If true telemetry values will be checked for XSS vulnerability + callback_thread_pool_size: "${SQL_TS_CALLBACK_THREAD_POOL_SIZE:12}" # Thread pool size for telemetry callback executor ts_latest: batch_size: "${SQL_TS_LATEST_BATCH_SIZE:1000}" # Batch size for persisting latest telemetry updates batch_max_delay: "${SQL_TS_LATEST_BATCH_MAX_DELAY_MS:50}" # Maximum timeout for latest telemetry entries queue polling. The value set in milliseconds diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java index f0059e4f6f..c1616a85db 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldStateTest.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedField import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; @@ -143,7 +144,7 @@ public class SimpleCalculatedFieldStateTest { } @Test - void testPerformCalculationWhenPassedNotNumber() { + void testPerformCalculationWhenPassedString() { state.arguments = new HashMap<>(Map.of( "key1", key1ArgEntry, "key2", new SingleValueArgumentEntry(System.currentTimeMillis() - 9, new StringDataEntry("key2", "string"), 124L), @@ -155,6 +156,23 @@ public class SimpleCalculatedFieldStateTest { .hasMessage("Argument 'key2' is not a number."); } + @Test + void testPerformCalculationWhenPassedBoolean() throws ExecutionException, InterruptedException { + state.arguments = new HashMap<>(Map.of( + "key1", key1ArgEntry, + "key2", new SingleValueArgumentEntry(System.currentTimeMillis() - 9, new BooleanDataEntry("key2", true), 124L),// true is parsed as 1 + "key3", key3ArgEntry + )); + + CalculatedFieldResult result = state.performCalculation(ctx).get(); + + assertThat(result).isNotNull(); + Output output = getCalculatedFieldConfig().getOutput(); + assertThat(result.getType()).isEqualTo(output.getType()); + assertThat(result.getScope()).isEqualTo(output.getScope()); + assertThat(result.getResult()).isEqualTo(JacksonUtil.valueToTree(Map.of("output", 35))); + } + @Test void testPerformCalculationWhenDecimalsByDefault() throws ExecutionException, InterruptedException { state.arguments = new HashMap<>(Map.of( diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java index 48a1eb6d5a..801c436d2f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvInsertRepository.java @@ -16,10 +16,8 @@ package org.thingsboard.server.dao.sql.attributes; import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.AbstractVersionedInsertRepository; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; -import org.thingsboard.server.dao.util.SqlDao; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -27,8 +25,6 @@ import java.sql.Types; import java.util.List; @Repository -@Transactional -@SqlDao public class AttributeKvInsertRepository extends AbstractVersionedInsertRepository { private static final String BATCH_UPDATE = "UPDATE attribute_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), last_update_ts = ?, version = nextval('attribute_kv_version_seq') " + diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java index 53a824f026..c14c069f23 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/dictionary/JpaKeyDictionaryDao.java @@ -29,7 +29,6 @@ import org.thingsboard.server.dao.dictionary.KeyDictionaryDao; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryCompositeKey; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; -import org.thingsboard.server.dao.util.SqlDao; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -38,7 +37,6 @@ import java.util.concurrent.locks.ReentrantLock; @Component @Slf4j -@SqlDao @RequiredArgsConstructor public class JpaKeyDictionaryDao extends JpaAbstractDaoListeningExecutorService implements KeyDictionaryDao { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java index b40fa520f2..c9d16f00ec 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/insert/latest/sql/SqlLatestInsertTsRepository.java @@ -18,11 +18,9 @@ package org.thingsboard.server.dao.sqlts.insert.latest.sql; import jakarta.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Repository; -import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.dao.AbstractVersionedInsertRepository; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; -import org.thingsboard.server.dao.util.SqlDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; import java.sql.PreparedStatement; @@ -32,8 +30,6 @@ import java.util.List; @SqlTsLatestAnyDao @Repository -@Transactional -@SqlDao public class SqlLatestInsertTsRepository extends AbstractVersionedInsertRepository implements InsertLatestTsRepository { @Value("${sql.ts_latest.update_by_latest_ts:true}") diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java b/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java index eeb88959fa..8b95ddcb57 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/KvUtils.java @@ -33,9 +33,8 @@ public class KvUtils { static { validatedKeys = Caffeine.newBuilder() - .weakKeys() .expireAfterAccess(24, TimeUnit.HOURS) - .maximumSize(100000).build(); + .maximumSize(50000).build(); } public static void validate(List tsKvEntries, boolean valueNoXssValidation) { @@ -57,11 +56,13 @@ public class KvUtils { throw new DataValidationException("Validation error: key length must be equal or less than 255"); } - if (validatedKeys.getIfPresent(key) == null) { - if (!NoXssValidator.isValid(key)) { - throw new DataValidationException("Validation error: key is malformed"); - } - validatedKeys.put(key, Boolean.TRUE); + Boolean isValid = validatedKeys.asMap().get(key); + if (isValid == null) { + isValid = NoXssValidator.isValid(key); + validatedKeys.put(key, isValid); + } + if (!isValid) { + throw new DataValidationException("Validation error: key is malformed"); } if (valueNoXssValidation) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java index c3095836bd..19c1956463 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesSaveRequest.java @@ -57,6 +57,7 @@ public class AttributesSaveRequest implements CalculatedFieldSystemAwareRequest public static final Strategy PROCESS_ALL = new Strategy(true, true, true); public static final Strategy WS_ONLY = new Strategy(false, true, false); public static final Strategy SKIP_ALL = new Strategy(false, false, false); + public static final Strategy CF_ONLY = new Strategy(false, false, true); } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index c402a0c984..3bacf8d8c3 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -56,6 +56,7 @@ public class TimeseriesSaveRequest implements CalculatedFieldSystemAwareRequest public static final Strategy WS_ONLY = new Strategy(false, false, true, false); public static final Strategy LATEST_AND_WS = new Strategy(false, true, true, false); public static final Strategy SKIP_ALL = new Strategy(false, false, false, false); + public static final Strategy CF_ONLY = new Strategy(false, false, false, true); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java index b2cce52c87..4e319500d1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbCalculatedFieldsNode.java @@ -92,13 +92,14 @@ public class TbCalculatedFieldsNode implements TbNode { .customerId(msg.getCustomerId()) .entityId(msg.getOriginator()) .entries(tsKvEntryList) + .strategy(TimeseriesSaveRequest.Strategy.CF_ONLY) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) .callback(new TelemetryNodeCallback(ctx, msg)) .build(); - ctx.getCalculatedFieldQueueService().pushRequestToQueue(timeseriesSaveRequest, timeseriesSaveRequest.getCallback()); + ctx.getTelemetryService().saveTimeseries(timeseriesSaveRequest); } private void processPostAttributesRequest(TbContext ctx, TbMsg msg) { @@ -114,12 +115,13 @@ public class TbCalculatedFieldsNode implements TbNode { .entityId(msg.getOriginator()) .scope(AttributeScope.valueOf(msg.getMetaData().getValue(SCOPE))) .entries(newAttributes) + .strategy(AttributesSaveRequest.Strategy.CF_ONLY) .previousCalculatedFieldIds(msg.getPreviousCalculatedFieldIds()) .tbMsgId(msg.getId()) .tbMsgType(msg.getInternalType()) .callback(new TelemetryNodeCallback(ctx, msg)) .build(); - ctx.getCalculatedFieldQueueService().pushRequestToQueue(attributesSaveRequest, attributesSaveRequest.getCallback()); + ctx.getTelemetryService().saveAttributes(attributesSaveRequest); } }