|
|
@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.EntityType; |
|
|
import org.thingsboard.server.common.data.ProfileEntityIdInfo; |
|
|
import org.thingsboard.server.common.data.ProfileEntityIdInfo; |
|
|
import org.thingsboard.server.common.data.cf.CalculatedField; |
|
|
import org.thingsboard.server.common.data.cf.CalculatedField; |
|
|
import org.thingsboard.server.common.data.cf.CalculatedFieldLink; |
|
|
import org.thingsboard.server.common.data.cf.CalculatedFieldLink; |
|
|
import org.thingsboard.server.common.data.cf.configuration.ScheduledUpdateSupportedCalculatedFieldConfiguration; |
|
|
|
|
|
import org.thingsboard.server.common.data.id.AssetId; |
|
|
import org.thingsboard.server.common.data.id.AssetId; |
|
|
import org.thingsboard.server.common.data.id.CalculatedFieldId; |
|
|
import org.thingsboard.server.common.data.id.CalculatedFieldId; |
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
@ -57,10 +56,7 @@ import java.util.Collections; |
|
|
import java.util.HashMap; |
|
|
import java.util.HashMap; |
|
|
import java.util.List; |
|
|
import java.util.List; |
|
|
import java.util.Map; |
|
|
import java.util.Map; |
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
|
import java.util.concurrent.CopyOnWriteArrayList; |
|
|
import java.util.concurrent.CopyOnWriteArrayList; |
|
|
import java.util.concurrent.ScheduledFuture; |
|
|
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
|
|
import java.util.function.BiConsumer; |
|
|
import java.util.function.BiConsumer; |
|
|
|
|
|
|
|
|
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; |
|
|
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; |
|
|
@ -74,7 +70,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware |
|
|
private final Map<CalculatedFieldId, CalculatedFieldCtx> calculatedFields = new HashMap<>(); |
|
|
private final Map<CalculatedFieldId, CalculatedFieldCtx> calculatedFields = new HashMap<>(); |
|
|
private final Map<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFields = new HashMap<>(); |
|
|
private final Map<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFields = new HashMap<>(); |
|
|
private final Map<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new HashMap<>(); |
|
|
private final Map<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new HashMap<>(); |
|
|
private final Map<CalculatedFieldId, ScheduledFuture<?>> cfDynamicArgumentsRefreshTasks = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
|
|
|
|
private final CalculatedFieldProcessingService cfExecService; |
|
|
private final CalculatedFieldProcessingService cfExecService; |
|
|
private final CalculatedFieldStateService cfStateService; |
|
|
private final CalculatedFieldStateService cfStateService; |
|
|
@ -113,8 +108,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware |
|
|
calculatedFields.clear(); |
|
|
calculatedFields.clear(); |
|
|
entityIdCalculatedFields.clear(); |
|
|
entityIdCalculatedFields.clear(); |
|
|
entityIdCalculatedFieldLinks.clear(); |
|
|
entityIdCalculatedFieldLinks.clear(); |
|
|
cfDynamicArgumentsRefreshTasks.values().forEach(future -> future.cancel(true)); |
|
|
|
|
|
cfDynamicArgumentsRefreshTasks.clear(); |
|
|
|
|
|
ctx.stop(ctx.getSelf()); |
|
|
ctx.stop(ctx.getSelf()); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -274,7 +267,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware |
|
|
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|
|
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|
|
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); |
|
|
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); |
|
|
addLinks(cf); |
|
|
addLinks(cf); |
|
|
scheduleDynamicArgumentsRefreshTaskForCfIfNeeded(cfCtx); |
|
|
|
|
|
applyToTargetCfEntityActors(cfCtx, callback, (id, cb) -> initCfForEntity(id, cfCtx, false, cb)); |
|
|
applyToTargetCfEntityActors(cfCtx, callback, (id, cb) -> initCfForEntity(id, cfCtx, false, cb)); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
@ -304,12 +296,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware |
|
|
calculatedFields.put(newCf.getId(), newCfCtx); |
|
|
calculatedFields.put(newCf.getId(), newCfCtx); |
|
|
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId()); |
|
|
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId()); |
|
|
|
|
|
|
|
|
boolean hasSchedulingConfigChanges = newCfCtx.hasSchedulingConfigChanges(oldCfCtx); |
|
|
|
|
|
if (hasSchedulingConfigChanges) { |
|
|
|
|
|
cancelCfDynamicArgumentsRefreshTaskIfExists(cfId, false); |
|
|
|
|
|
scheduleDynamicArgumentsRefreshTaskForCfIfNeeded(newCfCtx); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>(); |
|
|
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>(); |
|
|
boolean found = false; |
|
|
boolean found = false; |
|
|
for (CalculatedFieldCtx oldCtx : oldCfList) { |
|
|
for (CalculatedFieldCtx oldCtx : oldCfList) { |
|
|
@ -350,19 +336,9 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware |
|
|
} |
|
|
} |
|
|
entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx); |
|
|
entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx); |
|
|
deleteLinks(cfCtx); |
|
|
deleteLinks(cfCtx); |
|
|
cancelCfDynamicArgumentsRefreshTaskIfExists(cfId, true); |
|
|
|
|
|
applyToTargetCfEntityActors(cfCtx, callback, (id, cb) -> deleteCfForEntity(id, cfId, cb)); |
|
|
applyToTargetCfEntityActors(cfCtx, callback, (id, cb) -> deleteCfForEntity(id, cfId, cb)); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private void cancelCfDynamicArgumentsRefreshTaskIfExists(CalculatedFieldId cfId, boolean cfDeleted) { |
|
|
|
|
|
var existingTask = cfDynamicArgumentsRefreshTasks.remove(cfId); |
|
|
|
|
|
if (existingTask != null) { |
|
|
|
|
|
existingTask.cancel(false); |
|
|
|
|
|
String reason = cfDeleted ? "deletion" : "update"; |
|
|
|
|
|
log.debug("[{}][{}] Cancelled dynamic arguments refresh task due to CF {}!", tenantId, cfId, reason); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { |
|
|
public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { |
|
|
EntityId entityId = msg.getEntityId(); |
|
|
EntityId entityId = msg.getEntityId(); |
|
|
log.debug("Received telemetry msg from entity [{}]", entityId); |
|
|
log.debug("Received telemetry msg from entity [{}]", entityId); |
|
|
@ -442,43 +418,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware |
|
|
return result; |
|
|
return result; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private void scheduleDynamicArgumentsRefreshTaskForCfIfNeeded(CalculatedFieldCtx cfCtx) { |
|
|
|
|
|
CalculatedField cf = cfCtx.getCalculatedField(); |
|
|
|
|
|
if (!(cf.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledCfConfig)) { |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
if (!scheduledCfConfig.isScheduledUpdateEnabled()) { |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
if (cfDynamicArgumentsRefreshTasks.containsKey(cf.getId())) { |
|
|
|
|
|
log.debug("[{}][{}] Dynamic arguments refresh task for CF already exists!", tenantId, cf.getId()); |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(scheduledCfConfig.getScheduledUpdateInterval()); |
|
|
|
|
|
var scheduledMsg = new CalculatedFieldDynamicArgumentsRefreshMsg(tenantId, cfCtx.getCfId()); |
|
|
|
|
|
|
|
|
|
|
|
ScheduledFuture<?> scheduledFuture = systemContext |
|
|
|
|
|
.schedulePeriodicMsgWithDelay(ctx, scheduledMsg, refreshDynamicSourceInterval, refreshDynamicSourceInterval); |
|
|
|
|
|
cfDynamicArgumentsRefreshTasks.put(cf.getId(), scheduledFuture); |
|
|
|
|
|
log.debug("[{}][{}] Scheduled dynamic arguments refresh task for CF!", tenantId, cf.getId()); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public void onDynamicArgumentsRefreshMsg(CalculatedFieldDynamicArgumentsRefreshMsg msg) { |
|
|
|
|
|
log.debug("[{}] [{}] Processing CF dynamic arguments refresh task.", tenantId, msg.getCfId()); |
|
|
|
|
|
CalculatedFieldCtx cfCtx = calculatedFields.get(msg.getCfId()); |
|
|
|
|
|
if (cfCtx == null) { |
|
|
|
|
|
log.debug("[{}][{}] Failed to find CF context, going to stop dynamic arguments refresh task for CF.", tenantId, msg.getCfId()); |
|
|
|
|
|
cancelCfDynamicArgumentsRefreshTaskIfExists(msg.getCfId(), true); |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
applyToTargetCfEntityActors(cfCtx, msg.getCallback(), (id, cb) -> refreshDynamicArgumentsForEntity(id, msg.getCfId(), cb)); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void refreshDynamicArgumentsForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) { |
|
|
|
|
|
log.debug("Pushing CF dynamic arguments refresh msg to specific actor [{}]", entityId); |
|
|
|
|
|
getOrCreateActor(entityId).tell(new EntityCalculatedFieldDynamicArgumentsRefreshMsg(tenantId, cfId, callback)); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void linkedTelemetryMsgForEntity(EntityId entityId, EntityCalculatedFieldLinkedTelemetryMsg msg) { |
|
|
private void linkedTelemetryMsgForEntity(EntityId entityId, EntityCalculatedFieldLinkedTelemetryMsg msg) { |
|
|
log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId); |
|
|
log.debug("Pushing linked telemetry msg to specific actor [{}]", entityId); |
|
|
getOrCreateActor(entityId).tell(msg); |
|
|
getOrCreateActor(entityId).tell(msg); |
|
|
@ -565,7 +504,6 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware |
|
|
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
|
|
// We use copy on write lists to safely pass the reference to another actor for the iteration.
|
|
|
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|
|
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
|
|
|
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); |
|
|
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); |
|
|
scheduleDynamicArgumentsRefreshTaskForCfIfNeeded(cfCtx); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private void initCalculatedFieldLink(CalculatedFieldLink link) { |
|
|
private void initCalculatedFieldLink(CalculatedFieldLink link) { |
|
|
|