diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index ee9b6d115c..e4b0a7ca1e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -15,20 +15,15 @@ */ package org.thingsboard.server.service.cf; -import org.thingsboard.server.common.data.id.CalculatedFieldId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos; - -import java.util.List; +import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest; public interface CalculatedFieldExecutionService { void onCalculatedFieldMsg(TransportProtos.CalculatedFieldMsgProto proto, TbCallback callback); - void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List calculatedFieldIds, List telemetry); + void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest calculatedFieldTelemetryUpdateRequest); void onCalculatedFieldStateMsg(TransportProtos.CalculatedFieldStateMsgProto proto, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java index e8ea318bf6..52e0a0151b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java @@ -17,17 +17,18 @@ package org.thingsboard.server.service.cf; import lombok.Data; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.cf.configuration.OutputType; import java.util.Map; @Data public final class CalculatedFieldResult { - private final String type; + private final OutputType type; private final AttributeScope scope; private final Map resultMap; - public CalculatedFieldResult(String type, AttributeScope scope, Map resultMap) { + public CalculatedFieldResult(OutputType type, AttributeScope scope, Map resultMap) { this.type = type; this.scope = scope; this.resultMap = resultMap == null ? Map.of() : Map.copyOf(resultMap); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index af4b3e95e5..27db9ae290 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -35,12 +35,15 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; +import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceId; @@ -48,7 +51,6 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.Aggregation; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -79,11 +81,13 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; +import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest; import org.thingsboard.server.service.partition.AbstractPartitionBasedService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -92,7 +96,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -120,12 +123,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private ListeningExecutorService calculatedFieldExecutor; private ListeningExecutorService calculatedFieldCallbackExecutor; - private final ConcurrentMap entityLocks = new ConcurrentHashMap<>(); - private final ConcurrentMap states = new ConcurrentHashMap<>(); private static final int MAX_LAST_RECORDS_VALUE = 1024; + private static final Set supportedReferencedEntities = EnumSet.of( + EntityType.DEVICE, EntityType.ASSET, EntityType.CUSTOMER, EntityType.TENANT + ); + @Value("${calculatedField.initFetchPackSize:50000}") @Getter private int initFetchPackSize; @@ -336,28 +341,29 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List calculatedFieldIds, List telemetry) { + public void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest calculatedFieldTelemetryUpdateRequest) { try { - EntityType entityType = entityId.getEntityType(); - if (EntityType.DEVICE.equals(entityType) || EntityType.ASSET.equals(entityType) || EntityType.CUSTOMER.equals(entityType) || EntityType.TENANT.equals(entityType)) { - EntityId profileId = null; - if (EntityType.ASSET.equals(entityType)) { - profileId = assetProfileCache.get(tenantId, (AssetId) entityId).getId(); - } else if (EntityType.DEVICE.equals(entityType)) { - profileId = deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); - } - List cfLinks = new ArrayList<>(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId)); - Optional.ofNullable(profileId).ifPresent(id -> { - cfLinks.addAll(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, id)); - }); + TenantId tenantId = calculatedFieldTelemetryUpdateRequest.getTenantId(); + EntityId entityId = calculatedFieldTelemetryUpdateRequest.getEntityId(); + AttributeScope scope = calculatedFieldTelemetryUpdateRequest.getScope(); + List telemetry = calculatedFieldTelemetryUpdateRequest.getKvEntries(); + List calculatedFieldIds = calculatedFieldTelemetryUpdateRequest.getCalculatedFieldIds(); + + if (supportedReferencedEntities.contains(entityId.getEntityType())) { + EntityId profileId = getProfileId(tenantId, entityId); + + List cfLinks = Stream.concat( + calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream(), + profileId != null ? calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream() : Stream.empty() + ).toList(); + cfLinks.forEach(link -> { CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId(); - Map attributes = link.getConfiguration().getAttributes(); - Map timeSeries = link.getConfiguration().getTimeSeries(); + Map telemetryKeys = getTelemetryKeysFromLink(link, scope); Map updatedTelemetry = telemetry.stream() - .filter(entry -> attributes.containsValue(entry.getKey()) || timeSeries.containsValue(entry.getKey())) + .filter(entry -> telemetryKeys.containsValue(entry.getKey())) .collect(Collectors.toMap( - entry -> getMappedKey(entry, attributes, timeSeries), + entry -> getMappedKey(entry, telemetryKeys), entry -> entry, (v1, v2) -> v1 )); @@ -368,25 +374,24 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }); } } catch (Exception e) { - log.trace("Failed to update telemetry entityId: [{}]", entityId, e); + log.trace("Failed to update telemetry.", e); } } - private String getMappedKey(KvEntry entry, Map attributes, Map timeSeries) { - if (entry instanceof AttributeKvEntry) { - return attributes.entrySet().stream() - .filter(attr -> attr.getValue().equals(entry.getKey())) - .map(Map.Entry::getKey) - .findFirst() - .orElse(entry.getKey()); - } else if (entry instanceof TsKvEntry) { - return timeSeries.entrySet().stream() - .filter(ts -> ts.getValue().equals(entry.getKey())) - .map(Map.Entry::getKey) - .findFirst() - .orElse(entry.getKey()); - } - return entry.getKey(); + private Map getTelemetryKeysFromLink(CalculatedFieldLink link, AttributeScope scope) { + return scope == null ? link.getConfiguration().getTimeSeries() : switch (scope) { + case CLIENT_SCOPE -> link.getConfiguration().getClientAttributes(); + case SERVER_SCOPE -> link.getConfiguration().getServerAttributes(); + case SHARED_SCOPE -> link.getConfiguration().getSharedAttributes(); + }; + } + + private String getMappedKey(KvEntry entry, Map telemetry) { + return telemetry.entrySet().stream() + .filter(kvEntry -> kvEntry.getValue().equals(entry.getKey())) + .map(Map.Entry::getKey) + .findFirst() + .orElse(entry.getKey()); } private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List calculatedFieldIds, Map updatedTelemetry) { @@ -539,17 +544,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); if (tpi.isMyPartition()) { - ReentrantLock lock = entityLocks.computeIfAbsent(entityId, id -> new ReentrantLock()); - lock.lock(); + CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); - try { - CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); - CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType())); + states.compute(entityCtxId, (ctxId, ctx) -> { + CalculatedFieldEntityCtx calculatedFieldEntityCtx = ctx != null ? ctx : fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType()); Consumer performUpdateState = (state) -> { if (state.updateState(argumentValues)) { calculatedFieldEntityCtx.setState(state); - states.put(entityCtxId, calculatedFieldEntityCtx); rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); Map arguments = state.getArguments(); boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && @@ -564,12 +566,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas boolean allKeysPresent = argumentValues.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()); boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream() - .anyMatch(argument -> "TS_ROLLING".equals(argument.getType()) && state.getArguments().get(argument.getKey()) == null); + .anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getType()) && state.getArguments().get(argument.getKey()) == null); if (!allKeysPresent || requiresTsRollingUpdate) { Map missingArguments = calculatedFieldCtx.getArguments().entrySet().stream() - .filter(entry -> !argumentValues.containsKey(entry.getKey()) || ("TS_ROLLING".equals(entry.getValue().getType()) && state.getArguments().get(entry.getKey()) == null)) + .filter(entry -> !argumentValues.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getType()) && state.getArguments().get(entry.getKey()) == null)) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentValues::putAll) @@ -578,9 +580,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } else { performUpdateState.accept(state); } - } finally { - lock.unlock(); - } + return calculatedFieldEntityCtx; + }); } else { sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, calculatedFieldIds, argumentValues); } @@ -605,9 +606,9 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List calculatedFieldIds) { try { - String type = calculatedFieldResult.getType(); - TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; - TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; + OutputType type = calculatedFieldResult.getType(); + TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; + TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; ObjectNode payload = createJsonPayload(calculatedFieldResult); if (calculatedFieldIds == null) { calculatedFieldIds = new ArrayList<>(); @@ -649,19 +650,18 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private ListenableFuture fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) { return switch (argument.getType()) { - case "TS_ROLLING" -> fetchTsRolling(tenantId, entityId, argument); - case "ATTRIBUTE" -> transformSingleValueArgument( + case TS_ROLLING -> fetchTsRolling(tenantId, entityId, argument); + case ATTRIBUTE -> transformSingleValueArgument( Futures.transform( attributesService.find(tenantId, entityId, argument.getScope(), argument.getKey()), - result -> result.or(() -> Optional.of(new BaseAttributeKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))), + result -> result.or(() -> Optional.of(new BaseAttributeKvEntry(createDefaultKvEntry(argument), System.currentTimeMillis(), 0L))), calculatedFieldCallbackExecutor) ); - case "TS_LATEST" -> transformSingleValueArgument( + case TS_LATEST -> transformSingleValueArgument( Futures.transform( timeseriesService.findLatest(tenantId, entityId, argument.getKey()), - result -> result.or(() -> Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))), + result -> result.or(() -> Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument), 0L))), calculatedFieldCallbackExecutor)); - default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'."); }; } @@ -801,4 +801,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return EntityType.DEVICE_PROFILE.equals(entityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(entityId.getEntityType()); } + private EntityId getProfileId(TenantId tenantId, EntityId entityId) { + return switch (entityId.getEntityType()) { + case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); + case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); + default -> null; + }; + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index dd56405352..b261840bfd 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -35,7 +35,7 @@ import java.util.List; public interface ArgumentEntry { @JsonIgnore - ArgumentType getType(); + ArgumentEntryType getType(); Object getValue(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentType.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java similarity index 95% rename from application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentType.java rename to application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java index 360529a7e9..1a0dfb5ac7 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentType.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java @@ -15,6 +15,6 @@ */ package org.thingsboard.server.service.cf.ctx.state; -public enum ArgumentType { +public enum ArgumentEntryType { SINGLE_VALUE, TS_ROLLING } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index be7319b930..bc9a421e47 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -49,8 +49,12 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { existingTsRollingEntry.addAllTsRecords(newTsRollingEntry.getTsRecords()); } else if (existingEntry instanceof TsRollingArgumentEntry existingTsRollingEntry && newEntry instanceof SingleValueArgumentEntry singleValueEntry) { existingTsRollingEntry.addTsRecord(singleValueEntry.getTs(), singleValueEntry.getValue()); - } else if (existingEntry instanceof SingleValueArgumentEntry existingSingleValueEntry && newEntry instanceof SingleValueArgumentEntry singleValueEntry - && singleValueEntry.getVersion() > existingSingleValueEntry.getVersion()) { + } else if (existingEntry instanceof SingleValueArgumentEntry existingSingleValueEntry && newEntry instanceof SingleValueArgumentEntry singleValueEntry) { +// Long existingVersion = existingSingleValueEntry.getVersion(); +// Long newVersion = singleValueEntry.getVersion(); +// if (newVersion != null && (existingVersion == null || newVersion > existingVersion)) { +// arguments.put(key, newEntry.copy()); +// } arguments.put(key, newEntry.copy()); } else { arguments.put(key, newEntry.copy()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index 20b531f562..8d5080e90f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -32,7 +32,7 @@ public class SingleValueArgumentEntry implements ArgumentEntry { private long ts; private Object value; - private long version; + private Long version; public SingleValueArgumentEntry(KvEntry entry) { if (entry instanceof TsKvEntry tsKvEntry) { @@ -54,8 +54,8 @@ public class SingleValueArgumentEntry implements ArgumentEntry { } @Override - public ArgumentType getType() { - return ArgumentType.SINGLE_VALUE; + public ArgumentEntryType getType() { + return ArgumentEntryType.SINGLE_VALUE; } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java index 64de2f8c8a..1118e3af13 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java @@ -51,8 +51,8 @@ public class TsRollingArgumentEntry implements ArgumentEntry { } @Override - public ArgumentType getType() { - return ArgumentType.TS_ROLLING; + public ArgumentEntryType getType() { + return ArgumentEntryType.TS_ROLLING; } @JsonIgnore diff --git a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldAttributeUpdateRequest.java b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldAttributeUpdateRequest.java new file mode 100644 index 0000000000..8479ff37d7 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldAttributeUpdateRequest.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.telemetry; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; + +import java.util.List; + +@Data +@AllArgsConstructor +public class CalculatedFieldAttributeUpdateRequest implements CalculatedFieldTelemetryUpdateRequest { + + private TenantId tenantId; + private EntityId entityId; + private AttributeScope scope; + private List kvEntries; + private List calculatedFieldIds; + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTelemetryUpdateRequest.java b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTelemetryUpdateRequest.java new file mode 100644 index 0000000000..3c28833f31 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTelemetryUpdateRequest.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.telemetry; + +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.KvEntry; + +import java.util.List; + +public interface CalculatedFieldTelemetryUpdateRequest { + + TenantId getTenantId(); + + EntityId getEntityId(); + + AttributeScope getScope(); + + List getKvEntries(); + + List getCalculatedFieldIds(); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTimeSeriesUpdateRequest.java b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTimeSeriesUpdateRequest.java new file mode 100644 index 0000000000..987d899465 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/telemetry/CalculatedFieldTimeSeriesUpdateRequest.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.telemetry; + +import lombok.AllArgsConstructor; +import lombok.Data; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.List; + +@Data +@AllArgsConstructor +public class CalculatedFieldTimeSeriesUpdateRequest implements CalculatedFieldTelemetryUpdateRequest { + + private TenantId tenantId; + private EntityId entityId; + private List kvEntries; + private List calculatedFieldIds; + + @Override + public AttributeScope getScope() { + return null; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index e35d3cead6..bf3076be5c 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -50,6 +50,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldExecutionService; +import org.thingsboard.server.service.cf.telemetry.CalculatedFieldAttributeUpdateRequest; +import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTimeSeriesUpdateRequest; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; @@ -152,7 +154,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer if (request.isSaveLatest() && !request.isOnlyLatest()) { addEntityViewCallback(tenantId, entityId, request.getEntries()); } - calculatedFieldExecutionService.onTelemetryUpdate(tenantId, entityId, request.getCalculatedFieldIds(), request.getEntries()); + calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldTimeSeriesUpdateRequest(tenantId, entityId, request.getEntries(), request.getCalculatedFieldIds())); return saveFuture; } @@ -168,7 +170,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer ListenableFuture> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries()); addMainCallback(saveFuture, request.getCallback()); addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice())); - calculatedFieldExecutionService.onTelemetryUpdate(request.getTenantId(), request.getEntityId(), request.getCalculatedFieldIds(), request.getEntries()); + calculatedFieldExecutionService.onTelemetryUpdate(new CalculatedFieldAttributeUpdateRequest(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries(), request.getCalculatedFieldIds())); } @Override diff --git a/application/src/test/java/org/thingsboard/server/controller/CalculatedFieldControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/CalculatedFieldControllerTest.java index ba1dfb1fec..5d1467974d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/CalculatedFieldControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/CalculatedFieldControllerTest.java @@ -24,8 +24,10 @@ import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; +import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; @@ -140,7 +142,7 @@ public class CalculatedFieldControllerTest extends AbstractControllerTest { Argument argument = new Argument(); argument.setEntityId(referencedEntityId); - argument.setType("TS_LATEST"); + argument.setType(ArgumentType.TS_LATEST); argument.setKey("temperature"); config.setArguments(Map.of("T", argument)); @@ -149,7 +151,7 @@ public class CalculatedFieldControllerTest extends AbstractControllerTest { Output output = new Output(); output.setName("output"); - output.setType("TS_LATEST"); + output.setType(OutputType.TIME_SERIES); config.setOutput(output); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldLinkConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldLinkConfiguration.java index c5f81cd572..78513c8b74 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldLinkConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldLinkConfiguration.java @@ -23,7 +23,9 @@ import java.util.Map; @Data public class CalculatedFieldLinkConfiguration { - private Map attributes = new HashMap<>(); + private Map clientAttributes = new HashMap<>(); + private Map serverAttributes = new HashMap<>(); + private Map sharedAttributes = new HashMap<>(); private Map timeSeries = new HashMap<>(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java index f34f5e9cb7..4dac866219 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java @@ -24,7 +24,7 @@ public class Argument { private EntityId entityId; private String key; - private String type; + private ArgumentType type; private AttributeScope scope; private String defaultValue; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ArgumentType.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ArgumentType.java new file mode 100644 index 0000000000..17e2315b52 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ArgumentType.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.cf.configuration; + +public enum ArgumentType { + + TS_LATEST, ATTRIBUTE, TS_ROLLING + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java index ac36991a61..8c86b6c552 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java @@ -65,19 +65,24 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel public CalculatedFieldLinkConfiguration getReferencedEntityConfig(EntityId entityId) { CalculatedFieldLinkConfiguration linkConfiguration = new CalculatedFieldLinkConfiguration(); - for (Map.Entry entry : arguments.entrySet()) { - Argument argument = entry.getValue(); - if (argument.getEntityId().equals(entityId)) { - switch (argument.getType()) { - case "ATTRIBUTE": - linkConfiguration.getAttributes().put(entry.getKey(), argument.getKey()); - break; - case "TS_LATEST", "TS_ROLLING": - linkConfiguration.getTimeSeries().put(entry.getKey(), argument.getKey()); - break; - } - } - } + arguments.entrySet().stream() + .filter(entry -> entry.getValue().getEntityId().equals(entityId)) + .forEach(entry -> { + Argument argument = entry.getValue(); + String argumentKey = entry.getKey(); + + switch (argument.getType()) { + case ATTRIBUTE -> { + switch (argument.getScope()) { + case CLIENT_SCOPE -> linkConfiguration.getClientAttributes().put(entry.getKey(), argument.getKey()); + case SERVER_SCOPE -> linkConfiguration.getServerAttributes().put(entry.getKey(), argument.getKey()); + case SHARED_SCOPE -> linkConfiguration.getSharedAttributes().put(entry.getKey(), argument.getKey()); + } + } + case TS_LATEST, TS_ROLLING -> + linkConfiguration.getTimeSeries().put(argumentKey, argument.getKey()); + } + }); return linkConfiguration; } @@ -98,7 +103,7 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel argumentNode.put("entityId", entityId.toString()); } argumentNode.put("key", argument.getKey()); - argumentNode.put("type", argument.getType()); + argumentNode.put("type", String.valueOf(argument.getType())); argumentNode.put("scope", String.valueOf(argument.getScope())); argumentNode.put("defaultValue", argument.getDefaultValue()); argumentNode.put("limit", String.valueOf(argument.getLimit())); @@ -112,7 +117,7 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel if (output != null) { ObjectNode outputNode = configNode.putObject("output"); outputNode.put("name", output.getName()); - outputNode.put("type", output.getType()); + outputNode.put("type", String.valueOf(output.getType())); if (output.getScope() != null) { outputNode.put("scope", String.valueOf(output.getScope())); } @@ -141,7 +146,10 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel argument.setEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId)); } argument.setKey(argumentNode.get("key").asText()); - argument.setType(argumentNode.get("type").asText()); + JsonNode type = argumentNode.get("type"); + if (type != null && !type.isNull() && !type.asText().equals("null")) { + argument.setType(ArgumentType.valueOf(type.asText())); + } JsonNode scope = argumentNode.get("scope"); if (scope != null && !scope.isNull() && !scope.asText().equals("null")) { argument.setScope(AttributeScope.valueOf(scope.asText())); @@ -169,7 +177,10 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel if (outputNode != null) { Output output = new Output(); output.setName(outputNode.get("name").asText()); - output.setType(outputNode.get("type").asText()); + JsonNode type = outputNode.get("type"); + if (type != null && !type.isNull() && !type.asText().equals("null")) { + output.setType(OutputType.valueOf(type.asText())); + } JsonNode scope = outputNode.get("scope"); if (scope != null && !scope.isNull() && !scope.asText().equals("null")) { output.setScope(AttributeScope.valueOf(scope.asText())); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java index 46257d1ccc..12cf97338a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java @@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.AttributeScope; public class Output { private String name; - private String type; + private OutputType type; private AttributeScope scope; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputType.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputType.java new file mode 100644 index 0000000000..c248bc8042 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputType.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.cf.configuration; + +public enum OutputType { + + TIME_SERIES, ATTRIBUTES + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/AssetServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/AssetServiceTest.java index b0870f3dc5..9a0b9222f0 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/AssetServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/AssetServiceTest.java @@ -33,7 +33,9 @@ import org.thingsboard.server.common.data.asset.AssetProfile; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; +import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; @@ -884,7 +886,7 @@ public class AssetServiceTest extends AbstractServiceTest { Argument argument = new Argument(); argument.setEntityId(savedAsset.getId()); - argument.setType("TS_LATEST"); + argument.setType(ArgumentType.TS_LATEST); argument.setKey("temperature"); config.setArguments(Map.of("T", argument)); @@ -893,7 +895,7 @@ public class AssetServiceTest extends AbstractServiceTest { Output output = new Output(); output.setName("output"); - output.setType("TS_LATEST"); + output.setType(OutputType.TIME_SERIES); config.setOutput(output); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/CalculatedFieldServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/CalculatedFieldServiceTest.java index 9a1719e715..5a8f7a2383 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/CalculatedFieldServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/CalculatedFieldServiceTest.java @@ -26,8 +26,10 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; +import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; @@ -153,7 +155,7 @@ public class CalculatedFieldServiceTest extends AbstractServiceTest { Argument argument = new Argument(); argument.setEntityId(referencedEntityId); - argument.setType("TS_LATEST"); + argument.setType(ArgumentType.TS_LATEST); argument.setKey("temperature"); config.setArguments(Map.of("T", argument)); @@ -162,7 +164,7 @@ public class CalculatedFieldServiceTest extends AbstractServiceTest { Output output = new Output(); output.setName("output"); - output.setType("TS_LATEST"); + output.setType(OutputType.TIME_SERIES); config.setOutput(output); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/CustomerServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/CustomerServiceTest.java index 6671e0e821..d0ee833261 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/CustomerServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/CustomerServiceTest.java @@ -34,7 +34,9 @@ import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; +import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; @@ -379,7 +381,7 @@ public class CustomerServiceTest extends AbstractServiceTest { Argument argument = new Argument(); argument.setEntityId(savedCustomer.getId()); - argument.setType("TS_LATEST"); + argument.setType(ArgumentType.TS_LATEST); argument.setKey("temperature"); config.setArguments(Map.of("T", argument)); @@ -388,7 +390,7 @@ public class CustomerServiceTest extends AbstractServiceTest { Output output = new Output(); output.setName("output"); - output.setType("TS_LATEST"); + output.setType(OutputType.TIME_SERIES); config.setOutput(output); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/DeviceServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/DeviceServiceTest.java index f2f8686bc3..5b060ae145 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/DeviceServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/DeviceServiceTest.java @@ -42,7 +42,9 @@ import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; +import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceProfileId; @@ -1222,7 +1224,7 @@ public class DeviceServiceTest extends AbstractServiceTest { Argument argument = new Argument(); argument.setEntityId(device.getId()); - argument.setType("TS_LATEST"); + argument.setType(ArgumentType.TS_LATEST); argument.setKey("temperature"); config.setArguments(Map.of("T", argument)); @@ -1231,7 +1233,7 @@ public class DeviceServiceTest extends AbstractServiceTest { Output output = new Output(); output.setName("output"); - output.setType("TS_LATEST"); + output.setType(OutputType.TIME_SERIES); config.setOutput(output);