diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index a985f093c9..322443c848 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -191,19 +191,19 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas switch (entityId.getEntityType()) { case ASSET, DEVICE -> { log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId); - initializeStateForEntity(tenantId, cf, entityId, callback); + initializeStateForEntity(cf, entityId, callback); } case ASSET_PROFILE -> { log.info("Initializing state for all assets in profile: tenantId=[{}], assetProfileId=[{}]", tenantId, entityId); PageDataIterable assetIds = new PageDataIterable<>(pageLink -> assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize); - assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, assetId, callback)); + assetIds.forEach(assetId -> initializeStateForEntity(cf, assetId, callback)); } case DEVICE_PROFILE -> { log.info("Initializing state for all devices in profile: tenantId=[{}], deviceProfileId=[{}]", tenantId, entityId); PageDataIterable deviceIds = new PageDataIterable<>(pageLink -> deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize); - deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, deviceId, callback)); + deviceIds.forEach(deviceId -> initializeStateForEntity(cf, deviceId, callback)); } default -> throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields."); @@ -226,17 +226,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas try { CalculatedField calculatedField = calculatedFields.computeIfAbsent(calculatedFieldId, id -> calculatedFieldService.findById(tenantId, id)); Map argumentValues = updatedTelemetry.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> { - ArgumentEntry argumentEntry = new ArgumentEntry(); - argumentEntry.setKvEntry(entry.getValue()); - if (entry.getValue() instanceof TsKvEntry) { - argumentEntry.setKvEntries(List.of((TsKvEntry) entry.getValue())); - } - return argumentEntry; - } - )); + .collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createArgumentEntry(entry.getValue()))); updateOrInitializeState(calculatedField, calculatedField.getEntityId(), argumentValues); log.info("Successfully updated time series for calculatedFieldId: [{}]", calculatedFieldId); } catch (Exception e) { @@ -254,11 +244,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); - List cfIdsOfOldProfile = calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId); - cfIdsOfOldProfile.forEach(id -> states.remove(new CalculatedFieldCtxId(id.getId(), entityId.getId()))); - List ctxIdsToDelete = cfIdsOfOldProfile.stream().map(cfId -> JacksonUtil.writeValueAsString(new CalculatedFieldCtxId(cfId.getId(), entityId.getId()))).toList(); - rocksDBService.deleteAll(ctxIdsToDelete); - calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) .forEach(cfId -> { CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(cfId.getId(), entityId.getId()); @@ -269,7 +254,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, newProfileId) .stream() .map(cfId -> calculatedFields.computeIfAbsent(cfId, id -> calculatedFieldService.findById(tenantId, id))) - .forEach(cf -> initializeStateForEntity(tenantId, cf, entityId, callback)); + .forEach(cf -> initializeStateForEntity(cf, entityId, callback)); } catch (Exception e) { log.trace("Failed to process entity type update msg: [{}]", proto, e); } @@ -328,11 +313,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.cfId().equals(id.getId()))); } - private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, EntityId entityId, TbCallback callback) { + private void initializeStateForEntity(CalculatedField calculatedField, EntityId entityId, TbCallback callback) { Map arguments = calculatedField.getConfiguration().getArguments(); Map argumentValues = new HashMap<>(); AtomicInteger remaining = new AtomicInteger(arguments.size()); - arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedField, argument), new FutureCallback<>() { + arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedField, entityId, argument), new FutureCallback<>() { @Override public void onSuccess(ArgumentEntry result) { argumentValues.put(key, result); @@ -349,12 +334,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas }, calculatedFieldCallbackExecutor)); } - private ListenableFuture fetchArgumentValue(CalculatedField calculatedField, Argument argument) { + private ListenableFuture fetchArgumentValue(CalculatedField calculatedField, EntityId targetEntityId, Argument argument) { TenantId tenantId = calculatedField.getTenantId(); - EntityId cfEntityId = calculatedField.getEntityId(); EntityId argumentEntityId = argument.getEntityId(); EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType()) - ? cfEntityId + ? targetEntityId : argumentEntityId; if (CalculatedFieldType.LAST_RECORDS.equals(calculatedField.getType())) { return fetchLastRecords(tenantId, entityId, argument); @@ -371,11 +355,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, endTs, 0, limit, Aggregation.NONE); ListenableFuture> lastRecordsFuture = timeseriesService.findAll(tenantId, entityId, List.of(query)); - return Futures.transform(lastRecordsFuture, lastRecords -> { - ArgumentEntry argumentEntry = new ArgumentEntry(); - argumentEntry.setKvEntries(lastRecords); - return argumentEntry; - }, calculatedFieldExecutor); + return Futures.transform(lastRecordsFuture, ArgumentEntry::createArgumentEntry, calculatedFieldExecutor); } private ListenableFuture fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) { @@ -394,13 +374,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas calculatedFieldExecutor); default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'."); }; - return Futures.transform(kvEntryFuture, kvEntry -> { - ArgumentEntry argumentEntry = new ArgumentEntry(); - if (kvEntry.isPresent()) { - argumentEntry.setKvEntry(kvEntry.orElse(null)); - } - return argumentEntry; - }, calculatedFieldExecutor); + return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createArgumentEntry(kvEntry.orElse(null)), calculatedFieldExecutor); } private KvEntry createDefaultKvEntry(Argument argument) { @@ -429,12 +403,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas states.put(ctxId, calculatedFieldCtx); rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(calculatedFieldCtx)); - CalculationContext ctx = CalculationContext.builder() - .tenantId(calculatedField.getTenantId()) - .configuration(calculatedField.getConfiguration()) - .tbelInvokeService(tbelInvokeService) - .build(); - ListenableFuture resultFuture = state.performCalculation(ctx); + ListenableFuture resultFuture = state.performCalculation(buildCalculationContext(calculatedField)); Futures.addCallback(resultFuture, new FutureCallback<>() { @Override public void onSuccess(CalculatedFieldResult result) { @@ -464,6 +433,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas } } + private CalculationContext buildCalculationContext(CalculatedField calculatedField) { + CalculatedFieldConfiguration configuration = calculatedField.getConfiguration(); + return CalculationContext.builder() + .tenantId(calculatedField.getTenantId()) + .arguments(configuration.getArguments()) + .output(configuration.getOutput()) + .expression(configuration.getExpression()) + .tbelInvokeService(tbelInvokeService) + .build(); + } + private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) { ObjectNode payload = JacksonUtil.newObjectNode(); Map resultMap = calculatedFieldResult.getResultMap(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index 29e3417bde..3097056d11 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -15,16 +15,25 @@ */ package org.thingsboard.server.service.cf.ctx.state; -import lombok.Data; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; -@Data -public class ArgumentEntry { +public interface ArgumentEntry { - private KvEntry kvEntry; - private List kvEntries; + Object getValue(); + + static ArgumentEntry createArgumentEntry(KvEntry kvEntry) { + if (kvEntry instanceof TsKvEntry tsKvEntry) { + return new LastRecordsArgumentEntry(List.of(tsKvEntry)); + } else { + return new KvArgumentEntry(kvEntry); + } + } + + static ArgumentEntry createArgumentEntry(List kvEntries) { + return new LastRecordsArgumentEntry(kvEntries); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java new file mode 100644 index 0000000000..bac318a1b9 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state; + +import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.service.cf.CalculatedFieldResult; + +import java.util.HashMap; +import java.util.Map; + +public abstract class BaseCalculatedFieldState implements CalculatedFieldState { + + protected Map arguments; + + public BaseCalculatedFieldState() { + } + + @Override + public void initState(Map argumentValues) { + if (arguments == null) { + arguments = new HashMap<>(); + } +// argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry())); + } + + protected CalculatedFieldResult buildResult(Output output, Map resultMap) { + CalculatedFieldResult result = new CalculatedFieldResult(); + result.setType(output.getType()); + result.setScope(output.getScope()); + result.setResultMap(resultMap); + return result; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java index 656763ea48..aaabaec0d4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java @@ -18,7 +18,9 @@ package org.thingsboard.server.service.cf.ctx.state; import lombok.Builder; import lombok.Data; import org.thingsboard.script.api.tbel.TbelInvokeService; +import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; +import org.thingsboard.server.common.data.cf.configuration.Output; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.KvEntry; @@ -29,7 +31,9 @@ import java.util.Map; public class CalculationContext { private TenantId tenantId; - private CalculatedFieldConfiguration configuration; + private Map arguments; + private Output output; + private String expression; private TbelInvokeService tbelInvokeService; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KvArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KvArgumentEntry.java new file mode 100644 index 0000000000..0bd21d452e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KvArgumentEntry.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state; + +import lombok.Data; +import org.thingsboard.server.common.data.kv.KvEntry; + +@Data +public class KvArgumentEntry implements ArgumentEntry { + + private final KvEntry kvEntry; + + @Override + public Object getValue() { + return kvEntry; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java new file mode 100644 index 0000000000..8729f022fa --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state; + +import lombok.Data; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.List; + +@Data +public class LastRecordsArgumentEntry implements ArgumentEntry { + + private final List kvEntries; + + @Override + public Object getValue() { + return kvEntries; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java index 2f26d71f91..0428d0823d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java @@ -15,9 +15,11 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import aj.org.objectweb.asm.TypeReference; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.Data; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; @@ -33,7 +35,7 @@ import java.util.Map; import java.util.stream.Collectors; @Data -public class LastRecordsCalculatedFieldState implements CalculatedFieldState { +public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState { private Map> arguments; @@ -53,21 +55,16 @@ public class LastRecordsCalculatedFieldState implements CalculatedFieldState { } argumentValues.forEach((key, argumentEntry) -> { List tsKvEntryList = arguments.computeIfAbsent(key, k -> new ArrayList<>()); - tsKvEntryList.addAll(argumentEntry.getKvEntries()); +// tsKvEntryList.addAll(argumentEntry.getKvEntries()); }); } @Override public ListenableFuture performCalculation(CalculationContext ctx) { - CalculatedFieldConfiguration configuration = ctx.getConfiguration(); - Map configArguments = configuration.getArguments(); - Output output = configuration.getOutput(); - Map resultMap = new HashMap<>(); - arguments.replaceAll((key, entries) -> { - int limit = configArguments.get(key).getLimit(); + int limit = ctx.getArguments().get(key).getLimit(); List limitedEntries = entries.stream() .sorted(Comparator.comparingLong(TsKvEntry::getTs).reversed()) .limit(limit) @@ -79,13 +76,7 @@ public class LastRecordsCalculatedFieldState implements CalculatedFieldState { return limitedEntries; }); - - CalculatedFieldResult calculatedFieldResult = new CalculatedFieldResult(); - calculatedFieldResult.setType(output.getType()); - calculatedFieldResult.setScope(output.getScope()); - calculatedFieldResult.setResultMap(resultMap); - - return Futures.immediateFuture(calculatedFieldResult); + return Futures.immediateFuture(buildResult(ctx.getOutput(), resultMap)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java index 0e9b00ad7d..047bfd0f8c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java @@ -35,16 +35,11 @@ import java.util.Map; @Data @Slf4j -public class ScriptCalculatedFieldState implements CalculatedFieldState { +public class ScriptCalculatedFieldState extends BaseCalculatedFieldState { @JsonIgnore private CalculatedFieldScriptEngine calculatedFieldScriptEngine; - private Map arguments; - - public ScriptCalculatedFieldState() { - } - @Override public CalculatedFieldType getType() { return CalculatedFieldType.SCRIPT; @@ -52,50 +47,35 @@ public class ScriptCalculatedFieldState implements CalculatedFieldState { @Override - public void initState(Map argumentValues) { - if (arguments == null) { - arguments = new HashMap<>(); - } - argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry())); - } + public ListenableFuture performCalculation(CalculationContext ctx) { + if (isValid(this.arguments, ctx.getArguments())) { + if (calculatedFieldScriptEngine == null) { + initEngine(ctx.getTenantId(), ctx.getExpression(), ctx.getTbelInvokeService()); + } + ListenableFuture resultFuture = calculatedFieldScriptEngine.executeScriptAsync(this.arguments); - @Override - public ListenableFuture performCalculation(CalculationContext ctx) { - CalculatedFieldConfiguration calculatedFieldConfiguration = ctx.getConfiguration(); - TbelInvokeService tbelInvokeService = ctx.getTbelInvokeService(); + return Futures.transform(resultFuture, result -> { + Map resultMap = result instanceof Map + ? JacksonUtil.convertValue(result, Map.class) + : new HashMap<>(); - if (tbelInvokeService == null) { - throw new IllegalArgumentException("TBEL script engine is disabled!"); + return buildResult(ctx.getOutput(), resultMap); + }, MoreExecutors.directExecutor()); } + return null; + } - if (calculatedFieldScriptEngine == null) { - initEngine(ctx.getTenantId(), calculatedFieldConfiguration, tbelInvokeService); + private void initEngine(TenantId tenantId, String expression, TbelInvokeService tbelInvokeService) { + if (tbelInvokeService == null) { + throw new IllegalArgumentException("TBEL script engine is disabled!"); } - ListenableFuture resultFuture = calculatedFieldScriptEngine.executeScriptAsync(arguments); - - return Futures.transform(resultFuture, result -> { - Output output = calculatedFieldConfiguration.getOutput(); - Map resultMap = result instanceof Map - ? JacksonUtil.convertValue(result, Map.class) - : new HashMap<>(); - - CalculatedFieldResult calculatedFieldResult = new CalculatedFieldResult(); - calculatedFieldResult.setType(output.getType()); - calculatedFieldResult.setScope(output.getScope()); - calculatedFieldResult.setResultMap(resultMap); - - return calculatedFieldResult; - }, MoreExecutors.directExecutor()); - } - - private void initEngine(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) { calculatedFieldScriptEngine = new CalculatedFieldTbelScriptEngine( tenantId, tbelInvokeService, - calculatedFieldConfiguration.getExpression(), - arguments.keySet().toArray(new String[0]) + expression, + this.arguments.keySet().toArray(new String[0]) ); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index d3d3b5f636..fbdd1eb354 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -31,36 +31,17 @@ import java.util.HashMap; import java.util.Map; @Data -public class SimpleCalculatedFieldState implements CalculatedFieldState { - - private Map arguments; - - public SimpleCalculatedFieldState() { - } +public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { @Override public CalculatedFieldType getType() { return CalculatedFieldType.SIMPLE; } - @Override - public void initState(Map argumentValues) { - if (arguments == null) { - arguments = new HashMap<>(); - } - argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry())); - } - @Override public ListenableFuture performCalculation(CalculationContext ctx) { - CalculatedFieldConfiguration calculatedFieldConfiguration = ctx.getConfiguration(); - - Output output = calculatedFieldConfiguration.getOutput(); - Map arguments = calculatedFieldConfiguration.getArguments(); - - if (isValid(this.arguments, arguments)) { - CalculatedFieldResult result = new CalculatedFieldResult(); - String expression = calculatedFieldConfiguration.getExpression(); + if (isValid(this.arguments, ctx.getArguments())) { + String expression = ctx.getExpression(); ThreadLocal customExpression = new ThreadLocal<>(); var expr = customExpression.get(); if (expr == null) { @@ -76,10 +57,8 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState { double expressionResult = expr.evaluate(); - result.setType(output.getType()); - result.setScope(output.getScope()); - result.setResultMap(Map.of(output.getName(), expressionResult)); - return Futures.immediateFuture(result); + Output output = ctx.getOutput(); + return Futures.immediateFuture(buildResult(output, Map.of(output.getName(), expressionResult))); } return null; }