|
|
|
@ -71,11 +71,11 @@ import org.thingsboard.server.dao.device.DeviceService; |
|
|
|
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|
|
|
import org.thingsboard.server.gen.transport.TransportProtos; |
|
|
|
import org.thingsboard.server.queue.util.TbCoreComponent; |
|
|
|
import org.thingsboard.server.service.cf.ctx.CalculatedFieldCtx; |
|
|
|
import org.thingsboard.server.service.cf.ctx.CalculatedFieldCtxId; |
|
|
|
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx; |
|
|
|
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.CalculationContext; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.LastRecordsCalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; |
|
|
|
@ -115,7 +115,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
|
|
|
|
private final ConcurrentMap<CalculatedFieldId, CalculatedField> calculatedFields = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<CalculatedFieldId, List<CalculatedFieldLink>> calculatedFieldLinks = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<CalculatedFieldCtxId, CalculatedFieldCtx> states = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<CalculatedFieldId, CalculatedFieldCtx> calculatedFieldsCtx = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<CalculatedFieldEntityCtxId, CalculatedFieldEntityCtx> states = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
private static final int MAX_LAST_RECORDS_VALUE = 1024; |
|
|
|
|
|
|
|
@ -188,22 +189,24 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
EntityId entityId = cf.getEntityId(); |
|
|
|
calculatedFields.put(calculatedFieldId, cf); |
|
|
|
calculatedFieldLinks.put(calculatedFieldId, links); |
|
|
|
CalculatedFieldCtx calculatedFieldCtx = new CalculatedFieldCtx(cf, tbelInvokeService); |
|
|
|
calculatedFieldsCtx.put(calculatedFieldId, calculatedFieldCtx); |
|
|
|
switch (entityId.getEntityType()) { |
|
|
|
case ASSET, DEVICE -> { |
|
|
|
log.info("Initializing state for entity: tenantId=[{}], entityId=[{}]", tenantId, entityId); |
|
|
|
initializeStateForEntity(cf, entityId, callback); |
|
|
|
initializeStateForEntity(calculatedFieldCtx, entityId, callback); |
|
|
|
} |
|
|
|
case ASSET_PROFILE -> { |
|
|
|
log.info("Initializing state for all assets in profile: tenantId=[{}], assetProfileId=[{}]", tenantId, entityId); |
|
|
|
PageDataIterable<AssetId> assetIds = new PageDataIterable<>(pageLink -> |
|
|
|
assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) entityId, pageLink), initFetchPackSize); |
|
|
|
assetIds.forEach(assetId -> initializeStateForEntity(cf, assetId, callback)); |
|
|
|
assetIds.forEach(assetId -> initializeStateForEntity(calculatedFieldCtx, assetId, callback)); |
|
|
|
} |
|
|
|
case DEVICE_PROFILE -> { |
|
|
|
log.info("Initializing state for all devices in profile: tenantId=[{}], deviceProfileId=[{}]", tenantId, entityId); |
|
|
|
PageDataIterable<DeviceId> deviceIds = new PageDataIterable<>(pageLink -> |
|
|
|
deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) entityId, pageLink), initFetchPackSize); |
|
|
|
deviceIds.forEach(deviceId -> initializeStateForEntity(cf, deviceId, callback)); |
|
|
|
deviceIds.forEach(deviceId -> initializeStateForEntity(calculatedFieldCtx, deviceId, callback)); |
|
|
|
} |
|
|
|
default -> |
|
|
|
throw new IllegalArgumentException("Entity type '" + calculatedFieldId.getEntityType() + "' does not support calculated fields."); |
|
|
|
@ -224,10 +227,13 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
@Override |
|
|
|
public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map<String, KvEntry> updatedTelemetry) { |
|
|
|
try { |
|
|
|
CalculatedField calculatedField = calculatedFields.computeIfAbsent(calculatedFieldId, id -> calculatedFieldService.findById(tenantId, id)); |
|
|
|
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> { |
|
|
|
CalculatedField calculatedField = calculatedFields.computeIfAbsent(id, cfId -> calculatedFieldService.findById(tenantId, id)); |
|
|
|
return new CalculatedFieldCtx(calculatedField, tbelInvokeService); |
|
|
|
}); |
|
|
|
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream() |
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createArgumentEntry(entry.getValue()))); |
|
|
|
updateOrInitializeState(calculatedField, calculatedField.getEntityId(), argumentValues); |
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue()))); |
|
|
|
updateOrInitializeState(calculatedFieldCtx, calculatedFieldCtx.getEntityId(), argumentValues); |
|
|
|
log.info("Successfully updated time series for calculatedFieldId: [{}]", calculatedFieldId); |
|
|
|
} catch (Exception e) { |
|
|
|
log.trace("Failed to update telemetry for calculatedFieldId: [{}]", calculatedFieldId, e); |
|
|
|
@ -235,7 +241,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onEntityTypeChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback) { |
|
|
|
public void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback) { |
|
|
|
try { |
|
|
|
TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); |
|
|
|
EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); |
|
|
|
@ -246,15 +252,15 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
|
|
|
|
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) |
|
|
|
.forEach(cfId -> { |
|
|
|
CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(cfId.getId(), entityId.getId()); |
|
|
|
CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); |
|
|
|
states.remove(ctxId); |
|
|
|
rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); |
|
|
|
}); |
|
|
|
|
|
|
|
calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, newProfileId) |
|
|
|
.stream() |
|
|
|
.map(cfId -> calculatedFields.computeIfAbsent(cfId, id -> calculatedFieldService.findById(tenantId, id))) |
|
|
|
.forEach(cf -> initializeStateForEntity(cf, entityId, callback)); |
|
|
|
.map(cfId -> calculatedFieldsCtx.computeIfAbsent(cfId, id -> new CalculatedFieldCtx(calculatedFieldService.findById(tenantId, id), tbelInvokeService))) |
|
|
|
.forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); |
|
|
|
} catch (Exception e) { |
|
|
|
log.trace("Failed to process entity type update msg: [{}]", proto, e); |
|
|
|
} |
|
|
|
@ -267,6 +273,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
onCalculatedFieldDelete(newCalculatedField.getId(), callback); |
|
|
|
} else { |
|
|
|
calculatedFields.put(newCalculatedField.getId(), newCalculatedField); |
|
|
|
calculatedFieldsCtx.put(newCalculatedField.getId(), new CalculatedFieldCtx(newCalculatedField, tbelInvokeService)); |
|
|
|
callback.onSuccess(); |
|
|
|
shouldReinit = false; |
|
|
|
} |
|
|
|
@ -277,6 +284,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
try { |
|
|
|
calculatedFieldLinks.remove(calculatedFieldId); |
|
|
|
calculatedFields.remove(calculatedFieldId); |
|
|
|
calculatedFieldsCtx.remove(calculatedFieldId); |
|
|
|
states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.cfId().equals(id.getId()))); |
|
|
|
List<String> statesToRemove = states.keySet().stream() |
|
|
|
.filter(ctxId -> !calculatedFields.containsKey(new CalculatedFieldId(ctxId.cfId()))) |
|
|
|
@ -309,20 +317,20 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
cfs.forEach(cf -> calculatedFields.putIfAbsent(cf.getId(), cf)); |
|
|
|
PageDataIterable<CalculatedFieldLink> cfls = new PageDataIterable<>(calculatedFieldService::findAllCalculatedFieldLinks, initFetchPackSize); |
|
|
|
cfls.forEach(link -> calculatedFieldLinks.computeIfAbsent(link.getCalculatedFieldId(), id -> new ArrayList<>()).add(link)); |
|
|
|
rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(JacksonUtil.fromString(ctxId, CalculatedFieldCtxId.class), JacksonUtil.fromString(ctx, CalculatedFieldCtx.class))); |
|
|
|
rocksDBService.getAll().forEach((ctxId, ctx) -> states.put(JacksonUtil.fromString(ctxId, CalculatedFieldEntityCtxId.class), JacksonUtil.fromString(ctx, CalculatedFieldEntityCtx.class))); |
|
|
|
states.keySet().removeIf(ctxId -> calculatedFields.keySet().stream().noneMatch(id -> ctxId.cfId().equals(id.getId()))); |
|
|
|
} |
|
|
|
|
|
|
|
private void initializeStateForEntity(CalculatedField calculatedField, EntityId entityId, TbCallback callback) { |
|
|
|
Map<String, Argument> arguments = calculatedField.getConfiguration().getArguments(); |
|
|
|
private void initializeStateForEntity(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, TbCallback callback) { |
|
|
|
Map<String, Argument> arguments = calculatedFieldCtx.getArguments(); |
|
|
|
Map<String, ArgumentEntry> argumentValues = new HashMap<>(); |
|
|
|
AtomicInteger remaining = new AtomicInteger(arguments.size()); |
|
|
|
arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedField, entityId, argument), new FutureCallback<>() { |
|
|
|
arguments.forEach((key, argument) -> Futures.addCallback(fetchArgumentValue(calculatedFieldCtx, entityId, argument), new FutureCallback<>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(ArgumentEntry result) { |
|
|
|
argumentValues.put(key, result); |
|
|
|
if (remaining.decrementAndGet() == 0) { |
|
|
|
updateOrInitializeState(calculatedField, entityId, argumentValues); |
|
|
|
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -334,28 +342,28 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
}, calculatedFieldCallbackExecutor)); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<ArgumentEntry> fetchArgumentValue(CalculatedField calculatedField, EntityId targetEntityId, Argument argument) { |
|
|
|
TenantId tenantId = calculatedField.getTenantId(); |
|
|
|
private ListenableFuture<ArgumentEntry> fetchArgumentValue(CalculatedFieldCtx calculatedFieldCtx, EntityId targetEntityId, Argument argument) { |
|
|
|
TenantId tenantId = calculatedFieldCtx.getTenantId(); |
|
|
|
EntityId argumentEntityId = argument.getEntityId(); |
|
|
|
EntityId entityId = EntityType.DEVICE_PROFILE.equals(argumentEntityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(argumentEntityId.getEntityType()) |
|
|
|
? targetEntityId |
|
|
|
: argumentEntityId; |
|
|
|
if (CalculatedFieldType.LAST_RECORDS.equals(calculatedField.getType())) { |
|
|
|
if (CalculatedFieldType.LAST_RECORDS.equals(calculatedFieldCtx.getCfType())) { |
|
|
|
return fetchLastRecords(tenantId, entityId, argument); |
|
|
|
} |
|
|
|
return fetchKvEntry(tenantId, entityId, argument); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<ArgumentEntry> fetchLastRecords(TenantId tenantId, EntityId entityId, Argument argument) { |
|
|
|
long startTs = Math.max(argument.getStartTs(), 0); |
|
|
|
long currentTime = System.currentTimeMillis(); |
|
|
|
long timeWindow = argument.getTimeWindow() == 0 ? System.currentTimeMillis() : argument.getTimeWindow(); |
|
|
|
long endTs = startTs + timeWindow; |
|
|
|
long startTs = currentTime - timeWindow; |
|
|
|
int limit = argument.getLimit() == 0 ? MAX_LAST_RECORDS_VALUE : argument.getLimit(); |
|
|
|
|
|
|
|
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, endTs, 0, limit, Aggregation.NONE); |
|
|
|
ReadTsKvQuery query = new BaseReadTsKvQuery(argument.getKey(), startTs, currentTime, 0, limit, Aggregation.NONE); |
|
|
|
ListenableFuture<List<TsKvEntry>> lastRecordsFuture = timeseriesService.findAll(tenantId, entityId, List.of(query)); |
|
|
|
|
|
|
|
return Futures.transform(lastRecordsFuture, ArgumentEntry::createArgumentEntry, calculatedFieldExecutor); |
|
|
|
return Futures.transform(lastRecordsFuture, ArgumentEntry::createLastRecordsArgument, calculatedFieldExecutor); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<ArgumentEntry> fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) { |
|
|
|
@ -374,7 +382,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
calculatedFieldExecutor); |
|
|
|
default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'."); |
|
|
|
}; |
|
|
|
return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createArgumentEntry(kvEntry.orElse(null)), calculatedFieldExecutor); |
|
|
|
return Futures.transform(kvEntryFuture, kvEntry -> ArgumentEntry.createSingleValueArgument(kvEntry.orElse(null)), calculatedFieldExecutor); |
|
|
|
} |
|
|
|
|
|
|
|
private KvEntry createDefaultKvEntry(Argument argument) { |
|
|
|
@ -389,32 +397,32 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
return new StringDataEntry(key, defaultValue); |
|
|
|
} |
|
|
|
|
|
|
|
private void updateOrInitializeState(CalculatedField calculatedField, EntityId entityId, Map<String, ArgumentEntry> argumentValues) { |
|
|
|
CalculatedFieldCtxId ctxId = new CalculatedFieldCtxId(calculatedField.getUuidId(), entityId.getId()); |
|
|
|
CalculatedFieldCtx calculatedFieldCtx = states.computeIfAbsent(ctxId, ctx -> new CalculatedFieldCtx(ctxId, null)); |
|
|
|
private void updateOrInitializeState(CalculatedFieldCtx calculatedFieldCtx, EntityId entityId, Map<String, ArgumentEntry> argumentValues) { |
|
|
|
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(calculatedFieldCtx.getCfId().getId(), entityId.getId()); |
|
|
|
CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctx -> new CalculatedFieldEntityCtx(entityCtxId, null)); |
|
|
|
|
|
|
|
CalculatedFieldState state = calculatedFieldCtx.getState(); |
|
|
|
CalculatedFieldState state = calculatedFieldEntityCtx.getState(); |
|
|
|
|
|
|
|
if (state == null) { |
|
|
|
state = createStateByType(calculatedField.getType()); |
|
|
|
state = createStateByType(calculatedFieldCtx.getCfType()); |
|
|
|
} |
|
|
|
state.initState(argumentValues); |
|
|
|
calculatedFieldCtx.setState(state); |
|
|
|
states.put(ctxId, calculatedFieldCtx); |
|
|
|
rocksDBService.put(JacksonUtil.writeValueAsString(ctxId), JacksonUtil.writeValueAsString(calculatedFieldCtx)); |
|
|
|
calculatedFieldEntityCtx.setState(state); |
|
|
|
states.put(entityCtxId, calculatedFieldEntityCtx); |
|
|
|
rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); |
|
|
|
|
|
|
|
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(buildCalculationContext(calculatedField)); |
|
|
|
ListenableFuture<CalculatedFieldResult> resultFuture = state.performCalculation(calculatedFieldCtx); |
|
|
|
Futures.addCallback(resultFuture, new FutureCallback<>() { |
|
|
|
@Override |
|
|
|
public void onSuccess(CalculatedFieldResult result) { |
|
|
|
if (result != null) { |
|
|
|
pushMsgToRuleEngine(calculatedField.getTenantId(), entityId, result); |
|
|
|
pushMsgToRuleEngine(calculatedFieldCtx.getTenantId(), entityId, result); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedField.getId(), entityId, t); |
|
|
|
log.warn("[{}] Failed to perform calculation. entityId: [{}]", calculatedFieldCtx.getCfId(), entityId, t); |
|
|
|
} |
|
|
|
}, MoreExecutors.directExecutor()); |
|
|
|
|
|
|
|
@ -433,17 +441,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private CalculationContext buildCalculationContext(CalculatedField calculatedField) { |
|
|
|
CalculatedFieldConfiguration configuration = calculatedField.getConfiguration(); |
|
|
|
return CalculationContext.builder() |
|
|
|
.tenantId(calculatedField.getTenantId()) |
|
|
|
.arguments(configuration.getArguments()) |
|
|
|
.output(configuration.getOutput()) |
|
|
|
.expression(configuration.getExpression()) |
|
|
|
.tbelInvokeService(tbelInvokeService) |
|
|
|
.build(); |
|
|
|
} |
|
|
|
|
|
|
|
private ObjectNode createJsonPayload(CalculatedFieldResult calculatedFieldResult) { |
|
|
|
ObjectNode payload = JacksonUtil.newObjectNode(); |
|
|
|
Map<String, Object> resultMap = calculatedFieldResult.getResultMap(); |
|
|
|
|