diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java index 6d7cf0e741..a1202ec74b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldExecutionService.java @@ -31,4 +31,8 @@ public interface CalculatedFieldExecutionService { void onEntityProfileChanged(TransportProtos.EntityProfileUpdateMsgProto proto, TbCallback callback); + void onEntityAdded(TransportProtos.EntityAddMsgProto proto, TbCallback callback); + + void onEntityDeleted(TransportProtos.EntityDeleteMsg proto, TbCallback callback); + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java index 0d269f032f..3bb9be065d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldExecutionService.java @@ -74,8 +74,8 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtx; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.LastRecordsCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; @@ -227,6 +227,7 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas @Override public void onTelemetryUpdate(TenantId tenantId, CalculatedFieldId calculatedFieldId, Map updatedTelemetry) { try { + log.info("Received telemetry update msg: tenantId=[{}], calculatedFieldId=[{}]", tenantId, calculatedFieldId); CalculatedFieldCtx calculatedFieldCtx = calculatedFieldsCtx.computeIfAbsent(calculatedFieldId, id -> { CalculatedField calculatedField = calculatedFields.computeIfAbsent(id, cfId -> calculatedFieldService.findById(tenantId, id)); return new CalculatedFieldCtx(calculatedField, tbelInvokeService); @@ -247,7 +248,6 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); EntityId oldProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getOldProfileIdMSB(), proto.getOldProfileIdLSB())); EntityId newProfileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getNewProfileIdMSB(), proto.getNewProfileIdLSB())); - log.info("Received EntityProfileUpdateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, oldProfileId) @@ -257,10 +257,44 @@ public class DefaultCalculatedFieldExecutionService extends AbstractPartitionBas rocksDBService.delete(JacksonUtil.writeValueAsString(ctxId)); }); - calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, newProfileId) - .stream() - .map(cfId -> calculatedFieldsCtx.computeIfAbsent(cfId, id -> new CalculatedFieldCtx(calculatedFieldService.findById(tenantId, id), tbelInvokeService))) - .forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); + initializeStateForEntityByProfile(tenantId, entityId, newProfileId, callback); + } catch (Exception e) { + log.trace("Failed to process entity type update msg: [{}]", proto, e); + } + } + + @Override + public void onEntityAdded(TransportProtos.EntityAddMsgProto proto, TbCallback callback) { + try { + TenantId tenantId = TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())); + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + EntityId profileId = EntityIdFactory.getByTypeAndUuid(proto.getEntityProfileType(), new UUID(proto.getProfileIdMSB(), proto.getProfileIdLSB())); + log.info("Received EntityCreateMsgProto for processing: tenantId=[{}], entityId=[{}]", tenantId, entityId); + + initializeStateForEntityByProfile(tenantId, entityId, profileId, callback); + } catch (Exception e) { + log.trace("Failed to process entity type update msg: [{}]", proto, e); + } + } + + private void initializeStateForEntityByProfile(TenantId tenantId, EntityId entityId, EntityId profileId, TbCallback callback) { + calculatedFieldService.findCalculatedFieldIdsByEntityId(tenantId, profileId) + .stream() + .map(cfId -> calculatedFieldsCtx.computeIfAbsent(cfId, id -> new CalculatedFieldCtx(calculatedFieldService.findById(tenantId, id), tbelInvokeService))) + .forEach(cfCtx -> initializeStateForEntity(cfCtx, entityId, callback)); + } + + @Override + public void onEntityDeleted(TransportProtos.EntityDeleteMsg proto, TbCallback callback) { + try { + EntityId entityId = EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); + log.info("Received EntityDeleteMsg for processing: entityId=[{}]", entityId); + List statesToRemove = states.keySet().stream() + .filter(ctxEntityId -> ctxEntityId.entityId().equals(entityId.getId())) + .map(JacksonUtil::writeValueAsString) + .toList(); + states.keySet().removeIf(ctxEntityId -> ctxEntityId.entityId().equals(entityId.getId())); + rocksDBService.deleteAll(statesToRemove); } catch (Exception e) { log.trace("Failed to process entity type update msg: [{}]", proto, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java index 154f5f4833..57293169a5 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java @@ -146,7 +146,11 @@ public class EntityStateSourcingListener { log.debug("[{}][{}][{}] Handling entity deletion event: {}", tenantId, entityType, entityId, event); switch (entityType) { - case ASSET, ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> { + case ASSET -> { + Asset asset = (Asset) event.getEntity(); + tbClusterService.onAssetDeleted(tenantId, asset, null); + } + case ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> { tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED); } case NOTIFICATION_REQUEST -> { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 58a16d9c0a..78a1f27a55 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -388,11 +388,26 @@ public class DefaultTbClusterService implements TbClusterService { public void onDeviceDeleted(TenantId tenantId, Device device, TbQueueCallback callback) { DeviceId deviceId = device.getId(); gatewayNotificationsService.onDeviceDeleted(device); + handleEntityDelete(tenantId, deviceId, device.getDeviceProfileId()); broadcastEntityDeleteToTransport(tenantId, deviceId, device.getName(), callback); sendDeviceStateServiceEvent(tenantId, deviceId, false, false, true); broadcastEntityStateChangeEvent(tenantId, deviceId, ComponentLifecycleEvent.DELETED); } + @Override + public void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback) { + AssetId assetId = asset.getId(); + handleEntityDelete(tenantId, assetId, asset.getAssetProfileId()); + broadcastEntityStateChangeEvent(tenantId, assetId, ComponentLifecycleEvent.DELETED); + } + + private void handleEntityDelete(TenantId tenantId, EntityId entityId, EntityId profileId) { + boolean cfExistsByProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, profileId); + if (cfExistsByProfile) { + sendEntityDeletedEvent(tenantId, entityId); + } + } + @Override public void onDeviceAssignedToTenant(TenantId oldTenantId, Device device) { onDeviceDeleted(oldTenantId, device, null); @@ -613,11 +628,13 @@ public class DefaultTbClusterService implements TbClusterService { } boolean deviceTypeChanged = !device.getType().equals(old.getType()); if (deviceTypeChanged) { - handleProfileChange(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId()); + handleProfileUpdate(device.getTenantId(), device.getId(), old.getDeviceProfileId(), device.getDeviceProfileId()); } if (deviceNameChanged || deviceTypeChanged) { pushMsgToCore(new DeviceNameOrTypeUpdateMsg(device.getTenantId(), device.getId(), device.getName(), device.getType()), null); } + } else { + handleEntityCreate(device.getTenantId(), device.getId(), device.getDeviceProfileId()); } broadcastEntityStateChangeEvent(device.getTenantId(), device.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); sendDeviceStateServiceEvent(device.getTenantId(), device.getId(), created, !created, false); @@ -631,16 +648,26 @@ public class DefaultTbClusterService implements TbClusterService { if (old != null) { boolean assetTypeChanged = !asset.getType().equals(old.getType()); if (assetTypeChanged) { - handleProfileChange(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId()); + handleProfileUpdate(asset.getTenantId(), asset.getId(), old.getAssetProfileId(), asset.getAssetProfileId()); } + } else { + handleEntityCreate(asset.getTenantId(), asset.getId(), asset.getAssetProfileId()); } broadcastEntityStateChangeEvent(asset.getTenantId(), asset.getId(), created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); } - private void handleProfileChange(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) { - boolean cfExistsByProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, oldProfileId); + private void handleProfileUpdate(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) { + boolean cfExistsByOldProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, oldProfileId); + boolean cfExistsByNewProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, newProfileId); + if (cfExistsByOldProfile || cfExistsByNewProfile) { + sendEntityProfileUpdatedEvent(tenantId, entityId, oldProfileId, newProfileId); + } + } + + private void handleEntityCreate(TenantId tenantId, EntityId entityId, EntityId profileId) { + boolean cfExistsByProfile = calculatedFieldService.existsCalculatedFieldByEntityId(tenantId, profileId); if (cfExistsByProfile) { - sendEntityTypeUpdatedEvent(tenantId, entityId, oldProfileId, newProfileId); + sendEntityAddedEvent(tenantId, entityId, profileId); } } @@ -801,7 +828,7 @@ public class DefaultTbClusterService implements TbClusterService { pushMsgToCore(tenantId, calculatedFieldId, ToCoreMsg.newBuilder().setCalculatedFieldMsg(msg).build(), null); } - private void sendEntityTypeUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) { + private void sendEntityProfileUpdatedEvent(TenantId tenantId, EntityId entityId, EntityId oldProfileId, EntityId newProfileId) { TransportProtos.EntityProfileUpdateMsgProto.Builder builder = TransportProtos.EntityProfileUpdateMsgProto.newBuilder(); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); @@ -817,4 +844,26 @@ public class DefaultTbClusterService implements TbClusterService { pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityProfileUpdateMsg(msg).build(), null); } + private void sendEntityAddedEvent(TenantId tenantId, EntityId entityId, EntityId profileId) { + TransportProtos.EntityAddMsgProto.Builder builder = TransportProtos.EntityAddMsgProto.newBuilder(); + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); + builder.setEntityType(entityId.getEntityType().name()); + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + builder.setEntityProfileType(profileId.getEntityType().name()); + builder.setProfileIdMSB(profileId.getId().getMostSignificantBits()); + builder.setProfileIdLSB(profileId.getId().getLeastSignificantBits()); + TransportProtos.EntityAddMsgProto msg = builder.build(); + pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityAddMsg(msg).build(), null); + } + + private void sendEntityDeletedEvent(TenantId tenantId, EntityId entityId) { + TransportProtos.EntityDeleteMsg.Builder builder = TransportProtos.EntityDeleteMsg.newBuilder(); + builder.setEntityType(entityId.getEntityType().name()); + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + pushMsgToCore(tenantId, entityId, ToCoreMsg.newBuilder().setEntityDeleteMsg(builder).build(), null); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 392e2637d1..d477c6d753 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -320,6 +320,10 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityAdded(entityCreateMsg, callback)); + DonAsynchron.withCallback(future, + __ -> callback.onSuccess(), + t -> { + log.warn("[{}] Failed to process entity create message for entityId [{}]", tenantId.getId(), entityId.getId(), t); + callback.onFailure(t); + }); + } + + private void forwardToCalculatedFieldService(TransportProtos.EntityDeleteMsg entityDeleteMsg, TbCallback callback) { + var entityId = EntityIdFactory.getByTypeAndUuid(entityDeleteMsg.getEntityType(), new UUID(entityDeleteMsg.getEntityIdMSB(), entityDeleteMsg.getEntityIdLSB())); + ListenableFuture future = calculatedFieldsExecutor.submit(() -> calculatedFieldExecutionService.onEntityDeleted(entityDeleteMsg, callback)); + DonAsynchron.withCallback(future, + __ -> callback.onSuccess(), + t -> { + log.warn("Failed to process entity delete message for entity [{}]", entityId, t); + callback.onFailure(t); + }); + } + private void forwardToNotificationSchedulerService(TransportProtos.NotificationSchedulerServiceMsg msg, TbCallback callback) { TenantId tenantId = toTenantId(msg.getTenantIdMSB(), msg.getTenantIdLSB()); NotificationRequestId notificationRequestId = new NotificationRequestId(new UUID(msg.getRequestIdMSB(), msg.getRequestIdLSB())); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index 131b50a52c..69334b774f 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -100,6 +100,8 @@ public interface TbClusterService extends TbQueueClusterService { void onAssetUpdated(Asset asset, Asset old); + void onAssetDeleted(TenantId tenantId, Asset asset, TbQueueCallback callback); + void onResourceChange(TbResourceInfo resource, TbQueueCallback callback); void onResourceDeleted(TbResourceInfo resource, TbQueueCallback callback); diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index bf49a68c80..b1ca9b5e56 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -794,6 +794,17 @@ message EntityProfileUpdateMsgProto { int64 newProfileIdLSB = 10; } +message EntityAddMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + string entityType = 3; + int64 entityIdMSB = 4; + int64 entityIdLSB = 5; + string entityProfileType = 6; + int64 profileIdMSB = 7; + int64 profileIdLSB = 8; +} + //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. message SubscriptionInfoProto { int64 lastActivityTime = 1; @@ -1527,6 +1538,8 @@ message ToCoreMsg { DeviceInactivityProto deviceInactivityMsg = 52; CalculatedFieldMsgProto calculatedFieldMsg = 53; EntityProfileUpdateMsgProto entityProfileUpdateMsg = 54; + EntityAddMsgProto entityAddMsg = 55; + EntityDeleteMsg entityDeleteMsg = 56; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */ diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java index 7242cfde68..564da127ab 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java @@ -239,7 +239,7 @@ public class BaseAssetService extends AbstractCachedEntityService