Browse Source

added argument entry interface

pull/12092/head
IrynaMatveieva 2 years ago
parent
commit
80d9b22068
  1. 66
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java
  2. 19
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java
  3. 48
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java
  4. 6
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java
  5. 31
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KvArgumentEntry.java
  6. 33
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java
  7. 21
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java
  8. 60
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java
  9. 31
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java

66
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java

@ -191,19 +191,19 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
switch (entityId.getEntityType()) {
case ASSET, DEVICE -> {
log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId);
initializeStateForEntity(tenantId, cf, entityId, callback);
initializeStateForEntity(cf, entityId, callback);
}
case ASSET_PROFILE -> {
log.info("Initializing state for all assets in profile: tenantId=[{}], assetProfileId=[{}]", tenantId, entityId);
PageDataIterable<AssetId> assetIds = new PageDataIterable<>(pageLink ->
assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize);
assetIds.forEach(assetId -> initializeStateForEntity(tenantId, cf, assetId, callback));
assetIds.forEach(assetId -> initializeStateForEntity(cf, assetId, callback));
}
case DEVICE_PROFILE -> {
log.info("Initializing state for all devices in profile: tenantId=[{}], deviceProfileId=[{}]", tenantId, entityId);
PageDataIterable<DeviceId> deviceIds = new PageDataIterable<>(pageLink ->
deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize);
deviceIds.forEach(deviceId -> initializeStateForEntity(tenantId, cf, deviceId, callback));
deviceIds.forEach(deviceId -> initializeStateForEntity(cf, deviceId, callback));
}
default ->
throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields.");
@ -226,17 +226,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
try {
CalculatedField calculatedField = calculatedFields.computeIfAbsent(calculatedFieldId, id -> calculatedFieldService.findById(tenantId, id));
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> {
ArgumentEntry argumentEntry = new ArgumentEntry();
argumentEntry.setKvEntry(entry.getValue());
if (entry.getValue() instanceof TsKvEntry) {
argumentEntry.setKvEntries(List.of((TsKvEntry) entry.getValue()));
}
return argumentEntry;
}
));
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createArgumentEntry(entry.getValue())));
updateOrInitializeState(calculatedField, calculatedField.getEntityId(), argumentValues);
log.info("Successfully updated time series for calculatedFieldId: [{}]", calculatedFieldId);
} catch (Exception e) {
@ -254,11 +244,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId);
List<CalculatedFieldId> cfIdsOfOldProfile = calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId);
cfIdsOfOldProfile.forEach(id -> states.remove(new CalculatedFieldCtxId(id.getId(), entityId.getId())));
List<String> ctxIdsToDelete = cfIdsOfOldProfile.stream().map(cfId -> JacksonUtil.writeValueAsString(new CalculatedFieldCtxId(cfId.getId(), entityId.getId()))).toList();
rocksDBService.deleteAll(ctxIdsToDelete);
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId)
.forEach(cfId -> {
CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(cfId.getId(), entityId.getId());
@ -269,7 +254,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, newProfileId)
.stream()
.map(cfId -> calculatedFields.computeIfAbsent(cfId, id -> calculatedFieldService.findById(tenantId, id)))
.forEach(cf -> initializeStateForEntity(tenantId, cf, entityId, callback));
.forEach(cf -> initializeStateForEntity(cf, entityId, callback));
} catch (Exception e) {
log.trace("Failed to process entity type update msg: [{}]", proto, e);
}
@ -328,11 +313,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.cfId().equals(id.getId())));
}
private void initializeStateForEntity(TenantId tenantId, CalculatedField calculatedField, EntityId entityId, TbCallback callback) {
private void initializeStateForEntity(CalculatedField calculatedField, EntityId entityId, TbCallback callback) {
Map<String, Argument> arguments = calculatedField.getConfiguration().getArguments();
Map<String, ArgumentEntry> argumentValues = new HashMap<>();
AtomicInteger remaining = new AtomicInteger(arguments.size());
arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedField, argument), new FutureCallback<>() {
arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedField, entityId, argument), new FutureCallback<>() {
@Override
public void onSuccess(ArgumentEntry result) {
argumentValues.put(key, result);
@ -349,12 +334,11 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}, calculatedFieldCallbackExecutor));
}
private ListenableFuture<ArgumentEntry> fetchArgumentValue(CalculatedField calculatedField, Argument argument) {
private ListenableFuture<ArgumentEntry> fetchArgumentValue(CalculatedField calculatedField, EntityId targetEntityId, Argument argument) {
TenantId tenantId = calculatedField.getTenantId();
EntityId cfEntityId = calculatedField.getEntityId();
EntityId argumentEntityId = argument.getEntityId();
EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType())
? cfEntityId
? targetEntityId
: argumentEntityId;
if (CalculatedFieldType.LAST_RECORDS.equals(calculatedField.getType())) {
return fetchLastRecords(tenantId, entityId, argument);
@ -371,11 +355,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, endTs, 0, limit, Aggregation.NONE);
ListenableFuture<List<TsKvEntry>> lastRecordsFuture = timeseriesService.findAll(tenantId, entityId, List.of(query));
return Futures.transform(lastRecordsFuture, lastRecords -> {
ArgumentEntry argumentEntry = new ArgumentEntry();
argumentEntry.setKvEntries(lastRecords);
return argumentEntry;
}, calculatedFieldExecutor);
return Futures.transform(lastRecordsFuture, ArgumentEntry::createArgumentEntry, calculatedFieldExecutor);
}
private ListenableFuture<ArgumentEntry> fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) {
@ -394,13 +374,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
calculatedFieldExecutor);
default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'.");
};
return Futures.transform(kvEntryFuture, kvEntry -> {
ArgumentEntry argumentEntry = new ArgumentEntry();
if (kvEntry.isPresent()) {
argumentEntry.setKvEntry(kvEntry.orElse(null));
}
return argumentEntry;
}, calculatedFieldExecutor);
return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createArgumentEntry(kvEntry.orElse(null)), calculatedFieldExecutor);
}
private KvEntry createDefaultKvEntry(Argument argument) {
@ -429,12 +403,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
states.put(ctxId, calculatedFieldCtx);
rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(calculatedFieldCtx));
CalculationContext ctx = CalculationContext.builder()
.tenantId(calculatedField.getTenantId())
.configuration(calculatedField.getConfiguration())
.tbelInvokeService(tbelInvokeService)
.build();
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(ctx);
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(buildCalculationContext(calculatedField));
Futures.addCallback(resultFuture, new FutureCallback<>() {
@Override
public void onSuccess(CalculatedFieldResult result) {
@ -464,6 +433,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private CalculationContext buildCalculationContext(CalculatedField calculatedField) {
CalculatedFieldConfiguration configuration = calculatedField.getConfiguration();
return CalculationContext.builder()
.tenantId(calculatedField.getTenantId())
.arguments(configuration.getArguments())
.output(configuration.getOutput())
.expression(configuration.getExpression())
.tbelInvokeService(tbelInvokeService)
.build();
}
private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) {
ObjectNode payload = JacksonUtil.newObjectNode();
Map<String, Object> resultMap = calculatedFieldResult.getResultMap();

19
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java

@ -15,16 +15,25 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import lombok.Data;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.List;
@Data
public class ArgumentEntry {
public interface ArgumentEntry {
private KvEntry kvEntry;
private List<TsKvEntry> kvEntries;
Object getValue();
static ArgumentEntry createArgumentEntry(KvEntry kvEntry) {
if (kvEntry instanceof TsKvEntry tsKvEntry) {
return new LastRecordsArgumentEntry(List.of(tsKvEntry));
} else {
return new KvArgumentEntry(kvEntry);
}
}
static ArgumentEntry createArgumentEntry(List<TsKvEntry> kvEntries) {
return new LastRecordsArgumentEntry(kvEntries);
}
}

48
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import java.util.HashMap;
import java.util.Map;
public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
protected Map<String, KvEntry> arguments;
public BaseCalculatedFieldState() {
}
@Override
public void initState(Map<String, ArgumentEntry> argumentValues) {
if (arguments == null) {
arguments = new HashMap<>();
}
// argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry()));
}
protected CalculatedFieldResult buildResult(Output output, Map<String, Object> resultMap) {
CalculatedFieldResult result = new CalculatedFieldResult();
result.setType(output.getType());
result.setScope(output.getScope());
result.setResultMap(resultMap);
return result;
}
}

6
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculationContext.java

@ -18,7 +18,9 @@ package org.thingsboard.server.service.cf.ctx.state;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.KvEntry;
@ -29,7 +31,9 @@ import java.util.Map;
public class CalculationContext {
private TenantId tenantId;
private CalculatedFieldConfiguration configuration;
private Map<String, Argument> arguments;
private Output output;
private String expression;
private TbelInvokeService tbelInvokeService;
}

31
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/KvArgumentEntry.java

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state;
import lombok.Data;
import org.thingsboard.server.common.data.kv.KvEntry;
@Data
public class KvArgumentEntry implements ArgumentEntry {
private final KvEntry kvEntry;
@Override
public Object getValue() {
return kvEntry;
}
}

33
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsArgumentEntry.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state;
import lombok.Data;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.List;
@Data
public class LastRecordsArgumentEntry implements ArgumentEntry {
private final List<TsKvEntry> kvEntries;
@Override
public Object getValue() {
return kvEntries;
}
}

21
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/LastRecordsCalculatedFieldState.java

@ -15,9 +15,11 @@
*/
package org.thingsboard.server.service.cf.ctx.state;
import aj.org.objectweb.asm.TypeReference;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.Data;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
@ -33,7 +35,7 @@ import java.util.Map;
import java.util.stream.Collectors;
@Data
public class LastRecordsCalculatedFieldState implements CalculatedFieldState {
public class LastRecordsCalculatedFieldState extends BaseCalculatedFieldState {
private Map<String, List<TsKvEntry>> arguments;
@ -53,21 +55,16 @@ public class LastRecordsCalculatedFieldState implements CalculatedFieldState {
}
argumentValues.forEach((key, argumentEntry) -> {
List<TsKvEntry> tsKvEntryList = arguments.computeIfAbsent(key, k -> new ArrayList<>());
tsKvEntryList.addAll(argumentEntry.getKvEntries());
// tsKvEntryList.addAll(argumentEntry.getKvEntries());
});
}
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculationContext ctx) {
CalculatedFieldConfiguration configuration = ctx.getConfiguration();
Map<String, Argument> configArguments = configuration.getArguments();
Output output = configuration.getOutput();
Map<String, Object> resultMap = new HashMap<>();
arguments.replaceAll((key, entries) -> {
int limit = configArguments.get(key).getLimit();
int limit = ctx.getArguments().get(key).getLimit();
List<TsKvEntry> limitedEntries = entries.stream()
.sorted(Comparator.comparingLong(TsKvEntry::getTs).reversed())
.limit(limit)
@ -79,13 +76,7 @@ public class LastRecordsCalculatedFieldState implements CalculatedFieldState {
return limitedEntries;
});
CalculatedFieldResult calculatedFieldResult = new CalculatedFieldResult();
calculatedFieldResult.setType(output.getType());
calculatedFieldResult.setScope(output.getScope());
calculatedFieldResult.setResultMap(resultMap);
return Futures.immediateFuture(calculatedFieldResult);
return Futures.immediateFuture(buildResult(ctx.getOutput(), resultMap));
}
}

60
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ScriptCalculatedFieldState.java

@ -35,16 +35,11 @@ import java.util.Map;
@Data
@Slf4j
public class ScriptCalculatedFieldState implements CalculatedFieldState {
public class ScriptCalculatedFieldState extends BaseCalculatedFieldState {
@JsonIgnore
private CalculatedFieldScriptEngine calculatedFieldScriptEngine;
private Map<String, KvEntry> arguments;
public ScriptCalculatedFieldState() {
}
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.SCRIPT;
@ -52,50 +47,35 @@ public class ScriptCalculatedFieldState implements CalculatedFieldState {
@Override
public void initState(Map<String, ArgumentEntry> argumentValues) {
if (arguments == null) {
arguments = new HashMap<>();
}
argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry()));
}
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculationContext ctx) {
if (isValid(this.arguments, ctx.getArguments())) {
if (calculatedFieldScriptEngine == null) {
initEngine(ctx.getTenantId(), ctx.getExpression(), ctx.getTbelInvokeService());
}
ListenableFuture<Object> resultFuture = calculatedFieldScriptEngine.executeScriptAsync(this.arguments);
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculationContext ctx) {
CalculatedFieldConfiguration calculatedFieldConfiguration = ctx.getConfiguration();
TbelInvokeService tbelInvokeService = ctx.getTbelInvokeService();
return Futures.transform(resultFuture, result -> {
Map<String, Object> resultMap = result instanceof Map<?, ?>
? JacksonUtil.convertValue(result, Map.class)
: new HashMap<>();
if (tbelInvokeService == null) {
throw new IllegalArgumentException("TBEL script engine is disabled!");
return buildResult(ctx.getOutput(), resultMap);
}, MoreExecutors.directExecutor());
}
return null;
}
if (calculatedFieldScriptEngine == null) {
initEngine(ctx.getTenantId(), calculatedFieldConfiguration, tbelInvokeService);
private void initEngine(TenantId tenantId, String expression, TbelInvokeService tbelInvokeService) {
if (tbelInvokeService == null) {
throw new IllegalArgumentException("TBEL script engine is disabled!");
}
ListenableFuture<Object> resultFuture = calculatedFieldScriptEngine.executeScriptAsync(arguments);
return Futures.transform(resultFuture, result -> {
Output output = calculatedFieldConfiguration.getOutput();
Map<String, Object> resultMap = result instanceof Map<?, ?>
? JacksonUtil.convertValue(result, Map.class)
: new HashMap<>();
CalculatedFieldResult calculatedFieldResult = new CalculatedFieldResult();
calculatedFieldResult.setType(output.getType());
calculatedFieldResult.setScope(output.getScope());
calculatedFieldResult.setResultMap(resultMap);
return calculatedFieldResult;
}, MoreExecutors.directExecutor());
}
private void initEngine(TenantId tenantId, CalculatedFieldConfiguration calculatedFieldConfiguration, TbelInvokeService tbelInvokeService) {
calculatedFieldScriptEngine = new CalculatedFieldTbelScriptEngine(
tenantId,
tbelInvokeService,
calculatedFieldConfiguration.getExpression(),
arguments.keySet().toArray(new String[0])
expression,
this.arguments.keySet().toArray(new String[0])
);
}

31
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java

@ -31,36 +31,17 @@ import java.util.HashMap;
import java.util.Map;
@Data
public class SimpleCalculatedFieldState implements CalculatedFieldState {
private Map<String, KvEntry> arguments;
public SimpleCalculatedFieldState() {
}
public class SimpleCalculatedFieldState extends BaseCalculatedFieldState {
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.SIMPLE;
}
@Override
public void initState(Map<String, ArgumentEntry> argumentValues) {
if (arguments == null) {
arguments = new HashMap<>();
}
argumentValues.forEach((key, value) -> arguments.put(key, value.getKvEntry()));
}
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(CalculationContext ctx) {
CalculatedFieldConfiguration calculatedFieldConfiguration = ctx.getConfiguration();
Output output = calculatedFieldConfiguration.getOutput();
Map<String, Argument> arguments = calculatedFieldConfiguration.getArguments();
if (isValid(this.arguments, arguments)) {
CalculatedFieldResult result = new CalculatedFieldResult();
String expression = calculatedFieldConfiguration.getExpression();
if (isValid(this.arguments, ctx.getArguments())) {
String expression = ctx.getExpression();
ThreadLocal<Expression> customExpression = new ThreadLocal<>();
var expr = customExpression.get();
if (expr == null) {
@ -76,10 +57,8 @@ public class SimpleCalculatedFieldState implements CalculatedFieldState {
double expressionResult = expr.evaluate();
result.setType(output.getType());
result.setScope(output.getScope());
result.setResultMap(Map.of(output.getName(), expressionResult));
return Futures.immediateFuture(result);
Output output = ctx.getOutput();
return Futures.immediateFuture(buildResult(output, Map.of(output.getName(), expressionResult)));
}
return null;
}

Loading…
Cancel
Save