|
|
|
@ -35,12 +35,15 @@ import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.common.util.ThingsBoardExecutors; |
|
|
|
import org.thingsboard.script.api.tbel.TbelInvokeService; |
|
|
|
import org.thingsboard.server.cluster.TbClusterService; |
|
|
|
import org.thingsboard.server.common.data.AttributeScope; |
|
|
|
import org.thingsboard.server.common.data.EntityType; |
|
|
|
import org.thingsboard.server.common.data.cf.CalculatedField; |
|
|
|
import org.thingsboard.server.common.data.cf.CalculatedFieldLink; |
|
|
|
import org.thingsboard.server.common.data.cf.CalculatedFieldType; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.Argument; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.ArgumentType; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; |
|
|
|
import org.thingsboard.server.common.data.cf.configuration.OutputType; |
|
|
|
import org.thingsboard.server.common.data.id.AssetId; |
|
|
|
import org.thingsboard.server.common.data.id.CalculatedFieldId; |
|
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
|
@ -48,7 +51,6 @@ import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.data.kv.Aggregation; |
|
|
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; |
|
|
|
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|
|
|
@ -79,11 +81,13 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; |
|
|
|
import org.thingsboard.server.service.cf.telemetry.CalculatedFieldTelemetryUpdateRequest; |
|
|
|
import org.thingsboard.server.service.partition.AbstractPartitionBasedService; |
|
|
|
import org.thingsboard.server.service.profile.TbAssetProfileCache; |
|
|
|
import org.thingsboard.server.service.profile.TbDeviceProfileCache; |
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.EnumSet; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
@ -92,7 +96,6 @@ import java.util.Set; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
import java.util.concurrent.locks.ReentrantLock; |
|
|
|
import java.util.function.Consumer; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
import java.util.stream.Stream; |
|
|
|
@ -120,12 +123,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
private ListeningExecutorService calculatedFieldExecutor; |
|
|
|
private ListeningExecutorService calculatedFieldCallbackExecutor; |
|
|
|
|
|
|
|
private final ConcurrentMap<EntityId, ReentrantLock> entityLocks = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
private final ConcurrentMap<CalculatedFieldEntityCtxId, CalculatedFieldEntityCtx> states = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
private static final int MAX_LAST_RECORDS_VALUE = 1024; |
|
|
|
|
|
|
|
private static final Set<EntityType> supportedReferencedEntities = EnumSet.of( |
|
|
|
EntityType.DEVICE, EntityType.ASSET, EntityType.CUSTOMER, EntityType.TENANT |
|
|
|
); |
|
|
|
|
|
|
|
@Value("${calculatedField.initFetchPackSize:50000}") |
|
|
|
@Getter |
|
|
|
private int initFetchPackSize; |
|
|
|
@ -336,28 +341,29 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, List<? extends KvEntry> telemetry) { |
|
|
|
public void onTelemetryUpdate(CalculatedFieldTelemetryUpdateRequest calculatedFieldTelemetryUpdateRequest) { |
|
|
|
try { |
|
|
|
EntityType entityType = entityId.getEntityType(); |
|
|
|
if (EntityType.DEVICE.equals(entityType) || EntityType.ASSET.equals(entityType) || EntityType.CUSTOMER.equals(entityType) || EntityType.TENANT.equals(entityType)) { |
|
|
|
EntityId profileId = null; |
|
|
|
if (EntityType.ASSET.equals(entityType)) { |
|
|
|
profileId = assetProfileCache.get(tenantId, (AssetId) entityId).getId(); |
|
|
|
} else if (EntityType.DEVICE.equals(entityType)) { |
|
|
|
profileId = deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); |
|
|
|
} |
|
|
|
List<CalculatedFieldLink> cfLinks = new ArrayList<>(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId)); |
|
|
|
Optional.ofNullable(profileId).ifPresent(id -> { |
|
|
|
cfLinks.addAll(calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, id)); |
|
|
|
}); |
|
|
|
TenantId tenantId = calculatedFieldTelemetryUpdateRequest.getTenantId(); |
|
|
|
EntityId entityId = calculatedFieldTelemetryUpdateRequest.getEntityId(); |
|
|
|
AttributeScope scope = calculatedFieldTelemetryUpdateRequest.getScope(); |
|
|
|
List<? extends KvEntry> telemetry = calculatedFieldTelemetryUpdateRequest.getKvEntries(); |
|
|
|
List<CalculatedFieldId> calculatedFieldIds = calculatedFieldTelemetryUpdateRequest.getCalculatedFieldIds(); |
|
|
|
|
|
|
|
if (supportedReferencedEntities.contains(entityId.getEntityType())) { |
|
|
|
EntityId profileId = getProfileId(tenantId, entityId); |
|
|
|
|
|
|
|
List<CalculatedFieldLink> cfLinks = Stream.concat( |
|
|
|
calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, entityId).stream(), |
|
|
|
profileId != null ? calculatedFieldCache.getCalculatedFieldLinksByEntityId(tenantId, profileId).stream() : Stream.empty() |
|
|
|
).toList(); |
|
|
|
|
|
|
|
cfLinks.forEach(link -> { |
|
|
|
CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId(); |
|
|
|
Map<String, String> attributes = link.getConfiguration().getAttributes(); |
|
|
|
Map<String, String> timeSeries = link.getConfiguration().getTimeSeries(); |
|
|
|
Map<String, String> telemetryKeys = getTelemetryKeysFromLink(link, scope); |
|
|
|
Map<String, KvEntry> updatedTelemetry = telemetry.stream() |
|
|
|
.filter(entry -> attributes.containsValue(entry.getKey()) || timeSeries.containsValue(entry.getKey())) |
|
|
|
.filter(entry -> telemetryKeys.containsValue(entry.getKey())) |
|
|
|
.collect(Collectors.toMap( |
|
|
|
entry -> getMappedKey(entry, attributes, timeSeries), |
|
|
|
entry -> getMappedKey(entry, telemetryKeys), |
|
|
|
entry -> entry, |
|
|
|
(v1, v2) -> v1 |
|
|
|
)); |
|
|
|
@ -368,25 +374,24 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
}); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
log.trace("Failed to update telemetry entityId: [{}]", entityId, e); |
|
|
|
log.trace("Failed to update telemetry.", e); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private String getMappedKey(KvEntry entry, Map<String, String> attributes, Map<String, String> timeSeries) { |
|
|
|
if (entry instanceof AttributeKvEntry) { |
|
|
|
return attributes.entrySet().stream() |
|
|
|
.filter(attr -> attr.getValue().equals(entry.getKey())) |
|
|
|
.map(Map.Entry::getKey) |
|
|
|
.findFirst() |
|
|
|
.orElse(entry.getKey()); |
|
|
|
} else if (entry instanceof TsKvEntry) { |
|
|
|
return timeSeries.entrySet().stream() |
|
|
|
.filter(ts -> ts.getValue().equals(entry.getKey())) |
|
|
|
.map(Map.Entry::getKey) |
|
|
|
.findFirst() |
|
|
|
.orElse(entry.getKey()); |
|
|
|
} |
|
|
|
return entry.getKey(); |
|
|
|
private Map<String, String> getTelemetryKeysFromLink(CalculatedFieldLink link, AttributeScope scope) { |
|
|
|
return scope == null ? link.getConfiguration().getTimeSeries() : switch (scope) { |
|
|
|
case CLIENT_SCOPE -> link.getConfiguration().getClientAttributes(); |
|
|
|
case SERVER_SCOPE -> link.getConfiguration().getServerAttributes(); |
|
|
|
case SHARED_SCOPE -> link.getConfiguration().getSharedAttributes(); |
|
|
|
}; |
|
|
|
} |
|
|
|
|
|
|
|
private String getMappedKey(KvEntry entry, Map<String, String> telemetry) { |
|
|
|
return telemetry.entrySet().stream() |
|
|
|
.filter(kvEntry -> kvEntry.getValue().equals(entry.getKey())) |
|
|
|
.map(Map.Entry::getKey) |
|
|
|
.findFirst() |
|
|
|
.orElse(entry.getKey()); |
|
|
|
} |
|
|
|
|
|
|
|
private void executeTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, List<CalculatedFieldId> calculatedFieldIds, Map<String, KvEntry> updatedTelemetry) { |
|
|
|
@ -539,17 +544,14 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
CalculatedFieldId cfId = calculatedFieldCtx.getCfId(); |
|
|
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, cfId); |
|
|
|
if (tpi.isMyPartition()) { |
|
|
|
ReentrantLock lock = entityLocks.computeIfAbsent(entityId, id -> new ReentrantLock()); |
|
|
|
lock.lock(); |
|
|
|
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); |
|
|
|
|
|
|
|
try { |
|
|
|
CalculatedFieldEntityCtxId entityCtxId = new CalculatedFieldEntityCtxId(cfId.getId(), entityId.getId()); |
|
|
|
CalculatedFieldEntityCtx calculatedFieldEntityCtx = states.computeIfAbsent(entityCtxId, ctxId -> fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType())); |
|
|
|
states.compute(entityCtxId, (ctxId, ctx) -> { |
|
|
|
CalculatedFieldEntityCtx calculatedFieldEntityCtx = ctx != null ? ctx : fetchCalculatedFieldEntityState(ctxId, calculatedFieldCtx.getCfType()); |
|
|
|
|
|
|
|
Consumer<CalculatedFieldState> performUpdateState = (state) -> { |
|
|
|
if (state.updateState(argumentValues)) { |
|
|
|
calculatedFieldEntityCtx.setState(state); |
|
|
|
states.put(entityCtxId, calculatedFieldEntityCtx); |
|
|
|
rocksDBService.put(JacksonUtil.writeValueAsString(entityCtxId), JacksonUtil.writeValueAsString(calculatedFieldEntityCtx)); |
|
|
|
Map<String, ArgumentEntry> arguments = state.getArguments(); |
|
|
|
boolean allArgsPresent = arguments.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()) && |
|
|
|
@ -564,12 +566,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
|
|
|
|
boolean allKeysPresent = argumentValues.keySet().containsAll(calculatedFieldCtx.getArguments().keySet()); |
|
|
|
boolean requiresTsRollingUpdate = calculatedFieldCtx.getArguments().values().stream() |
|
|
|
.anyMatch(argument -> "TS_ROLLING".equals(argument.getType()) && state.getArguments().get(argument.getKey()) == null); |
|
|
|
.anyMatch(argument -> ArgumentType.TS_ROLLING.equals(argument.getType()) && state.getArguments().get(argument.getKey()) == null); |
|
|
|
|
|
|
|
if (!allKeysPresent || requiresTsRollingUpdate) { |
|
|
|
|
|
|
|
Map<String, Argument> missingArguments = calculatedFieldCtx.getArguments().entrySet().stream() |
|
|
|
.filter(entry -> !argumentValues.containsKey(entry.getKey()) || ("TS_ROLLING".equals(entry.getValue().getType()) && state.getArguments().get(entry.getKey()) == null)) |
|
|
|
.filter(entry -> !argumentValues.containsKey(entry.getKey()) || (ArgumentType.TS_ROLLING.equals(entry.getValue().getType()) && state.getArguments().get(entry.getKey()) == null)) |
|
|
|
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
|
|
|
|
|
|
|
fetchArguments(calculatedFieldCtx.getTenantId(), entityId, missingArguments, argumentValues::putAll) |
|
|
|
@ -578,9 +580,8 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
} else { |
|
|
|
performUpdateState.accept(state); |
|
|
|
} |
|
|
|
} finally { |
|
|
|
lock.unlock(); |
|
|
|
} |
|
|
|
return calculatedFieldEntityCtx; |
|
|
|
}); |
|
|
|
} else { |
|
|
|
sendUpdateCalculatedFieldStateMsg(tenantId, cfId, entityId, calculatedFieldIds, argumentValues); |
|
|
|
} |
|
|
|
@ -605,9 +606,9 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
|
|
|
|
private void pushMsgToRuleEngine(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId originatorId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> calculatedFieldIds) { |
|
|
|
try { |
|
|
|
String type = calculatedFieldResult.getType(); |
|
|
|
TbMsgType msgType = "ATTRIBUTES".equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; |
|
|
|
TbMsgMetaData md = "ATTRIBUTES".equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; |
|
|
|
OutputType type = calculatedFieldResult.getType(); |
|
|
|
TbMsgType msgType = OutputType.ATTRIBUTES.equals(type) ? TbMsgType.POST_ATTRIBUTES_REQUEST : TbMsgType.POST_TELEMETRY_REQUEST; |
|
|
|
TbMsgMetaData md = OutputType.ATTRIBUTES.equals(type) ? new TbMsgMetaData(Map.of(SCOPE, calculatedFieldResult.getScope().name())) : TbMsgMetaData.EMPTY; |
|
|
|
ObjectNode payload = createJsonPayload(calculatedFieldResult); |
|
|
|
if (calculatedFieldIds == null) { |
|
|
|
calculatedFieldIds = new ArrayList<>(); |
|
|
|
@ -649,19 +650,18 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
|
|
|
|
private ListenableFuture<ArgumentEntry> fetchKvEntry(TenantId tenantId, EntityId entityId, Argument argument) { |
|
|
|
return switch (argument.getType()) { |
|
|
|
case "TS_ROLLING" -> fetchTsRolling(tenantId, entityId, argument); |
|
|
|
case "ATTRIBUTE" -> transformSingleValueArgument( |
|
|
|
case TS_ROLLING -> fetchTsRolling(tenantId, entityId, argument); |
|
|
|
case ATTRIBUTE -> transformSingleValueArgument( |
|
|
|
Futures.transform( |
|
|
|
attributesService.find(tenantId, entityId, argument.getScope(), argument.getKey()), |
|
|
|
result -> result.or(() -> Optional.of(new BaseAttributeKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))), |
|
|
|
result -> result.or(() -> Optional.of(new BaseAttributeKvEntry(createDefaultKvEntry(argument), System.currentTimeMillis(), 0L))), |
|
|
|
calculatedFieldCallbackExecutor) |
|
|
|
); |
|
|
|
case "TS_LATEST" -> transformSingleValueArgument( |
|
|
|
case TS_LATEST -> transformSingleValueArgument( |
|
|
|
Futures.transform( |
|
|
|
timeseriesService.findLatest(tenantId, entityId, argument.getKey()), |
|
|
|
result -> result.or(() -> Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument)))), |
|
|
|
result -> result.or(() -> Optional.of(new BasicTsKvEntry(System.currentTimeMillis(), createDefaultKvEntry(argument), 0L))), |
|
|
|
calculatedFieldCallbackExecutor)); |
|
|
|
default -> throw new IllegalArgumentException("Invalid argument type '" + argument.getType() + "'."); |
|
|
|
}; |
|
|
|
} |
|
|
|
|
|
|
|
@ -801,4 +801,12 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas |
|
|
|
return EntityType.DEVICE_PROFILE.equals(entityId.getEntityType()) || EntityType.ASSET_PROFILE.equals(entityId.getEntityType()); |
|
|
|
} |
|
|
|
|
|
|
|
private EntityId getProfileId(TenantId tenantId, EntityId entityId) { |
|
|
|
return switch (entityId.getEntityType()) { |
|
|
|
case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId(); |
|
|
|
case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId(); |
|
|
|
default -> null; |
|
|
|
}; |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|