diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index 62940f2931..c1fbcbc69b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -54,6 +54,17 @@ import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstruct import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor; +import org.thingsboard.server.service.edge.rpc.processor.AdminSettingsProcessor; +import org.thingsboard.server.service.edge.rpc.processor.AssetProcessor; +import org.thingsboard.server.service.edge.rpc.processor.CustomerProcessor; +import org.thingsboard.server.service.edge.rpc.processor.DashboardProcessor; +import org.thingsboard.server.service.edge.rpc.processor.DeviceProfileProcessor; +import org.thingsboard.server.service.edge.rpc.processor.EntityProcessor; +import org.thingsboard.server.service.edge.rpc.processor.EntityViewProcessor; +import org.thingsboard.server.service.edge.rpc.processor.RuleChainProcessor; +import org.thingsboard.server.service.edge.rpc.processor.UserProcessor; +import org.thingsboard.server.service.edge.rpc.processor.WidgetBundleProcessor; +import org.thingsboard.server.service.edge.rpc.processor.WidgetTypeProcessor; import org.thingsboard.server.service.edge.rpc.sync.EdgeRequestsService; import org.thingsboard.server.service.edge.rpc.processor.AlarmProcessor; import org.thingsboard.server.service.edge.rpc.processor.DeviceProcessor; @@ -72,9 +83,6 @@ public class EdgeContextComponent { @Autowired private EdgeService edgeService; - @Autowired - private PartitionService partitionService; - @Lazy @Autowired private EdgeEventService edgeEventService; @@ -83,38 +91,14 @@ public class EdgeContextComponent { @Autowired private AssetService assetService; - @Lazy - @Autowired - private DeviceService deviceService; - @Lazy @Autowired private DeviceProfileService deviceProfileService; - @Lazy - @Autowired - private DeviceCredentialsService deviceCredentialsService; - - @Lazy - @Autowired - private EntityViewService entityViewService; - @Lazy @Autowired private AttributesService attributesService; - @Lazy - @Autowired - private CustomerService customerService; - - @Lazy - @Autowired - private RelationService relationService; - - @Lazy - @Autowired - private AlarmService alarmService; - @Lazy @Autowired private DashboardService dashboardService; @@ -127,101 +111,73 @@ public class EdgeContextComponent { @Autowired private UserService userService; - @Lazy - @Autowired - private ActorService actorService; - @Lazy @Autowired private WidgetsBundleService widgetsBundleService; - @Lazy - @Autowired - private WidgetTypeService widgetTypeService; - - @Lazy - @Autowired - private DeviceStateService deviceStateService; - - @Lazy - @Autowired - private TbClusterService tbClusterService; - @Lazy @Autowired private EdgeRequestsService edgeRequestsService; @Lazy @Autowired - private RuleChainMsgConstructor ruleChainMsgConstructor; - - @Lazy - @Autowired - private AlarmMsgConstructor alarmMsgConstructor; - - @Lazy - @Autowired - private DeviceMsgConstructor deviceMsgConstructor; - - @Lazy - @Autowired - private DeviceProfileMsgConstructor deviceProfileMsgConstructor; + private AlarmProcessor alarmProcessor; @Lazy @Autowired - private AssetMsgConstructor assetMsgConstructor; + private DeviceProfileProcessor deviceProfileProcessor; @Lazy @Autowired - private EntityViewMsgConstructor entityViewMsgConstructor; + private DeviceProcessor deviceProcessor; @Lazy @Autowired - private DashboardMsgConstructor dashboardMsgConstructor; + private EntityProcessor entityProcessor; @Lazy @Autowired - private CustomerMsgConstructor customerMsgConstructor; + private AssetProcessor assetProcessor; @Lazy @Autowired - private UserMsgConstructor userMsgConstructor; + private EntityViewProcessor entityViewProcessor; @Lazy @Autowired - private RelationMsgConstructor relationMsgConstructor; + private UserProcessor userProcessor; @Lazy @Autowired - private WidgetsBundleMsgConstructor widgetsBundleMsgConstructor; + private RelationProcessor relationProcessor; @Lazy @Autowired - private WidgetTypeMsgConstructor widgetTypeMsgConstructor; + private TelemetryProcessor telemetryProcessor; @Lazy @Autowired - private AdminSettingsMsgConstructor adminSettingsMsgConstructor; + private DashboardProcessor dashboardProcessor; @Lazy @Autowired - private EntityDataMsgConstructor entityDataMsgConstructor; + private RuleChainProcessor ruleChainProcessor; @Lazy @Autowired - private AlarmProcessor alarmProcessor; + private CustomerProcessor customerProcessor; @Lazy @Autowired - private DeviceProcessor deviceProcessor; + private WidgetBundleProcessor widgetBundleProcessor; @Lazy @Autowired - private RelationProcessor relationProcessor; + private WidgetTypeProcessor widgetTypeProcessor; @Lazy @Autowired - private TelemetryProcessor telemetryProcessor; + private AdminSettingsProcessor adminSettingsProcessor; @Lazy @Autowired diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 01fa32795f..e95c5952f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -21,67 +21,32 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.gson.JsonElement; import io.grpc.stub.StreamObserver; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.data.AdminSettings; -import org.thingsboard.server.common.data.Customer; -import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.DeviceProfile; -import org.thingsboard.server.common.data.EntityView; -import org.thingsboard.server.common.data.HasCustomerId; -import org.thingsboard.server.common.data.User; -import org.thingsboard.server.common.data.alarm.Alarm; -import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.id.AlarmId; -import org.thingsboard.server.common.data.id.AssetId; -import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DashboardId; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityViewId; -import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.id.UserId; -import org.thingsboard.server.common.data.id.WidgetTypeId; -import org.thingsboard.server.common.data.id.WidgetsBundleId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.rule.RuleChain; -import org.thingsboard.server.common.data.rule.RuleChainMetaData; -import org.thingsboard.server.common.data.security.DeviceCredentials; -import org.thingsboard.server.common.data.security.UserCredentials; -import org.thingsboard.server.common.data.widget.WidgetType; -import org.thingsboard.server.common.data.widget.WidgetsBundle; -import org.thingsboard.server.common.transport.util.JsonUtils; -import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.AlarmUpdateMsg; -import org.thingsboard.server.gen.edge.AssetUpdateMsg; import org.thingsboard.server.gen.edge.AttributesRequestMsg; import org.thingsboard.server.gen.edge.ConnectRequestMsg; import org.thingsboard.server.gen.edge.ConnectResponseCode; import org.thingsboard.server.gen.edge.ConnectResponseMsg; -import org.thingsboard.server.gen.edge.CustomerUpdateMsg; -import org.thingsboard.server.gen.edge.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.DeviceProfileDevicesRequestMsg; -import org.thingsboard.server.gen.edge.DeviceProfileUpdateMsg; import org.thingsboard.server.gen.edge.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.DownlinkMsg; @@ -89,7 +54,6 @@ import org.thingsboard.server.gen.edge.DownlinkResponseMsg; import org.thingsboard.server.gen.edge.EdgeConfiguration; import org.thingsboard.server.gen.edge.EdgeUpdateMsg; import org.thingsboard.server.gen.edge.EntityDataProto; -import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; import org.thingsboard.server.gen.edge.EntityViewsRequestMsg; import org.thingsboard.server.gen.edge.RelationRequestMsg; import org.thingsboard.server.gen.edge.RelationUpdateMsg; @@ -97,17 +61,12 @@ import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.RequestMsgType; import org.thingsboard.server.gen.edge.ResponseMsg; import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg; -import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; -import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.SyncCompletedMsg; import org.thingsboard.server.gen.edge.UpdateMsgType; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg; -import org.thingsboard.server.gen.edge.UserCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.WidgetBundleTypesRequestMsg; -import org.thingsboard.server.gen.edge.WidgetTypeUpdateMsg; -import org.thingsboard.server.gen.edge.WidgetsBundleUpdateMsg; import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher; import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher; @@ -390,7 +349,7 @@ public final class EdgeGrpcSession implements Closeable { } private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) { - log.trace("[{}] converting edge event to downlink msg [{}]", this.sessionId, edgeEvent); + log.trace("[{}][{}] converting edge event to downlink msg [{}]", edge.getTenantId(), this.sessionId, edgeEvent); DownlinkMsg downlinkMsg = null; try { switch (edgeEvent.getAction()) { @@ -407,25 +366,26 @@ public final class EdgeGrpcSession implements Closeable { case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: downlinkMsg = processEntityMessage(edgeEvent, edgeEvent.getAction()); + log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg); break; case ATTRIBUTES_UPDATED: case POST_ATTRIBUTES: case ATTRIBUTES_DELETED: case TIMESERIES_UPDATED: - downlinkMsg = processTelemetryMessage(edgeEvent); + downlinkMsg = ctx.getTelemetryProcessor().processTelemetryMessageToEdge(edgeEvent); break; case CREDENTIALS_REQUEST: - downlinkMsg = processCredentialsRequestMessage(edgeEvent); + downlinkMsg = ctx.getEntityProcessor().processCredentialsRequestMessageToEdge(edgeEvent); break; case ENTITY_MERGE_REQUEST: - downlinkMsg = processEntityMergeRequestMessage(edgeEvent); + downlinkMsg = ctx.getEntityProcessor().processEntityMergeRequestMessageToEdge(edge, edgeEvent); break; case RPC_CALL: - downlinkMsg = processRpcCallMsg(edgeEvent); + downlinkMsg = ctx.getDeviceProcessor().processRpcCallMsgToEdge(edgeEvent); break; } } catch (Exception e) { - log.error("Exception during converting edge event to downlink msg", e); + log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e); } return downlinkMsg; } @@ -437,49 +397,6 @@ public final class EdgeGrpcSession implements Closeable { .collect(Collectors.toList()); } - private DownlinkMsg processEntityMergeRequestMessage(EdgeEvent edgeEvent) { - DownlinkMsg downlinkMsg = null; - if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) { - DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); - Device device = ctx.getDeviceService().findDeviceById(edge.getTenantId(), deviceId); - CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device); - String conflictName = null; - if(edgeEvent.getBody() != null) { - conflictName = edgeEvent.getBody().get("conflictName").asText(); - } - DeviceUpdateMsg d = ctx.getDeviceMsgConstructor() - .constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, customerId, conflictName); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllDeviceUpdateMsg(Collections.singletonList(d)) - .build(); - } - return downlinkMsg; - } - - private DownlinkMsg processRpcCallMsg(EdgeEvent edgeEvent) { - log.trace("Executing processRpcCall, edgeEvent [{}]", edgeEvent); - DeviceRpcCallMsg deviceRpcCallMsg = - ctx.getDeviceMsgConstructor().constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); - return DownlinkMsg.newBuilder() - .addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg)) - .build(); - } - - private DownlinkMsg processCredentialsRequestMessage(EdgeEvent edgeEvent) { - DownlinkMsg downlinkMsg = null; - if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) { - DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); - DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = DeviceCredentialsRequestMsg.newBuilder() - .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) - .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) - .build(); - DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() - .addAllDeviceCredentialsRequestMsg(Collections.singletonList(deviceCredentialsRequestMsg)); - downlinkMsg = builder.build(); - } - return downlinkMsg; - } - private ListenableFuture getQueueStartTs() { ListenableFuture> future = ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY); @@ -500,454 +417,44 @@ public final class EdgeGrpcSession implements Closeable { ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes); } - private DownlinkMsg processTelemetryMessage(EdgeEvent edgeEvent) { - log.trace("[{}] Executing processTelemetryMessage, edgeEvent [{}]", this.sessionId, edgeEvent); - EntityId entityId = null; - switch (edgeEvent.getType()) { - case DEVICE: - entityId = new DeviceId(edgeEvent.getEntityId()); - break; - case ASSET: - entityId = new AssetId(edgeEvent.getEntityId()); - break; - case ENTITY_VIEW: - entityId = new EntityViewId(edgeEvent.getEntityId()); - break; - case DASHBOARD: - entityId = new DashboardId(edgeEvent.getEntityId()); - break; - case TENANT: - entityId = new TenantId(edgeEvent.getEntityId()); - break; - case CUSTOMER: - entityId = new CustomerId(edgeEvent.getEntityId()); - break; - case EDGE: - entityId = new EdgeId(edgeEvent.getEntityId()); - break; - } - DownlinkMsg downlinkMsg = null; - if (entityId != null) { - log.debug("[{}] Sending telemetry data msg, entityId [{}], body [{}]", this.sessionId, edgeEvent.getEntityId(), edgeEvent.getBody()); - try { - downlinkMsg = constructEntityDataProtoMsg(entityId, edgeEvent.getAction(), JsonUtils.parse(mapper.writeValueAsString(edgeEvent.getBody()))); - } catch (Exception e) { - log.warn("[{}] Can't send telemetry data msg, entityId [{}], body [{}]", this.sessionId, edgeEvent.getEntityId(), edgeEvent.getBody(), e); - } - } - return downlinkMsg; - } - private DownlinkMsg processEntityMessage(EdgeEvent edgeEvent, EdgeEventActionType action) { UpdateMsgType msgType = getResponseMsgType(edgeEvent.getAction()); log.trace("Executing processEntityMessage, edgeEvent [{}], action [{}], msgType [{}]", edgeEvent, action, msgType); switch (edgeEvent.getType()) { case DEVICE: - return processDevice(edgeEvent, msgType, action); + return ctx.getDeviceProcessor().processDeviceToEdge(edge, edgeEvent, msgType, action); case DEVICE_PROFILE: - return processDeviceProfile(edgeEvent, msgType, action); + return ctx.getDeviceProfileProcessor().processDeviceProfileToEdge(edgeEvent, msgType, action); case ASSET: - return processAsset(edgeEvent, msgType, action); + return ctx.getAssetProcessor().processAssetToEdge(edge, edgeEvent, msgType, action); case ENTITY_VIEW: - return processEntityView(edgeEvent, msgType, action); + return ctx.getEntityViewProcessor().processEntityViewToEdge(edge, edgeEvent, msgType, action); case DASHBOARD: - return processDashboard(edgeEvent, msgType, action); + return ctx.getDashboardProcessor().processDashboardToEdge(edge, edgeEvent, msgType, action); case CUSTOMER: - return processCustomer(edgeEvent, msgType, action); + return ctx.getCustomerProcessor().processCustomerToEdge(edgeEvent, msgType, action); case RULE_CHAIN: - return processRuleChain(edgeEvent, msgType, action); + return ctx.getRuleChainProcessor().processRuleChainToEdge(edge, edgeEvent, msgType, action); case RULE_CHAIN_METADATA: - return processRuleChainMetadata(edgeEvent, msgType); + return ctx.getRuleChainProcessor().processRuleChainMetadataToEdge(edgeEvent, msgType); case ALARM: - return processAlarm(edgeEvent, msgType); + return ctx.getAlarmProcessor().processAlarmToEdge(edge, edgeEvent, msgType); case USER: - return processUser(edgeEvent, msgType, action); + return ctx.getUserProcessor().processUserToEdge(edge, edgeEvent, msgType, action); case RELATION: - return processRelation(edgeEvent, msgType); + return ctx.getRelationProcessor().processRelationToEdge(edgeEvent, msgType); case WIDGETS_BUNDLE: - return processWidgetsBundle(edgeEvent, msgType, action); + return ctx.getWidgetBundleProcessor().processWidgetsBundleToEdge(edgeEvent, msgType, action); case WIDGET_TYPE: - return processWidgetType(edgeEvent, msgType, action); + return ctx.getWidgetTypeProcessor().processWidgetTypeToEdge(edgeEvent, msgType, action); case ADMIN_SETTINGS: - return processAdminSettings(edgeEvent); + return ctx.getAdminSettingsProcessor().processAdminSettingsToEdge(edgeEvent); default: log.warn("Unsupported edge event type [{}]", edgeEvent); return null; } } - private DownlinkMsg processDevice(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) { - DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (edgeEdgeEventActionType) { - case ADDED: - case UPDATED: - case ASSIGNED_TO_EDGE: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: - Device device = ctx.getDeviceService().findDeviceById(edgeEvent.getTenantId(), deviceId); - if (device != null) { - CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device); - DeviceUpdateMsg deviceUpdateMsg = - ctx.getDeviceMsgConstructor().constructDeviceUpdatedMsg(msgType, device, customerId, null); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg)) - .build(); - } - break; - case DELETED: - case UNASSIGNED_FROM_EDGE: - DeviceUpdateMsg deviceUpdateMsg = - ctx.getDeviceMsgConstructor().constructDeviceDeleteMsg(deviceId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg)) - .build(); - break; - case CREDENTIALS_UPDATED: - DeviceCredentials deviceCredentials = ctx.getDeviceCredentialsService().findDeviceCredentialsByDeviceId(edge.getTenantId(), deviceId); - if (deviceCredentials != null) { - DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = - ctx.getDeviceMsgConstructor().constructDeviceCredentialsUpdatedMsg(deviceCredentials); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllDeviceCredentialsUpdateMsg(Collections.singletonList(deviceCredentialsUpdateMsg)) - .build(); - } - break; - } - log.trace("[{}] device processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processDeviceProfile(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { - DeviceProfileId deviceProfileId = new DeviceProfileId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (action) { - case ADDED: - case UPDATED: - DeviceProfile deviceProfile = ctx.getDeviceProfileService().findDeviceProfileById(edgeEvent.getTenantId(), deviceProfileId); - if (deviceProfile != null) { - DeviceProfileUpdateMsg deviceProfileUpdateMsg = - ctx.getDeviceProfileMsgConstructor().constructDeviceProfileUpdatedMsg(msgType, deviceProfile); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg)) - .build(); - } - break; - case DELETED: - DeviceProfileUpdateMsg deviceProfileUpdateMsg = - ctx.getDeviceProfileMsgConstructor().constructDeviceProfileDeleteMsg(deviceProfileId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg)) - .build(); - break; - } - log.trace("[{}] device profile processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processAsset(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { - AssetId assetId = new AssetId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (action) { - case ADDED: - case UPDATED: - case ASSIGNED_TO_EDGE: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: - Asset asset = ctx.getAssetService().findAssetById(edgeEvent.getTenantId(), assetId); - if (asset != null) { - CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(asset); - AssetUpdateMsg assetUpdateMsg = - ctx.getAssetMsgConstructor().constructAssetUpdatedMsg(msgType, asset, customerId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg)) - .build(); - } - break; - case DELETED: - case UNASSIGNED_FROM_EDGE: - AssetUpdateMsg assetUpdateMsg = - ctx.getAssetMsgConstructor().constructAssetDeleteMsg(assetId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg)) - .build(); - break; - } - log.trace("[{}] asset processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processEntityView(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { - EntityViewId entityViewId = new EntityViewId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (action) { - case ADDED: - case UPDATED: - case ASSIGNED_TO_EDGE: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: - EntityView entityView = ctx.getEntityViewService().findEntityViewById(edgeEvent.getTenantId(), entityViewId); - if (entityView != null) { - CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(entityView); - EntityViewUpdateMsg entityViewUpdateMsg = - ctx.getEntityViewMsgConstructor().constructEntityViewUpdatedMsg(msgType, entityView, customerId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg)) - .build(); - } - break; - case DELETED: - case UNASSIGNED_FROM_EDGE: - EntityViewUpdateMsg entityViewUpdateMsg = - ctx.getEntityViewMsgConstructor().constructEntityViewDeleteMsg(entityViewId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg)) - .build(); - break; - } - log.trace("[{}] entity view processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processDashboard(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { - DashboardId dashboardId = new DashboardId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (action) { - case ADDED: - case UPDATED: - case ASSIGNED_TO_EDGE: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: - Dashboard dashboard = ctx.getDashboardService().findDashboardById(edgeEvent.getTenantId(), dashboardId); - if (dashboard != null) { - CustomerId customerId = null; - if (!edge.getCustomerId().isNullUid() && dashboard.isAssignedToCustomer(edge.getCustomerId())) { - customerId = edge.getCustomerId(); - } - DashboardUpdateMsg dashboardUpdateMsg = - ctx.getDashboardMsgConstructor().constructDashboardUpdatedMsg(msgType, dashboard, customerId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg)) - .build(); - } - break; - case DELETED: - case UNASSIGNED_FROM_EDGE: - DashboardUpdateMsg dashboardUpdateMsg = - ctx.getDashboardMsgConstructor().constructDashboardDeleteMsg(dashboardId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg)) - .build(); - break; - } - log.trace("[{}] dashboard processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processCustomer(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { - CustomerId customerId = new CustomerId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (action) { - case ADDED: - case UPDATED: - Customer customer = ctx.getCustomerService().findCustomerById(edgeEvent.getTenantId(), customerId); - if (customer != null) { - CustomerUpdateMsg customerUpdateMsg = - ctx.getCustomerMsgConstructor().constructCustomerUpdatedMsg(msgType, customer); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg)) - .build(); - } - break; - case DELETED: - CustomerUpdateMsg customerUpdateMsg = - ctx.getCustomerMsgConstructor().constructCustomerDeleteMsg(customerId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg)) - .build(); - break; - } - log.trace("[{}] customer processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processRuleChain(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { - RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (action) { - case ADDED: - case UPDATED: - case ASSIGNED_TO_EDGE: - RuleChain ruleChain = ctx.getRuleChainService().findRuleChainById(edgeEvent.getTenantId(), ruleChainId); - if (ruleChain != null) { - RuleChainUpdateMsg ruleChainUpdateMsg = - ctx.getRuleChainMsgConstructor().constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainUpdateMsg)) - .build(); - } - break; - case DELETED: - case UNASSIGNED_FROM_EDGE: - downlinkMsg = DownlinkMsg.newBuilder() - .addAllRuleChainUpdateMsg(Collections.singletonList(ctx.getRuleChainMsgConstructor().constructRuleChainDeleteMsg(ruleChainId))) - .build(); - break; - } - log.trace("[{}] rule chain processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processRuleChainMetadata(EdgeEvent edgeEvent, UpdateMsgType msgType) { - RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId()); - RuleChain ruleChain = ctx.getRuleChainService().findRuleChainById(edgeEvent.getTenantId(), ruleChainId); - DownlinkMsg downlinkMsg = null; - if (ruleChain != null) { - RuleChainMetaData ruleChainMetaData = ctx.getRuleChainService().loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId); - RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = - ctx.getRuleChainMsgConstructor().constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData); - if (ruleChainMetadataUpdateMsg != null) { - downlinkMsg = DownlinkMsg.newBuilder() - .addAllRuleChainMetadataUpdateMsg(Collections.singletonList(ruleChainMetadataUpdateMsg)) - .build(); - } - } - log.trace("[{}] rule chain metadata processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processUser(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) { - UserId userId = new UserId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (edgeEdgeEventActionType) { - case ADDED: - case UPDATED: - User user = ctx.getUserService().findUserById(edgeEvent.getTenantId(), userId); - if (user != null) { - CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(user); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllUserUpdateMsg(Collections.singletonList(ctx.getUserMsgConstructor().constructUserUpdatedMsg(msgType, user, customerId))) - .build(); - } - break; - case DELETED: - downlinkMsg = DownlinkMsg.newBuilder() - .addAllUserUpdateMsg(Collections.singletonList(ctx.getUserMsgConstructor().constructUserDeleteMsg(userId))) - .build(); - break; - case CREDENTIALS_UPDATED: - UserCredentials userCredentialsByUserId = ctx.getUserService().findUserCredentialsByUserId(edge.getTenantId(), userId); - if (userCredentialsByUserId != null && userCredentialsByUserId.isEnabled()) { - UserCredentialsUpdateMsg userCredentialsUpdateMsg = - ctx.getUserMsgConstructor().constructUserCredentialsUpdatedMsg(userCredentialsByUserId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllUserCredentialsUpdateMsg(Collections.singletonList(userCredentialsUpdateMsg)) - .build(); - } - } - log.trace("[{}] user processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity) { - if (!edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(hasCustomerIdEntity.getCustomerId())) { - return edge.getCustomerId(); - } else { - return null; - } - } - - private DownlinkMsg processRelation(EdgeEvent edgeEvent, UpdateMsgType msgType) { - EntityRelation entityRelation = mapper.convertValue(edgeEvent.getBody(), EntityRelation.class); - RelationUpdateMsg r = ctx.getRelationMsgConstructor().constructRelationUpdatedMsg(msgType, entityRelation); - DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder() - .addAllRelationUpdateMsg(Collections.singletonList(r)) - .build(); - log.trace("[{}] relation processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processAlarm(EdgeEvent edgeEvent, UpdateMsgType msgType) { - DownlinkMsg downlinkMsg = null; - try { - AlarmId alarmId = new AlarmId(edgeEvent.getEntityId()); - Alarm alarm = ctx.getAlarmService().findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get(); - if (alarm != null) { - downlinkMsg = DownlinkMsg.newBuilder() - .addAllAlarmUpdateMsg(Collections.singletonList(ctx.getAlarmMsgConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))) - .build(); - } - } catch (Exception e) { - log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e); - } - log.trace("[{}] alarm processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processWidgetsBundle(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) { - WidgetsBundleId widgetsBundleId = new WidgetsBundleId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (edgeEdgeEventActionType) { - case ADDED: - case UPDATED: - WidgetsBundle widgetsBundle = ctx.getWidgetsBundleService().findWidgetsBundleById(edgeEvent.getTenantId(), widgetsBundleId); - if (widgetsBundle != null) { - WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = - ctx.getWidgetsBundleMsgConstructor().constructWidgetsBundleUpdateMsg(msgType, widgetsBundle); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg)) - .build(); - } - break; - case DELETED: - WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = - ctx.getWidgetsBundleMsgConstructor().constructWidgetsBundleDeleteMsg(widgetsBundleId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg)) - .build(); - break; - } - log.trace("[{}] widget bundle processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processWidgetType(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) { - WidgetTypeId widgetTypeId = new WidgetTypeId(edgeEvent.getEntityId()); - DownlinkMsg downlinkMsg = null; - switch (edgeEdgeEventActionType) { - case ADDED: - case UPDATED: - WidgetType widgetType = ctx.getWidgetTypeService().findWidgetTypeById(edgeEvent.getTenantId(), widgetTypeId); - if (widgetType != null) { - WidgetTypeUpdateMsg widgetTypeUpdateMsg = - ctx.getWidgetTypeMsgConstructor().constructWidgetTypeUpdateMsg(msgType, widgetType); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg)) - .build(); - } - break; - case DELETED: - WidgetTypeUpdateMsg widgetTypeUpdateMsg = - ctx.getWidgetTypeMsgConstructor().constructWidgetTypeDeleteMsg(widgetTypeId); - downlinkMsg = DownlinkMsg.newBuilder() - .addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg)) - .build(); - break; - } - log.trace("[{}] widget type processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - - private DownlinkMsg processAdminSettings(EdgeEvent edgeEvent) { - AdminSettings adminSettings = mapper.convertValue(edgeEvent.getBody(), AdminSettings.class); - AdminSettingsUpdateMsg t = ctx.getAdminSettingsMsgConstructor().constructAdminSettingsUpdateMsg(adminSettings); - DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder() - .addAllAdminSettingsUpdateMsg(Collections.singletonList(t)) - .build(); - log.trace("[{}] admin settings processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } - private UpdateMsgType getResponseMsgType(EdgeEventActionType actionType) { switch (actionType) { case UPDATED: @@ -972,41 +479,33 @@ public final class EdgeGrpcSession implements Closeable { } } - private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) { - EntityDataProto entityDataProto = ctx.getEntityDataMsgConstructor().constructEntityDataMsg(entityId, actionType, entityData); - DownlinkMsg downlinkMsg = DownlinkMsg.newBuilder() - .addAllEntityData(Collections.singletonList(entityDataProto)) - .build(); - log.trace("[{}] entity data proto processed [{}]", this.sessionId, downlinkMsg); - return downlinkMsg; - } private ListenableFuture> processUplinkMsg(UplinkMsg uplinkMsg) { List> result = new ArrayList<>(); try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), edge.getCustomerId(), entityData)); + result.addAll(ctx.getTelemetryProcessor().processTelemetryFromEdge(edge.getTenantId(), edge.getCustomerId(), entityData)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { for (DeviceUpdateMsg deviceUpdateMsg : uplinkMsg.getDeviceUpdateMsgList()) { - result.add(ctx.getDeviceProcessor().onDeviceUpdate(edge.getTenantId(), edge, deviceUpdateMsg)); + result.add(ctx.getDeviceProcessor().processDeviceFromEdge(edge.getTenantId(), edge, deviceUpdateMsg)); } } if (uplinkMsg.getDeviceCredentialsUpdateMsgCount() > 0) { for (DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg : uplinkMsg.getDeviceCredentialsUpdateMsgList()) { - result.add(ctx.getDeviceProcessor().onDeviceCredentialsUpdate(edge.getTenantId(), deviceCredentialsUpdateMsg)); + result.add(ctx.getDeviceProcessor().processDeviceCredentialsFromEdge(edge.getTenantId(), deviceCredentialsUpdateMsg)); } } if (uplinkMsg.getAlarmUpdateMsgCount() > 0) { for (AlarmUpdateMsg alarmUpdateMsg : uplinkMsg.getAlarmUpdateMsgList()) { - result.add(ctx.getAlarmProcessor().onAlarmUpdate(edge.getTenantId(), alarmUpdateMsg)); + result.add(ctx.getAlarmProcessor().processAlarmFromEdge(edge.getTenantId(), alarmUpdateMsg)); } } if (uplinkMsg.getRelationUpdateMsgCount() > 0) { for (RelationUpdateMsg relationUpdateMsg : uplinkMsg.getRelationUpdateMsgList()) { - result.add(ctx.getRelationProcessor().onRelationUpdate(edge.getTenantId(), relationUpdateMsg)); + result.add(ctx.getRelationProcessor().processRelationFromEdge(edge.getTenantId(), relationUpdateMsg)); } } if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) { @@ -1036,7 +535,7 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) { for (DeviceRpcCallMsg deviceRpcCallMsg : uplinkMsg.getDeviceRpcCallMsgList()) { - result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseMsg(edge.getTenantId(), deviceRpcCallMsg)); + result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseFromEdge(edge.getTenantId(), deviceRpcCallMsg)); } } if (uplinkMsg.getDeviceProfileDevicesRequestMsgCount() > 0) { @@ -1107,7 +606,7 @@ public final class EdgeGrpcSession implements Closeable { .setCloudType("CE"); if (edge.getCustomerId() != null) { builder.setCustomerIdMSB(edge.getCustomerId().getId().getMostSignificantBits()) - .setCustomerIdLSB(edge.getCustomerId().getId().getLeastSignificantBits()); + .setCustomerIdLSB(edge.getCustomerId().getId().getLeastSignificantBits()); } return builder .build(); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AdminSettingsProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AdminSettingsProcessor.java new file mode 100644 index 0000000000..53f46900fa --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AdminSettingsProcessor.java @@ -0,0 +1,41 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.AdminSettings; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class AdminSettingsProcessor extends BaseProcessor { + + public DownlinkMsg processAdminSettingsToEdge(EdgeEvent edgeEvent) { + AdminSettings adminSettings = mapper.convertValue(edgeEvent.getBody(), AdminSettings.class); + AdminSettingsUpdateMsg t = adminSettingsMsgConstructor.constructAdminSettingsUpdateMsg(adminSettings); + return DownlinkMsg.newBuilder() + .addAllAdminSettingsUpdateMsg(Collections.singletonList(t)) + .build(); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmProcessor.java index 5e7e1e5844..5ceb82381a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AlarmProcessor.java @@ -23,17 +23,24 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.alarm.AlarmStatus; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.id.AlarmId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.gen.edge.AlarmUpdateMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.Collections; + @Component @Slf4j @TbCoreComponent public class AlarmProcessor extends BaseProcessor { - public ListenableFuture onAlarmUpdate(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { + public ListenableFuture processAlarmFromEdge(TenantId tenantId, AlarmUpdateMsg alarmUpdateMsg) { log.trace("[{}] onAlarmUpdate [{}]", tenantId, alarmUpdateMsg); EntityId originatorId = getAlarmOriginator(tenantId, alarmUpdateMsg.getOriginatorName(), EntityType.valueOf(alarmUpdateMsg.getOriginatorType())); @@ -96,4 +103,20 @@ public class AlarmProcessor extends BaseProcessor { return null; } } + + public DownlinkMsg processAlarmToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType) { + DownlinkMsg downlinkMsg = null; + try { + AlarmId alarmId = new AlarmId(edgeEvent.getEntityId()); + Alarm alarm = alarmService.findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId).get(); + if (alarm != null) { + downlinkMsg = DownlinkMsg.newBuilder() + .addAllAlarmUpdateMsg(Collections.singletonList(alarmMsgConstructor.constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))) + .build(); + } + } catch (Exception e) { + log.error("Can't process alarm msg [{}] [{}]", edgeEvent, msgType, e); + } + return downlinkMsg; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetProcessor.java new file mode 100644 index 0000000000..e3c8d9d1e8 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/AssetProcessor.java @@ -0,0 +1,68 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.AssetId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.gen.edge.AssetUpdateMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class AssetProcessor extends BaseProcessor { + + public DownlinkMsg processAssetToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { + AssetId assetId = new AssetId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (action) { + case ADDED: + case UPDATED: + case ASSIGNED_TO_EDGE: + case ASSIGNED_TO_CUSTOMER: + case UNASSIGNED_FROM_CUSTOMER: + Asset asset = assetService.findAssetById(edgeEvent.getTenantId(), assetId); + if (asset != null) { + CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(asset, edge); + AssetUpdateMsg assetUpdateMsg = + assetMsgConstructor.constructAssetUpdatedMsg(msgType, asset, customerId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg)) + .build(); + } + break; + case DELETED: + case UNASSIGNED_FROM_EDGE: + AssetUpdateMsg assetUpdateMsg = + assetMsgConstructor.constructAssetDeleteMsg(assetId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllAssetUpdateMsg(Collections.singletonList(assetUpdateMsg)) + .build(); + break; + } + return downlinkMsg; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java index cf7f9e21fb..8ea8eac1df 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseProcessor.java @@ -23,9 +23,12 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; +import org.thingsboard.server.common.data.HasCustomerId; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -35,14 +38,31 @@ import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.customer.CustomerService; import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.device.DeviceCredentialsService; +import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.dao.widget.WidgetTypeService; +import org.thingsboard.server.dao.widget.WidgetsBundleService; +import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.CustomerMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.DashboardMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.DeviceMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.DeviceProfileMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.EntityDataMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.EntityViewMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.RelationMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor; import org.thingsboard.server.service.executors.DbCallbackExecutorService; -import org.thingsboard.server.service.profile.DefaultTbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.state.DeviceStateService; @@ -52,6 +72,9 @@ public abstract class BaseProcessor { protected static final ObjectMapper mapper = new ObjectMapper(); + @Autowired + protected RuleChainService ruleChainService; + @Autowired protected AlarmService alarmService; @@ -79,6 +102,9 @@ public abstract class BaseProcessor { @Autowired protected UserService userService; + @Autowired + protected DeviceProfileService deviceProfileService; + @Autowired protected RelationService relationService; @@ -97,6 +123,54 @@ public abstract class BaseProcessor { @Autowired protected EdgeEventService edgeEventService; + @Autowired + protected WidgetsBundleService widgetsBundleService; + + @Autowired + protected WidgetTypeService widgetTypeService; + + @Autowired + protected EntityDataMsgConstructor entityDataMsgConstructor; + + @Autowired + protected RuleChainMsgConstructor ruleChainMsgConstructor; + + @Autowired + protected AlarmMsgConstructor alarmMsgConstructor; + + @Autowired + protected DeviceMsgConstructor deviceMsgConstructor; + + @Autowired + protected AssetMsgConstructor assetMsgConstructor; + + @Autowired + protected EntityViewMsgConstructor entityViewMsgConstructor; + + @Autowired + protected DashboardMsgConstructor dashboardMsgConstructor; + + @Autowired + protected RelationMsgConstructor relationMsgConstructor; + + @Autowired + protected UserMsgConstructor userMsgConstructor; + + @Autowired + protected CustomerMsgConstructor customerMsgConstructor; + + @Autowired + protected DeviceProfileMsgConstructor deviceProfileMsgConstructor; + + @Autowired + protected WidgetsBundleMsgConstructor widgetsBundleMsgConstructor; + + @Autowired + protected WidgetTypeMsgConstructor widgetTypeMsgConstructor; + + @Autowired + protected AdminSettingsMsgConstructor adminSettingsMsgConstructor; + @Autowired protected DbCallbackExecutorService dbCallbackExecutorService; @@ -133,4 +207,12 @@ public abstract class BaseProcessor { }, dbCallbackExecutorService); return future; } + + protected CustomerId getCustomerIdIfEdgeAssignedToCustomer(HasCustomerId hasCustomerIdEntity, Edge edge) { + if (!edge.getCustomerId().isNullUid() && edge.getCustomerId().equals(hasCustomerIdEntity.getCustomerId())) { + return edge.getCustomerId(); + } else { + return null; + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerProcessor.java new file mode 100644 index 0000000000..476458bd05 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/CustomerProcessor.java @@ -0,0 +1,61 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.Customer; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.gen.edge.CustomerUpdateMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class CustomerProcessor extends BaseProcessor { + + public DownlinkMsg processCustomerToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { + CustomerId customerId = new CustomerId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (action) { + case ADDED: + case UPDATED: + Customer customer = customerService.findCustomerById(edgeEvent.getTenantId(), customerId); + if (customer != null) { + CustomerUpdateMsg customerUpdateMsg = + customerMsgConstructor.constructCustomerUpdatedMsg(msgType, customer); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg)) + .build(); + } + break; + case DELETED: + CustomerUpdateMsg customerUpdateMsg = + customerMsgConstructor.constructCustomerDeleteMsg(customerId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllCustomerUpdateMsg(Collections.singletonList(customerUpdateMsg)) + .build(); + break; + } + return downlinkMsg; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DashboardProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DashboardProcessor.java new file mode 100644 index 0000000000..f6df66cf0e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DashboardProcessor.java @@ -0,0 +1,71 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.Dashboard; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DashboardId; +import org.thingsboard.server.gen.edge.DashboardUpdateMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class DashboardProcessor extends BaseProcessor { + + public DownlinkMsg processDashboardToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { + DashboardId dashboardId = new DashboardId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (action) { + case ADDED: + case UPDATED: + case ASSIGNED_TO_EDGE: + case ASSIGNED_TO_CUSTOMER: + case UNASSIGNED_FROM_CUSTOMER: + Dashboard dashboard = dashboardService.findDashboardById(edgeEvent.getTenantId(), dashboardId); + if (dashboard != null) { + CustomerId customerId = null; + if (!edge.getCustomerId().isNullUid() && dashboard.isAssignedToCustomer(edge.getCustomerId())) { + customerId = edge.getCustomerId(); + } + DashboardUpdateMsg dashboardUpdateMsg = + dashboardMsgConstructor.constructDashboardUpdatedMsg(msgType, dashboard, customerId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg)) + .build(); + } + break; + case DELETED: + case UNASSIGNED_FROM_EDGE: + DashboardUpdateMsg dashboardUpdateMsg = + dashboardMsgConstructor.constructDashboardDeleteMsg(dashboardId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDashboardUpdateMsg(Collections.singletonList(dashboardUpdateMsg)) + .build(); + break; + } + return downlinkMsg; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java index 216e5560d5..2325b6736d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; import org.thingsboard.server.common.data.id.CustomerId; @@ -51,12 +52,15 @@ import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; +import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.locks.ReentrantLock; @@ -68,7 +72,9 @@ public class DeviceProcessor extends BaseProcessor { private static final ReentrantLock deviceCreationLock = new ReentrantLock(); - public ListenableFuture onDeviceUpdate(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { + // TODO onDeviceUpdateFromEdge onDeviceUpdateToEdge + + public ListenableFuture processDeviceFromEdge(TenantId tenantId, Edge edge, DeviceUpdateMsg deviceUpdateMsg) { log.trace("[{}] onDeviceUpdate [{}] from edge [{}]", tenantId, deviceUpdateMsg, edge.getName()); switch (deviceUpdateMsg.getMsgType()) { case ENTITY_CREATED_RPC_MESSAGE: @@ -133,7 +139,7 @@ public class DeviceProcessor extends BaseProcessor { return Futures.immediateFuture(null); } - public ListenableFuture onDeviceCredentialsUpdate(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { + public ListenableFuture processDeviceCredentialsFromEdge(TenantId tenantId, DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg) { log.debug("Executing onDeviceCredentialsUpdate, deviceCredentialsUpdateMsg [{}]", deviceCredentialsUpdateMsg); DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsUpdateMsg.getDeviceIdMSB(), deviceCredentialsUpdateMsg.getDeviceIdLSB())); ListenableFuture deviceFuture = deviceService.findDeviceByIdAsync(tenantId, deviceId); @@ -264,7 +270,7 @@ public class DeviceProcessor extends BaseProcessor { return metaData; } - public ListenableFuture processDeviceRpcCallResponseMsg(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { + public ListenableFuture processDeviceRpcCallResponseFromEdge(TenantId tenantId, DeviceRpcCallMsg deviceRpcCallMsg) { log.trace("[{}] processDeviceRpcCallResponseMsg [{}]", tenantId, deviceRpcCallMsg); SettableFuture futureToSet = SettableFuture.create(); UUID requestUuid = new UUID(deviceRpcCallMsg.getRequestUuidMSB(), deviceRpcCallMsg.getRequestUuidLSB()); @@ -296,4 +302,54 @@ public class DeviceProcessor extends BaseProcessor { return futureToSet; } + public DownlinkMsg processDeviceToEdge(Edge edge, EdgeEvent edgeEvent, + UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) { + DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (edgeEdgeEventActionType) { + case ADDED: + case UPDATED: + case ASSIGNED_TO_EDGE: + case ASSIGNED_TO_CUSTOMER: + case UNASSIGNED_FROM_CUSTOMER: + Device device = deviceService.findDeviceById(edgeEvent.getTenantId(), deviceId); + if (device != null) { + CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device, edge); + DeviceUpdateMsg deviceUpdateMsg = + deviceMsgConstructor.constructDeviceUpdatedMsg(msgType, device, customerId, null); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg)) + .build(); + } + break; + case DELETED: + case UNASSIGNED_FROM_EDGE: + DeviceUpdateMsg deviceUpdateMsg = + deviceMsgConstructor.constructDeviceDeleteMsg(deviceId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceUpdateMsg(Collections.singletonList(deviceUpdateMsg)) + .build(); + break; + case CREDENTIALS_UPDATED: + DeviceCredentials deviceCredentials = deviceCredentialsService.findDeviceCredentialsByDeviceId(edge.getTenantId(), deviceId); + if (deviceCredentials != null) { + DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = + deviceMsgConstructor.constructDeviceCredentialsUpdatedMsg(deviceCredentials); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceCredentialsUpdateMsg(Collections.singletonList(deviceCredentialsUpdateMsg)) + .build(); + } + break; + } + return downlinkMsg; + } + + public DownlinkMsg processRpcCallMsgToEdge(EdgeEvent edgeEvent) { + log.trace("Executing processRpcCall, edgeEvent [{}]", edgeEvent); + DeviceRpcCallMsg deviceRpcCallMsg = + deviceMsgConstructor.constructDeviceRpcCallMsg(edgeEvent.getEntityId(), edgeEvent.getBody()); + return DownlinkMsg.newBuilder() + .addAllDeviceRpcCallMsg(Collections.singletonList(deviceRpcCallMsg)) + .build(); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProfileProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProfileProcessor.java new file mode 100644 index 0000000000..6aab6838fd --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProfileProcessor.java @@ -0,0 +1,62 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.gen.edge.DeviceProfileUpdateMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class DeviceProfileProcessor extends BaseProcessor { + + public DownlinkMsg processDeviceProfileToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { + DeviceProfileId deviceProfileId = new DeviceProfileId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (action) { + case ADDED: + case UPDATED: + DeviceProfile deviceProfile = deviceProfileService.findDeviceProfileById(edgeEvent.getTenantId(), deviceProfileId); + if (deviceProfile != null) { + DeviceProfileUpdateMsg deviceProfileUpdateMsg = + deviceProfileMsgConstructor.constructDeviceProfileUpdatedMsg(msgType, deviceProfile); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg)) + .build(); + } + break; + case DELETED: + DeviceProfileUpdateMsg deviceProfileUpdateMsg = + deviceProfileMsgConstructor.constructDeviceProfileDeleteMsg(deviceProfileId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceProfileUpdateMsg(Collections.singletonList(deviceProfileUpdateMsg)) + .build(); + break; + } + return downlinkMsg; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityProcessor.java new file mode 100644 index 0000000000..c5865b0ff9 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityProcessor.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; +import org.thingsboard.server.gen.edge.DeviceUpdateMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class EntityProcessor extends BaseProcessor { + + public DownlinkMsg processEntityMergeRequestMessageToEdge(Edge edge, EdgeEvent edgeEvent) { + DownlinkMsg downlinkMsg = null; + if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) { + DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); + Device device = deviceService.findDeviceById(edge.getTenantId(), deviceId); + CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(device, edge); + String conflictName = null; + if(edgeEvent.getBody() != null) { + conflictName = edgeEvent.getBody().get("conflictName").asText(); + } + DeviceUpdateMsg d = deviceMsgConstructor + .constructDeviceUpdatedMsg(UpdateMsgType.ENTITY_MERGE_RPC_MESSAGE, device, customerId, conflictName); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllDeviceUpdateMsg(Collections.singletonList(d)) + .build(); + } + return downlinkMsg; + } + + public DownlinkMsg processCredentialsRequestMessageToEdge(EdgeEvent edgeEvent) { + DownlinkMsg downlinkMsg = null; + if (EdgeEventType.DEVICE.equals(edgeEvent.getType())) { + DeviceId deviceId = new DeviceId(edgeEvent.getEntityId()); + DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = DeviceCredentialsRequestMsg.newBuilder() + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .build(); + DownlinkMsg.Builder builder = DownlinkMsg.newBuilder() + .addAllDeviceCredentialsRequestMsg(Collections.singletonList(deviceCredentialsRequestMsg)); + downlinkMsg = builder.build(); + } + return downlinkMsg; + } +} + diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityViewProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityViewProcessor.java new file mode 100644 index 0000000000..e62e084471 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/EntityViewProcessor.java @@ -0,0 +1,68 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityViewId; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class EntityViewProcessor extends BaseProcessor { + + public DownlinkMsg processEntityViewToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { + EntityViewId entityViewId = new EntityViewId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (action) { + case ADDED: + case UPDATED: + case ASSIGNED_TO_EDGE: + case ASSIGNED_TO_CUSTOMER: + case UNASSIGNED_FROM_CUSTOMER: + EntityView entityView = entityViewService.findEntityViewById(edgeEvent.getTenantId(), entityViewId); + if (entityView != null) { + CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(entityView, edge); + EntityViewUpdateMsg entityViewUpdateMsg = + entityViewMsgConstructor.constructEntityViewUpdatedMsg(msgType, entityView, customerId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg)) + .build(); + } + break; + case DELETED: + case UNASSIGNED_FROM_EDGE: + EntityViewUpdateMsg entityViewUpdateMsg = + entityViewMsgConstructor.constructEntityViewDeleteMsg(entityViewId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllEntityViewUpdateMsg(Collections.singletonList(entityViewUpdateMsg)) + .build(); + break; + } + return downlinkMsg; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationProcessor.java index b21a5bd195..eb96df6729 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RelationProcessor.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.AssetId; @@ -33,9 +34,12 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; +import org.thingsboard.server.gen.edge.DownlinkMsg; import org.thingsboard.server.gen.edge.RelationUpdateMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.Collections; import java.util.UUID; @Component @@ -43,7 +47,7 @@ import java.util.UUID; @TbCoreComponent public class RelationProcessor extends BaseProcessor { - public ListenableFuture onRelationUpdate(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { + public ListenableFuture processRelationFromEdge(TenantId tenantId, RelationUpdateMsg relationUpdateMsg) { log.trace("[{}] onRelationUpdate [{}]", tenantId, relationUpdateMsg); try { EntityRelation entityRelation = new EntityRelation(); @@ -100,5 +104,12 @@ public class RelationProcessor extends BaseProcessor { } } + public DownlinkMsg processRelationToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType) { + EntityRelation entityRelation = mapper.convertValue(edgeEvent.getBody(), EntityRelation.class); + RelationUpdateMsg r = relationMsgConstructor.constructRelationUpdatedMsg(msgType, entityRelation); + return DownlinkMsg.newBuilder() + .addAllRelationUpdateMsg(Collections.singletonList(r)) + .build(); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainProcessor.java new file mode 100644 index 0000000000..c30e9bf45f --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/RuleChainProcessor.java @@ -0,0 +1,81 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.RuleChainId; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.common.data.rule.RuleChainMetaData; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; +import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class RuleChainProcessor extends BaseProcessor { + + public DownlinkMsg processRuleChainToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType action) { + RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (action) { + case ADDED: + case UPDATED: + case ASSIGNED_TO_EDGE: + RuleChain ruleChain = ruleChainService.findRuleChainById(edgeEvent.getTenantId(), ruleChainId); + if (ruleChain != null) { + RuleChainUpdateMsg ruleChainUpdateMsg = + ruleChainMsgConstructor.constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainUpdateMsg)) + .build(); + } + break; + case DELETED: + case UNASSIGNED_FROM_EDGE: + downlinkMsg = DownlinkMsg.newBuilder() + .addAllRuleChainUpdateMsg(Collections.singletonList(ruleChainMsgConstructor.constructRuleChainDeleteMsg(ruleChainId))) + .build(); + break; + } + return downlinkMsg; + } + + public DownlinkMsg processRuleChainMetadataToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType) { + RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId()); + RuleChain ruleChain = ruleChainService.findRuleChainById(edgeEvent.getTenantId(), ruleChainId); + DownlinkMsg downlinkMsg = null; + if (ruleChain != null) { + RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId); + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = + ruleChainMsgConstructor.constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData); + if (ruleChainMetadataUpdateMsg != null) { + downlinkMsg = DownlinkMsg.newBuilder() + .addAllRuleChainMetadataUpdateMsg(Collections.singletonList(ruleChainMetadataUpdateMsg)) + .build(); + } + } + return downlinkMsg; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java index 53ef04fe85..b42ceb40ab 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java @@ -15,11 +15,13 @@ */ package org.thingsboard.server.service.edge.rpc.processor; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.gson.Gson; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -32,10 +34,13 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.RuleChainId; @@ -50,6 +55,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; import org.thingsboard.server.gen.edge.AttributeDeleteMsg; +import org.thingsboard.server.gen.edge.DownlinkMsg; import org.thingsboard.server.gen.edge.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; @@ -58,6 +64,7 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -70,7 +77,7 @@ public class TelemetryProcessor extends BaseProcessor { private final Gson gson = new Gson(); - public List> onTelemetryUpdate(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { + public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); @@ -279,4 +286,44 @@ public class TelemetryProcessor extends BaseProcessor { return null; } } + + public DownlinkMsg processTelemetryMessageToEdge(EdgeEvent edgeEvent) throws JsonProcessingException { + EntityId entityId = null; + switch (edgeEvent.getType()) { + case DEVICE: + entityId = new DeviceId(edgeEvent.getEntityId()); + break; + case ASSET: + entityId = new AssetId(edgeEvent.getEntityId()); + break; + case ENTITY_VIEW: + entityId = new EntityViewId(edgeEvent.getEntityId()); + break; + case DASHBOARD: + entityId = new DashboardId(edgeEvent.getEntityId()); + break; + case TENANT: + entityId = new TenantId(edgeEvent.getEntityId()); + break; + case CUSTOMER: + entityId = new CustomerId(edgeEvent.getEntityId()); + break; + case EDGE: + entityId = new EdgeId(edgeEvent.getEntityId()); + break; + } + DownlinkMsg downlinkMsg = null; + if (entityId != null) { + downlinkMsg = constructEntityDataProtoMsg(entityId, edgeEvent.getAction(), JsonUtils.parse(mapper.writeValueAsString(edgeEvent.getBody()))); + } + return downlinkMsg; + } + + private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, EdgeEventActionType actionType, JsonElement entityData) { + EntityDataProto entityDataProto = entityDataMsgConstructor.constructEntityDataMsg(entityId, actionType, entityData); + return DownlinkMsg.newBuilder() + .addAllEntityData(Collections.singletonList(entityDataProto)) + .build(); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/UserProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/UserProcessor.java new file mode 100644 index 0000000000..74afa02b15 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/UserProcessor.java @@ -0,0 +1,71 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.security.UserCredentials; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.gen.edge.UserCredentialsUpdateMsg; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class UserProcessor extends BaseProcessor { + + public DownlinkMsg processUserToEdge(Edge edge, EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) { + UserId userId = new UserId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (edgeEdgeEventActionType) { + case ADDED: + case UPDATED: + User user = userService.findUserById(edgeEvent.getTenantId(), userId); + if (user != null) { + CustomerId customerId = getCustomerIdIfEdgeAssignedToCustomer(user, edge); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllUserUpdateMsg(Collections.singletonList(userMsgConstructor.constructUserUpdatedMsg(msgType, user, customerId))) + .build(); + } + break; + case DELETED: + downlinkMsg = DownlinkMsg.newBuilder() + .addAllUserUpdateMsg(Collections.singletonList(userMsgConstructor.constructUserDeleteMsg(userId))) + .build(); + break; + case CREDENTIALS_UPDATED: + UserCredentials userCredentialsByUserId = userService.findUserCredentialsByUserId(edge.getTenantId(), userId); + if (userCredentialsByUserId != null && userCredentialsByUserId.isEnabled()) { + UserCredentialsUpdateMsg userCredentialsUpdateMsg = + userMsgConstructor.constructUserCredentialsUpdatedMsg(userCredentialsByUserId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllUserCredentialsUpdateMsg(Collections.singletonList(userCredentialsUpdateMsg)) + .build(); + } + } + return downlinkMsg; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetBundleProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetBundleProcessor.java new file mode 100644 index 0000000000..370cf3419f --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetBundleProcessor.java @@ -0,0 +1,62 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.WidgetsBundleId; +import org.thingsboard.server.common.data.widget.WidgetsBundle; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.gen.edge.WidgetsBundleUpdateMsg; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class WidgetBundleProcessor extends BaseProcessor { + + public DownlinkMsg processWidgetsBundleToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) { + WidgetsBundleId widgetsBundleId = new WidgetsBundleId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (edgeEdgeEventActionType) { + case ADDED: + case UPDATED: + WidgetsBundle widgetsBundle = widgetsBundleService.findWidgetsBundleById(edgeEvent.getTenantId(), widgetsBundleId); + if (widgetsBundle != null) { + WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = + widgetsBundleMsgConstructor.constructWidgetsBundleUpdateMsg(msgType, widgetsBundle); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg)) + .build(); + } + break; + case DELETED: + WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = + widgetsBundleMsgConstructor.constructWidgetsBundleDeleteMsg(widgetsBundleId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllWidgetsBundleUpdateMsg(Collections.singletonList(widgetsBundleUpdateMsg)) + .build(); + break; + } + return downlinkMsg; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetTypeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetTypeProcessor.java new file mode 100644 index 0000000000..795207567e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/WidgetTypeProcessor.java @@ -0,0 +1,62 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.processor; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.id.WidgetTypeId; +import org.thingsboard.server.common.data.widget.WidgetType; +import org.thingsboard.server.gen.edge.DownlinkMsg; +import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.gen.edge.WidgetTypeUpdateMsg; +import org.thingsboard.server.queue.util.TbCoreComponent; + +import java.util.Collections; + +@Component +@Slf4j +@TbCoreComponent +public class WidgetTypeProcessor extends BaseProcessor { + + public DownlinkMsg processWidgetTypeToEdge(EdgeEvent edgeEvent, UpdateMsgType msgType, EdgeEventActionType edgeEdgeEventActionType) { + WidgetTypeId widgetTypeId = new WidgetTypeId(edgeEvent.getEntityId()); + DownlinkMsg downlinkMsg = null; + switch (edgeEdgeEventActionType) { + case ADDED: + case UPDATED: + WidgetType widgetType = widgetTypeService.findWidgetTypeById(edgeEvent.getTenantId(), widgetTypeId); + if (widgetType != null) { + WidgetTypeUpdateMsg widgetTypeUpdateMsg = + widgetTypeMsgConstructor.constructWidgetTypeUpdateMsg(msgType, widgetType); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg)) + .build(); + } + break; + case DELETED: + WidgetTypeUpdateMsg widgetTypeUpdateMsg = + widgetTypeMsgConstructor.constructWidgetTypeDeleteMsg(widgetTypeId); + downlinkMsg = DownlinkMsg.newBuilder() + .addAllWidgetTypeUpdateMsg(Collections.singletonList(widgetTypeUpdateMsg)) + .build(); + break; + } + return downlinkMsg; + } + +}