Browse Source

improved implementation of telemetry update

pull/12092/head
IrynaMatveieva 2 years ago
parent
commit
19234df2b8
  1. 60
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java
  2. 11
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

60
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java

@ -177,7 +177,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
onCalculatedFieldDelete(calculatedFieldId, callback);
callback.onSuccess();
}
CalculatedField cf = getOfFetchFromDb(tenantId, calculatedFieldId);
CalculatedField cf = getOrFetchFromDb(tenantId, calculatedFieldId);
if (proto.getUpdated()) {
log.info("Executing onCalculatedFieldUpdate, calculatedFieldId=[{}]", calculatedFieldId);
boolean shouldReinit = onCalculatedFieldUpdate(cf, callback);
@ -226,14 +226,32 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
public void onTelemetryUpdate(TenantId tenantId, EntityId entityId, CalculatedFieldId calculatedFieldId, Map<String, KvEntry> updatedTelemetry) {
try {
log.info("Received telemetry update msg: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId);
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> {
CalculatedField calculatedField = getOfFetchFromDb(tenantId, id);
return new CalculatedFieldCtx(calculatedField, tbelInvokeService);
});
CalculatedField calculatedField = getOrFetchFromDb(tenantId, calculatedFieldId);
CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> new CalculatedFieldCtx(calculatedField, tbelInvokeService));
Map<String, ArgumentEntry> argumentValues = updatedTelemetry.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> ArgumentEntry.createSingleValueArgument(entry.getValue())));
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues);
log.info("Successfully updated time series for calculatedFieldId: [{}]", calculatedFieldId);
.collect(Collectors.toMap(Map.Entry::getKey, entry -> {
if (EntityType.TENANT.equals(entityId.getEntityType()) || EntityType.CUSTOMER.equals(entityId.getEntityType())) {
updateStorage(tenantId, entityId, Optional.of(entry.getValue()));
}
return ArgumentEntry.createSingleValueArgument(entry.getValue());
}));
EntityId cfEntityId = calculatedField.getEntityId();
switch (cfEntityId.getEntityType()) {
case ASSET_PROFILE, DEVICE_PROFILE -> {
boolean isCommonEntity = calculatedField.getConfiguration().getReferencedEntities().contains(entityId);
if (isCommonEntity) {
PageDataIterable<? extends EntityId> entities = cfEntityId.getEntityType() == EntityType.ASSET_PROFILE
? new PageDataIterable<>(pageLink -> assetService.findAssetIdsByTenantIdAndAssetProfileId(tenantId, (AssetProfileId) cfEntityId, pageLink), initFetchPackSize)
: new PageDataIterable<>(pageLink -> deviceService.findDeviceIdsByTenantIdAndDeviceProfileId(tenantId, (DeviceProfileId) cfEntityId, pageLink), initFetchPackSize);
entities.forEach(id -> updateOrInitializeState(calculatedFieldCtx, id, argumentValues));
} else {
updateOrInitializeState(calculatedFieldCtx, entityId, argumentValues);
}
}
default -> updateOrInitializeState(calculatedFieldCtx, cfEntityId, argumentValues);
}
log.info("Successfully updated telemetry for calculatedFieldId: [{}]", calculatedFieldId);
} catch (Exception e) {
log.trace("Failed to update telemetry for calculatedFieldId: [{}]", calculatedFieldId, e);
}
@ -299,7 +317,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private boolean onCalculatedFieldUpdate(CalculatedField updatedCalculatedField, TbCallback callback) {
CalculatedField oldCalculatedField = getOfFetchFromDb(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId());
CalculatedField oldCalculatedField = getOrFetchFromDb(updatedCalculatedField.getTenantId(), updatedCalculatedField.getId());
boolean shouldReinit = true;
if (hasSignificantChanges(oldCalculatedField, updatedCalculatedField)) {
onCalculatedFieldDelete(updatedCalculatedField.getId(), callback);
@ -328,7 +346,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
}
private CalculatedField getOfFetchFromDb(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
private CalculatedField getOrFetchFromDb(TenantId tenantId, CalculatedFieldId calculatedFieldId) {
return calculatedFields.computeIfAbsent(calculatedFieldId, cfId -> calculatedFieldService.findById(tenantId, calculatedFieldId));
}
@ -435,17 +453,17 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas
}
private void updateStorage(TenantId tenantId, EntityId entityId, Optional<? extends KvEntry> kvEntry) {
if (kvEntry.isEmpty()) {
return;
}
List<KvEntry> kvEntries = switch (entityId.getEntityType()) {
case TENANT -> tenantStorage.computeIfAbsent(tenantId, id -> new ArrayList<>());
case CUSTOMER -> customerStorage.computeIfAbsent((CustomerId) entityId, id -> new ArrayList<>());
default -> null;
};
if (kvEntries != null && !kvEntries.contains(kvEntry.get())) {
kvEntries.add(kvEntry.get());
}
kvEntry.ifPresent(entry -> {
List<KvEntry> kvEntries = switch (entityId.getEntityType()) {
case TENANT -> tenantStorage.computeIfAbsent(tenantId, id -> new ArrayList<>());
case CUSTOMER -> customerStorage.computeIfAbsent((CustomerId) entityId, id -> new ArrayList<>());
default -> null;
};
if (kvEntries != null) {
kvEntries.removeIf(existingEntry -> existingEntry.getKey().equals(entry.getKey()));
kvEntries.add(entry);
}
});
}
private KvEntry createDefaultKvEntry(Argument argument) {

11
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -212,16 +212,15 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
private void updateTelemetryInCalculatedFields(TenantId tenantId, EntityId entityId, List<? extends KvEntry> telemetry) {
EntityType entityType = entityId.getEntityType();
if (EntityType.DEVICE.equals(entityType) || EntityType.ASSET.equals(entityType)) {
EntityId profileId;
if (EntityType.DEVICE.equals(entityType) || EntityType.ASSET.equals(entityType) || EntityType.CUSTOMER.equals(entityType) || EntityType.TENANT.equals(entityType)) {
EntityId profileId = null;
if (EntityType.ASSET.equals(entityType)) {
profileId = assetProfileCache.get(tenantId, (AssetId) entityId).getId();
} else {
} else if (EntityType.DEVICE.equals(entityType)) {
profileId = deviceProfileCache.get(tenantId, (DeviceId) entityId).getId();
}
List<CalculatedFieldLink> cfLinks = new ArrayList<>();
cfLinks.addAll(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId));
cfLinks.addAll(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, profileId));
List<CalculatedFieldLink> cfLinks = new ArrayList<>(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, entityId));
Optional.ofNullable(profileId).ifPresent(id -> cfLinks.addAll(calculatedFieldService.findAllCalculatedFieldLinksByEntityId(tenantId, id)));
if (!cfLinks.isEmpty()) {
cfLinks.forEach(link -> {
CalculatedFieldId calculatedFieldId = link.getCalculatedFieldId();

Loading…
Cancel
Save