Browse Source

CalculatedField functionality support for Edge

- added CalculatedField functionality for Edge
pull/13494/head
Yevhenii 1 year ago
parent
commit
2ffec81812
  1. 8
      application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java
  2. 16
      application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java
  3. 7
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
  4. 3
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java
  5. 48
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java
  6. 79
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java
  7. 133
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java
  8. 28
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java
  9. 4
      common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java
  10. 9
      common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java
  11. 2
      common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java
  12. 10
      common/edge-api/src/main/proto/edge.proto
  13. 23
      dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java
  14. 3
      dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java
  15. 2
      dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java
  16. 7
      dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java

8
application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java

@ -29,6 +29,7 @@ import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
@ -61,6 +62,7 @@ import org.thingsboard.server.service.edge.rpc.processor.alarm.AlarmProcessor;
import org.thingsboard.server.service.edge.rpc.processor.alarm.comment.AlarmCommentProcessor;
import org.thingsboard.server.service.edge.rpc.processor.asset.AssetEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.asset.profile.AssetProfileEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.calculated.CalculatedFieldProcessor;
import org.thingsboard.server.service.edge.rpc.processor.dashboard.DashboardEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.device.DeviceEdgeProcessor;
import org.thingsboard.server.service.edge.rpc.processor.device.profile.DeviceProfileEdgeProcessor;
@ -248,6 +250,12 @@ public class EdgeContextComponent {
@Autowired
private GrpcCallbackExecutorService grpcCallbackExecutorService;
@Autowired
private CalculatedFieldService calculatedFieldService;
@Autowired
private CalculatedFieldProcessor calculatedFieldProcessor;
public EdgeProcessor getProcessor(EdgeEventType edgeEventType) {
EdgeProcessor processor = processorMap.get(edgeEventType);
if (processor == null) {

16
application/src/main/java/org/thingsboard/server/service/edge/EdgeMsgConstructorUtils.java

@ -48,11 +48,13 @@ import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmComment;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.domain.DomainInfo;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
@ -89,6 +91,7 @@ import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.gen.edge.v1.CustomerUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
@ -638,4 +641,17 @@ public class EdgeMsgConstructorUtils {
.build();
}
public static CalculatedFieldUpdateMsg constructCalculatedFieldUpdatedMsg(UpdateMsgType msgType, CalculatedField calculatedField) {
return CalculatedFieldUpdateMsg.newBuilder().setMsgType(msgType).setEntity(JacksonUtil.toString(calculatedField))
.setIdMSB(calculatedField.getId().getId().getMostSignificantBits())
.setIdLSB(calculatedField.getId().getId().getLeastSignificantBits()).build();
}
public static CalculatedFieldUpdateMsg constructCalculatedFieldDeleteMsg(CalculatedFieldId calculatedFieldId) {
return CalculatedFieldUpdateMsg.newBuilder()
.setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE)
.setIdMSB(calculatedFieldId.getId().getMostSignificantBits())
.setIdLSB(calculatedFieldId.getId().getLeastSignificantBits()).build();
}
}

7
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java

@ -42,7 +42,6 @@ import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.notification.rule.trigger.EdgeCommunicationFailureTrigger;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AlarmCommentUpdateMsg;
@ -50,6 +49,7 @@ import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AssetUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg;
import org.thingsboard.server.gen.edge.v1.ConnectResponseCode;
import org.thingsboard.server.gen.edge.v1.ConnectResponseMsg;
@ -907,6 +907,11 @@ public abstract class EdgeGrpcSession implements Closeable {
result.add(ctx.getEdgeRequestsService().processEntityViewsRequestMsg(edge.getTenantId(), edge, entityViewRequestMsg));
}
}
if (uplinkMsg.getCalculatedFieldUpdateMsgCount() > 0) {
for (CalculatedFieldUpdateMsg calculatedFieldUpdateMsg : uplinkMsg.getCalculatedFieldUpdateMsgList()) {
result.add(ctx.getCalculatedFieldProcessor().processCalculatedFieldMsgFromEdge(edge.getTenantId(), edge, calculatedFieldUpdateMsg));
}
}
} catch (Exception e) {
String failureMsg = String.format("Can't process uplink msg [%s] from edge", uplinkMsg);
log.trace("[{}][{}] Can't process uplink msg [{}]", edge.getTenantId(), sessionId, uplinkMsg, e);

3
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSyncCursor.java

@ -23,6 +23,7 @@ import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.rpc.fetch.AdminSettingsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.AssetProfilesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.CalculatedFieldsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.CustomerEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.DashboardsEdgeEventFetcher;
@ -80,6 +81,7 @@ public class EdgeSyncCursor {
fetchers.add(new DevicesEdgeEventFetcher(ctx.getDeviceService()));
fetchers.add(new AssetsEdgeEventFetcher(ctx.getAssetService()));
fetchers.add(new EntityViewsEdgeEventFetcher(ctx.getEntityViewService()));
fetchers.add(new CalculatedFieldsEdgeEventFetcher(ctx.getCalculatedFieldService()));
if (fullSync) {
fetchers.add(new NotificationTemplateEdgeEventFetcher(ctx.getNotificationTemplateService()));
fetchers.add(new NotificationTargetEdgeEventFetcher(ctx.getNotificationTargetService()));
@ -107,4 +109,5 @@ public class EdgeSyncCursor {
currentIdx++;
return edgeEventFetcher;
}
}

48
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CalculatedFieldsEdgeEventFetcher.java

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.cf.CalculatedFieldService;
@AllArgsConstructor
@Slf4j
public class CalculatedFieldsEdgeEventFetcher extends BasePageableEdgeEventFetcher<CalculatedField> {
private final CalculatedFieldService calculatedFieldService;
@Override
PageData<CalculatedField> fetchEntities(TenantId tenantId, Edge edge, PageLink pageLink) {
return calculatedFieldService.findCalculatedFieldsByTenantId(tenantId, pageLink);
}
@Override
EdgeEvent constructEdgeEvent(TenantId tenantId, Edge edge, CalculatedField calculatedField) {
return EdgeUtils.constructEdgeEvent(tenantId, edge.getId(), EdgeEventType.CALCULATED_FIELD,
EdgeEventActionType.ADDED, calculatedField.getId(), null);
}
}

79
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/BaseCalculatedFieldProcessor.java

@ -0,0 +1,79 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.calculated;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor;
@Slf4j
public abstract class BaseCalculatedFieldProcessor extends BaseEdgeProcessor {
@Autowired
private DataValidator<CalculatedField> calculatedFieldValidator;
protected Pair<Boolean, Boolean> saveOrUpdateCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg) {
boolean isCreated = false;
boolean isNameUpdated = false;
try {
CalculatedField calculatedField = JacksonUtil.fromString(calculatedFieldUpdateMsg.getEntity(), CalculatedField.class, true);
if (calculatedField == null) {
throw new RuntimeException("[{" + tenantId + "}] calculatedFieldUpdateMsg {" + calculatedFieldUpdateMsg + " } cannot be converted to calculatedField");
}
CalculatedField calculatedFieldById = edgeCtx.getCalculatedFieldService().findById(tenantId, calculatedFieldId);
if (calculatedFieldById == null) {
calculatedField.setCreatedTime(Uuids.unixTimestamp(calculatedFieldId.getId()));
isCreated = true;
calculatedField.setId(null);
} else {
calculatedField.setId(calculatedFieldId);
}
String calculatedFieldName = calculatedField.getName();
CalculatedField calculatedFieldByName = edgeCtx.getCalculatedFieldService().findByTenantIdAndName(tenantId, calculatedFieldName);
if (calculatedFieldByName != null && !calculatedFieldByName.getId().equals(calculatedFieldId)) {
calculatedFieldName = calculatedFieldName + "_" + StringUtils.randomAlphabetic(15);
log.warn("[{}] calculatedField with name {} already exists. Renaming calculatedField name to {}",
tenantId, calculatedField.getName(), calculatedFieldByName.getName());
isNameUpdated = true;
}
calculatedField.setName(calculatedFieldName);
calculatedFieldValidator.validate(calculatedField, CalculatedField::getTenantId);
if (isCreated) {
calculatedField.setId(calculatedFieldId);
}
edgeCtx.getCalculatedFieldService().save(calculatedField, false);
} catch (Exception e) {
log.error("[{}] Failed to process calculatedField update msg [{}]", tenantId, calculatedFieldUpdateMsg, e);
throw e;
}
return Pair.of(isCreated, isNameUpdated);
}
}

133
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldEdgeProcessor.java

@ -0,0 +1,133 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.calculated;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.EdgeMsgConstructorUtils;
import java.util.UUID;
@Slf4j
@Component
@TbCoreComponent
public class CalculatedFieldEdgeProcessor extends BaseCalculatedFieldProcessor implements CalculatedFieldProcessor {
@Override
public ListenableFuture<Void> processCalculatedFieldMsgFromEdge(TenantId tenantId, Edge edge, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg) {
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(new UUID(calculatedFieldUpdateMsg.getIdMSB(), calculatedFieldUpdateMsg.getIdLSB()));
try {
edgeSynchronizationManager.getEdgeId().set(edge.getId());
switch (calculatedFieldUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
case ENTITY_UPDATED_RPC_MESSAGE:
processCalculatedField(tenantId, calculatedFieldId, calculatedFieldUpdateMsg, edge);
return Futures.immediateFuture(null);
case ENTITY_DELETED_RPC_MESSAGE:
CalculatedField calculatedField = edgeCtx.getCalculatedFieldService().findById(tenantId, calculatedFieldId);
if (calculatedField != null) {
edgeCtx.getCalculatedFieldService().deleteCalculatedField(tenantId, calculatedFieldId);
}
return Futures.immediateFuture(null);
case UNRECOGNIZED:
default:
return handleUnsupportedMsgType(calculatedFieldUpdateMsg.getMsgType());
}
} catch (DataValidationException e) {
log.warn("[{}] Failed to process CalculatedFieldUpdateMsg from Edge [{}]", tenantId, calculatedFieldUpdateMsg, e);
return Futures.immediateFailedFuture(e);
} finally {
edgeSynchronizationManager.getEdgeId().remove();
}
}
@Override
public DownlinkMsg convertEdgeEventToDownlink(EdgeEvent edgeEvent, EdgeVersion edgeVersion) {
CalculatedFieldId calculatedFieldId = new CalculatedFieldId(edgeEvent.getEntityId());
switch (edgeEvent.getAction()) {
case ADDED, UPDATED -> {
CalculatedField calculatedField = edgeCtx.getCalculatedFieldService().findById(edgeEvent.getTenantId(), calculatedFieldId);
if (calculatedField != null) {
UpdateMsgType msgType = getUpdateMsgType(edgeEvent.getAction());
CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = EdgeMsgConstructorUtils.constructCalculatedFieldUpdatedMsg(msgType, calculatedField);
return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addCalculatedFieldUpdateMsg(calculatedFieldUpdateMsg)
.build();
}
}
case DELETED -> {
CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = EdgeMsgConstructorUtils.constructCalculatedFieldDeleteMsg(calculatedFieldId);
return DownlinkMsg.newBuilder()
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.addCalculatedFieldUpdateMsg(calculatedFieldUpdateMsg)
.build();
}
}
return null;
}
@Override
public EdgeEventType getEdgeEventType() {
return EdgeEventType.CALCULATED_FIELD;
}
private void processCalculatedField(TenantId tenantId, CalculatedFieldId calculatedFieldId, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg, Edge edge) {
Pair<Boolean, Boolean> resultPair = super.saveOrUpdateCalculatedField(tenantId, calculatedFieldId, calculatedFieldUpdateMsg);
Boolean wasCreated = resultPair.getFirst();
if (wasCreated) {
pushCalculatedFieldCreatedEventToRuleEngine(tenantId, edge, calculatedFieldId);
}
Boolean nameWasUpdated = resultPair.getSecond();
if (nameWasUpdated) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.CALCULATED_FIELD, EdgeEventActionType.UPDATED, calculatedFieldId, null);
}
}
private void pushCalculatedFieldCreatedEventToRuleEngine(TenantId tenantId, Edge edge, CalculatedFieldId calculatedFieldId) {
try {
CalculatedField calculatedField = edgeCtx.getCalculatedFieldService().findById(tenantId, calculatedFieldId);
String calculatedFieldAsString = JacksonUtil.toString(calculatedField);
TbMsgMetaData msgMetaData = getEdgeActionTbMsgMetaData(edge, edge.getCustomerId());
pushEntityEventToRuleEngine(tenantId, calculatedFieldId, edge.getCustomerId(), TbMsgType.ENTITY_CREATED, calculatedFieldAsString, msgMetaData);
} catch (Exception e) {
log.warn("[{}][{}] Failed to push calculatedField action to rule engine: {}", tenantId, calculatedFieldId, TbMsgType.ENTITY_CREATED.name(), e);
}
}
}

28
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/calculated/CalculatedFieldProcessor.java

@ -0,0 +1,28 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor.calculated;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.v1.CalculatedFieldUpdateMsg;
import org.thingsboard.server.service.edge.rpc.processor.EdgeProcessor;
public interface CalculatedFieldProcessor extends EdgeProcessor {
ListenableFuture<Void> processCalculatedFieldMsgFromEdge(TenantId tenantId, Edge edge, CalculatedFieldUpdateMsg calculatedFieldUpdateMsg);
}

4
common/dao-api/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldService.java

@ -31,8 +31,12 @@ public interface CalculatedFieldService extends EntityDaoService {
CalculatedField save(CalculatedField calculatedField);
CalculatedField save(CalculatedField calculatedField, boolean doValidate);
CalculatedField findById(TenantId tenantId, CalculatedFieldId calculatedFieldId);
CalculatedField findByTenantIdAndName(TenantId tenantId, String name);
List<CalculatedFieldId> findCalculatedFieldIdsByEntityId(TenantId tenantId, EntityId entityId);
List<CalculatedField> findCalculatedFieldsByEntityId(TenantId tenantId, EntityId entityId);

9
common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEventType.java

@ -41,12 +41,13 @@ public enum EdgeEventType {
ADMIN_SETTINGS(true, null),
OTA_PACKAGE(true, EntityType.OTA_PACKAGE),
QUEUE(true, EntityType.QUEUE),
NOTIFICATION_RULE (true, EntityType.NOTIFICATION_RULE),
NOTIFICATION_TARGET (true, EntityType.NOTIFICATION_TARGET),
NOTIFICATION_TEMPLATE (true, EntityType.NOTIFICATION_TEMPLATE),
NOTIFICATION_RULE(true, EntityType.NOTIFICATION_RULE),
NOTIFICATION_TARGET(true, EntityType.NOTIFICATION_TARGET),
NOTIFICATION_TEMPLATE(true, EntityType.NOTIFICATION_TEMPLATE),
TB_RESOURCE(true, EntityType.TB_RESOURCE),
OAUTH2_CLIENT(true, EntityType.OAUTH2_CLIENT),
DOMAIN(true, EntityType.DOMAIN);
DOMAIN(true, EntityType.DOMAIN),
CALCULATED_FIELD(true, EntityType.CALCULATED_FIELD);
private final boolean allEdgesRelated;

2
common/data/src/main/java/org/thingsboard/server/common/data/id/EntityIdFactory.java

@ -169,6 +169,8 @@ public class EntityIdFactory {
return new OAuth2ClientId(uuid);
case DOMAIN:
return new DomainId(uuid);
case CALCULATED_FIELD:
return new CalculatedFieldId(uuid);
}
throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!");
}

10
common/edge-api/src/main/proto/edge.proto

@ -124,6 +124,14 @@ enum UpdateMsgType {
// use 6 as a next number
}
message CalculatedFieldUpdateMsg{
UpdateMsgType msgType = 1;
int64 idMSB = 2;
int64 idLSB = 3;
string entity = 4;
}
message EntityDataProto {
int64 entityIdMSB = 1;
int64 entityIdLSB = 2;
@ -423,6 +431,7 @@ message UplinkMsg {
repeated AlarmCommentUpdateMsg alarmCommentUpdateMsg = 22;
repeated RuleChainUpdateMsg ruleChainUpdateMsg = 23;
repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 24;
repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 25;
}
message UplinkResponseMsg {
@ -472,4 +481,5 @@ message DownlinkMsg {
repeated NotificationTargetUpdateMsg notificationTargetUpdateMsg = 32;
repeated NotificationTemplateUpdateMsg notificationTemplateUpdateMsg = 33;
repeated OAuth2DomainUpdateMsg oAuth2DomainUpdateMsg = 34;
repeated CalculatedFieldUpdateMsg calculatedFieldUpdateMsg = 35;
}

23
dao/src/main/java/org/thingsboard/server/dao/cf/BaseCalculatedFieldService.java

@ -57,7 +57,18 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
@Override
public CalculatedField save(CalculatedField calculatedField) {
CalculatedField oldCalculatedField = calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId);
return doSave(calculatedField, true);
}
@Override
public CalculatedField save(CalculatedField calculatedField, boolean doValidate) {
return doSave(calculatedField, doValidate);
}
private CalculatedField doSave(CalculatedField calculatedField, boolean doValidate) {
if (doValidate) {
calculatedFieldDataValidator.validate(calculatedField, CalculatedField::getTenantId);
}
try {
TenantId tenantId = calculatedField.getTenantId();
log.trace("Executing save calculated field, [{}]", calculatedField);
@ -65,7 +76,7 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
CalculatedField savedCalculatedField = calculatedFieldDao.save(tenantId, calculatedField);
createOrUpdateCalculatedFieldLink(tenantId, savedCalculatedField);
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedCalculatedField.getTenantId()).entityId(savedCalculatedField.getId())
.entity(savedCalculatedField).oldEntity(oldCalculatedField).created(calculatedField.getId() == null).build());
.entity(savedCalculatedField).oldEntity(calculatedField).created(calculatedField.getId() == null).build());
return savedCalculatedField;
} catch (Exception e) {
checkConstraintViolation(e,
@ -83,6 +94,14 @@ public class BaseCalculatedFieldService extends AbstractEntityService implements
return calculatedFieldDao.findById(tenantId, calculatedFieldId.getId());
}
@Override
public CalculatedField findByTenantIdAndName(TenantId tenantId, String name) {
log.trace("Executing findByTenantIdAndName [{}], calculatedFieldName[{}]", tenantId, name);
validateId(tenantId, id -> INCORRECT_TENANT_ID + id);
return calculatedFieldDao.findByTenantIdAndName(tenantId, name).orElse(null);
}
@Override
public List<CalculatedFieldId> findCalculatedFieldIdsByEntityId(TenantId tenantId, EntityId entityId) {
log.trace("Executing findCalculatedFieldIdsByEntityId [{}]", entityId);

3
dao/src/main/java/org/thingsboard/server/dao/cf/CalculatedFieldDao.java

@ -24,6 +24,7 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.Dao;
import java.util.List;
import java.util.Optional;
public interface CalculatedFieldDao extends Dao<CalculatedField> {
@ -35,6 +36,8 @@ public interface CalculatedFieldDao extends Dao<CalculatedField> {
List<CalculatedField> findAll();
Optional<CalculatedField> findByTenantIdAndName(TenantId tenantId, String name);
PageData<CalculatedField> findAll(PageLink pageLink);
PageData<CalculatedField> findAllByTenantId(TenantId tenantId, PageLink pageLink);

2
dao/src/main/java/org/thingsboard/server/dao/sql/cf/CalculatedFieldRepository.java

@ -28,6 +28,8 @@ public interface CalculatedFieldRepository extends JpaRepository<CalculatedField
boolean existsByTenantIdAndEntityId(UUID tenantId, UUID entityId);
CalculatedFieldEntity findByTenantIdAndName(UUID tenantId, String name);
List<CalculatedFieldId> findCalculatedFieldIdsByTenantIdAndEntityId(UUID tenantId, UUID entityId);
List<CalculatedFieldEntity> findAllByTenantIdAndEntityId(UUID tenantId, UUID entityId);

7
dao/src/main/java/org/thingsboard/server/dao/sql/cf/JpaCalculatedFieldDao.java

@ -34,6 +34,7 @@ import org.thingsboard.server.dao.sql.JpaAbstractDao;
import org.thingsboard.server.dao.util.SqlDao;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@Slf4j
@ -65,6 +66,12 @@ public class JpaCalculatedFieldDao extends JpaAbstractDao<CalculatedFieldEntity,
return DaoUtil.convertDataList(calculatedFieldRepository.findAll());
}
@Override
public Optional<CalculatedField> findByTenantIdAndName(TenantId tenantId, String name) {
CalculatedField calculatedField = DaoUtil.getData(calculatedFieldRepository.findByTenantIdAndName(tenantId.getId(), name));
return Optional.ofNullable(calculatedField);
}
@Override
public PageData<CalculatedField> findAll(PageLink pageLink) {
log.debug("Try to find calculated fields by pageLink [{}]", pageLink);

Loading…
Cancel
Save