Browse Source

Introduced processors for "to edge" msgs

pull/4918/head
Volodymyr Babak 5 years ago
parent
commit
fb6e152f7e
  1. 96
      application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java
  2. 557
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
  3. 41
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AdminSettingsProcessor.java
  4. 25
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmProcessor.java
  5. 68
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetProcessor.java
  6. 84
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java
  7. 61
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerProcessor.java
  8. 71
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DashboardProcessor.java
  9. 62
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java
  10. 62
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProfileProcessor.java
  11. 73
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityProcessor.java
  12. 68
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityViewProcessor.java
  13. 13
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationProcessor.java
  14. 81
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainProcessor.java
  15. 49
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java
  16. 71
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/UserProcessor.java
  17. 62
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetBundleProcessor.java
  18. 62
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetTypeProcessor.java

96
application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java

@ -54,6 +54,17 @@ import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstruct
import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor;
import org.thingsboard.server.service.edge.rpc.processor.AdminSettingsProcessor;
import org.thingsboard.server.service.edge.rpc.processor.AssetProcessor;
import org.thingsboard.server.service.edge.rpc.processor.CustomerProcessor;
import org.thingsboard.server.service.edge.rpc.processor.DashboardProcessor;
import org.thingsboard.server.service.edge.rpc.processor.DeviceProfileProcessor;
import org.thingsboard.server.service.edge.rpc.processor.EntityProcessor;
import org.thingsboard.server.service.edge.rpc.processor.EntityViewProcessor;
import org.thingsboard.server.service.edge.rpc.processor.RuleChainProcessor;
import org.thingsboard.server.service.edge.rpc.processor.UserProcessor;
import org.thingsboard.server.service.edge.rpc.processor.WidgetBundleProcessor;
import org.thingsboard.server.service.edge.rpc.processor.WidgetTypeProcessor;
import org.thingsboard.server.service.edge.rpc.sync.EdgeRequestsService;
import org.thingsboard.server.service.edge.rpc.processor.AlarmProcessor;
import org.thingsboard.server.service.edge.rpc.processor.DeviceProcessor;
@ -72,9 +83,6 @@ public class EdgeContextComponent {
@Autowired
private EdgeService edgeService;
@Autowired
private PartitionService partitionService;
@Lazy
@Autowired
private EdgeEventService edgeEventService;
@ -83,38 +91,14 @@ public class EdgeContextComponent {
@Autowired
private AssetService assetService;
@Lazy
@Autowired
private DeviceService deviceService;
@Lazy
@Autowired
private DeviceProfileService deviceProfileService;
@Lazy
@Autowired
private DeviceCredentialsService deviceCredentialsService;
@Lazy
@Autowired
private EntityViewService entityViewService;
@Lazy
@Autowired
private AttributesService attributesService;
@Lazy
@Autowired
private CustomerService customerService;
@Lazy
@Autowired
private RelationService relationService;
@Lazy
@Autowired
private AlarmService alarmService;
@Lazy
@Autowired
private DashboardService dashboardService;
@ -127,101 +111,73 @@ public class EdgeContextComponent {
@Autowired
private UserService userService;
@Lazy
@Autowired
private ActorService actorService;
@Lazy
@Autowired
private WidgetsBundleService widgetsBundleService;
@Lazy
@Autowired
private WidgetTypeService widgetTypeService;
@Lazy
@Autowired
private DeviceStateService deviceStateService;
@Lazy
@Autowired
private TbClusterService tbClusterService;
@Lazy
@Autowired
private EdgeRequestsService edgeRequestsService;
@Lazy
@Autowired
private RuleChainMsgConstructor ruleChainMsgConstructor;
@Lazy
@Autowired
private AlarmMsgConstructor alarmMsgConstructor;
@Lazy
@Autowired
private DeviceMsgConstructor deviceMsgConstructor;
@Lazy
@Autowired
private DeviceProfileMsgConstructor deviceProfileMsgConstructor;
private AlarmProcessor alarmProcessor;
@Lazy
@Autowired
private AssetMsgConstructor assetMsgConstructor;
private DeviceProfileProcessor deviceProfileProcessor;
@Lazy
@Autowired
private EntityViewMsgConstructor entityViewMsgConstructor;
private DeviceProcessor deviceProcessor;
@Lazy
@Autowired
private DashboardMsgConstructor dashboardMsgConstructor;
private EntityProcessor entityProcessor;
@Lazy
@Autowired
private CustomerMsgConstructor customerMsgConstructor;
private AssetProcessor assetProcessor;
@Lazy
@Autowired
private UserMsgConstructor userMsgConstructor;
private EntityViewProcessor entityViewProcessor;
@Lazy
@Autowired
private RelationMsgConstructor relationMsgConstructor;
private UserProcessor userProcessor;
@Lazy
@Autowired
private WidgetsBundleMsgConstructor widgetsBundleMsgConstructor;
private RelationProcessor relationProcessor;
@Lazy
@Autowired
private WidgetTypeMsgConstructor widgetTypeMsgConstructor;
private TelemetryProcessor telemetryProcessor;
@Lazy
@Autowired
private AdminSettingsMsgConstructor adminSettingsMsgConstructor;
private DashboardProcessor dashboardProcessor;
@Lazy
@Autowired
private EntityDataMsgConstructor entityDataMsgConstructor;
private RuleChainProcessor ruleChainProcessor;
@Lazy
@Autowired
private AlarmProcessor alarmProcessor;
private CustomerProcessor customerProcessor;
@Lazy
@Autowired
private DeviceProcessor deviceProcessor;
private WidgetBundleProcessor widgetBundleProcessor;
@Lazy
@Autowired
private RelationProcessor relationProcessor;
private WidgetTypeProcessor widgetTypeProcessor;
@Lazy
@Autowired
private TelemetryProcessor telemetryProcessor;
private AdminSettingsProcessor adminSettingsProcessor;
@Lazy
@Autowired

557
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java

@ -21,67 +21,32 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonElement;
import io.grpc.stub.StreamObserver;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.id.WidgetTypeId;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.UserCredentials;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
import org.thingsboard.server.gen.edge.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.ConnectRequestMsg;
import org.thingsboard.server.gen.edge.ConnectResponseCode;
import org.thingsboard.server.gen.edge.ConnectResponseMsg;
import org.thingsboard.server.gen.edge.CustomerUpdateMsg;
import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceProfileDevicesRequestMsg;
import org.thingsboard.server.gen.edge.DeviceProfileUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceRpcCallMsg;
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
@ -89,7 +54,6 @@ import org.thingsboard.server.gen.edge.DownlinkResponseMsg;
import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.EdgeUpdateMsg;
import org.thingsboard.server.gen.edge.EntityDataProto;
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
import org.thingsboard.server.gen.edge.EntityViewsRequestMsg;
import org.thingsboard.server.gen.edge.RelationRequestMsg;
import org.thingsboard.server.gen.edge.RelationUpdateMsg;
@ -97,17 +61,12 @@ import org.thingsboard.server.gen.edge.RequestMsg;
import org.thingsboard.server.gen.edge.RequestMsgType;
import org.thingsboard.server.gen.edge.ResponseMsg;
import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.SyncCompletedMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.gen.edge.UplinkMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.UserCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.WidgetBundleTypesRequestMsg;
import org.thingsboard.server.gen.edge.WidgetTypeUpdateMsg;
import org.thingsboard.server.gen.edge.WidgetsBundleUpdateMsg;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher;
@ -390,7 +349,7 @@ public final class EdgeGrpcSession implements Closeable {
}
private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) {
log.trace("[{}] converting edge event to downlink msg [{}]", this.sessionId, edgeEvent);
log.trace("[{}][{}] converting edge event to downlink msg [{}]", edge.getTenantId(), this.sessionId, edgeEvent);
DownlinkMsg downlinkMsg = null;
try {
switch (edgeEvent.getAction()) {
@ -407,25 +366,26 @@ public final class EdgeGrpcSession implements Closeable {
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
downlinkMsg = processEntityMessage(edgeEvent, edgeEvent.getAction());
log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg);
break;
case ATTRIBUTES_UPDATED:
case POST_ATTRIBUTES:
case ATTRIBUTES_DELETED:
case TIMESERIES_UPDATED:
downlinkMsg = processTelemetryMessage(edgeEvent);
downlinkMsg = ctx.getTelemetryProcessor().processTelemetryMessageToEdge(edgeEvent);
break;
case CREDENTIALS_REQUEST:
downlinkMsg = processCredentialsRequestMessage(edgeEvent);
downlinkMsg = ctx.getEntityProcessor().processCredentialsRequestMessageToEdge(edgeEvent);
break;
case ENTITY_MERGE_REQUEST:
downlinkMsg = processEntityMergeRequestMessage(edgeEvent);
downlinkMsg = ctx.getEntityProcessor().processEntityMergeRequestMessageToEdge(edge, edgeEvent);
break;
case RPC_CALL:
downlinkMsg = processRpcCallMsg(edgeEvent);
downlinkMsg = ctx.getDeviceProcessor().processRpcCallMsgToEdge(edgeEvent);
break;
}
} catch (Exception e) {
log.error("Exception during converting edge event to downlink msg", e);
log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e);
}
return downlinkMsg;
}
@ -437,49 +397,6 @@ public final class EdgeGrpcSession implements Closeable {
.collect(Collectors.toList());
}
private DownlinkMsg processEntityMergeRequestMessage(EdgeEvent edgeEvent) {
DownlinkMsg downlinkMsg = null;
if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) {
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
Device device = ctx.getDeviceService().findDeviceById(edge.getTenantId(), deviceId);
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device);
String conflictName = null;
if(edgeEvent.getBody() != null) {
conflictName = edgeEvent.getBody().get("conflictName").asText();
}
DeviceUpdateMsg d = ctx.getDeviceMsgConstructor()
.constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, customerId, conflictName);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceUpdateMsg(Collections.singletonList(d))
.build();
}
return downlinkMsg;
}
private DownlinkMsg processRpcCallMsg(EdgeEvent edgeEvent) {
log.trace("Executing processRpcCall, edgeEvent [{}]", edgeEvent);
DeviceRpcCallMsg deviceRpcCallMsg =
ctx.getDeviceMsgConstructor().constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody());
return DownlinkMsg.newBuilder()
.addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg))
.build();
}
private DownlinkMsg processCredentialsRequestMessage(EdgeEvent edgeEvent) {
DownlinkMsg downlinkMsg = null;
if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) {
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = DeviceCredentialsRequestMsg.newBuilder()
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.build();
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
.addAllDeviceCredentialsRequestMsg(Collections.singletonList(deviceCredentialsRequestMsg));
downlinkMsg = builder.build();
}
return downlinkMsg;
}
private ListenableFuture<Long> getQueueStartTs() {
ListenableFuture<Optional<AttributeKvEntry>> future =
ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY);
@ -500,454 +417,44 @@ public final class EdgeGrpcSession implements Closeable {
ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
}
private DownlinkMsg processTelemetryMessage(EdgeEvent edgeEvent) {
log.trace("[{}] Executing processTelemetryMessage, edgeEvent [{}]", this.sessionId, edgeEvent);
EntityId entityId = null;
switch (edgeEvent.getType()) {
case DEVICE:
entityId = new DeviceId(edgeEvent.getEntityId());
break;
case ASSET:
entityId = new AssetId(edgeEvent.getEntityId());
break;
case ENTITY_VIEW:
entityId = new EntityViewId(edgeEvent.getEntityId());
break;
case DASHBOARD:
entityId = new DashboardId(edgeEvent.getEntityId());
break;
case TENANT:
entityId = new TenantId(edgeEvent.getEntityId());
break;
case CUSTOMER:
entityId = new CustomerId(edgeEvent.getEntityId());
break;
case EDGE:
entityId = new EdgeId(edgeEvent.getEntityId());
break;
}
DownlinkMsg downlinkMsg = null;
if (entityId != null) {
log.debug("[{}] Sending telemetry data msg, entityId [{}], body [{}]", this.sessionId, edgeEvent.getEntityId(), edgeEvent.getBody());
try {
downlinkMsg = constructEntityDataProtoMsg(entityId, edgeEvent.getAction(), JsonUtils.parse(mapper.writeValueAsString(edgeEvent.getBody())));
} catch (Exception e) {
log.warn("[{}] Can't send telemetry data msg, entityId [{}], body [{}]", this.sessionId, edgeEvent.getEntityId(), edgeEvent.getBody(), e);
}
}
return downlinkMsg;
}
private DownlinkMsg processEntityMessage(EdgeEvent edgeEvent, EdgeEventActionType action) {
UpdateMsgType msgType = getResponseMsgType(edgeEvent.getAction());
log.trace("Executing processEntityMessage, edgeEvent [{}], action [{}], msgType [{}]", edgeEvent, action, msgType);
switch (edgeEvent.getType()) {
case DEVICE:
return processDevice(edgeEvent, msgType, action);
return ctx.getDeviceProcessor().processDeviceToEdge(edge, edgeEvent, msgType, action);
case DEVICE_PROFILE:
return processDeviceProfile(edgeEvent, msgType, action);
return ctx.getDeviceProfileProcessor().processDeviceProfileToEdge(edgeEvent, msgType, action);
case ASSET:
return processAsset(edgeEvent, msgType, action);
return ctx.getAssetProcessor().processAssetToEdge(edge, edgeEvent, msgType, action);
case ENTITY_VIEW:
return processEntityView(edgeEvent, msgType, action);
return ctx.getEntityViewProcessor().processEntityViewToEdge(edge, edgeEvent, msgType, action);
case DASHBOARD:
return processDashboard(edgeEvent, msgType, action);
return ctx.getDashboardProcessor().processDashboardToEdge(edge, edgeEvent, msgType, action);
case CUSTOMER:
return processCustomer(edgeEvent, msgType, action);
return ctx.getCustomerProcessor().processCustomerToEdge(edgeEvent, msgType, action);
case RULE_CHAIN:
return processRuleChain(edgeEvent, msgType, action);
return ctx.getRuleChainProcessor().processRuleChainToEdge(edge, edgeEvent, msgType, action);
case RULE_CHAIN_METADATA:
return processRuleChainMetadata(edgeEvent, msgType);
return ctx.getRuleChainProcessor().processRuleChainMetadataToEdge(edgeEvent, msgType);
case ALARM:
return processAlarm(edgeEvent, msgType);
return ctx.getAlarmProcessor().processAlarmToEdge(edge, edgeEvent, msgType);
case USER:
return processUser(edgeEvent, msgType, action);
return ctx.getUserProcessor().processUserToEdge(edge, edgeEvent, msgType, action);
case RELATION:
return processRelation(edgeEvent, msgType);
return ctx.getRelationProcessor().processRelationToEdge(edgeEvent, msgType);
case WIDGETS_BUNDLE:
return processWidgetsBundle(edgeEvent, msgType, action);
return ctx.getWidgetBundleProcessor().processWidgetsBundleToEdge(edgeEvent, msgType, action);
case WIDGET_TYPE:
return processWidgetType(edgeEvent, msgType, action);
return ctx.getWidgetTypeProcessor().processWidgetTypeToEdge(edgeEvent, msgType, action);
case ADMIN_SETTINGS:
return processAdminSettings(edgeEvent);
return ctx.getAdminSettingsProcessor().processAdminSettingsToEdge(edgeEvent);
default:
log.warn("Unsupported edge event type [{}]", edgeEvent);
return null;
}
}
private DownlinkMsg processDevice(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) {
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEdgeEventActionType) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
Device device = ctx.getDeviceService().findDeviceById(edgeEvent.getTenantId(), deviceId);
if (device != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device);
DeviceUpdateMsg deviceUpdateMsg =
ctx.getDeviceMsgConstructor().constructDeviceUpdatedMsg(msgType, device, customerId, null);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
DeviceUpdateMsg deviceUpdateMsg =
ctx.getDeviceMsgConstructor().constructDeviceDeleteMsg(deviceId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg))
.build();
break;
case CREDENTIALS_UPDATED:
DeviceCredentials deviceCredentials = ctx.getDeviceCredentialsService().findDeviceCredentialsByDeviceId(edge.getTenantId(), deviceId);
if (deviceCredentials != null) {
DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg =
ctx.getDeviceMsgConstructor().constructDeviceCredentialsUpdatedMsg(deviceCredentials);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceCredentialsUpdateMsg(Collections.singletonList(deviceCredentialsUpdateMsg))
.build();
}
break;
}
log.trace("[{}] device processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processDeviceProfile(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
DeviceProfileId deviceProfileId = new DeviceProfileId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
DeviceProfile deviceProfile = ctx.getDeviceProfileService().findDeviceProfileById(edgeEvent.getTenantId(), deviceProfileId);
if (deviceProfile != null) {
DeviceProfileUpdateMsg deviceProfileUpdateMsg =
ctx.getDeviceProfileMsgConstructor().constructDeviceProfileUpdatedMsg(msgType, deviceProfile);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg))
.build();
}
break;
case DELETED:
DeviceProfileUpdateMsg deviceProfileUpdateMsg =
ctx.getDeviceProfileMsgConstructor().constructDeviceProfileDeleteMsg(deviceProfileId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg))
.build();
break;
}
log.trace("[{}] device profile processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processAsset(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
AssetId assetId = new AssetId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
Asset asset = ctx.getAssetService().findAssetById(edgeEvent.getTenantId(), assetId);
if (asset != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(asset);
AssetUpdateMsg assetUpdateMsg =
ctx.getAssetMsgConstructor().constructAssetUpdatedMsg(msgType, asset, customerId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
AssetUpdateMsg assetUpdateMsg =
ctx.getAssetMsgConstructor().constructAssetDeleteMsg(assetId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg))
.build();
break;
}
log.trace("[{}] asset processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processEntityView(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
EntityViewId entityViewId = new EntityViewId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
EntityView entityView = ctx.getEntityViewService().findEntityViewById(edgeEvent.getTenantId(), entityViewId);
if (entityView != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(entityView);
EntityViewUpdateMsg entityViewUpdateMsg =
ctx.getEntityViewMsgConstructor().constructEntityViewUpdatedMsg(msgType, entityView, customerId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
EntityViewUpdateMsg entityViewUpdateMsg =
ctx.getEntityViewMsgConstructor().constructEntityViewDeleteMsg(entityViewId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg))
.build();
break;
}
log.trace("[{}] entity view processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processDashboard(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
DashboardId dashboardId = new DashboardId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
Dashboard dashboard = ctx.getDashboardService().findDashboardById(edgeEvent.getTenantId(), dashboardId);
if (dashboard != null) {
CustomerId customerId = null;
if (!edge.getCustomerId().isNullUid() && dashboard.isAssignedToCustomer(edge.getCustomerId())) {
customerId = edge.getCustomerId();
}
DashboardUpdateMsg dashboardUpdateMsg =
ctx.getDashboardMsgConstructor().constructDashboardUpdatedMsg(msgType, dashboard, customerId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
DashboardUpdateMsg dashboardUpdateMsg =
ctx.getDashboardMsgConstructor().constructDashboardDeleteMsg(dashboardId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg))
.build();
break;
}
log.trace("[{}] dashboard processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processCustomer(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
CustomerId customerId = new CustomerId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
Customer customer = ctx.getCustomerService().findCustomerById(edgeEvent.getTenantId(), customerId);
if (customer != null) {
CustomerUpdateMsg customerUpdateMsg =
ctx.getCustomerMsgConstructor().constructCustomerUpdatedMsg(msgType, customer);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg))
.build();
}
break;
case DELETED:
CustomerUpdateMsg customerUpdateMsg =
ctx.getCustomerMsgConstructor().constructCustomerDeleteMsg(customerId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg))
.build();
break;
}
log.trace("[{}] customer processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processRuleChain(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
RuleChain ruleChain = ctx.getRuleChainService().findRuleChainById(edgeEvent.getTenantId(), ruleChainId);
if (ruleChain != null) {
RuleChainUpdateMsg ruleChainUpdateMsg =
ctx.getRuleChainMsgConstructor().constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
downlinkMsg = DownlinkMsg.newBuilder()
.addAllRuleChainUpdateMsg(Collections.singletonList(ctx.getRuleChainMsgConstructor().constructRuleChainDeleteMsg(ruleChainId)))
.build();
break;
}
log.trace("[{}] rule chain processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processRuleChainMetadata(EdgeEvent edgeEvent, UpdateMsgType msgType) {
RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
RuleChain ruleChain = ctx.getRuleChainService().findRuleChainById(edgeEvent.getTenantId(), ruleChainId);
DownlinkMsg downlinkMsg = null;
if (ruleChain != null) {
RuleChainMetaData ruleChainMetaData = ctx.getRuleChainService().loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId);
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
ctx.getRuleChainMsgConstructor().constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
if (ruleChainMetadataUpdateMsg != null) {
downlinkMsg = DownlinkMsg.newBuilder()
.addAllRuleChainMetadataUpdateMsg(Collections.singletonList(ruleChainMetadataUpdateMsg))
.build();
}
}
log.trace("[{}] rule chain metadata processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processUser(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) {
UserId userId = new UserId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEdgeEventActionType) {
case ADDED:
case UPDATED:
User user = ctx.getUserService().findUserById(edgeEvent.getTenantId(), userId);
if (user != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(user);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllUserUpdateMsg(Collections.singletonList(ctx.getUserMsgConstructor().constructUserUpdatedMsg(msgType, user, customerId)))
.build();
}
break;
case DELETED:
downlinkMsg = DownlinkMsg.newBuilder()
.addAllUserUpdateMsg(Collections.singletonList(ctx.getUserMsgConstructor().constructUserDeleteMsg(userId)))
.build();
break;
case CREDENTIALS_UPDATED:
UserCredentials userCredentialsByUserId = ctx.getUserService().findUserCredentialsByUserId(edge.getTenantId(), userId);
if (userCredentialsByUserId != null && userCredentialsByUserId.isEnabled()) {
UserCredentialsUpdateMsg userCredentialsUpdateMsg =
ctx.getUserMsgConstructor().constructUserCredentialsUpdatedMsg(userCredentialsByUserId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllUserCredentialsUpdateMsg(Collections.singletonList(userCredentialsUpdateMsg))
.build();
}
}
log.trace("[{}] user processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity) {
if (!edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(hasCustomerIdEntity.getCustomerId())) {
return edge.getCustomerId();
} else {
return null;
}
}
private DownlinkMsg processRelation(EdgeEvent edgeEvent, UpdateMsgType msgType) {
EntityRelation entityRelation = mapper.convertValue(edgeEvent.getBody(), EntityRelation.class);
RelationUpdateMsg r = ctx.getRelationMsgConstructor().constructRelationUpdatedMsg(msgType, entityRelation);
DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder()
.addAllRelationUpdateMsg(Collections.singletonList(r))
.build();
log.trace("[{}] relation processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processAlarm(EdgeEvent edgeEvent, UpdateMsgType msgType) {
DownlinkMsg downlinkMsg = null;
try {
AlarmId alarmId = new AlarmId(edgeEvent.getEntityId());
Alarm alarm = ctx.getAlarmService().findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get();
if (alarm != null) {
downlinkMsg = DownlinkMsg.newBuilder()
.addAllAlarmUpdateMsg(Collections.singletonList(ctx.getAlarmMsgConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm)))
.build();
}
} catch (Exception e) {
log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e);
}
log.trace("[{}] alarm processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processWidgetsBundle(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) {
WidgetsBundleId widgetsBundleId = new WidgetsBundleId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEdgeEventActionType) {
case ADDED:
case UPDATED:
WidgetsBundle widgetsBundle = ctx.getWidgetsBundleService().findWidgetsBundleById(edgeEvent.getTenantId(), widgetsBundleId);
if (widgetsBundle != null) {
WidgetsBundleUpdateMsg widgetsBundleUpdateMsg =
ctx.getWidgetsBundleMsgConstructor().constructWidgetsBundleUpdateMsg(msgType, widgetsBundle);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg))
.build();
}
break;
case DELETED:
WidgetsBundleUpdateMsg widgetsBundleUpdateMsg =
ctx.getWidgetsBundleMsgConstructor().constructWidgetsBundleDeleteMsg(widgetsBundleId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg))
.build();
break;
}
log.trace("[{}] widget bundle processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processWidgetType(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) {
WidgetTypeId widgetTypeId = new WidgetTypeId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEdgeEventActionType) {
case ADDED:
case UPDATED:
WidgetType widgetType = ctx.getWidgetTypeService().findWidgetTypeById(edgeEvent.getTenantId(), widgetTypeId);
if (widgetType != null) {
WidgetTypeUpdateMsg widgetTypeUpdateMsg =
ctx.getWidgetTypeMsgConstructor().constructWidgetTypeUpdateMsg(msgType, widgetType);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg))
.build();
}
break;
case DELETED:
WidgetTypeUpdateMsg widgetTypeUpdateMsg =
ctx.getWidgetTypeMsgConstructor().constructWidgetTypeDeleteMsg(widgetTypeId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg))
.build();
break;
}
log.trace("[{}] widget type processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private DownlinkMsg processAdminSettings(EdgeEvent edgeEvent) {
AdminSettings adminSettings = mapper.convertValue(edgeEvent.getBody(), AdminSettings.class);
AdminSettingsUpdateMsg t = ctx.getAdminSettingsMsgConstructor().constructAdminSettingsUpdateMsg(adminSettings);
DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder()
.addAllAdminSettingsUpdateMsg(Collections.singletonList(t))
.build();
log.trace("[{}] admin settings processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private UpdateMsgType getResponseMsgType(EdgeEventActionType actionType) {
switch (actionType) {
case UPDATED:
@ -972,41 +479,33 @@ public final class EdgeGrpcSession implements Closeable {
}
}
private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) {
EntityDataProto entityDataProto = ctx.getEntityDataMsgConstructor().constructEntityDataMsg(entityId, actionType, entityData);
DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder()
.addAllEntityData(Collections.singletonList(entityDataProto))
.build();
log.trace("[{}] entity data proto processed [{}]", this.sessionId, downlinkMsg);
return downlinkMsg;
}
private ListenableFuture<List<Void>> processUplinkMsg(UplinkMsg uplinkMsg) {
List<ListenableFuture<Void>> result = new ArrayList<>();
try {
if (uplinkMsg.getEntityDataCount() > 0) {
for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), edge.getCustomerId(), entityData));
result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData));
}
}
if (uplinkMsg.getDeviceUpdateMsgCount() > 0) {
for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) {
result.add(ctx.getDeviceProcessor().onDeviceUpdate(edge.getTenantId(), edge, deviceUpdateMsg));
result.add(ctx.getDeviceProcessor().processDeviceFromEdge(edge.getTenantId(), edge, deviceUpdateMsg));
}
}
if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) {
for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) {
result.add(ctx.getDeviceProcessor().onDeviceCredentialsUpdate(edge.getTenantId(), deviceCredentialsUpdateMsg));
result.add(ctx.getDeviceProcessor().processDeviceCredentialsFromEdge(edge.getTenantId(), deviceCredentialsUpdateMsg));
}
}
if (uplinkMsg.getAlarmUpdateMsgCount() > 0) {
for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) {
result.add(ctx.getAlarmProcessor().onAlarmUpdate(edge.getTenantId(), alarmUpdateMsg));
result.add(ctx.getAlarmProcessor().processAlarmFromEdge(edge.getTenantId(), alarmUpdateMsg));
}
}
if (uplinkMsg.getRelationUpdateMsgCount() > 0) {
for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) {
result.add(ctx.getRelationProcessor().onRelationUpdate(edge.getTenantId(), relationUpdateMsg));
result.add(ctx.getRelationProcessor().processRelationFromEdge(edge.getTenantId(), relationUpdateMsg));
}
}
if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) {
@ -1036,7 +535,7 @@ public final class EdgeGrpcSession implements Closeable {
}
if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) {
for (DeviceRpcCallMsg deviceRpcCallMsg : uplinkMsg.getDeviceRpcCallMsgList()) {
result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseMsg(edge.getTenantId(), deviceRpcCallMsg));
result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseFromEdge(edge.getTenantId(), deviceRpcCallMsg));
}
}
if (uplinkMsg.getDeviceProfileDevicesRequestMsgCount() > 0) {
@ -1107,7 +606,7 @@ public final class EdgeGrpcSession implements Closeable {
.setCloudType("CE");
if (edge.getCustomerId() != null) {
builder.setCustomerIdMSB(edge.getCustomerId().getId().getMostSignificantBits())
.setCustomerIdLSB(edge.getCustomerId().getId().getLeastSignificantBits());
.setCustomerIdLSB(edge.getCustomerId().getId().getLeastSignificantBits());
}
return builder
.build();

41
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AdminSettingsProcessor.java

@ -0,0 +1,41 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class AdminSettingsProcessor extends BaseProcessor {
public DownlinkMsg processAdminSettingsToEdge(EdgeEvent edgeEvent) {
AdminSettings adminSettings = mapper.convertValue(edgeEvent.getBody(), AdminSettings.class);
AdminSettingsUpdateMsg t = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings);
return DownlinkMsg.newBuilder()
.addAllAdminSettingsUpdateMsg(Collections.singletonList(t))
.build();
}
}

25
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmProcessor.java

@ -23,17 +23,24 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class AlarmProcessor extends BaseProcessor {
public ListenableFuture<Void> onAlarmUpdate(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
public ListenableFuture<Void> processAlarmFromEdge(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) {
log.trace("[{}] onAlarmUpdate [{}]", tenantId, alarmUpdateMsg);
EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(),
EntityType.valueOf(alarmUpdateMsg.getOriginatorType()));
@ -96,4 +103,20 @@ public class AlarmProcessor extends BaseProcessor {
return null;
}
}
public DownlinkMsg processAlarmToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType) {
DownlinkMsg downlinkMsg = null;
try {
AlarmId alarmId = new AlarmId(edgeEvent.getEntityId());
Alarm alarm = alarmService.findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get();
if (alarm != null) {
downlinkMsg = DownlinkMsg.newBuilder()
.addAllAlarmUpdateMsg(Collections.singletonList(alarmMsgConstructor.constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm)))
.build();
}
} catch (Exception e) {
log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e);
}
return downlinkMsg;
}
}

68
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetProcessor.java

@ -0,0 +1,68 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class AssetProcessor extends BaseProcessor {
public DownlinkMsg processAssetToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
AssetId assetId = new AssetId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
Asset asset = assetService.findAssetById(edgeEvent.getTenantId(), assetId);
if (asset != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(asset, edge);
AssetUpdateMsg assetUpdateMsg =
assetMsgConstructor.constructAssetUpdatedMsg(msgType, asset, customerId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
AssetUpdateMsg assetUpdateMsg =
assetMsgConstructor.constructAssetDeleteMsg(assetId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg))
.build();
break;
}
return downlinkMsg;
}
}

84
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java

@ -23,9 +23,12 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.HasCustomerId;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -35,14 +38,31 @@ import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeEventService;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.CustomerMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.DashboardMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.DeviceMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.DeviceProfileMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.EntityDataMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.EntityViewMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.RelationMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.DefaultTbDeviceProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.state.DeviceStateService;
@ -52,6 +72,9 @@ public abstract class BaseProcessor {
protected static final ObjectMapper mapper = new ObjectMapper();
@Autowired
protected RuleChainService ruleChainService;
@Autowired
protected AlarmService alarmService;
@ -79,6 +102,9 @@ public abstract class BaseProcessor {
@Autowired
protected UserService userService;
@Autowired
protected DeviceProfileService deviceProfileService;
@Autowired
protected RelationService relationService;
@ -97,6 +123,54 @@ public abstract class BaseProcessor {
@Autowired
protected EdgeEventService edgeEventService;
@Autowired
protected WidgetsBundleService widgetsBundleService;
@Autowired
protected WidgetTypeService widgetTypeService;
@Autowired
protected EntityDataMsgConstructor entityDataMsgConstructor;
@Autowired
protected RuleChainMsgConstructor ruleChainMsgConstructor;
@Autowired
protected AlarmMsgConstructor alarmMsgConstructor;
@Autowired
protected DeviceMsgConstructor deviceMsgConstructor;
@Autowired
protected AssetMsgConstructor assetMsgConstructor;
@Autowired
protected EntityViewMsgConstructor entityViewMsgConstructor;
@Autowired
protected DashboardMsgConstructor dashboardMsgConstructor;
@Autowired
protected RelationMsgConstructor relationMsgConstructor;
@Autowired
protected UserMsgConstructor userMsgConstructor;
@Autowired
protected CustomerMsgConstructor customerMsgConstructor;
@Autowired
protected DeviceProfileMsgConstructor deviceProfileMsgConstructor;
@Autowired
protected WidgetsBundleMsgConstructor widgetsBundleMsgConstructor;
@Autowired
protected WidgetTypeMsgConstructor widgetTypeMsgConstructor;
@Autowired
protected AdminSettingsMsgConstructor adminSettingsMsgConstructor;
@Autowired
protected DbCallbackExecutorService dbCallbackExecutorService;
@ -133,4 +207,12 @@ public abstract class BaseProcessor {
}, dbCallbackExecutorService);
return future;
}
protected CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity, Edge edge) {
if (!edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(hasCustomerIdEntity.getCustomerId())) {
return edge.getCustomerId();
} else {
return null;
}
}
}

61
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerProcessor.java

@ -0,0 +1,61 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.gen.edge.CustomerUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class CustomerProcessor extends BaseProcessor {
public DownlinkMsg processCustomerToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
CustomerId customerId = new CustomerId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
Customer customer = customerService.findCustomerById(edgeEvent.getTenantId(), customerId);
if (customer != null) {
CustomerUpdateMsg customerUpdateMsg =
customerMsgConstructor.constructCustomerUpdatedMsg(msgType, customer);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg))
.build();
}
break;
case DELETED:
CustomerUpdateMsg customerUpdateMsg =
customerMsgConstructor.constructCustomerDeleteMsg(customerId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg))
.build();
break;
}
return downlinkMsg;
}
}

71
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DashboardProcessor.java

@ -0,0 +1,71 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class DashboardProcessor extends BaseProcessor {
public DownlinkMsg processDashboardToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
DashboardId dashboardId = new DashboardId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
Dashboard dashboard = dashboardService.findDashboardById(edgeEvent.getTenantId(), dashboardId);
if (dashboard != null) {
CustomerId customerId = null;
if (!edge.getCustomerId().isNullUid() && dashboard.isAssignedToCustomer(edge.getCustomerId())) {
customerId = edge.getCustomerId();
}
DashboardUpdateMsg dashboardUpdateMsg =
dashboardMsgConstructor.constructDashboardUpdatedMsg(msgType, dashboard, customerId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
DashboardUpdateMsg dashboardUpdateMsg =
dashboardMsgConstructor.constructDashboardDeleteMsg(dashboardId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg))
.build();
break;
}
return downlinkMsg;
}
}

62
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java

@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CustomerId;
@ -51,12 +52,15 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceRpcCallMsg;
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
@ -68,7 +72,9 @@ public class DeviceProcessor extends BaseProcessor {
private static final ReentrantLock deviceCreationLock = new ReentrantLock();
public ListenableFuture<Void> onDeviceUpdate(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
// TODO onDeviceUpdateFromEdge onDeviceUpdateToEdge
public ListenableFuture<Void> processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) {
log.trace("[{}] onDeviceUpdate [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName());
switch (deviceUpdateMsg.getMsgType()) {
case ENTITY_CREATED_RPC_MESSAGE:
@ -133,7 +139,7 @@ public class DeviceProcessor extends BaseProcessor {
return Futures.immediateFuture(null);
}
public ListenableFuture<Void> onDeviceCredentialsUpdate(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
public ListenableFuture<Void> processDeviceCredentialsFromEdge(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) {
log.debug("Executing onDeviceCredentialsUpdate, deviceCredentialsUpdateMsg [{}]", deviceCredentialsUpdateMsg);
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB()));
ListenableFuture<Device> deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId);
@ -264,7 +270,7 @@ public class DeviceProcessor extends BaseProcessor {
return metaData;
}
public ListenableFuture<Void> processDeviceRpcCallResponseMsg(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) {
public ListenableFuture<Void> processDeviceRpcCallResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) {
log.trace("[{}] processDeviceRpcCallResponseMsg [{}]", tenantId, deviceRpcCallMsg);
SettableFuture<Void> futureToSet = SettableFuture.create();
UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB());
@ -296,4 +302,54 @@ public class DeviceProcessor extends BaseProcessor {
return futureToSet;
}
public DownlinkMsg processDeviceToEdge(Edge edge, EdgeEvent edgeEvent,
UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) {
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEdgeEventActionType) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
Device device = deviceService.findDeviceById(edgeEvent.getTenantId(), deviceId);
if (device != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device, edge);
DeviceUpdateMsg deviceUpdateMsg =
deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device, customerId, null);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
DeviceUpdateMsg deviceUpdateMsg =
deviceMsgConstructor.constructDeviceDeleteMsg(deviceId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg))
.build();
break;
case CREDENTIALS_UPDATED:
DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(edge.getTenantId(), deviceId);
if (deviceCredentials != null) {
DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg =
deviceMsgConstructor.constructDeviceCredentialsUpdatedMsg(deviceCredentials);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceCredentialsUpdateMsg(Collections.singletonList(deviceCredentialsUpdateMsg))
.build();
}
break;
}
return downlinkMsg;
}
public DownlinkMsg processRpcCallMsgToEdge(EdgeEvent edgeEvent) {
log.trace("Executing processRpcCall, edgeEvent [{}]", edgeEvent);
DeviceRpcCallMsg deviceRpcCallMsg =
deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody());
return DownlinkMsg.newBuilder()
.addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg))
.build();
}
}

62
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProfileProcessor.java

@ -0,0 +1,62 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.gen.edge.DeviceProfileUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class DeviceProfileProcessor extends BaseProcessor {
public DownlinkMsg processDeviceProfileToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
DeviceProfileId deviceProfileId = new DeviceProfileId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(edgeEvent.getTenantId(), deviceProfileId);
if (deviceProfile != null) {
DeviceProfileUpdateMsg deviceProfileUpdateMsg =
deviceProfileMsgConstructor.constructDeviceProfileUpdatedMsg(msgType, deviceProfile);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg))
.build();
}
break;
case DELETED:
DeviceProfileUpdateMsg deviceProfileUpdateMsg =
deviceProfileMsgConstructor.constructDeviceProfileDeleteMsg(deviceProfileId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg))
.build();
break;
}
return downlinkMsg;
}
}

73
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityProcessor.java

@ -0,0 +1,73 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class EntityProcessor extends BaseProcessor {
public DownlinkMsg processEntityMergeRequestMessageToEdge(Edge edge, EdgeEvent edgeEvent) {
DownlinkMsg downlinkMsg = null;
if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) {
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
Device device = deviceService.findDeviceById(edge.getTenantId(), deviceId);
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device, edge);
String conflictName = null;
if(edgeEvent.getBody() != null) {
conflictName = edgeEvent.getBody().get("conflictName").asText();
}
DeviceUpdateMsg d = deviceMsgConstructor
.constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, customerId, conflictName);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllDeviceUpdateMsg(Collections.singletonList(d))
.build();
}
return downlinkMsg;
}
public DownlinkMsg processCredentialsRequestMessageToEdge(EdgeEvent edgeEvent) {
DownlinkMsg downlinkMsg = null;
if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) {
DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = DeviceCredentialsRequestMsg.newBuilder()
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.build();
DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
.addAllDeviceCredentialsRequestMsg(Collections.singletonList(deviceCredentialsRequestMsg));
downlinkMsg = builder.build();
}
return downlinkMsg;
}
}

68
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityViewProcessor.java

@ -0,0 +1,68 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class EntityViewProcessor extends BaseProcessor {
public DownlinkMsg processEntityViewToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
EntityViewId entityViewId = new EntityViewId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
EntityView entityView = entityViewService.findEntityViewById(edgeEvent.getTenantId(), entityViewId);
if (entityView != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(entityView, edge);
EntityViewUpdateMsg entityViewUpdateMsg =
entityViewMsgConstructor.constructEntityViewUpdatedMsg(msgType, entityView, customerId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
EntityViewUpdateMsg entityViewUpdateMsg =
entityViewMsgConstructor.constructEntityViewDeleteMsg(entityViewId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg))
.build();
break;
}
return downlinkMsg;
}
}

13
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationProcessor.java

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.AssetId;
@ -33,9 +34,12 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.RelationUpdateMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
import java.util.UUID;
@Component
@ -43,7 +47,7 @@ import java.util.UUID;
@TbCoreComponent
public class RelationProcessor extends BaseProcessor {
public ListenableFuture<Void> onRelationUpdate(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
public ListenableFuture<Void> processRelationFromEdge(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) {
log.trace("[{}] onRelationUpdate [{}]", tenantId, relationUpdateMsg);
try {
EntityRelation entityRelation = new EntityRelation();
@ -100,5 +104,12 @@ public class RelationProcessor extends BaseProcessor {
}
}
public DownlinkMsg processRelationToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType) {
EntityRelation entityRelation = mapper.convertValue(edgeEvent.getBody(), EntityRelation.class);
RelationUpdateMsg r = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation);
return DownlinkMsg.newBuilder()
.addAllRelationUpdateMsg(Collections.singletonList(r))
.build();
}
}

81
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainProcessor.java

@ -0,0 +1,81 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class RuleChainProcessor extends BaseProcessor {
public DownlinkMsg processRuleChainToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) {
RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (action) {
case ADDED:
case UPDATED:
case ASSIGNED_TO_EDGE:
RuleChain ruleChain = ruleChainService.findRuleChainById(edgeEvent.getTenantId(), ruleChainId);
if (ruleChain != null) {
RuleChainUpdateMsg ruleChainUpdateMsg =
ruleChainMsgConstructor.constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainUpdateMsg))
.build();
}
break;
case DELETED:
case UNASSIGNED_FROM_EDGE:
downlinkMsg = DownlinkMsg.newBuilder()
.addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainMsgConstructor.constructRuleChainDeleteMsg(ruleChainId)))
.build();
break;
}
return downlinkMsg;
}
public DownlinkMsg processRuleChainMetadataToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType) {
RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
RuleChain ruleChain = ruleChainService.findRuleChainById(edgeEvent.getTenantId(), ruleChainId);
DownlinkMsg downlinkMsg = null;
if (ruleChain != null) {
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId);
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
if (ruleChainMetadataUpdateMsg != null) {
downlinkMsg = DownlinkMsg.newBuilder()
.addAllRuleChainMetadataUpdateMsg(Collections.singletonList(ruleChainMetadataUpdateMsg))
.build();
}
}
return downlinkMsg;
}
}

49
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java

@ -15,11 +15,13 @@
*/
package org.thingsboard.server.service.edge.rpc.processor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
@ -32,10 +34,13 @@ import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.RuleChainId;
@ -50,6 +55,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.gen.edge.AttributeDeleteMsg;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.EntityDataProto;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
@ -58,6 +64,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -70,7 +77,7 @@ public class TelemetryProcessor extends BaseProcessor {
private final Gson gson = new Gson();
public List<ListenableFuture<Void>> onTelemetryUpdate(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) {
public List<ListenableFuture<Void>> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) {
log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData);
List<ListenableFuture<Void>> result = new ArrayList<>();
EntityId entityId = constructEntityId(entityData);
@ -279,4 +286,44 @@ public class TelemetryProcessor extends BaseProcessor {
return null;
}
}
public DownlinkMsg processTelemetryMessageToEdge(EdgeEvent edgeEvent) throws JsonProcessingException {
EntityId entityId = null;
switch (edgeEvent.getType()) {
case DEVICE:
entityId = new DeviceId(edgeEvent.getEntityId());
break;
case ASSET:
entityId = new AssetId(edgeEvent.getEntityId());
break;
case ENTITY_VIEW:
entityId = new EntityViewId(edgeEvent.getEntityId());
break;
case DASHBOARD:
entityId = new DashboardId(edgeEvent.getEntityId());
break;
case TENANT:
entityId = new TenantId(edgeEvent.getEntityId());
break;
case CUSTOMER:
entityId = new CustomerId(edgeEvent.getEntityId());
break;
case EDGE:
entityId = new EdgeId(edgeEvent.getEntityId());
break;
}
DownlinkMsg downlinkMsg = null;
if (entityId != null) {
downlinkMsg = constructEntityDataProtoMsg(entityId, edgeEvent.getAction(), JsonUtils.parse(mapper.writeValueAsString(edgeEvent.getBody())));
}
return downlinkMsg;
}
private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) {
EntityDataProto entityDataProto = entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData);
return DownlinkMsg.newBuilder()
.addAllEntityData(Collections.singletonList(entityDataProto))
.build();
}
}

71
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/UserProcessor.java

@ -0,0 +1,71 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.security.UserCredentials;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.gen.edge.UserCredentialsUpdateMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class UserProcessor extends BaseProcessor {
public DownlinkMsg processUserToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) {
UserId userId = new UserId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEdgeEventActionType) {
case ADDED:
case UPDATED:
User user = userService.findUserById(edgeEvent.getTenantId(), userId);
if (user != null) {
CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(user, edge);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllUserUpdateMsg(Collections.singletonList(userMsgConstructor.constructUserUpdatedMsg(msgType, user, customerId)))
.build();
}
break;
case DELETED:
downlinkMsg = DownlinkMsg.newBuilder()
.addAllUserUpdateMsg(Collections.singletonList(userMsgConstructor.constructUserDeleteMsg(userId)))
.build();
break;
case CREDENTIALS_UPDATED:
UserCredentials userCredentialsByUserId = userService.findUserCredentialsByUserId(edge.getTenantId(), userId);
if (userCredentialsByUserId != null && userCredentialsByUserId.isEnabled()) {
UserCredentialsUpdateMsg userCredentialsUpdateMsg =
userMsgConstructor.constructUserCredentialsUpdatedMsg(userCredentialsByUserId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllUserCredentialsUpdateMsg(Collections.singletonList(userCredentialsUpdateMsg))
.build();
}
}
return downlinkMsg;
}
}

62
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetBundleProcessor.java

@ -0,0 +1,62 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.gen.edge.WidgetsBundleUpdateMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class WidgetBundleProcessor extends BaseProcessor {
public DownlinkMsg processWidgetsBundleToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) {
WidgetsBundleId widgetsBundleId = new WidgetsBundleId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEdgeEventActionType) {
case ADDED:
case UPDATED:
WidgetsBundle widgetsBundle = widgetsBundleService.findWidgetsBundleById(edgeEvent.getTenantId(), widgetsBundleId);
if (widgetsBundle != null) {
WidgetsBundleUpdateMsg widgetsBundleUpdateMsg =
widgetsBundleMsgConstructor.constructWidgetsBundleUpdateMsg(msgType, widgetsBundle);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg))
.build();
}
break;
case DELETED:
WidgetsBundleUpdateMsg widgetsBundleUpdateMsg =
widgetsBundleMsgConstructor.constructWidgetsBundleDeleteMsg(widgetsBundleId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg))
.build();
break;
}
return downlinkMsg;
}
}

62
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetTypeProcessor.java

@ -0,0 +1,62 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.id.WidgetTypeId;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.gen.edge.DownlinkMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.gen.edge.WidgetTypeUpdateMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@Component
@Slf4j
@TbCoreComponent
public class WidgetTypeProcessor extends BaseProcessor {
public DownlinkMsg processWidgetTypeToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) {
WidgetTypeId widgetTypeId = new WidgetTypeId(edgeEvent.getEntityId());
DownlinkMsg downlinkMsg = null;
switch (edgeEdgeEventActionType) {
case ADDED:
case UPDATED:
WidgetType widgetType = widgetTypeService.findWidgetTypeById(edgeEvent.getTenantId(), widgetTypeId);
if (widgetType != null) {
WidgetTypeUpdateMsg widgetTypeUpdateMsg =
widgetTypeMsgConstructor.constructWidgetTypeUpdateMsg(msgType, widgetType);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg))
.build();
}
break;
case DELETED:
WidgetTypeUpdateMsg widgetTypeUpdateMsg =
widgetTypeMsgConstructor.constructWidgetTypeDeleteMsg(widgetTypeId);
downlinkMsg = DownlinkMsg.newBuilder()
.addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg))
.build();
break;
}
return downlinkMsg;
}
}
Loading…
Cancel
Save