From d684c8777a2d7ec438e8a4b7b7de0a6841eaae49 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 26 Nov 2024 17:33:01 +0200 Subject: [PATCH] improved usage of calculation ctx --- .../cf/CalculatedFieldExecutionService.java | 2 +- .../service/cf/CalculatedFieldResult.java | 8 +- ...efaultCalculatedFieldExecutionService.java | 95 +++++++++---------- ...Ctx.java => CalculatedFieldEntityCtx.java} | 8 +- ...d.java => CalculatedFieldEntityCtxId.java} | 2 +- .../service/cf/ctx/state/ArgumentEntry.java | 26 +++-- ...KvArgumentEntry.java => ArgumentType.java} | 15 +-- .../ctx/state/BaseCalculatedFieldState.java | 30 +++--- .../cf/ctx/state/CalculatedFieldCtx.java | 72 ++++++++++++++ .../state/CalculatedFieldScriptEngine.java | 7 +- .../cf/ctx/state/CalculatedFieldState.java | 8 +- .../CalculatedFieldTbelScriptEngine.java | 18 +++- .../ctx/state/LastRecordsArgumentEntry.java | 16 +++- .../LastRecordsCalculatedFieldState.java | 46 +++++---- .../ctx/state/ScriptCalculatedFieldState.java | 47 ++------- .../ctx/state/SimpleCalculatedFieldState.java | 9 +- ...ext.java => SingleValueArgumentEntry.java} | 32 +++---- .../queue/DefaultTbCoreConsumerService.java | 2 +- .../data/cf/configuration/Argument.java | 1 - .../BaseCalculatedFieldConfiguration.java | 4 - 20 files changed, 257 insertions(+), 191 deletions(-) rename application/src/main/java/org/thingsboard/server/service/cf/ctx/{CalculatedFieldCtx.java => CalculatedFieldEntityCtx.java} (79%) rename application/src/main/java/org/thingsboard/server/service/cf/ctx/{CalculatedFieldCtxId.java => CalculatedFieldEntityCtxId.java} (90%) rename application/src/main/java/org/thingsboard/server/service/cf/ctx/state/{KvArgumentEntry.java => ArgumentType.java} (72%) create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java rename application/src/main/java/org/thingsboard/server/service/cf/ctx/state/{CalculationContext.java => SingleValueArgumentEntry.java} (51%) diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 6b7b12655f..6d7cf0e741 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -29,6 +29,6 @@ public interface CalculatedFieldExecutionService { void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry); - void onEntityTypeChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback); + void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java index 87f1d08a84..1f8a06c8fa 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java @@ -21,13 +21,15 @@ import org.thingsboard.server.common.data.AttributeScope; import java.util.Map; @Data -public class CalculatedFieldResult { +public final class CalculatedFieldResult { private String type; private AttributeScope scope; private Map resultMap; - public CalculatedFieldResult() { + public CalculatedFieldResult(String type, AttributeScope scope, Map resultMap) { + this.type = type; + this.scope = scope; + this.resultMap = resultMap == null ? Map.of() : Map.copyOf(resultMap); } - } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 322443c848..0d269f032f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -71,11 +71,11 @@ import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldCtx; -import org.thingsboard.server.service.cf.ctx.CalculatedFieldCtxId; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx; +import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; -import org.thingsboard.server.service.cf.ctx.state.CalculationContext; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.LastRecordsCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; @@ -115,7 +115,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas private final ConcurrentMap calculatedFields = new ConcurrentHashMap<>(); private final ConcurrentMap> calculatedFieldLinks = new ConcurrentHashMap<>(); - private final ConcurrentMap states = new ConcurrentHashMap<>(); + private final ConcurrentMap calculatedFieldsCtx = new ConcurrentHashMap<>(); + private final ConcurrentMap states = new ConcurrentHashMap<>(); private static final int MAX_LAST_RECORDS_VALUE = 1024; @@ -188,22 +189,24 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas EntityId entityId = cf.getEntityId(); calculatedFields.put(calculatedFieldId, cf); calculatedFieldLinks.put(calculatedFieldId, links); + CalculatedFieldCtx calculatedFieldCtx = new CalculatedFieldCtx(cf, tbelInvokeService); + calculatedFieldsCtx.put(calculatedFieldId, calculatedFieldCtx); switch (entityId.getEntityType()) { case ASSET, DEVICE -> { log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId); - initializeStateForEntity(cf, entityId, callback); + initializeStateForEntity(calculatedFieldCtx, entityId, callback); } case ASSET_PROFILE -> { log.info("Initializing state for all assets in profile: tenantId=[{}], assetProfileId=[{}]", tenantId, entityId); PageDataIterable assetIds = new PageDataIterable<>(pageLink -> assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize); - assetIds.forEach(assetId -> initializeStateForEntity(cf, assetId, callback)); + assetIds.forEach(assetId -> initializeStateForEntity(calculatedFieldCtx, assetId, callback)); } case DEVICE_PROFILE -> { log.info("Initializing state for all devices in profile: tenantId=[{}], deviceProfileId=[{}]", tenantId, entityId); PageDataIterable deviceIds = new PageDataIterable<>(pageLink -> deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize); - deviceIds.forEach(deviceId -> initializeStateForEntity(cf, deviceId, callback)); + deviceIds.forEach(deviceId -> initializeStateForEntity(calculatedFieldCtx, deviceId, callback)); } default -> throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields."); @@ -224,10 +227,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas @Override public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry) { try { - CalculatedField calculatedField = calculatedFields.computeIfAbsent(calculatedFieldId, id -> calculatedFieldService.findById(tenantId, id)); + CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> { + CalculatedField calculatedField = calculatedFields.computeIfAbsent(id, cfId -> calculatedFieldService.findById(tenantId, id)); + return new CalculatedFieldCtx(calculatedField, tbelInvokeService); + }); Map argumentValues = updatedTelemetry.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createArgumentEntry(entry.getValue()))); - updateOrInitializeState(calculatedField, calculatedField.getEntityId(), argumentValues); + .collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue()))); + updateOrInitializeState(calculatedFieldCtx, calculatedFieldCtx.getEntityId(), argumentValues); log.info("Successfully updated time series for calculatedFieldId: [{}]", calculatedFieldId); } catch (Exception e) { log.trace("Failed to update telemetry for calculatedFieldId: [{}]", calculatedFieldId, e); @@ -235,7 +241,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } @Override - public void onEntityTypeChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback) { + public void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback) { try { TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); @@ -246,15 +252,15 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) .forEach(cfId -> { - CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(cfId.getId(), entityId.getId()); + CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); states.remove(ctxId); rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); }); calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, newProfileId) .stream() - .map(cfId -> calculatedFields.computeIfAbsent(cfId, id -> calculatedFieldService.findById(tenantId, id))) - .forEach(cf -> initializeStateForEntity(cf, entityId, callback)); + .map(cfId -> calculatedFieldsCtx.computeIfAbsent(cfId, id -> new CalculatedFieldCtx(calculatedFieldService.findById(tenantId, id), tbelInvokeService))) + .forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); } catch (Exception e) { log.trace("Failed to process entity type update msg: [{}]", proto, e); } @@ -267,6 +273,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas onCalculatedFieldDelete(newCalculatedField.getId(), callback); } else { calculatedFields.put(newCalculatedField.getId(), newCalculatedField); + calculatedFieldsCtx.put(newCalculatedField.getId(), new CalculatedFieldCtx(newCalculatedField, tbelInvokeService)); callback.onSuccess(); shouldReinit = false; } @@ -277,6 +284,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas try { calculatedFieldLinks.remove(calculatedFieldId); calculatedFields.remove(calculatedFieldId); + calculatedFieldsCtx.remove(calculatedFieldId); states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.cfId().equals(id.getId()))); List statesToRemove = states.keySet().stream() .filter(ctxId -> !calculatedFields.containsKey(new CalculatedFieldId(ctxId.cfId()))) @@ -309,20 +317,20 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf)); PageDataIterable cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize); cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link)); - rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(JacksonUtil.fromString(ctxId, CalculatedFieldCtxId.class), JacksonUtil.fromString(ctx, CalculatedFieldCtx.class))); + rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(JacksonUtil.fromString(ctxId, CalculatedFieldEntityCtxId.class), JacksonUtil.fromString(ctx, CalculatedFieldEntityCtx.class))); states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.cfId().equals(id.getId()))); } - private void initializeStateForEntity(CalculatedField calculatedField, EntityId entityId, TbCallback callback) { - Map arguments = calculatedField.getConfiguration().getArguments(); + private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) { + Map arguments = calculatedFieldCtx.getArguments(); Map argumentValues = new HashMap<>(); AtomicInteger remaining = new AtomicInteger(arguments.size()); - arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedField, entityId, argument), new FutureCallback<>() { + arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedFieldCtx, entityId, argument), new FutureCallback<>() { @Override public void onSuccess(ArgumentEntry result) { argumentValues.put(key, result); if (remaining.decrementAndGet() == 0) { - updateOrInitializeState(calculatedField, entityId, argumentValues); + updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues); } } @@ -334,28 +342,28 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, calculatedFieldCallbackExecutor)); } - private ListenableFuture fetchArgumentValue(CalculatedField calculatedField, EntityId targetEntityId, Argument argument) { - TenantId tenantId = calculatedField.getTenantId(); + private ListenableFuture fetchArgumentValue(CalculatedFieldCtx calculatedFieldCtx, EntityId targetEntityId, Argument argument) { + TenantId tenantId = calculatedFieldCtx.getTenantId(); EntityId argumentEntityId = argument.getEntityId(); EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType()) ? targetEntityId : argumentEntityId; - if (CalculatedFieldType.LAST_RECORDS.equals(calculatedField.getType())) { + if (CalculatedFieldType.LAST_RECORDS.equals(calculatedFieldCtx.getCfType())) { return fetchLastRecords(tenantId, entityId, argument); } return fetchKvEntry(tenantId, entityId, argument); } private ListenableFuture fetchLastRecords(TenantId tenantId, EntityId entityId, Argument argument) { - long startTs = Math.max(argument.getStartTs(), 0); + long currentTime = System.currentTimeMillis(); long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow(); - long endTs = startTs + timeWindow; + long startTs = currentTime - timeWindow; int limit = argument.getLimit() == 0 ? MAX_LAST_RECORDS_VALUE : argument.getLimit(); - ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, endTs, 0, limit, Aggregation.NONE); + ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE); ListenableFuture> lastRecordsFuture = timeseriesService.findAll(tenantId, entityId, List.of(query)); - return Futures.transform(lastRecordsFuture, ArgumentEntry::createArgumentEntry, calculatedFieldExecutor); + return Futures.transform(lastRecordsFuture, ArgumentEntry::createLastRecordsArgument, calculatedFieldExecutor); } private ListenableFuture fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) { @@ -374,7 +382,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldExecutor); default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'."); }; - return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createArgumentEntry(kvEntry.orElse(null)), calculatedFieldExecutor); + return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)), calculatedFieldExecutor); } private KvEntry createDefaultKvEntry(Argument argument) { @@ -389,32 +397,32 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas return new StringDataEntry(key, defaultValue); } - private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map argumentValues) { - CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(calculatedField.getUuidId(), entityId.getId()); - CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(ctxId, ctx -> new CalculatedFieldCtx(ctxId, null)); + private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map argumentValues) { + CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(calculatedFieldCtx.getCfId().getId(), entityId.getId()); + CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctx -> new CalculatedFieldEntityCtx(entityCtxId, null)); - CalculatedFieldState state = calculatedFieldCtx.getState(); + CalculatedFieldState state = calculatedFieldEntityCtx.getState(); if (state == null) { - state = createStateByType(calculatedField.getType()); + state = createStateByType(calculatedFieldCtx.getCfType()); } state.initState(argumentValues); - calculatedFieldCtx.setState(state); - states.put(ctxId, calculatedFieldCtx); - rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(calculatedFieldCtx)); + calculatedFieldEntityCtx.setState(state); + states.put(entityCtxId, calculatedFieldEntityCtx); + rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); - ListenableFuture resultFuture = state.performCalculation(buildCalculationContext(calculatedField)); + ListenableFuture resultFuture = state.performCalculation(calculatedFieldCtx); Futures.addCallback(resultFuture, new FutureCallback<>() { @Override public void onSuccess(CalculatedFieldResult result) { if (result != null) { - pushMsgToRuleEngine(calculatedField.getTenantId(), entityId, result); + pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), entityId, result); } } @Override public void onFailure(Throwable t) { - log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedField.getId(), entityId, t); + log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedFieldCtx.getCfId(), entityId, t); } }, MoreExecutors.directExecutor()); @@ -433,17 +441,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } - private CalculationContext buildCalculationContext(CalculatedField calculatedField) { - CalculatedFieldConfiguration configuration = calculatedField.getConfiguration(); - return CalculationContext.builder() - .tenantId(calculatedField.getTenantId()) - .arguments(configuration.getArguments()) - .output(configuration.getOutput()) - .expression(configuration.getExpression()) - .tbelInvokeService(tbelInvokeService) - .build(); - } - private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) { ObjectNode payload = JacksonUtil.newObjectNode(); Map resultMap = calculatedFieldResult.getResultMap(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtx.java similarity index 79% rename from application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldCtx.java rename to application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtx.java index 4b2a6c918f..7a8384b6bf 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtx.java @@ -19,15 +19,15 @@ import lombok.Data; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; @Data -public class CalculatedFieldCtx { +public class CalculatedFieldEntityCtx { - private CalculatedFieldCtxId id; + private CalculatedFieldEntityCtxId id; private CalculatedFieldState state; - public CalculatedFieldCtx() { + public CalculatedFieldEntityCtx() { } - public CalculatedFieldCtx(CalculatedFieldCtxId id, CalculatedFieldState state) { + public CalculatedFieldEntityCtx(CalculatedFieldEntityCtxId id, CalculatedFieldState state) { this.id = id; this.state = state; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldCtxId.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java similarity index 90% rename from application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldCtxId.java rename to application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java index a316c54b76..f7c451efee 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldCtxId.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/CalculatedFieldEntityCtxId.java @@ -17,5 +17,5 @@ package org.thingsboard.server.service.cf.ctx; import java.util.UUID; -public record CalculatedFieldCtxId(UUID cfId, UUID entityId) { +public record CalculatedFieldEntityCtxId(UUID cfId, UUID entityId) { } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index 3097056d11..6218f38edf 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -15,25 +15,35 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; +import java.util.stream.Collectors; +@JsonTypeInfo( + use = JsonTypeInfo.Id.NAME, + include = JsonTypeInfo.As.PROPERTY, + property = "type" +) +@JsonSubTypes({ + @JsonSubTypes.Type(value = SingleValueArgumentEntry.class, name = "SINGLE_VALUE"), + @JsonSubTypes.Type(value = LastRecordsArgumentEntry.class, name = "LAST_RECORDS") +}) public interface ArgumentEntry { + ArgumentType getType(); + Object getValue(); - static ArgumentEntry createArgumentEntry(KvEntry kvEntry) { - if (kvEntry instanceof TsKvEntry tsKvEntry) { - return new LastRecordsArgumentEntry(List.of(tsKvEntry)); - } else { - return new KvArgumentEntry(kvEntry); - } + static ArgumentEntry createSingleValueArgument(KvEntry kvEntry) { + return new SingleValueArgumentEntry(kvEntry.getValue()); } - static ArgumentEntry createArgumentEntry(List kvEntries) { - return new LastRecordsArgumentEntry(kvEntries); + static ArgumentEntry createLastRecordsArgument(List kvEntries) { + return new LastRecordsArgumentEntry(kvEntries.stream() .collect(Collectors.toMap(TsKvEntry::getTs, TsKvEntry::getValue))); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KvArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentType.java similarity index 72% rename from application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KvArgumentEntry.java rename to application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentType.java index 0bd21d452e..f2f0eac60d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KvArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentType.java @@ -15,17 +15,6 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import lombok.Data; -import org.thingsboard.server.common.data.kv.KvEntry; - -@Data -public class KvArgumentEntry implements ArgumentEntry { - - private final KvEntry kvEntry; - - @Override - public Object getValue() { - return kvEntry; - } - +public enum ArgumentType { + SINGLE_VALUE, LAST_RECORDS } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index bac318a1b9..54df89e757 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -15,34 +15,36 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import org.thingsboard.server.common.data.cf.configuration.Output; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.service.cf.CalculatedFieldResult; - import java.util.HashMap; import java.util.Map; +import java.util.stream.Collectors; public abstract class BaseCalculatedFieldState implements CalculatedFieldState { - protected Map arguments; + protected Map arguments; public BaseCalculatedFieldState() { } + @Override + public Map getArguments() { + return this.arguments; + } + @Override public void initState(Map argumentValues) { if (arguments == null) { arguments = new HashMap<>(); } -// argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry())); - } - - protected CalculatedFieldResult buildResult(Output output, Map resultMap) { - CalculatedFieldResult result = new CalculatedFieldResult(); - result.setType(output.getType()); - result.setScope(output.getScope()); - result.setResultMap(resultMap); - return result; + arguments.putAll( + argumentValues.entrySet().stream() + .peek(entry -> { + if (entry.getValue() instanceof LastRecordsArgumentEntry) { + throw new IllegalArgumentException("Last records argument entry is not allowed for single calculated field state"); + } + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java new file mode 100644 index 0000000000..10395a0d5d --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state; + +import lombok.Data; +import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; +import org.thingsboard.server.common.data.cf.configuration.Argument; +import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; + +import java.util.Map; + +@Data +public class CalculatedFieldCtx { + + private CalculatedFieldId cfId; + private TenantId tenantId; + private EntityId entityId; + private CalculatedFieldType cfType; + private Map arguments; + private Output output; + private String expression; + private TbelInvokeService tbelInvokeService; + private CalculatedFieldScriptEngine calculatedFieldScriptEngine; + + public CalculatedFieldCtx(CalculatedField calculatedField, TbelInvokeService tbelInvokeService) { + this.cfId = calculatedField.getId(); + this.tenantId = calculatedField.getTenantId(); + this.entityId = calculatedField.getEntityId(); + this.cfType = calculatedField.getType(); + CalculatedFieldConfiguration configuration = calculatedField.getConfiguration(); + this.arguments = configuration.getArguments(); + this.output = configuration.getOutput(); + this.expression = configuration.getExpression(); + this.tbelInvokeService = tbelInvokeService; + if (CalculatedFieldType.SCRIPT.equals(calculatedField.getType())) { + this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService); + } + } + + private CalculatedFieldScriptEngine initEngine(TenantId tenantId, String expression, TbelInvokeService tbelInvokeService) { + if (tbelInvokeService == null) { + throw new IllegalArgumentException("TBEL script engine is disabled!"); + } + + return new CalculatedFieldTbelScriptEngine( + tenantId, + tbelInvokeService, + expression, + arguments.keySet().toArray(new String[0]) + ); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldScriptEngine.java index 6b54536019..212d971398 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldScriptEngine.java @@ -16,13 +16,16 @@ package org.thingsboard.server.service.cf.ctx.state; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.kv.KvEntry; import java.util.Map; public interface CalculatedFieldScriptEngine { - ListenableFuture executeScriptAsync(Map arguments); + ListenableFuture executeScriptAsync(Map arguments); + + ListenableFuture> executeToMapAsync(Map arguments); + + ListenableFuture> executeToMapTransform(Object result); void destroy(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index e4c4440921..f0882cbea4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -41,12 +41,14 @@ public interface CalculatedFieldState { @JsonIgnore CalculatedFieldType getType(); - default boolean isValid(Map argumentValues, Map arguments) { - return argumentValues.keySet().containsAll(arguments.keySet()); + Map getArguments(); + + default boolean isValid(Map arguments) { + return getArguments().keySet().containsAll(arguments.keySet()); } void initState(Map argumentValues); - ListenableFuture performCalculation(CalculationContext ctx); + ListenableFuture performCalculation(CalculatedFieldCtx ctx); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldTbelScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldTbelScriptEngine.java index 5cc58b2a95..363eefd44e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldTbelScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldTbelScriptEngine.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.script.api.ScriptType; import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.common.data.id.TenantId; @@ -52,9 +53,9 @@ public class CalculatedFieldTbelScriptEngine implements CalculatedFieldScriptEng } @Override - public ListenableFuture executeScriptAsync(Map arguments) { + public ListenableFuture executeScriptAsync(Map arguments) { log.trace("execute script async, arguments {}", arguments); - Object[] args = arguments.values().stream().map(KvEntry::getValue).toArray(); + Object[] args = arguments.values().stream().map(ArgumentEntry::getValue).toArray(); return Futures.transformAsync(tbelInvokeService.invokeScript(tenantId, null, this.scriptId, args), o -> { try { @@ -71,6 +72,19 @@ public class CalculatedFieldTbelScriptEngine implements CalculatedFieldScriptEng }, MoreExecutors.directExecutor()); } + @Override + public ListenableFuture> executeToMapAsync(Map arguments) { + return Futures.transformAsync(executeScriptAsync(arguments), this::executeToMapTransform, MoreExecutors.directExecutor()); + } + + @Override + public ListenableFuture> executeToMapTransform(Object result) { + if (result instanceof Map) { + return Futures.immediateFuture((Map) result); + } + throw new IllegalArgumentException("Wrong result type: [" + result.getClass().getName() + "]"); + } + @Override public void destroy() { tbelInvokeService.release(this.scriptId); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java index 8729f022fa..39da1838bc 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java @@ -15,19 +15,27 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import lombok.AllArgsConstructor; import lombok.Data; -import org.thingsboard.server.common.data.kv.TsKvEntry; +import lombok.NoArgsConstructor; -import java.util.List; +import java.util.Map; @Data +@NoArgsConstructor +@AllArgsConstructor public class LastRecordsArgumentEntry implements ArgumentEntry { - private final List kvEntries; + private Map tsRecords; + + @Override + public ArgumentType getType() { + return ArgumentType.LAST_RECORDS; + } @Override public Object getValue() { - return kvEntries; + return tsRecords.values(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java index 0428d0823d..437707bfdb 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java @@ -15,14 +15,10 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import aj.org.objectweb.asm.TypeReference; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.Data; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.cf.CalculatedFieldType; -import org.thingsboard.server.common.data.cf.configuration.Argument; -import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.service.cf.CalculatedFieldResult; @@ -37,8 +33,6 @@ import java.util.stream.Collectors; @Data public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState { - private Map> arguments; - public LastRecordsCalculatedFieldState() { } @@ -47,36 +41,46 @@ public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState { return CalculatedFieldType.LAST_RECORDS; } - @Override public void initState(Map argumentValues) { if (arguments == null) { arguments = new HashMap<>(); } argumentValues.forEach((key, argumentEntry) -> { - List tsKvEntryList = arguments.computeIfAbsent(key, k -> new ArrayList<>()); -// tsKvEntryList.addAll(argumentEntry.getKvEntries()); + LastRecordsArgumentEntry existingArgumentEntry = (LastRecordsArgumentEntry) + arguments.computeIfAbsent(key, k -> new LastRecordsArgumentEntry(new HashMap<>())); + if (argumentEntry instanceof LastRecordsArgumentEntry lastRecordsArgumentEntry) { + existingArgumentEntry.getTsRecords().putAll(lastRecordsArgumentEntry.getTsRecords()); + } else if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry + && singleValueArgumentEntry.getValue() instanceof TsKvEntry tsKvEntry) { + existingArgumentEntry.getTsRecords().put(tsKvEntry.getTs(), tsKvEntry.getValue()); + } }); } - @Override - public ListenableFuture performCalculation(CalculationContext ctx) { + public ListenableFuture performCalculation(CalculatedFieldCtx ctx) { Map resultMap = new HashMap<>(); - arguments.replaceAll((key, entries) -> { + arguments.replaceAll((key, argumentEntry) -> { int limit = ctx.getArguments().get(key).getLimit(); - List limitedEntries = entries.stream() - .sorted(Comparator.comparingLong(TsKvEntry::getTs).reversed()) - .limit(limit) - .collect(Collectors.toList()); - Map valueWithTs = limitedEntries.stream() - .collect(Collectors.toMap(TsKvEntry::getTs, TsKvEntry::getValue)); - resultMap.put(key, valueWithTs); + // TODO: implement removing if size > limit + + +// List limitedEntries = entries.stream() +// .sorted(Comparator.comparingLong(TsKvEntry::getTs).reversed()) +// .limit(limit) +// .collect(Collectors.toList()); +// +// Map valueWithTs = limitedEntries.stream() +// .collect(Collectors.toMap(TsKvEntry::getTs, TsKvEntry::getValue)); +// resultMap.put(key, valueWithTs); - return limitedEntries; +// return new LastRecordsArgumentEntry(limitedEntries); + return null; }); - return Futures.immediateFuture(buildResult(ctx.getOutput(), resultMap)); + Output output = ctx.getOutput(); + return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), resultMap)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java index 047bfd0f8c..16905930d3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java @@ -15,68 +15,37 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.script.api.tbel.TbelInvokeService; import org.thingsboard.server.common.data.cf.CalculatedFieldType; -import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.Output; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.service.cf.CalculatedFieldResult; -import java.util.HashMap; import java.util.Map; @Data @Slf4j public class ScriptCalculatedFieldState extends BaseCalculatedFieldState { - @JsonIgnore - private CalculatedFieldScriptEngine calculatedFieldScriptEngine; - @Override public CalculatedFieldType getType() { return CalculatedFieldType.SCRIPT; } - @Override - public ListenableFuture performCalculation(CalculationContext ctx) { - if (isValid(this.arguments, ctx.getArguments())) { - if (calculatedFieldScriptEngine == null) { - initEngine(ctx.getTenantId(), ctx.getExpression(), ctx.getTbelInvokeService()); - } - - ListenableFuture resultFuture = calculatedFieldScriptEngine.executeScriptAsync(this.arguments); - - return Futures.transform(resultFuture, result -> { - Map resultMap = result instanceof Map - ? JacksonUtil.convertValue(result, Map.class) - : new HashMap<>(); - - return buildResult(ctx.getOutput(), resultMap); - }, MoreExecutors.directExecutor()); + public ListenableFuture performCalculation(CalculatedFieldCtx ctx) { + Output output = ctx.getOutput(); + if (isValid(ctx.getArguments())) { + ListenableFuture> resultFuture = ctx.getCalculatedFieldScriptEngine().executeToMapAsync(this.arguments); + return Futures.transform(resultFuture, + result -> new CalculatedFieldResult(output.getType(), output.getScope(), result), + MoreExecutors.directExecutor() + ); } return null; } - private void initEngine(TenantId tenantId, String expression, TbelInvokeService tbelInvokeService) { - if (tbelInvokeService == null) { - throw new IllegalArgumentException("TBEL script engine is disabled!"); - } - - calculatedFieldScriptEngine = new CalculatedFieldTbelScriptEngine( - tenantId, - tbelInvokeService, - expression, - this.arguments.keySet().toArray(new String[0]) - ); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index fbdd1eb354..64eb409d3a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -22,7 +22,6 @@ import net.objecthunter.exp4j.Expression; import net.objecthunter.exp4j.ExpressionBuilder; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; -import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.service.cf.CalculatedFieldResult; @@ -39,8 +38,8 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { } @Override - public ListenableFuture performCalculation(CalculationContext ctx) { - if (isValid(this.arguments, ctx.getArguments())) { + public ListenableFuture performCalculation(CalculatedFieldCtx ctx) { + if (isValid(ctx.getArguments())) { String expression = ctx.getExpression(); ThreadLocal customExpression = new ThreadLocal<>(); var expr = customExpression.get(); @@ -52,13 +51,13 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { customExpression.set(expr); } Map variables = new HashMap<>(); - this.arguments.forEach((k, v) -> variables.put(k, Double.parseDouble(v.getValueAsString()))); + this.arguments.forEach((k, v) -> variables.put(k, Double.parseDouble(((KvEntry) v.getValue()).getValueAsString()))); expr.setVariables(variables); double expressionResult = expr.evaluate(); Output output = ctx.getOutput(); - return Futures.immediateFuture(buildResult(output, Map.of(output.getName(), expressionResult))); + return Futures.immediateFuture(new CalculatedFieldResult(output.getType(), output.getScope(), Map.of(output.getName(), expressionResult))); } return null; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java similarity index 51% rename from application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java rename to application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index aaabaec0d4..504d213748 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -15,25 +15,25 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import lombok.Builder; +import lombok.AllArgsConstructor; import lombok.Data; -import org.thingsboard.script.api.tbel.TbelInvokeService; -import org.thingsboard.server.common.data.cf.configuration.Argument; -import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; -import org.thingsboard.server.common.data.cf.configuration.Output; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.KvEntry; - -import java.util.Map; +import lombok.NoArgsConstructor; @Data -@Builder -public class CalculationContext { +@NoArgsConstructor +@AllArgsConstructor +public class SingleValueArgumentEntry implements ArgumentEntry { + + private Object value; + + @Override + public ArgumentType getType() { + return ArgumentType.SINGLE_VALUE; + } - private TenantId tenantId; - private Map arguments; - private Output output; - private String expression; - private TbelInvokeService tbelInvokeService; + @Override + public Object getValue() { + return value; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 3caaab6613..392e2637d1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -685,7 +685,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityTypeChanged(profileUpdateMsg, callback)); + ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityProfileChanged(profileUpdateMsg, callback)); DonAsynchron.withCallback(future, __ -> callback.onSuccess(), t -> { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java index 0d70591a38..f34f5e9cb7 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Argument.java @@ -29,7 +29,6 @@ public class Argument { private String defaultValue; private int limit; - private long startTs; private long timeWindow; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java index f7cc53b7cf..f311fb737b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java @@ -102,7 +102,6 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel argumentNode.put("scope", String.valueOf(argument.getScope())); argumentNode.put("defaultValue", argument.getDefaultValue()); argumentNode.put("limit", String.valueOf(argument.getLimit())); - argumentNode.put("startTs", String.valueOf(argument.getStartTs())); argumentNode.put("timeWindow", String.valueOf(argument.getTimeWindow())); }); @@ -153,9 +152,6 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel if (argumentNode.hasNonNull("limit")) { argument.setLimit(argumentNode.get("limit").asInt()); } - if (argumentNode.hasNonNull("startTs")) { - argument.setStartTs(argumentNode.get("startTs").asLong()); - } if (argumentNode.hasNonNull("timeWindow")) { argument.setTimeWindow(argumentNode.get("timeWindow").asInt()); }