From 2ffec818125bf16e7fd736ea901272c3b7edb5b2 Mon Sep 17 00:00:00 2001 From: Yevhenii Date: Mon, 2 Jun 2025 12:28:44 +0300 Subject: [PATCH] CalculatedField functionality support for Edge - added CalculatedField functionality for Edge --- .../service/edge/EdgeContextComponent.java | 8 ++ .../service/edge/EdgeMsgConstructorUtils.java | 16 +++ .../service/edge/rpc/EdgeGrpcSession.java | 7 +- .../service/edge/rpc/EdgeSyncCursor.java | 3 + .../CalculatedFieldsEdgeEventFetcher.java | 48 +++++++ .../BaseCalculatedFieldProcessor.java | 79 +++++++++++ .../CalculatedFieldEdgeProcessor.java | 133 ++++++++++++++++++ .../calculated/CalculatedFieldProcessor.java | 28 ++++ .../server/dao/cf/CalculatedFieldService.java | 4 + .../common/data/edge/EdgeEventType.java | 9 +- .../common/data/id/EntityIdFactory.java | 2 + common/edge-api/src/main/proto/edge.proto | 10 ++ .../dao/cf/BaseCalculatedFieldService.java | 23 ++- .../server/dao/cf/CalculatedFieldDao.java | 3 + .../dao/sql/cf/CalculatedFieldRepository.java | 2 + .../dao/sql/cf/JpaCalculatedFieldDao.java | 7 + 16 files changed, 375 insertions(+), 7 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java create mode 100644 application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index 00ae4a45bd..abdc89ebd9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -29,6 +29,7 @@ import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetProfileService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.cf.CalculatedFieldService; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.device.DeviceCredentialsService; @@ -61,6 +62,7 @@ import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmProcessor; import org.thingsboard.server.service.edge.rpc.processor.alarm.comment.AlarmCommentProcessor; import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.asset.profile.AssetProfileEdgeProcessor; +import org.thingsboard.server.service.edge.rpc.processor.calculated.CalculatedFieldProcessor; import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor; import org.thingsboard.server.service.edge.rpc.processor.device.profile.DeviceProfileEdgeProcessor; @@ -248,6 +250,12 @@ public class EdgeContextComponent { @Autowired private GrpcCallbackExecutorService grpcCallbackExecutorService; + @Autowired + private CalculatedFieldService calculatedFieldService; + + @Autowired + private CalculatedFieldProcessor calculatedFieldProcessor; + public EdgeProcessor getProcessor(EdgeEventType edgeEventType) { EdgeProcessor processor = processorMap.get(edgeEventType); if (processor == null) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java index 5f5fd771cf..192c56692d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java @@ -48,11 +48,13 @@ import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmComment; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.asset.AssetProfile; +import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.domain.DomainInfo; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.AssetProfileId; +import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.DeviceId; @@ -89,6 +91,7 @@ import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; +import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg; import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; @@ -638,4 +641,17 @@ public class EdgeMsgConstructorUtils { .build(); } + public static CalculatedFieldUpdateMsg constructCalculatedFieldUpdatedMsg(UpdateMsgType msgType, CalculatedField calculatedField) { + return CalculatedFieldUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(calculatedField)) + .setIdMSB(calculatedField.getId().getId().getMostSignificantBits()) + .setIdLSB(calculatedField.getId().getId().getLeastSignificantBits()).build(); + } + + public static CalculatedFieldUpdateMsg constructCalculatedFieldDeleteMsg(CalculatedFieldId calculatedFieldId) { + return CalculatedFieldUpdateMsg.newBuilder() + .setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE) + .setIdMSB(calculatedFieldId.getId().getMostSignificantBits()) + .setIdLSB(calculatedFieldId.getId().getLeastSignificantBits()).build(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 4a9b68fc6d..f3335d6d11 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -42,7 +42,6 @@ import org.thingsboard.server.common.data.limit.LimitedApi; import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg; @@ -50,6 +49,7 @@ import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg; import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg; +import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg; import org.thingsboard.server.gen.edge.v1.ConnectResponseCode; import org.thingsboard.server.gen.edge.v1.ConnectResponseMsg; @@ -907,6 +907,11 @@ public abstract class EdgeGrpcSession implements Closeable { result.add(ctx.getEdgeRequestsService().processEntityViewsRequestMsg(edge.getTenantId(), edge, entityViewRequestMsg)); } } + if (uplinkMsg.getCalculatedFieldUpdateMsgCount() > 0) { + for (CalculatedFieldUpdateMsg calculatedFieldUpdateMsg : uplinkMsg.getCalculatedFieldUpdateMsgList()) { + result.add(ctx.getCalculatedFieldProcessor().processCalculatedFieldMsgFromEdge(edge.getTenantId(), edge, calculatedFieldUpdateMsg)); + } + } } catch (Exception e) { String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg); log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java index 351c9b411b..389f0202fa 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java @@ -23,6 +23,7 @@ import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.rpc.fetch.AdminSettingsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.AssetProfilesEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.CalculatedFieldsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.CustomerEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.DashboardsEdgeEventFetcher; @@ -80,6 +81,7 @@ public class EdgeSyncCursor { fetchers.add(new DevicesEdgeEventFetcher(ctx.getDeviceService())); fetchers.add(new AssetsEdgeEventFetcher(ctx.getAssetService())); fetchers.add(new EntityViewsEdgeEventFetcher(ctx.getEntityViewService())); + fetchers.add(new CalculatedFieldsEdgeEventFetcher(ctx.getCalculatedFieldService())); if (fullSync) { fetchers.add(new NotificationTemplateEdgeEventFetcher(ctx.getNotificationTemplateService())); fetchers.add(new NotificationTargetEdgeEventFetcher(ctx.getNotificationTargetService())); @@ -107,4 +109,5 @@ public class EdgeSyncCursor { currentIdx++; return edgeEventFetcher; } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java new file mode 100644 index 0000000000..4f3e91354e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.cf.CalculatedFieldService; + +@AllArgsConstructor +@Slf4j +public class CalculatedFieldsEdgeEventFetcher extends BasePageableEdgeEventFetcher { + + private final CalculatedFieldService calculatedFieldService; + + @Override + PageData fetchEntities(TenantId tenantId, Edge edge, PageLink pageLink) { + return calculatedFieldService.findCalculatedFieldsByTenantId(tenantId, pageLink); + } + + @Override + EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, CalculatedField calculatedField) { + return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.CALCULATED_FIELD, + EdgeEventActionType.ADDED, calculatedField.getId(), null); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java new file mode 100644 index 0000000000..ddb1b23d53 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.calculated; + +import com.datastax.oss.driver.api.core.uuid.Uuids; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.util.Pair; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.service.DataValidator; +import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; +import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; + +@Slf4j +public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor { + + @Autowired + private DataValidator calculatedFieldValidator; + + protected Pair saveOrUpdateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg) { + boolean isCreated = false; + boolean isNameUpdated = false; + try { + CalculatedField calculatedField = JacksonUtil.fromString(calculatedFieldUpdateMsg.getEntity(), CalculatedField.class, true); + if (calculatedField == null) { + throw new RuntimeException("[{" + tenantId + "}] calculatedFieldUpdateMsg {" + calculatedFieldUpdateMsg + " } cannot be converted to calculatedField"); + } + + CalculatedField calculatedFieldById = edgeCtx.getCalculatedFieldService().findById(tenantId, calculatedFieldId); + if (calculatedFieldById == null) { + calculatedField.setCreatedTime(Uuids.unixTimestamp(calculatedFieldId.getId())); + isCreated = true; + calculatedField.setId(null); + } else { + calculatedField.setId(calculatedFieldId); + } + + String calculatedFieldName = calculatedField.getName(); + CalculatedField calculatedFieldByName = edgeCtx.getCalculatedFieldService().findByTenantIdAndName(tenantId, calculatedFieldName); + if (calculatedFieldByName != null && !calculatedFieldByName.getId().equals(calculatedFieldId)) { + calculatedFieldName = calculatedFieldName + "_" + StringUtils.randomAlphabetic(15); + log.warn("[{}] calculatedField with name {} already exists. Renaming calculatedField name to {}", + tenantId, calculatedField.getName(), calculatedFieldByName.getName()); + isNameUpdated = true; + } + calculatedField.setName(calculatedFieldName); + + calculatedFieldValidator.validate(calculatedField, CalculatedField::getTenantId); + + if (isCreated) { + calculatedField.setId(calculatedFieldId); + } + + edgeCtx.getCalculatedFieldService().save(calculatedField, false); + } catch (Exception e) { + log.error("[{}] Failed to process calculatedField update msg [{}]", tenantId, calculatedFieldUpdateMsg, e); + throw e; + } + return Pair.of(isCreated, isNameUpdated); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java new file mode 100644 index 0000000000..0ddb62e874 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java @@ -0,0 +1,133 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.calculated; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.util.Pair; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.EdgeUtils; +import org.thingsboard.server.common.data.cf.CalculatedField; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.exception.DataValidationException; +import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; +import org.thingsboard.server.gen.edge.v1.DownlinkMsg; +import org.thingsboard.server.gen.edge.v1.EdgeVersion; +import org.thingsboard.server.gen.edge.v1.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils; + +import java.util.UUID; + +@Slf4j +@Component +@TbCoreComponent +public class CalculatedFieldEdgeProcessor extends BaseCalculatedFieldProcessor implements CalculatedFieldProcessor { + + @Override + public ListenableFuture processCalculatedFieldMsgFromEdge(TenantId tenantId, Edge edge, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg) { + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(calculatedFieldUpdateMsg.getIdMSB(), calculatedFieldUpdateMsg.getIdLSB())); + try { + edgeSynchronizationManager.getEdgeId().set(edge.getId()); + + switch (calculatedFieldUpdateMsg.getMsgType()) { + case ENTITY_CREATED_RPC_MESSAGE: + case ENTITY_UPDATED_RPC_MESSAGE: + processCalculatedField(tenantId, calculatedFieldId, calculatedFieldUpdateMsg, edge); + return Futures.immediateFuture(null); + case ENTITY_DELETED_RPC_MESSAGE: + CalculatedField calculatedField = edgeCtx.getCalculatedFieldService().findById(tenantId, calculatedFieldId); + if (calculatedField != null) { + edgeCtx.getCalculatedFieldService().deleteCalculatedField(tenantId, calculatedFieldId); + } + return Futures.immediateFuture(null); + case UNRECOGNIZED: + default: + return handleUnsupportedMsgType(calculatedFieldUpdateMsg.getMsgType()); + } + } catch (DataValidationException e) { + log.warn("[{}] Failed to process CalculatedFieldUpdateMsg from Edge [{}]", tenantId, calculatedFieldUpdateMsg, e); + return Futures.immediateFailedFuture(e); + } finally { + edgeSynchronizationManager.getEdgeId().remove(); + } + } + + @Override + public DownlinkMsg convertEdgeEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) { + CalculatedFieldId calculatedFieldId = new CalculatedFieldId(edgeEvent.getEntityId()); + switch (edgeEvent.getAction()) { + case ADDED, UPDATED -> { + CalculatedField calculatedField = edgeCtx.getCalculatedFieldService().findById(edgeEvent.getTenantId(), calculatedFieldId); + if (calculatedField != null) { + UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction()); + CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = EdgeMsgConstructorUtils.constructCalculatedFieldUpdatedMsg(msgType, calculatedField); + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addCalculatedFieldUpdateMsg(calculatedFieldUpdateMsg) + .build(); + } + } + case DELETED -> { + CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = EdgeMsgConstructorUtils.constructCalculatedFieldDeleteMsg(calculatedFieldId); + return DownlinkMsg.newBuilder() + .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) + .addCalculatedFieldUpdateMsg(calculatedFieldUpdateMsg) + .build(); + } + } + return null; + } + + @Override + public EdgeEventType getEdgeEventType() { + return EdgeEventType.CALCULATED_FIELD; + } + + + private void processCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg, Edge edge) { + Pair resultPair = super.saveOrUpdateCalculatedField(tenantId, calculatedFieldId, calculatedFieldUpdateMsg); + Boolean wasCreated = resultPair.getFirst(); + if (wasCreated) { + pushCalculatedFieldCreatedEventToRuleEngine(tenantId, edge, calculatedFieldId); + } + Boolean nameWasUpdated = resultPair.getSecond(); + if (nameWasUpdated) { + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.CALCULATED_FIELD, EdgeEventActionType.UPDATED, calculatedFieldId, null); + } + } + + private void pushCalculatedFieldCreatedEventToRuleEngine(TenantId tenantId, Edge edge, CalculatedFieldId calculatedFieldId) { + try { + CalculatedField calculatedField = edgeCtx.getCalculatedFieldService().findById(tenantId, calculatedFieldId); + String calculatedFieldAsString = JacksonUtil.toString(calculatedField); + TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, edge.getCustomerId()); + pushEntityEventToRuleEngine(tenantId, calculatedFieldId, edge.getCustomerId(), TbMsgType.ENTITY_CREATED, calculatedFieldAsString, msgMetaData); + } catch (Exception e) { + log.warn("[{}][{}] Failed to push calculatedField action to rule engine: {}", tenantId, calculatedFieldId, TbMsgType.ENTITY_CREATED.name(), e); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java new file mode 100644 index 0000000000..ba9c8b27e1 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java @@ -0,0 +1,28 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor.calculated; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg; +import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor; + +public interface CalculatedFieldProcessor extends EdgeProcessor { + + ListenableFuture processCalculatedFieldMsgFromEdge(TenantId tenantId, Edge edge, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg); + +} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java index 5101d6d57e..a645903896 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java @@ -31,8 +31,12 @@ public interface CalculatedFieldService extends EntityDaoService { CalculatedField save(CalculatedField calculatedField); + CalculatedField save(CalculatedField calculatedField, boolean doValidate); + CalculatedField findById(TenantId tenantId, CalculatedFieldId calculatedFieldId); + CalculatedField findByTenantIdAndName(TenantId tenantId, String name); + List findCalculatedFieldIdsByEntityId(TenantId tenantId, EntityId entityId); List findCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java index 6f1ae8150f..9c97935329 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java @@ -41,12 +41,13 @@ public enum EdgeEventType { ADMIN_SETTINGS(true, null), OTA_PACKAGE(true, EntityType.OTA_PACKAGE), QUEUE(true, EntityType.QUEUE), - NOTIFICATION_RULE (true, EntityType.NOTIFICATION_RULE), - NOTIFICATION_TARGET (true, EntityType.NOTIFICATION_TARGET), - NOTIFICATION_TEMPLATE (true, EntityType.NOTIFICATION_TEMPLATE), + NOTIFICATION_RULE(true, EntityType.NOTIFICATION_RULE), + NOTIFICATION_TARGET(true, EntityType.NOTIFICATION_TARGET), + NOTIFICATION_TEMPLATE(true, EntityType.NOTIFICATION_TEMPLATE), TB_RESOURCE(true, EntityType.TB_RESOURCE), OAUTH2_CLIENT(true, EntityType.OAUTH2_CLIENT), - DOMAIN(true, EntityType.DOMAIN); + DOMAIN(true, EntityType.DOMAIN), + CALCULATED_FIELD(true, EntityType.CALCULATED_FIELD); private final boolean allEdgesRelated; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java index f5dd4b12a0..01727b0ebd 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java @@ -169,6 +169,8 @@ public class EntityIdFactory { return new OAuth2ClientId(uuid); case DOMAIN: return new DomainId(uuid); + case CALCULATED_FIELD: + return new CalculatedFieldId(uuid); } throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!"); } diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 023ac00634..904a938f1e 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -124,6 +124,14 @@ enum UpdateMsgType { // use 6 as a next number } +message CalculatedFieldUpdateMsg{ + UpdateMsgType msgType = 1; + int64 idMSB = 2; + int64 idLSB = 3; + string entity = 4; +} + + message EntityDataProto { int64 entityIdMSB = 1; int64 entityIdLSB = 2; @@ -423,6 +431,7 @@ message UplinkMsg { repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 22; repeated RuleChainUpdateMsg ruleChainUpdateMsg = 23; repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 24; + repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 25; } message UplinkResponseMsg { @@ -472,4 +481,5 @@ message DownlinkMsg { repeated NotificationTargetUpdateMsg notificationTargetUpdateMsg = 32; repeated NotificationTemplateUpdateMsg notificationTemplateUpdateMsg = 33; repeated OAuth2DomainUpdateMsg oAuth2DomainUpdateMsg = 34; + repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 35; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java index 0c5df18e80..141adf49aa 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java @@ -57,7 +57,18 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements @Override public CalculatedField save(CalculatedField calculatedField) { - CalculatedField oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId); + return doSave(calculatedField, true); + } + + @Override + public CalculatedField save(CalculatedField calculatedField, boolean doValidate) { + return doSave(calculatedField, doValidate); + } + + private CalculatedField doSave(CalculatedField calculatedField, boolean doValidate) { + if (doValidate) { + calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId); + } try { TenantId tenantId = calculatedField.getTenantId(); log.trace("Executing save calculated field, [{}]", calculatedField); @@ -65,7 +76,7 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements CalculatedField savedCalculatedField = calculatedFieldDao.save(tenantId, calculatedField); createOrUpdateCalculatedFieldLink(tenantId, savedCalculatedField); eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedCalculatedField.getTenantId()).entityId(savedCalculatedField.getId()) - .entity(savedCalculatedField).oldEntity(oldCalculatedField).created(calculatedField.getId() == null).build()); + .entity(savedCalculatedField).oldEntity(calculatedField).created(calculatedField.getId() == null).build()); return savedCalculatedField; } catch (Exception e) { checkConstraintViolation(e, @@ -83,6 +94,14 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements return calculatedFieldDao.findById(tenantId, calculatedFieldId.getId()); } + @Override + public CalculatedField findByTenantIdAndName(TenantId tenantId, String name) { + log.trace("Executing findByTenantIdAndName [{}], calculatedFieldName[{}]", tenantId, name); + validateId(tenantId, id -> INCORRECT_TENANT_ID + id); + + return calculatedFieldDao.findByTenantIdAndName(tenantId, name).orElse(null); + } + @Override public List findCalculatedFieldIdsByEntityId(TenantId tenantId, EntityId entityId) { log.trace("Executing findCalculatedFieldIdsByEntityId [{}]", entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java b/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java index aadae93893..44dbb26b81 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java @@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.dao.Dao; import java.util.List; +import java.util.Optional; public interface CalculatedFieldDao extends Dao { @@ -35,6 +36,8 @@ public interface CalculatedFieldDao extends Dao { List findAll(); + Optional findByTenantIdAndName(TenantId tenantId, String name); + PageData findAll(PageLink pageLink); PageData findAllByTenantId(TenantId tenantId, PageLink pageLink); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java index be122816ba..91721c74f3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java @@ -28,6 +28,8 @@ public interface CalculatedFieldRepository extends JpaRepository findCalculatedFieldIdsByTenantIdAndEntityId(UUID tenantId, UUID entityId); List findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java index 4bb52c29db..f18fe10f8c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java @@ -34,6 +34,7 @@ import org.thingsboard.server.dao.sql.JpaAbstractDao; import org.thingsboard.server.dao.util.SqlDao; import java.util.List; +import java.util.Optional; import java.util.UUID; @Slf4j @@ -65,6 +66,12 @@ public class JpaCalculatedFieldDao extends JpaAbstractDao findByTenantIdAndName(TenantId tenantId, String name) { + CalculatedField calculatedField = DaoUtil.getData(calculatedFieldRepository.findByTenantIdAndName(tenantId.getId(), name)); + return Optional.ofNullable(calculatedField); + } + @Override public PageData findAll(PageLink pageLink) { log.debug("Try to find calculated fields by pageLink [{}]", pageLink);