|
|
|
@ -76,6 +76,10 @@ public class TbDeviceProfileNode implements TbNode { |
|
|
|
this.ctx = ctx; |
|
|
|
scheduleAlarmHarvesting(ctx, null); |
|
|
|
ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate); |
|
|
|
initAlarmRuleState(false); |
|
|
|
} |
|
|
|
|
|
|
|
private void initAlarmRuleState(boolean printNewlyAddedDeviceStates) { |
|
|
|
if (config.isFetchAlarmRulesStateOnStart()) { |
|
|
|
log.info("[{}] Fetching alarm rule state", ctx.getSelfId()); |
|
|
|
int fetchCount = 0; |
|
|
|
@ -86,7 +90,7 @@ public class TbDeviceProfileNode implements TbNode { |
|
|
|
for (RuleNodeState rns : states.getData()) { |
|
|
|
fetchCount++; |
|
|
|
if (rns.getEntityId().getEntityType().equals(EntityType.DEVICE) && ctx.isLocalEntity(rns.getEntityId())) { |
|
|
|
getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns); |
|
|
|
getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns, printNewlyAddedDeviceStates); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -130,7 +134,7 @@ public class TbDeviceProfileNode implements TbNode { |
|
|
|
removeDeviceState(deviceId); |
|
|
|
ctx.tellSuccess(msg); |
|
|
|
} else { |
|
|
|
DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null); |
|
|
|
DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null, false); |
|
|
|
if (deviceState != null) { |
|
|
|
deviceState.process(ctx, msg); |
|
|
|
} else { |
|
|
|
@ -148,6 +152,7 @@ public class TbDeviceProfileNode implements TbNode { |
|
|
|
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) { |
|
|
|
// Cleanup the cache for all entities that are no longer assigned to current server partitions
|
|
|
|
deviceStates.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey())); |
|
|
|
initAlarmRuleState(true); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -156,13 +161,16 @@ public class TbDeviceProfileNode implements TbNode { |
|
|
|
deviceStates.clear(); |
|
|
|
} |
|
|
|
|
|
|
|
protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns) { |
|
|
|
protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns, boolean printNewlyAddedDeviceStates) { |
|
|
|
DeviceState deviceState = deviceStates.get(deviceId); |
|
|
|
if (deviceState == null) { |
|
|
|
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId); |
|
|
|
if (deviceProfile != null) { |
|
|
|
deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns); |
|
|
|
deviceStates.put(deviceId, deviceState); |
|
|
|
if (printNewlyAddedDeviceStates) { |
|
|
|
log.info("[{}][{}] Device [{}] was added during PartitionChangeMsg", ctx.getTenantId(), ctx.getSelfId(), deviceId); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
return deviceState; |
|
|
|
|