196 changed files with 5361 additions and 3802 deletions
@ -0,0 +1,143 @@ |
|||
{ |
|||
"ruleChain": { |
|||
"additionalInfo": null, |
|||
"name": "Edge Root Rule Chain", |
|||
"type": "EDGE", |
|||
"firstRuleNodeId": null, |
|||
"root": true, |
|||
"debugMode": false, |
|||
"configuration": null |
|||
}, |
|||
"metadata": { |
|||
"firstNodeIndex": 2, |
|||
"nodes": [ |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 823, |
|||
"layoutY": 157 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode", |
|||
"name": "Save Timeseries", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"defaultTTL": 0 |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 824, |
|||
"layoutY": 52 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.telemetry.TbMsgAttributesNode", |
|||
"name": "Save Client Attributes", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"scope": "CLIENT_SCOPE" |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 347, |
|||
"layoutY": 149 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.filter.TbMsgTypeSwitchNode", |
|||
"name": "Message Type Switch", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"version": 0 |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 825, |
|||
"layoutY": 266 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.action.TbLogNode", |
|||
"name": "Log RPC from Device", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 824, |
|||
"layoutY": 378 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.action.TbLogNode", |
|||
"name": "Log Other", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"jsScript": "return '\\nIncoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);" |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 824, |
|||
"layoutY": 466 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.rpc.TbSendRPCRequestNode", |
|||
"name": "RPC Call Request", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"timeoutInSeconds": 60 |
|||
} |
|||
}, |
|||
{ |
|||
"additionalInfo": { |
|||
"layoutX": 1134, |
|||
"layoutY": 132 |
|||
}, |
|||
"type": "org.thingsboard.rule.engine.edge.TbMsgPushToCloudNode", |
|||
"name": "Push to cloud", |
|||
"debugMode": false, |
|||
"configuration": { |
|||
"version": 0 |
|||
} |
|||
} |
|||
], |
|||
"connections": [ |
|||
{ |
|||
"fromIndex": 0, |
|||
"toIndex": 6, |
|||
"type": "Success" |
|||
}, |
|||
{ |
|||
"fromIndex": 1, |
|||
"toIndex": 6, |
|||
"type": "Success" |
|||
}, |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 4, |
|||
"type": "Other" |
|||
}, |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 1, |
|||
"type": "Post attributes" |
|||
}, |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 0, |
|||
"type": "Post telemetry" |
|||
}, |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 3, |
|||
"type": "RPC Request from Device" |
|||
}, |
|||
{ |
|||
"fromIndex": 2, |
|||
"toIndex": 5, |
|||
"type": "RPC Request to Device" |
|||
}, |
|||
{ |
|||
"fromIndex": 3, |
|||
"toIndex": 6, |
|||
"type": "Success" |
|||
} |
|||
], |
|||
"ruleChainConnections": null |
|||
} |
|||
} |
|||
@ -1,63 +0,0 @@ |
|||
-- |
|||
-- Copyright © 2016-2020 The Thingsboard Authors |
|||
-- |
|||
-- Licensed under the Apache License, Version 2.0 (the "License"); |
|||
-- you may not use this file except in compliance with the License. |
|||
-- You may obtain a copy of the License at |
|||
-- |
|||
-- http://www.apache.org/licenses/LICENSE-2.0 |
|||
-- |
|||
-- Unless required by applicable law or agreed to in writing, software |
|||
-- distributed under the License is distributed on an "AS IS" BASIS, |
|||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
-- See the License for the specific language governing permissions and |
|||
-- limitations under the License. |
|||
-- |
|||
|
|||
CREATE TABLE IF NOT EXISTS thingsboard.edge ( |
|||
id timeuuid, |
|||
tenant_id timeuuid, |
|||
customer_id timeuuid, |
|||
name text, |
|||
search_text text, |
|||
configuration text, |
|||
additional_info text, |
|||
PRIMARY KEY (id, tenant_id) |
|||
); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_name AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND name IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, name, id, customer_id, type) |
|||
WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, search_text, id, customer_id, type) |
|||
WITH CLUSTERING ORDER BY ( search_text ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_by_type_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, type, search_text, id, customer_id) |
|||
WITH CLUSTERING ORDER BY ( type ASC, search_text ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_customer_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( customer_id, tenant_id, search_text, id, type ) |
|||
WITH CLUSTERING ORDER BY ( tenant_id DESC, search_text ASC, id DESC ); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_customer_by_type_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( customer_id, tenant_id, type, search_text, id ) |
|||
WITH CLUSTERING ORDER BY ( tenant_id DESC, type ASC, search_text ASC, id DESC ); |
|||
|
|||
-- VOBA ADD changes for the MATERIALIZED view for DEVICE ASSET ENTITY_VIEW RULE_CHAIN |
|||
@ -0,0 +1,63 @@ |
|||
-- |
|||
-- Copyright © 2016-2020 The Thingsboard Authors |
|||
-- |
|||
-- Licensed under the Apache License, Version 2.0 (the "License"); |
|||
-- you may not use this file except in compliance with the License. |
|||
-- You may obtain a copy of the License at |
|||
-- |
|||
-- http://www.apache.org/licenses/LICENSE-2.0 |
|||
-- |
|||
-- Unless required by applicable law or agreed to in writing, software |
|||
-- distributed under the License is distributed on an "AS IS" BASIS, |
|||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
-- See the License for the specific language governing permissions and |
|||
-- limitations under the License. |
|||
-- |
|||
|
|||
CREATE TABLE IF NOT EXISTS thingsboard.edge ( |
|||
id timeuuid, |
|||
tenant_id timeuuid, |
|||
customer_id timeuuid, |
|||
name text, |
|||
search_text text, |
|||
configuration text, |
|||
additional_info text, |
|||
PRIMARY KEY (id, tenant_id) |
|||
); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_name AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND name IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, name, id, customer_id, type) |
|||
WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, search_text, id, customer_id, type) |
|||
WITH CLUSTERING ORDER BY ( search_text ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_tenant_by_type_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, type, search_text, id, customer_id) |
|||
WITH CLUSTERING ORDER BY ( type ASC, search_text ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_customer_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( customer_id, tenant_id, search_text, id, type ) |
|||
WITH CLUSTERING ORDER BY ( tenant_id DESC, search_text ASC, id DESC ); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.edge_by_customer_by_type_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.edge |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( customer_id, tenant_id, type, search_text, id ) |
|||
WITH CLUSTERING ORDER BY ( tenant_id DESC, type ASC, search_text ASC, id DESC ); |
|||
|
|||
-- VOBA ADD changes for the MATERIALIZED view for DEVICE ASSET ENTITY_VIEW RULE_CHAIN |
|||
@ -0,0 +1,322 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.alarm.Alarm; |
|||
import org.thingsboard.server.common.data.audit.ActionType; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.edge.EdgeEventType; |
|||
import org.thingsboard.server.common.data.id.AlarmId; |
|||
import org.thingsboard.server.common.data.id.DashboardId; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.id.IdBased; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.common.data.relation.RelationTypeGroup; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.dao.alarm.AlarmService; |
|||
import org.thingsboard.server.dao.edge.EdgeEventService; |
|||
import org.thingsboard.server.dao.edge.EdgeService; |
|||
import org.thingsboard.server.dao.relation.RelationService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.queue.util.TbCoreComponent; |
|||
import org.thingsboard.server.service.executors.DbCallbackExecutorService; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.io.IOException; |
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.HashSet; |
|||
import java.util.List; |
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Service |
|||
@TbCoreComponent |
|||
@Slf4j |
|||
public class DefaultEdgeNotificationService implements EdgeNotificationService { |
|||
|
|||
private static final ObjectMapper mapper = new ObjectMapper(); |
|||
|
|||
@Autowired |
|||
private EdgeService edgeService; |
|||
|
|||
@Autowired |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@Autowired |
|||
private AlarmService alarmService; |
|||
|
|||
@Autowired |
|||
private RelationService relationService; |
|||
|
|||
@Autowired |
|||
private EdgeEventService edgeEventService; |
|||
|
|||
@Autowired |
|||
private DbCallbackExecutorService dbCallbackExecutorService; |
|||
|
|||
private ExecutorService tsCallBackExecutor; |
|||
|
|||
@PostConstruct |
|||
public void initExecutor() { |
|||
tsCallBackExecutor = Executors.newSingleThreadExecutor(); |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void shutdownExecutor() { |
|||
if (tsCallBackExecutor != null) { |
|||
tsCallBackExecutor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { |
|||
return edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException { |
|||
edge.setRootRuleChainId(ruleChainId); |
|||
Edge savedEdge = edgeService.saveEdge(edge); |
|||
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, ActionType.UPDATED, ruleChainId, null); |
|||
return savedEdge; |
|||
} |
|||
|
|||
private void saveEdgeEvent(TenantId tenantId, |
|||
EdgeId edgeId, |
|||
EdgeEventType edgeEventType, |
|||
ActionType edgeEventAction, |
|||
EntityId entityId, |
|||
JsonNode entityBody) { |
|||
log.debug("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], edgeEventType [{}], edgeEventAction[{}], entityId [{}], entityBody [{}]", |
|||
tenantId, edgeId, edgeEventType, edgeEventAction, entityId, entityBody); |
|||
|
|||
EdgeEvent edgeEvent = new EdgeEvent(); |
|||
edgeEvent.setEdgeId(edgeId); |
|||
edgeEvent.setTenantId(tenantId); |
|||
edgeEvent.setEdgeEventType(edgeEventType); |
|||
edgeEvent.setEdgeEventAction(edgeEventAction.name()); |
|||
if (entityId != null) { |
|||
edgeEvent.setEntityId(entityId.getId()); |
|||
} |
|||
edgeEvent.setEntityBody(entityBody); |
|||
edgeEventService.saveAsync(edgeEvent); |
|||
} |
|||
|
|||
@Override |
|||
public void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) { |
|||
try { |
|||
TenantId tenantId = new TenantId(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB())); |
|||
EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType()); |
|||
switch (edgeEventType) { |
|||
// TODO: voba - handle edge updates
|
|||
// case EDGE:
|
|||
case ASSET: |
|||
case DEVICE: |
|||
case ENTITY_VIEW: |
|||
case DASHBOARD: |
|||
case RULE_CHAIN: |
|||
processEntities(tenantId, edgeNotificationMsg); |
|||
break; |
|||
case ALARM: |
|||
processAlarm(tenantId, edgeNotificationMsg); |
|||
break; |
|||
case RELATION: |
|||
processRelation(tenantId, edgeNotificationMsg); |
|||
break; |
|||
default: |
|||
log.debug("Edge event type [{}] is not designed to be pushed to edge", edgeEventType); |
|||
} |
|||
} catch (Exception e) { |
|||
callback.onFailure(e); |
|||
log.error("Can't push to edge updates, edgeNotificationMsg [{}]", edgeNotificationMsg, e); |
|||
} finally { |
|||
callback.onSuccess(); |
|||
} |
|||
} |
|||
|
|||
private void processEntities(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { |
|||
ActionType edgeEventActionType = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()); |
|||
EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType()); |
|||
EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(edgeEventType, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); |
|||
switch (edgeEventActionType) { |
|||
// TODO: voba - ADDED is not required for CE version ?
|
|||
// case ADDED:
|
|||
case UPDATED: |
|||
ListenableFuture<List<EdgeId>> edgeIdsFuture = findRelatedEdgeIdsByEntityId(tenantId, entityId); |
|||
Futures.transform(edgeIdsFuture, edgeIds -> { |
|||
if (edgeIds != null && !edgeIds.isEmpty()) { |
|||
for (EdgeId edgeId : edgeIds) { |
|||
try { |
|||
saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, null); |
|||
if (edgeEventType.equals(EdgeEventType.RULE_CHAIN)) { |
|||
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, new RuleChainId(entityId.getId())); |
|||
saveEdgeEvent(tenantId, edgeId, EdgeEventType.RULE_CHAIN_METADATA, edgeEventActionType, ruleChainMetaData.getRuleChainId(), null); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("[{}] Failed to push event to edge, edgeId [{}], edgeEventType [{}], edgeEventActionType [{}], entityId [{}]", |
|||
tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, e); |
|||
} |
|||
} |
|||
} |
|||
return null; |
|||
}, dbCallbackExecutorService); |
|||
break; |
|||
case DELETED: |
|||
PageData<Edge> edgesByTenantId = edgeService.findEdgesByTenantId(tenantId, new PageLink(Integer.MAX_VALUE)); |
|||
if (edgesByTenantId != null && edgesByTenantId.getData() != null && !edgesByTenantId.getData().isEmpty()) { |
|||
for (Edge edge : edgesByTenantId.getData()) { |
|||
saveEdgeEvent(tenantId, edge.getId(), edgeEventType, edgeEventActionType, entityId, null); |
|||
} |
|||
} |
|||
break; |
|||
case ASSIGNED_TO_EDGE: |
|||
case UNASSIGNED_FROM_EDGE: |
|||
EdgeId edgeId = new EdgeId(new UUID(edgeNotificationMsg.getEdgeIdMSB(), edgeNotificationMsg.getEdgeIdLSB())); |
|||
saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventActionType, entityId, null); |
|||
break; |
|||
case RELATIONS_DELETED: |
|||
// TODO: voba - add support for relations deleted
|
|||
break; |
|||
} |
|||
} |
|||
|
|||
private void processAlarm(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { |
|||
AlarmId alarmId = new AlarmId(new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB())); |
|||
ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, alarmId); |
|||
Futures.transform(alarmFuture, alarm -> { |
|||
if (alarm != null) { |
|||
EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); |
|||
if (edgeEventType != null) { |
|||
ListenableFuture<List<EdgeId>> relatedEdgeIdsByEntityIdFuture = findRelatedEdgeIdsByEntityId(tenantId, alarm.getOriginator()); |
|||
Futures.transform(relatedEdgeIdsByEntityIdFuture, relatedEdgeIdsByEntityId -> { |
|||
if (relatedEdgeIdsByEntityId != null) { |
|||
for (EdgeId edgeId : relatedEdgeIdsByEntityId) { |
|||
saveEdgeEvent(tenantId, |
|||
edgeId, |
|||
EdgeEventType.ALARM, |
|||
ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()), |
|||
alarmId, |
|||
null); |
|||
} |
|||
} |
|||
return null; |
|||
}, dbCallbackExecutorService); |
|||
} |
|||
} |
|||
return null; |
|||
}, dbCallbackExecutorService); |
|||
} |
|||
|
|||
private void processRelation(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg) { |
|||
EntityRelation entityRelation = mapper.convertValue(edgeNotificationMsg.getEntityBody(), EntityRelation.class); |
|||
List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>(); |
|||
futures.add(findRelatedEdgeIdsByEntityId(tenantId, entityRelation.getTo())); |
|||
futures.add(findRelatedEdgeIdsByEntityId(tenantId, entityRelation.getFrom())); |
|||
ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures); |
|||
Futures.transform(combinedFuture, listOfListsEdgeIds -> { |
|||
Set<EdgeId> uniqueEdgeIds = new HashSet<>(); |
|||
if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) { |
|||
for (List<EdgeId> listOfListsEdgeId : listOfListsEdgeIds) { |
|||
if (listOfListsEdgeId != null) { |
|||
uniqueEdgeIds.addAll(listOfListsEdgeId); |
|||
} |
|||
} |
|||
} |
|||
if (!uniqueEdgeIds.isEmpty()) { |
|||
for (EdgeId edgeId : uniqueEdgeIds) { |
|||
saveEdgeEvent(tenantId, |
|||
edgeId, |
|||
EdgeEventType.RELATION, |
|||
ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction()), |
|||
null, |
|||
mapper.valueToTree(entityRelation)); |
|||
} |
|||
} |
|||
return null; |
|||
}, dbCallbackExecutorService); |
|||
} |
|||
|
|||
private ListenableFuture<List<EdgeId>> findRelatedEdgeIdsByEntityId(TenantId tenantId, EntityId entityId) { |
|||
switch (entityId.getEntityType()) { |
|||
case DEVICE: |
|||
case ASSET: |
|||
case ENTITY_VIEW: |
|||
ListenableFuture<List<EntityRelation>> originatorEdgeRelationsFuture = relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE); |
|||
return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> { |
|||
if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) { |
|||
return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId())); |
|||
} else { |
|||
return Collections.emptyList(); |
|||
} |
|||
}, dbCallbackExecutorService); |
|||
case DASHBOARD: |
|||
return convertToEdgeIds(edgeService.findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId()))); |
|||
case RULE_CHAIN: |
|||
return convertToEdgeIds(edgeService.findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId()))); |
|||
default: |
|||
return Futures.immediateFuture(Collections.emptyList()); |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<List<Edge>> future) { |
|||
return Futures.transform(future, edges -> { |
|||
if (edges != null && !edges.isEmpty()) { |
|||
return edges.stream().map(IdBased::getId).collect(Collectors.toList()); |
|||
} else { |
|||
return Collections.emptyList(); |
|||
} |
|||
}, dbCallbackExecutorService); |
|||
} |
|||
|
|||
private EdgeEventType getEdgeQueueTypeByEntityType(EntityType entityType) { |
|||
switch (entityType) { |
|||
case DEVICE: |
|||
return EdgeEventType.DEVICE; |
|||
case ASSET: |
|||
return EdgeEventType.ASSET; |
|||
case ENTITY_VIEW: |
|||
return EdgeEventType.ENTITY_VIEW; |
|||
default: |
|||
log.info("Unsupported entity type: [{}]", entityType); |
|||
return null; |
|||
} |
|||
} |
|||
} |
|||
|
|||
|
|||
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge; |
|||
|
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.common.msg.queue.TbCallback; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
|
|||
import java.io.IOException; |
|||
|
|||
public interface EdgeNotificationService { |
|||
|
|||
PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink); |
|||
|
|||
Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException; |
|||
|
|||
void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback); |
|||
} |
|||
File diff suppressed because it is too large
@ -0,0 +1,56 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc.constructor; |
|||
|
|||
import com.google.gson.JsonElement; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.audit.ActionType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.transport.adaptor.JsonConverter; |
|||
import org.thingsboard.server.gen.edge.EntityDataProto; |
|||
|
|||
@Component |
|||
@Slf4j |
|||
public class EntityDataMsgConstructor { |
|||
|
|||
public EntityDataProto constructEntityDataMsg(EntityId entityId, ActionType actionType, JsonElement entityData) { |
|||
EntityDataProto.Builder builder = EntityDataProto.newBuilder() |
|||
.setEntityIdMSB(entityId.getId().getMostSignificantBits()) |
|||
.setEntityIdLSB(entityId.getId().getLeastSignificantBits()) |
|||
.setEntityType(entityId.getEntityType().name()); |
|||
switch (actionType) { |
|||
case TIMESERIES_UPDATED: |
|||
try { |
|||
builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(entityData)); |
|||
} catch (Exception e) { |
|||
log.warn("Can't convert to telemetry proto, entityData [{}]", entityData, e); |
|||
} |
|||
break; |
|||
case ATTRIBUTES_UPDATED: |
|||
try { |
|||
builder.setPostAttributesMsg(JsonConverter.convertToAttributesProto(entityData)); |
|||
} catch (Exception e) { |
|||
log.warn("Can't convert to attributes proto, entityData [{}]", entityData, e); |
|||
} |
|||
break; |
|||
// TODO: voba - add support for attribute delete
|
|||
// case ATTRIBUTES_DELETED:
|
|||
} |
|||
return builder.build(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,45 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc.constructor; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.dao.util.mapping.JacksonUtil; |
|||
import org.thingsboard.server.gen.edge.RelationUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.UpdateMsgType; |
|||
|
|||
@Component |
|||
@Slf4j |
|||
public class RelationUpdateMsgConstructor { |
|||
|
|||
public RelationUpdateMsg constructRelationUpdatedMsg(UpdateMsgType msgType, EntityRelation entityRelation) { |
|||
RelationUpdateMsg.Builder builder = RelationUpdateMsg.newBuilder() |
|||
.setMsgType(msgType) |
|||
.setFromIdMSB(entityRelation.getFrom().getId().getMostSignificantBits()) |
|||
.setFromIdLSB(entityRelation.getFrom().getId().getLeastSignificantBits()) |
|||
.setFromEntityType(entityRelation.getFrom().getEntityType().name()) |
|||
.setToIdMSB(entityRelation.getTo().getId().getMostSignificantBits()) |
|||
.setToIdLSB(entityRelation.getTo().getId().getLeastSignificantBits()) |
|||
.setToEntityType(entityRelation.getTo().getEntityType().name()) |
|||
.setType(entityRelation.getType()) |
|||
.setAdditionalInfo(JacksonUtil.toString(entityRelation.getAdditionalInfo())); |
|||
if (entityRelation.getTypeGroup() != null) { |
|||
builder.setTypeGroup(entityRelation.getTypeGroup().name()); |
|||
} |
|||
return builder.build(); |
|||
} |
|||
} |
|||
@ -0,0 +1,72 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc.constructor; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.common.data.User; |
|||
import org.thingsboard.server.common.data.id.UserId; |
|||
import org.thingsboard.server.common.data.security.UserCredentials; |
|||
import org.thingsboard.server.dao.user.UserService; |
|||
import org.thingsboard.server.dao.util.mapping.JacksonUtil; |
|||
import org.thingsboard.server.gen.edge.UpdateMsgType; |
|||
import org.thingsboard.server.gen.edge.UserUpdateMsg; |
|||
|
|||
@Component |
|||
@Slf4j |
|||
public class UserUpdateMsgConstructor { |
|||
|
|||
@Autowired |
|||
private UserService userService; |
|||
|
|||
public UserUpdateMsg constructUserUpdatedMsg(UpdateMsgType msgType, User user) { |
|||
UserUpdateMsg.Builder builder = UserUpdateMsg.newBuilder() |
|||
.setMsgType(msgType) |
|||
.setIdMSB(user.getId().getId().getMostSignificantBits()) |
|||
.setIdLSB(user.getId().getId().getLeastSignificantBits()) |
|||
.setEmail(user.getEmail()) |
|||
.setAuthority(user.getAuthority().name()) |
|||
.setEnabled(false); |
|||
if (user.getFirstName() != null) { |
|||
builder.setFirstName(user.getFirstName()); |
|||
} |
|||
if (user.getLastName() != null) { |
|||
builder.setLastName(user.getLastName()); |
|||
} |
|||
if (user.getAdditionalInfo() != null) { |
|||
builder.setAdditionalInfo(JacksonUtil.toString(user.getAdditionalInfo())); |
|||
} |
|||
if (user.getAdditionalInfo() != null) { |
|||
builder.setAdditionalInfo(JacksonUtil.toString(user.getAdditionalInfo())); |
|||
} |
|||
if (msgType.equals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE) || |
|||
msgType.equals(UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE)) { |
|||
UserCredentials userCredentials = userService.findUserCredentialsByUserId(user.getTenantId(), user.getId()); |
|||
if (userCredentials != null) { |
|||
builder.setEnabled(userCredentials.isEnabled()).setPassword(userCredentials.getPassword()); |
|||
} |
|||
} |
|||
return builder.build(); |
|||
} |
|||
|
|||
public UserUpdateMsg constructUserDeleteMsg(UserId userId) { |
|||
return UserUpdateMsg.newBuilder() |
|||
.setMsgType(UpdateMsgType.ENTITY_DELETED_RPC_MESSAGE) |
|||
.setIdMSB(userId.getId().getMostSignificantBits()) |
|||
.setIdLSB(userId.getId().getLeastSignificantBits()).build(); |
|||
} |
|||
} |
|||
@ -1,271 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc.init; |
|||
|
|||
import io.grpc.stub.StreamObserver; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.Dashboard; |
|||
import org.thingsboard.server.common.data.DashboardInfo; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.EntityView; |
|||
import org.thingsboard.server.common.data.asset.Asset; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.dao.asset.AssetService; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.entityview.EntityViewService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.gen.edge.AssetUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.DashboardUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.DeviceUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.EntityUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.ResponseMsg; |
|||
import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg; |
|||
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.UpdateMsgType; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
public class DefaultInitEdgeService implements InitEdgeService { |
|||
|
|||
@Autowired |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@Autowired |
|||
private DeviceService deviceService; |
|||
|
|||
@Autowired |
|||
private AssetService assetService; |
|||
|
|||
@Autowired |
|||
private EntityViewService entityViewService; |
|||
|
|||
@Autowired |
|||
private DashboardService dashboardService; |
|||
|
|||
@Autowired |
|||
private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private DeviceUpdateMsgConstructor deviceUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private AssetUpdateMsgConstructor assetUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private EntityViewUpdateMsgConstructor entityViewUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor; |
|||
|
|||
@Override |
|||
public void init(Edge edge, StreamObserver<ResponseMsg> outputStream) { |
|||
initRuleChains(edge, outputStream); |
|||
initDevices(edge, outputStream); |
|||
initAssets(edge, outputStream); |
|||
initEntityViews(edge, outputStream); |
|||
initDashboards(edge, outputStream); |
|||
} |
|||
|
|||
private void initDevices(Edge edge, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
PageLink pageLink = new PageLink(100); |
|||
PageData<Device> pageData; |
|||
do { |
|||
pageData = deviceService.findDevicesByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink); |
|||
if (!pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (Device device : pageData.getData()) { |
|||
DeviceUpdateMsg deviceUpdateMsg = |
|||
deviceUpdateMsgConstructor.constructDeviceUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
device); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setDeviceUpdateMsg(deviceUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
} |
|||
} |
|||
if (pageData.hasNext()) { |
|||
pageLink = pageLink.nextPageLink(); |
|||
} |
|||
} while (pageData.hasNext()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge device(s) on init!"); |
|||
} |
|||
} |
|||
|
|||
private void initAssets(Edge edge, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
PageLink pageLink = new PageLink(100); |
|||
PageData<Asset> pageData; |
|||
do { |
|||
pageData = assetService.findAssetsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink); |
|||
if (!pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] asset(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (Asset asset : pageData.getData()) { |
|||
AssetUpdateMsg assetUpdateMsg = |
|||
assetUpdateMsgConstructor.constructAssetUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
asset); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setAssetUpdateMsg(assetUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
} |
|||
} |
|||
if (pageData.hasNext()) { |
|||
pageLink = pageLink.nextPageLink(); |
|||
} |
|||
} while (pageData.hasNext()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge asset(s) on init!"); |
|||
} |
|||
} |
|||
|
|||
private void initEntityViews(Edge edge, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
PageLink pageLink = new PageLink(100); |
|||
PageData<EntityView> pageData; |
|||
do { |
|||
pageData = entityViewService.findEntityViewsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink); |
|||
if (!pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] entity view(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (EntityView entityView : pageData.getData()) { |
|||
EntityViewUpdateMsg entityViewUpdateMsg = |
|||
entityViewUpdateMsgConstructor.constructEntityViewUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
entityView); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setEntityViewUpdateMsg(entityViewUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
} |
|||
} |
|||
if (pageData.hasNext()) { |
|||
pageLink = pageLink.nextPageLink(); |
|||
} |
|||
} while (pageData.hasNext()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge entity view(s) on init!"); |
|||
} |
|||
} |
|||
|
|||
private void initDashboards(Edge edge, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
TimePageLink pageLink = new TimePageLink(100); |
|||
PageData<DashboardInfo> pageData; |
|||
do { |
|||
pageData = dashboardService.findDashboardsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink); |
|||
if (!pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] dashboard(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (DashboardInfo dashboardInfo : pageData.getData()) { |
|||
Dashboard dashboard = dashboardService.findDashboardById(edge.getTenantId(), dashboardInfo.getId()); |
|||
DashboardUpdateMsg dashboardUpdateMsg = |
|||
dashboardUpdateMsgConstructor.constructDashboardUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
dashboard); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setDashboardUpdateMsg(dashboardUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
} |
|||
} |
|||
if (pageData.hasNext()) { |
|||
pageLink = pageLink.nextPageLink(); |
|||
} |
|||
} while (pageData.hasNext()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge dashboard(s) on init!"); |
|||
} |
|||
} |
|||
|
|||
private void initRuleChains(Edge edge, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
TimePageLink pageLink = new TimePageLink(100); |
|||
PageData<RuleChain> pageData; |
|||
do { |
|||
pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink); |
|||
if (!pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (RuleChain ruleChain : pageData.getData()) { |
|||
RuleChainUpdateMsg ruleChainUpdateMsg = |
|||
ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg( |
|||
edge, |
|||
UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, |
|||
ruleChain); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setRuleChainUpdateMsg(ruleChainUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
} |
|||
} |
|||
if (pageData.hasNext()) { |
|||
pageLink = pageLink.nextPageLink(); |
|||
} |
|||
} while (pageData.hasNext()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge rule chain(s) on init!"); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void initRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream) { |
|||
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) { |
|||
RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB())); |
|||
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edge.getTenantId(), ruleChainId); |
|||
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = |
|||
ruleChainUpdateMsgConstructor.constructRuleChainMetadataUpdatedMsg( |
|||
UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, |
|||
ruleChainMetaData); |
|||
if (ruleChainMetadataUpdateMsg != null) { |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,398 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.edge.rpc.init; |
|||
|
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import io.grpc.stub.StreamObserver; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.Dashboard; |
|||
import org.thingsboard.server.common.data.DashboardInfo; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.EntityView; |
|||
import org.thingsboard.server.common.data.User; |
|||
import org.thingsboard.server.common.data.asset.Asset; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.common.data.relation.EntityRelationsQuery; |
|||
import org.thingsboard.server.common.data.relation.EntitySearchDirection; |
|||
import org.thingsboard.server.common.data.relation.RelationsSearchParameters; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.dao.asset.AssetService; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.entityview.EntityViewService; |
|||
import org.thingsboard.server.dao.relation.RelationService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.dao.user.UserService; |
|||
import org.thingsboard.server.gen.edge.AssetUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.DashboardUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.DeviceUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.EntityUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.RelationUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.ResponseMsg; |
|||
import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg; |
|||
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; |
|||
import org.thingsboard.server.gen.edge.UpdateMsgType; |
|||
import org.thingsboard.server.gen.edge.UserUpdateMsg; |
|||
import org.thingsboard.server.service.edge.EdgeContextComponent; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.RelationUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor; |
|||
import org.thingsboard.server.service.edge.rpc.constructor.UserUpdateMsgConstructor; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.HashSet; |
|||
import java.util.List; |
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
public class DefaultSyncEdgeService implements SyncEdgeService { |
|||
|
|||
@Autowired |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@Autowired |
|||
private RelationService relationService; |
|||
|
|||
@Autowired |
|||
private DeviceService deviceService; |
|||
|
|||
@Autowired |
|||
private AssetService assetService; |
|||
|
|||
@Autowired |
|||
private EntityViewService entityViewService; |
|||
|
|||
@Autowired |
|||
private DashboardService dashboardService; |
|||
|
|||
@Autowired |
|||
private UserService userService; |
|||
|
|||
@Autowired |
|||
private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private DeviceUpdateMsgConstructor deviceUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private AssetUpdateMsgConstructor assetUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private EntityViewUpdateMsgConstructor entityViewUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private UserUpdateMsgConstructor userUpdateMsgConstructor; |
|||
|
|||
@Autowired |
|||
private RelationUpdateMsgConstructor relationUpdateMsgConstructor; |
|||
|
|||
@Override |
|||
public void sync(EdgeContextComponent ctx, Edge edge, StreamObserver<ResponseMsg> outputStream) { |
|||
Set<EntityId> pushedEntityIds = new HashSet<>(); |
|||
syncUsers(ctx, edge, pushedEntityIds, outputStream); |
|||
List<ListenableFuture<Void>> futures = new ArrayList<>(); |
|||
futures.add(syncRuleChains(ctx, edge, pushedEntityIds, outputStream)); |
|||
futures.add(syncDevices(ctx, edge, pushedEntityIds, outputStream)); |
|||
futures.add(syncAssets(ctx, edge, pushedEntityIds, outputStream)); |
|||
futures.add(syncEntityViews(ctx, edge, pushedEntityIds, outputStream)); |
|||
futures.add(syncDashboards(ctx, edge, pushedEntityIds, outputStream)); |
|||
ListenableFuture<List<Void>> joinFuture = Futures.allAsList(futures); |
|||
Futures.transform(joinFuture, result -> { |
|||
syncRelations(ctx, edge, pushedEntityIds, outputStream); |
|||
return null; |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
|
|||
private ListenableFuture<Void> syncRuleChains(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
ListenableFuture<PageData<RuleChain>> future = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); |
|||
return Futures.transform(future, pageData -> { |
|||
try { |
|||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (RuleChain ruleChain : pageData.getData()) { |
|||
RuleChainUpdateMsg ruleChainUpdateMsg = |
|||
ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg( |
|||
edge.getRootRuleChainId(), |
|||
UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, |
|||
ruleChain); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setRuleChainUpdateMsg(ruleChainUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
pushedEntityIds.add(ruleChain.getId()); |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge rule chain(s) on sync!", e); |
|||
} |
|||
return null; |
|||
}, ctx.getDbCallbackExecutor()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge rule chain(s) on sync!", e); |
|||
return Futures.immediateFuture(null); |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<Void> syncDevices(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
ListenableFuture<PageData<Device>> future = deviceService.findDevicesByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); |
|||
return Futures.transform(future, pageData -> { |
|||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (Device device : pageData.getData()) { |
|||
DeviceUpdateMsg deviceUpdateMsg = |
|||
deviceUpdateMsgConstructor.constructDeviceUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
device); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setDeviceUpdateMsg(deviceUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
pushedEntityIds.add(device.getId()); |
|||
} |
|||
} |
|||
return null; |
|||
}, ctx.getDbCallbackExecutor()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge device(s) on sync!", e); |
|||
return Futures.immediateFuture(null); |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<Void> syncAssets(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
ListenableFuture<PageData<Asset>> future = assetService.findAssetsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); |
|||
return Futures.transform(future, pageData -> { |
|||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] asset(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (Asset asset : pageData.getData()) { |
|||
AssetUpdateMsg assetUpdateMsg = |
|||
assetUpdateMsgConstructor.constructAssetUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
asset); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setAssetUpdateMsg(assetUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
pushedEntityIds.add(asset.getId()); |
|||
} |
|||
} |
|||
return null; |
|||
}, ctx.getDbCallbackExecutor()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge asset(s) on sync!", e); |
|||
return Futures.immediateFuture(null); |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<Void> syncEntityViews(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
ListenableFuture<PageData<EntityView>> future = entityViewService.findEntityViewsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); |
|||
return Futures.transform(future, pageData -> { |
|||
try { |
|||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] entity view(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (EntityView entityView : pageData.getData()) { |
|||
EntityViewUpdateMsg entityViewUpdateMsg = |
|||
entityViewUpdateMsgConstructor.constructEntityViewUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
entityView); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setEntityViewUpdateMsg(entityViewUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
pushedEntityIds.add(entityView.getId()); |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge entity view(s) on sync!", e); |
|||
} |
|||
return null; |
|||
}, ctx.getDbCallbackExecutor()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge entity view(s) on sync!", e); |
|||
return Futures.immediateFuture(null); |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<Void> syncDashboards(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
ListenableFuture<PageData<DashboardInfo>> future = dashboardService.findDashboardsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), new TimePageLink(Integer.MAX_VALUE)); |
|||
return Futures.transform(future, pageData -> { |
|||
try { |
|||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] dashboard(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (DashboardInfo dashboardInfo : pageData.getData()) { |
|||
Dashboard dashboard = dashboardService.findDashboardById(edge.getTenantId(), dashboardInfo.getId()); |
|||
DashboardUpdateMsg dashboardUpdateMsg = |
|||
dashboardUpdateMsgConstructor.constructDashboardUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
dashboard); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setDashboardUpdateMsg(dashboardUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
pushedEntityIds.add(dashboard.getId()); |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge dashboard(s) on sync!", e); |
|||
} |
|||
return null; |
|||
}, ctx.getDbCallbackExecutor()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge dashboard(s) on sync!", e); |
|||
return Futures.immediateFuture(null); |
|||
} |
|||
} |
|||
|
|||
private void syncUsers(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
|||
try { |
|||
PageData<User> pageData = userService.findTenantAdmins(edge.getTenantId(), new PageLink(Integer.MAX_VALUE)); |
|||
pushUsersToEdge(pageData, edge, pushedEntityIds, outputStream); |
|||
if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { |
|||
pageData = userService.findCustomerUsers(edge.getTenantId(), edge.getCustomerId(), new PageLink(Integer.MAX_VALUE)); |
|||
pushUsersToEdge(pageData, edge, pushedEntityIds, outputStream); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading edge user(s) on sync!", e); |
|||
} |
|||
} |
|||
|
|||
private void pushUsersToEdge(PageData<User> pageData, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
|||
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { |
|||
log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); |
|||
for (User user : pageData.getData()) { |
|||
UserUpdateMsg userUpdateMsg = |
|||
userUpdateMsgConstructor.constructUserUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
user); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setUserUpdateMsg(userUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
pushedEntityIds.add(user.getId()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<Void> syncRelations(EdgeContextComponent ctx, Edge edge, Set<EntityId> pushedEntityIds, StreamObserver<ResponseMsg> outputStream) { |
|||
if (!pushedEntityIds.isEmpty()) { |
|||
List<ListenableFuture<List<EntityRelation>>> futures = new ArrayList<>(); |
|||
for (EntityId entityId : pushedEntityIds) { |
|||
futures.add(syncRelations(edge, entityId, EntitySearchDirection.FROM)); |
|||
futures.add(syncRelations(edge, entityId, EntitySearchDirection.TO)); |
|||
} |
|||
ListenableFuture<List<List<EntityRelation>>> relationsListFuture = Futures.allAsList(futures); |
|||
return Futures.transform(relationsListFuture, relationsList -> { |
|||
try { |
|||
Set<EntityRelation> uniqueEntityRelations = new HashSet<>(); |
|||
if (!relationsList.isEmpty()) { |
|||
for (List<EntityRelation> entityRelations : relationsList) { |
|||
if (!entityRelations.isEmpty()) { |
|||
uniqueEntityRelations.addAll(entityRelations); |
|||
} |
|||
} |
|||
} |
|||
if (!uniqueEntityRelations.isEmpty()) { |
|||
log.trace("[{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), uniqueEntityRelations.size()); |
|||
for (EntityRelation relation : uniqueEntityRelations) { |
|||
try { |
|||
RelationUpdateMsg relationUpdateMsg = |
|||
relationUpdateMsgConstructor.constructRelationUpdatedMsg( |
|||
UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, |
|||
relation); |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setRelationUpdateMsg(relationUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading relation [{}] to edge on sync!", relation, e); |
|||
} |
|||
} |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Exception during loading relation(s) to edge on sync!", e); |
|||
} |
|||
return null; |
|||
}, ctx.getDbCallbackExecutor()); |
|||
} else { |
|||
return Futures.immediateFuture(null); |
|||
} |
|||
} |
|||
|
|||
private ListenableFuture<List<EntityRelation>> syncRelations(Edge edge, EntityId entityId, EntitySearchDirection direction) { |
|||
EntityRelationsQuery query = new EntityRelationsQuery(); |
|||
query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false)); |
|||
return relationService.findByQuery(edge.getTenantId(), query); |
|||
} |
|||
|
|||
@Override |
|||
public void syncRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver<ResponseMsg> outputStream) { |
|||
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) { |
|||
RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB())); |
|||
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(edge.getTenantId(), ruleChainId); |
|||
RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = |
|||
ruleChainUpdateMsgConstructor.constructRuleChainMetadataUpdatedMsg( |
|||
UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, |
|||
ruleChainMetaData); |
|||
if (ruleChainMetadataUpdateMsg != null) { |
|||
EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() |
|||
.setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg) |
|||
.build(); |
|||
outputStream.onNext(ResponseMsg.newBuilder() |
|||
.setEntityUpdateMsg(entityUpdateMsg) |
|||
.build()); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,35 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.edge; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.edge.EdgeEventType; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
|
|||
public interface EdgeEventService { |
|||
|
|||
EdgeEventType getEdgeEventTypeByEntityType(EntityType entityType); |
|||
|
|||
ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent); |
|||
|
|||
PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink); |
|||
|
|||
} |
|||
@ -0,0 +1,73 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data; |
|||
|
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
|
|||
import java.util.Set; |
|||
|
|||
public final class EdgeUtils { |
|||
|
|||
private EdgeUtils() { |
|||
} |
|||
|
|||
public static boolean isAssignedToEdge(Set<ShortEdgeInfo> assignedEdges, EdgeId edgeId) { |
|||
return assignedEdges != null && assignedEdges.contains(new ShortEdgeInfo(edgeId, null, null)); |
|||
} |
|||
|
|||
public static ShortEdgeInfo getAssignedEdgeInfo(Set<ShortEdgeInfo> assignedEdges, EdgeId edgeId) { |
|||
if (assignedEdges != null) { |
|||
for (ShortEdgeInfo edgeInfo : assignedEdges) { |
|||
if (edgeInfo.getEdgeId().equals(edgeId)) { |
|||
return edgeInfo; |
|||
} |
|||
} |
|||
} |
|||
return null; |
|||
} |
|||
|
|||
public static boolean addAssignedEdge(Set<ShortEdgeInfo> assignedEdges, ShortEdgeInfo edgeInfo) { |
|||
if (assignedEdges != null && assignedEdges.contains(edgeInfo)) { |
|||
return false; |
|||
} else { |
|||
if (assignedEdges != null) { |
|||
assignedEdges.add(edgeInfo); |
|||
return true; |
|||
} else { |
|||
return false; |
|||
} |
|||
} |
|||
} |
|||
|
|||
public static boolean updateAssignedEdge(Set<ShortEdgeInfo> assignedEdges, ShortEdgeInfo edgeInfo) { |
|||
if (assignedEdges != null && assignedEdges.contains(edgeInfo)) { |
|||
assignedEdges.remove(edgeInfo); |
|||
assignedEdges.add(edgeInfo); |
|||
return true; |
|||
} else { |
|||
return false; |
|||
} |
|||
} |
|||
|
|||
public static boolean removeAssignedEdge(Set<ShortEdgeInfo> assignedEdges, ShortEdgeInfo edgeInfo) { |
|||
if (assignedEdges != null && assignedEdges.contains(edgeInfo)) { |
|||
assignedEdges.remove(edgeInfo); |
|||
return true; |
|||
} else { |
|||
return false; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data.edge; |
|||
|
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.BaseData; |
|||
import org.thingsboard.server.common.data.id.EdgeEventId; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
@Data |
|||
public class EdgeEvent extends BaseData<EdgeEventId> { |
|||
|
|||
private TenantId tenantId; |
|||
private EdgeId edgeId; |
|||
private String edgeEventAction; |
|||
private UUID entityId; |
|||
private EdgeEventType edgeEventType; |
|||
private transient JsonNode entityBody; |
|||
|
|||
public EdgeEvent() { |
|||
super(); |
|||
} |
|||
|
|||
public EdgeEvent(EdgeEventId id) { |
|||
super(id); |
|||
} |
|||
|
|||
public EdgeEvent(EdgeEvent event) { |
|||
super(event); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,84 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.edge; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.edge.EdgeEventType; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.dao.exception.DataValidationException; |
|||
import org.thingsboard.server.dao.service.DataValidator; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
public class BaseEdgeEventService implements EdgeEventService { |
|||
|
|||
@Autowired |
|||
public EdgeEventDao edgeEventDao; |
|||
|
|||
@Override |
|||
public EdgeEventType getEdgeEventTypeByEntityType(EntityType entityType) { |
|||
switch (entityType) { |
|||
case DEVICE: |
|||
return EdgeEventType.DEVICE; |
|||
case ASSET: |
|||
return EdgeEventType.ASSET; |
|||
case ENTITY_VIEW: |
|||
return EdgeEventType.ENTITY_VIEW; |
|||
case DASHBOARD: |
|||
return EdgeEventType.DASHBOARD; |
|||
case USER: |
|||
return EdgeEventType.USER; |
|||
case ALARM: |
|||
return EdgeEventType.ALARM; |
|||
default: |
|||
log.warn("Failed to push notification to edge service. Unsupported entity type [{}]", entityType); |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent) { |
|||
edgeEventValidator.validate(edgeEvent, EdgeEvent::getTenantId); |
|||
return edgeEventDao.saveAsync(edgeEvent); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EdgeEvent> findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { |
|||
return edgeEventDao.findEdgeEvents(tenantId.getId(), edgeId, pageLink); |
|||
} |
|||
|
|||
private DataValidator<EdgeEvent> edgeEventValidator = |
|||
new DataValidator<EdgeEvent>() { |
|||
@Override |
|||
protected void validateDataImpl(TenantId tenantId, EdgeEvent edgeEvent) { |
|||
if (edgeEvent.getEdgeId() == null) { |
|||
throw new DataValidationException("Edge id should be specified!"); |
|||
} |
|||
if (StringUtils.isEmpty(edgeEvent.getEdgeEventAction())) { |
|||
throw new DataValidationException("Edge Event action should be specified!"); |
|||
} |
|||
} |
|||
}; |
|||
} |
|||
@ -1,743 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.edge; |
|||
|
|||
import com.fasterxml.jackson.databind.ObjectMapper; |
|||
import com.google.common.base.Function; |
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.cache.Cache; |
|||
import org.springframework.cache.CacheManager; |
|||
import org.springframework.cache.annotation.CacheEvict; |
|||
import org.springframework.cache.annotation.Cacheable; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.util.StringUtils; |
|||
import org.thingsboard.server.common.data.Customer; |
|||
import org.thingsboard.server.common.data.Dashboard; |
|||
import org.thingsboard.server.common.data.DataConstants; |
|||
import org.thingsboard.server.common.data.Device; |
|||
import org.thingsboard.server.common.data.EntitySubtype; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.EntityView; |
|||
import org.thingsboard.server.common.data.Event; |
|||
import org.thingsboard.server.common.data.ShortEdgeInfo; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.alarm.Alarm; |
|||
import org.thingsboard.server.common.data.asset.Asset; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.edge.EdgeInfo; |
|||
import org.thingsboard.server.common.data.edge.EdgeQueueEntityType; |
|||
import org.thingsboard.server.common.data.edge.EdgeQueueEntry; |
|||
import org.thingsboard.server.common.data.edge.EdgeSearchQuery; |
|||
import org.thingsboard.server.common.data.id.AssetId; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.EntityViewId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.common.data.relation.EntitySearchDirection; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.common.data.rule.RuleChainMetaData; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.session.SessionMsgType; |
|||
import org.thingsboard.server.dao.asset.AssetService; |
|||
import org.thingsboard.server.dao.customer.CustomerDao; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.device.DeviceService; |
|||
import org.thingsboard.server.dao.entity.AbstractEntityService; |
|||
import org.thingsboard.server.dao.entityview.EntityViewService; |
|||
import org.thingsboard.server.dao.event.EventService; |
|||
import org.thingsboard.server.dao.exception.DataValidationException; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.dao.service.DataValidator; |
|||
import org.thingsboard.server.dao.service.PaginatedRemover; |
|||
import org.thingsboard.server.dao.service.Validator; |
|||
import org.thingsboard.server.dao.tenant.TenantDao; |
|||
|
|||
import javax.annotation.Nullable; |
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.io.IOException; |
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.Comparator; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.stream.Collectors; |
|||
|
|||
import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE; |
|||
import static org.thingsboard.server.dao.DaoUtil.toUUIDs; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; |
|||
import static org.thingsboard.server.dao.service.Validator.validateId; |
|||
import static org.thingsboard.server.dao.service.Validator.validateIds; |
|||
import static org.thingsboard.server.dao.service.Validator.validatePageLink; |
|||
import static org.thingsboard.server.dao.service.Validator.validateString; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
public class BaseEdgeService extends AbstractEntityService implements EdgeService { |
|||
|
|||
private static final ObjectMapper mapper = new ObjectMapper(); |
|||
|
|||
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; |
|||
public static final String INCORRECT_PAGE_LINK = "Incorrect page link "; |
|||
public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId "; |
|||
public static final String INCORRECT_EDGE_ID = "Incorrect edgeId "; |
|||
|
|||
@Autowired |
|||
private EdgeDao edgeDao; |
|||
|
|||
@Autowired |
|||
private TenantDao tenantDao; |
|||
|
|||
@Autowired |
|||
private CustomerDao customerDao; |
|||
|
|||
@Autowired |
|||
private CacheManager cacheManager; |
|||
|
|||
@Autowired |
|||
private EventService eventService; |
|||
|
|||
@Autowired |
|||
private DashboardService dashboardService; |
|||
|
|||
@Autowired |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@Autowired |
|||
private DeviceService deviceService; |
|||
|
|||
@Autowired |
|||
private AssetService assetService; |
|||
|
|||
@Autowired |
|||
private EntityViewService entityViewService; |
|||
|
|||
private ExecutorService tsCallBackExecutor; |
|||
|
|||
@PostConstruct |
|||
public void initExecutor() { |
|||
tsCallBackExecutor = Executors.newSingleThreadExecutor(); |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void shutdownExecutor() { |
|||
if (tsCallBackExecutor != null) { |
|||
tsCallBackExecutor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) { |
|||
log.trace("Executing findEdgeById [{}]", edgeId); |
|||
validateId(edgeId, INCORRECT_EDGE_ID + edgeId); |
|||
return edgeDao.findById(tenantId, edgeId.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Edge> findEdgeByIdAsync(TenantId tenantId, EdgeId edgeId) { |
|||
log.trace("Executing findEdgeById [{}]", edgeId); |
|||
validateId(edgeId, INCORRECT_EDGE_ID + edgeId); |
|||
return edgeDao.findByIdAsync(tenantId, edgeId.getId()); |
|||
} |
|||
|
|||
@Cacheable(cacheNames = EDGE_CACHE, key = "{#tenantId, #name}") |
|||
@Override |
|||
public Edge findEdgeByTenantIdAndName(TenantId tenantId, String name) { |
|||
log.trace("Executing findEdgeByTenantIdAndName [{}][{}]", tenantId, name); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
Optional<Edge> edgeOpt = edgeDao.findEdgeByTenantIdAndName(tenantId.getId(), name); |
|||
return edgeOpt.orElse(null); |
|||
} |
|||
|
|||
@Override |
|||
public Optional<Edge> findEdgeByRoutingKey(TenantId tenantId, String routingKey) { |
|||
log.trace("Executing findEdgeByRoutingKey [{}]", routingKey); |
|||
Validator.validateString(routingKey, "Incorrect edge routingKey for search request."); |
|||
return edgeDao.findByRoutingKey(tenantId.getId(), routingKey); |
|||
} |
|||
|
|||
@CacheEvict(cacheNames = EDGE_CACHE, key = "{#edge.tenantId, #edge.name}") |
|||
@Override |
|||
public Edge saveEdge(Edge edge) { |
|||
log.trace("Executing saveEdge [{}]", edge); |
|||
edgeValidator.validate(edge, Edge::getTenantId); |
|||
Edge savedEdge = edgeDao.save(edge.getTenantId(), edge); |
|||
dashboardService.updateEdgeDashboards(savedEdge.getTenantId(), savedEdge.getId()); |
|||
return savedEdge; |
|||
} |
|||
|
|||
@Override |
|||
public Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId) { |
|||
Edge edge = findEdgeById(tenantId, edgeId); |
|||
edge.setCustomerId(customerId); |
|||
return saveEdge(edge); |
|||
} |
|||
|
|||
@Override |
|||
public Edge unassignEdgeFromCustomer(TenantId tenantId, EdgeId edgeId) { |
|||
Edge edge = findEdgeById(tenantId, edgeId); |
|||
edge.setCustomerId(null); |
|||
return saveEdge(edge); |
|||
} |
|||
|
|||
@Override |
|||
public void deleteEdge(TenantId tenantId, EdgeId edgeId) { |
|||
log.trace("Executing deleteEdge [{}]", edgeId); |
|||
validateId(edgeId, INCORRECT_EDGE_ID + edgeId); |
|||
|
|||
Edge edge = edgeDao.findById(tenantId, edgeId.getId()); |
|||
|
|||
dashboardService.unassignEdgeDashboards(tenantId, edgeId); |
|||
// TODO: validate that rule chains are removed by deleteEntityRelations(tenantId, edgeId); call
|
|||
ruleChainService.unassignEdgeRuleChains(tenantId, edgeId); |
|||
|
|||
List<Object> list = new ArrayList<>(); |
|||
list.add(edge.getTenantId()); |
|||
list.add(edge.getName()); |
|||
Cache cache = cacheManager.getCache(EDGE_CACHE); |
|||
cache.evict(list); |
|||
|
|||
deleteEntityRelations(tenantId, edgeId); |
|||
|
|||
edgeDao.removeById(tenantId, edgeId.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Edge> findEdgesByTenantId(TenantId tenantId, PageLink pageLink) { |
|||
log.trace("Executing findEdgesByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgesByTenantId(tenantId.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Edge> findEdgesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) { |
|||
log.trace("Executing findEdgesByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateString(type, "Incorrect type " + type); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgesByTenantIdAndType(tenantId.getId(), type, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EdgeInfo> findEdgeInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) { |
|||
log.trace("Executing findEdgeInfosByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateString(type, "Incorrect type " + type); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgeInfosByTenantIdAndType(tenantId.getId(), type, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EdgeInfo> findEdgeInfosByTenantId(TenantId tenantId, PageLink pageLink) { |
|||
log.trace("Executing findEdgeInfosByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgeInfosByTenantId(tenantId.getId(), pageLink); |
|||
} |
|||
|
|||
|
|||
@Override |
|||
public ListenableFuture<List<Edge>> findEdgesByTenantIdAndIdsAsync(TenantId tenantId, List<EdgeId> edgeIds) { |
|||
log.trace("Executing findEdgesByTenantIdAndIdsAsync, tenantId [{}], edgeIds [{}]", tenantId, edgeIds); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateIds(edgeIds, "Incorrect edgeIds " + edgeIds); |
|||
return edgeDao.findEdgesByTenantIdAndIdsAsync(tenantId.getId(), toUUIDs(edgeIds)); |
|||
} |
|||
|
|||
|
|||
@Override |
|||
public void deleteEdgesByTenantId(TenantId tenantId) { |
|||
log.trace("Executing deleteEdgesByTenantId, tenantId [{}]", tenantId); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
tenantEdgesRemover.removeEntities(tenantId, tenantId); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Edge> findEdgesByTenantIdAndCustomerId(TenantId tenantId, CustomerId customerId, PageLink pageLink) { |
|||
log.trace("Executing findEdgesByTenantIdAndCustomerId, tenantId [{}], customerId [{}], pageLink [{}]", tenantId, customerId, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgesByTenantIdAndCustomerId(tenantId.getId(), customerId.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Edge> findEdgesByTenantIdAndCustomerIdAndType(TenantId tenantId, CustomerId customerId, String type, PageLink pageLink) { |
|||
log.trace("Executing findEdgesByTenantIdAndCustomerIdAndType, tenantId [{}], customerId [{}], type [{}], pageLink [{}]", tenantId, customerId, type, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); |
|||
validateString(type, "Incorrect type " + type); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgesByTenantIdAndCustomerIdAndType(tenantId.getId(), customerId.getId(), type, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<Edge>> findEdgesByTenantIdCustomerIdAndIdsAsync(TenantId tenantId, CustomerId customerId, List<EdgeId> edgeIds) { |
|||
log.trace("Executing findEdgesByTenantIdCustomerIdAndIdsAsync, tenantId [{}], customerId [{}], edgeIds [{}]", tenantId, customerId, edgeIds); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); |
|||
validateIds(edgeIds, "Incorrect edgeIds " + edgeIds); |
|||
return edgeDao.findEdgesByTenantIdCustomerIdAndIdsAsync(tenantId.getId(), |
|||
customerId.getId(), toUUIDs(edgeIds)); |
|||
} |
|||
|
|||
@Override |
|||
public void unassignCustomerEdges(TenantId tenantId, CustomerId customerId) { |
|||
log.trace("Executing unassignCustomerEdges, tenantId [{}], customerId [{}]", tenantId, customerId); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); |
|||
customerEdgeUnasigner.removeEntities(tenantId, customerId); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<Edge>> findEdgesByQuery(TenantId tenantId, EdgeSearchQuery query) { |
|||
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(tenantId, query.toEntitySearchQuery()); |
|||
ListenableFuture<List<Edge>> edges = Futures.transformAsync(relations, r -> { |
|||
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection(); |
|||
List<ListenableFuture<Edge>> futures = new ArrayList<>(); |
|||
for (EntityRelation relation : r) { |
|||
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom(); |
|||
if (entityId.getEntityType() == EntityType.EDGE) { |
|||
futures.add(findEdgeByIdAsync(tenantId, new EdgeId(entityId.getId()))); |
|||
} |
|||
} |
|||
return Futures.successfulAsList(futures); |
|||
}, MoreExecutors.directExecutor()); |
|||
|
|||
edges = Futures.transform(edges, new Function<List<Edge>, List<Edge>>() { |
|||
@Nullable |
|||
@Override |
|||
public List<Edge> apply(@Nullable List<Edge> edgeList) { |
|||
return edgeList == null ? Collections.emptyList() : edgeList.stream().filter(edge -> query.getEdgeTypes().contains(edge.getType())).collect(Collectors.toList()); |
|||
} |
|||
}, MoreExecutors.directExecutor()); |
|||
|
|||
return edges; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<EntitySubtype>> findEdgeTypesByTenantId(TenantId tenantId) { |
|||
log.trace("Executing findEdgeTypesByTenantId, tenantId [{}]", tenantId); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
ListenableFuture<List<EntitySubtype>> tenantEdgeTypes = edgeDao.findTenantEdgeTypesAsync(tenantId.getId()); |
|||
return Futures.transform(tenantEdgeTypes, |
|||
edgeTypes -> { |
|||
edgeTypes.sort(Comparator.comparing(EntitySubtype::getType)); |
|||
return edgeTypes; |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
|
|||
@Override |
|||
public void pushEventToEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) { |
|||
if (tbMsg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) || |
|||
tbMsg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) || |
|||
tbMsg.getType().equals(DataConstants.ATTRIBUTES_UPDATED) || |
|||
tbMsg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) { |
|||
processCustomTbMsg(tenantId, tbMsg, callback); |
|||
} else { |
|||
try { |
|||
switch (tbMsg.getOriginator().getEntityType()) { |
|||
case EDGE: |
|||
processEdge(tenantId, tbMsg, callback); |
|||
break; |
|||
case ASSET: |
|||
processAsset(tenantId, tbMsg, callback); |
|||
break; |
|||
case DEVICE: |
|||
processDevice(tenantId, tbMsg, callback); |
|||
break; |
|||
case DASHBOARD: |
|||
processDashboard(tenantId, tbMsg, callback); |
|||
break; |
|||
case RULE_CHAIN: |
|||
processRuleChain(tenantId, tbMsg, callback); |
|||
break; |
|||
case ENTITY_VIEW: |
|||
processEntityView(tenantId, tbMsg, callback); |
|||
break; |
|||
case ALARM: |
|||
processAlarm(tenantId, tbMsg, callback); |
|||
break; |
|||
default: |
|||
log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType()); |
|||
} |
|||
} catch (IOException e) { |
|||
log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) { |
|||
EdgeId edgeId = getEdgeIdByOriginatorId(tenantId, tbMsg.getOriginator()); |
|||
EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType()); |
|||
if (edgeId != null && edgeQueueEntityType != null) { |
|||
try { |
|||
saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), mapper.writeValueAsString(tbMsg), callback); |
|||
} catch (IOException e) { |
|||
log.error("Error while saving custom tbMsg into Edge Queue", e); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private EdgeQueueEntityType getEdgeQueueTypeByEntityType(EntityType entityType) { |
|||
switch (entityType) { |
|||
case DEVICE: |
|||
return EdgeQueueEntityType.DEVICE; |
|||
case ASSET: |
|||
return EdgeQueueEntityType.ASSET; |
|||
case ENTITY_VIEW: |
|||
return EdgeQueueEntityType.ENTITY_VIEW; |
|||
default: |
|||
log.info("Unsupported entity type: [{}]", entityType); |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
private EdgeId getEdgeIdByOriginatorId(TenantId tenantId, EntityId originatorId) { |
|||
switch (originatorId.getEntityType()) { |
|||
case DEVICE: |
|||
Device device = deviceService.findDeviceById(tenantId, new DeviceId(originatorId.getId())); |
|||
return device.getEdgeId(); |
|||
case ASSET: |
|||
Asset asset = assetService.findAssetById(tenantId, new AssetId(originatorId.getId())); |
|||
return asset.getEdgeId(); |
|||
case ENTITY_VIEW: |
|||
EntityView entityView = entityViewService.findEntityViewById(tenantId, new EntityViewId(originatorId.getId())); |
|||
return entityView.getEdgeId(); |
|||
default: |
|||
log.info("Unsupported entity type: [{}]", originatorId.getEntityType()); |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
switch (tbMsg.getType()) { |
|||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE: |
|||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: |
|||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DEVICE, callback); |
|||
break; |
|||
case DataConstants.ENTITY_DELETED: |
|||
case DataConstants.ENTITY_CREATED: |
|||
case DataConstants.ENTITY_UPDATED: |
|||
Device device = mapper.readValue(tbMsg.getData(), Device.class); |
|||
if (device.getEdgeId() != null) { |
|||
pushEventToEdge(tenantId, device.getEdgeId(), EdgeQueueEntityType.DEVICE, tbMsg, callback); |
|||
} |
|||
break; |
|||
default: |
|||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); |
|||
} |
|||
} |
|||
|
|||
private void processEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
switch (tbMsg.getType()) { |
|||
case DataConstants.ENTITY_DELETED: |
|||
case DataConstants.ENTITY_CREATED: |
|||
case DataConstants.ENTITY_UPDATED: |
|||
Edge edge = mapper.readValue(tbMsg.getData(), Edge.class); |
|||
if (edge != null) { |
|||
pushEventToEdge(tenantId, edge.getId(), EdgeQueueEntityType.EDGE, tbMsg, callback); |
|||
} |
|||
break; |
|||
default: |
|||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); |
|||
} |
|||
} |
|||
|
|||
private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
switch (tbMsg.getType()) { |
|||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE: |
|||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: |
|||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ASSET, callback); |
|||
break; |
|||
case DataConstants.ENTITY_DELETED: |
|||
case DataConstants.ENTITY_CREATED: |
|||
case DataConstants.ENTITY_UPDATED: |
|||
Asset asset = mapper.readValue(tbMsg.getData(), Asset.class); |
|||
if (asset.getEdgeId() != null) { |
|||
pushEventToEdge(tenantId, asset.getEdgeId(), EdgeQueueEntityType.ASSET, tbMsg, callback); |
|||
} |
|||
break; |
|||
default: |
|||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); |
|||
} |
|||
} |
|||
|
|||
private void processEntityView(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
switch (tbMsg.getType()) { |
|||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE: |
|||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: |
|||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.ENTITY_VIEW, callback); |
|||
break; |
|||
case DataConstants.ENTITY_DELETED: |
|||
case DataConstants.ENTITY_CREATED: |
|||
case DataConstants.ENTITY_UPDATED: |
|||
EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class); |
|||
if (entityView.getEdgeId() != null) { |
|||
pushEventToEdge(tenantId, entityView.getEdgeId(), EdgeQueueEntityType.ENTITY_VIEW, tbMsg, callback); |
|||
} |
|||
break; |
|||
default: |
|||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); |
|||
} |
|||
} |
|||
|
|||
private void processAlarm(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
switch (tbMsg.getType()) { |
|||
case DataConstants.ENTITY_DELETED: |
|||
case DataConstants.ENTITY_CREATED: |
|||
case DataConstants.ENTITY_UPDATED: |
|||
case DataConstants.ALARM_ACK: |
|||
case DataConstants.ALARM_CLEAR: |
|||
Alarm alarm = mapper.readValue(tbMsg.getData(), Alarm.class); |
|||
EdgeId edgeId = getEdgeIdByOriginatorId(tenantId, alarm.getOriginator()); |
|||
EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType()); |
|||
if (edgeId != null && edgeQueueEntityType != null) { |
|||
pushEventToEdge(tenantId, edgeId, EdgeQueueEntityType.ALARM, tbMsg, callback); |
|||
} |
|||
break; |
|||
default: |
|||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); |
|||
} |
|||
} |
|||
|
|||
private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.DASHBOARD, callback); |
|||
} |
|||
|
|||
private void processRuleChain(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
switch (tbMsg.getType()) { |
|||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE: |
|||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: |
|||
processAssignedEntity(tenantId, tbMsg, EdgeQueueEntityType.RULE_CHAIN, callback); |
|||
break; |
|||
case DataConstants.ENTITY_DELETED: |
|||
case DataConstants.ENTITY_CREATED: |
|||
case DataConstants.ENTITY_UPDATED: |
|||
RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class); |
|||
if (ruleChain.getAssignedEdges() != null && !ruleChain.getAssignedEdges().isEmpty()) { |
|||
for (ShortEdgeInfo assignedEdge : ruleChain.getAssignedEdges()) { |
|||
pushEventToEdge(tenantId, assignedEdge.getEdgeId(), EdgeQueueEntityType.RULE_CHAIN, tbMsg, callback); |
|||
} |
|||
} |
|||
break; |
|||
default: |
|||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); |
|||
} |
|||
} |
|||
|
|||
private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeQueueEntityType entityType, FutureCallback<Void> callback) throws IOException { |
|||
EdgeId edgeId; |
|||
switch (tbMsg.getType()) { |
|||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE: |
|||
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId"))); |
|||
pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback); |
|||
break; |
|||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: |
|||
edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId"))); |
|||
pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback); |
|||
break; |
|||
case DataConstants.ENTITY_DELETED: |
|||
case DataConstants.ENTITY_CREATED: |
|||
case DataConstants.ENTITY_UPDATED: |
|||
Dashboard dashboard = mapper.readValue(tbMsg.getData(), Dashboard.class); |
|||
if (dashboard.getAssignedEdges() != null && !dashboard.getAssignedEdges().isEmpty()) { |
|||
for (ShortEdgeInfo assignedEdge : dashboard.getAssignedEdges()) { |
|||
pushEventToEdge(tenantId, assignedEdge.getEdgeId(), EdgeQueueEntityType.DASHBOARD, tbMsg, callback); |
|||
} |
|||
} |
|||
break; |
|||
} |
|||
} |
|||
|
|||
private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg); |
|||
|
|||
saveEventToEdgeQueue(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData(), callback); |
|||
|
|||
if (entityType.equals(EdgeQueueEntityType.RULE_CHAIN)) { |
|||
pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg, callback); |
|||
} |
|||
} |
|||
|
|||
private void saveEventToEdgeQueue(TenantId tenantId, EdgeId edgeId, EdgeQueueEntityType entityType, String type, String data, FutureCallback<Void> callback) throws IOException { |
|||
log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data); |
|||
|
|||
EdgeQueueEntry queueEntry = new EdgeQueueEntry(); |
|||
queueEntry.setEntityType(entityType); |
|||
queueEntry.setType(type); |
|||
queueEntry.setData(data); |
|||
|
|||
Event event = new Event(); |
|||
event.setEntityId(edgeId); |
|||
event.setTenantId(tenantId); |
|||
event.setType(DataConstants.EDGE_QUEUE_EVENT_TYPE); |
|||
event.setBody(mapper.valueToTree(queueEntry)); |
|||
ListenableFuture<Event> saveFuture = eventService.saveAsync(event); |
|||
|
|||
addMainCallback(saveFuture, callback); |
|||
} |
|||
|
|||
private void addMainCallback(ListenableFuture<Event> saveFuture, final FutureCallback<Void> callback) { |
|||
Futures.addCallback(saveFuture, new FutureCallback<Event>() { |
|||
@Override |
|||
public void onSuccess(@Nullable Event result) { |
|||
callback.onSuccess(null); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
callback.onFailure(t); |
|||
} |
|||
}, tsCallBackExecutor); |
|||
} |
|||
|
|||
private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException { |
|||
RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class); |
|||
switch (tbMsg.getType()) { |
|||
case DataConstants.ENTITY_ASSIGNED_TO_EDGE: |
|||
case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE: |
|||
case DataConstants.ENTITY_UPDATED: |
|||
RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId()); |
|||
saveEventToEdgeQueue(tenantId, edgeId, EdgeQueueEntityType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData), callback); |
|||
break; |
|||
default: |
|||
log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Event> findQueueEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { |
|||
return eventService.findEvents(tenantId, edgeId, DataConstants.EDGE_QUEUE_EVENT_TYPE, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public Edge setRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException { |
|||
edge.setRootRuleChainId(ruleChainId); |
|||
Edge savedEdge = saveEdge(edge); |
|||
ruleChainService.updateEdgeRuleChains(tenantId, savedEdge.getId()); |
|||
RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId); |
|||
saveEventToEdgeQueue(tenantId, edge.getId(), EdgeQueueEntityType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback<Void>() { |
|||
@Override |
|||
public void onSuccess(@Nullable Void aVoid) { |
|||
log.debug("Event saved successfully!"); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
log.debug("Failure during event save", t); |
|||
} |
|||
}); |
|||
return savedEdge; |
|||
} |
|||
|
|||
private DataValidator<Edge> edgeValidator = |
|||
new DataValidator<Edge>() { |
|||
|
|||
@Override |
|||
protected void validateCreate(TenantId tenantId, Edge edge) { |
|||
edgeDao.findEdgeByTenantIdAndName(edge.getTenantId().getId(), edge.getName()).ifPresent( |
|||
d -> { |
|||
throw new DataValidationException("Edge with such name already exists!"); |
|||
} |
|||
); |
|||
} |
|||
|
|||
@Override |
|||
protected void validateUpdate(TenantId tenantId, Edge edge) { |
|||
edgeDao.findEdgeByTenantIdAndName(edge.getTenantId().getId(), edge.getName()).ifPresent( |
|||
e -> { |
|||
if (!e.getUuidId().equals(edge.getUuidId())) { |
|||
throw new DataValidationException("Edge with such name already exists!"); |
|||
} |
|||
} |
|||
); |
|||
} |
|||
|
|||
@Override |
|||
protected void validateDataImpl(TenantId tenantId, Edge edge) { |
|||
if (StringUtils.isEmpty(edge.getType())) { |
|||
throw new DataValidationException("Edge type should be specified!"); |
|||
} |
|||
if (StringUtils.isEmpty(edge.getName())) { |
|||
throw new DataValidationException("Edge name should be specified!"); |
|||
} |
|||
if (edge.getTenantId() == null) { |
|||
throw new DataValidationException("Edge should be assigned to tenant!"); |
|||
} else { |
|||
Tenant tenant = tenantDao.findById(edge.getTenantId(), edge.getTenantId().getId()); |
|||
if (tenant == null) { |
|||
throw new DataValidationException("Edge is referencing to non-existent tenant!"); |
|||
} |
|||
} |
|||
if (edge.getCustomerId() == null) { |
|||
edge.setCustomerId(new CustomerId(NULL_UUID)); |
|||
} else if (!edge.getCustomerId().getId().equals(NULL_UUID)) { |
|||
Customer customer = customerDao.findById(edge.getTenantId(), edge.getCustomerId().getId()); |
|||
if (customer == null) { |
|||
throw new DataValidationException("Can't assign edge to non-existent customer!"); |
|||
} |
|||
if (!customer.getTenantId().getId().equals(edge.getTenantId().getId())) { |
|||
throw new DataValidationException("Can't assign edge to customer from different tenant!"); |
|||
} |
|||
} |
|||
} |
|||
}; |
|||
|
|||
private PaginatedRemover<TenantId, Edge> tenantEdgesRemover = |
|||
new PaginatedRemover<TenantId, Edge>() { |
|||
|
|||
@Override |
|||
protected PageData<Edge> findEntities(TenantId tenantId, TenantId id, PageLink pageLink) { |
|||
return edgeDao.findEdgesByTenantId(id.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
protected void removeEntity(TenantId tenantId, Edge entity) { |
|||
deleteEdge(tenantId, new EdgeId(entity.getUuidId())); |
|||
} |
|||
}; |
|||
|
|||
private PaginatedRemover<CustomerId, Edge> customerEdgeUnasigner = new PaginatedRemover<CustomerId, Edge>() { |
|||
|
|||
@Override |
|||
protected PageData<Edge> findEntities(TenantId tenantId, CustomerId id, PageLink pageLink) { |
|||
return edgeDao.findEdgesByTenantIdAndCustomerId(tenantId.getId(), id.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
protected void removeEntity(TenantId tenantId, Edge entity) { |
|||
unassignEdgeFromCustomer(tenantId, new EdgeId(entity.getUuidId())); |
|||
} |
|||
}; |
|||
|
|||
} |
|||
@ -0,0 +1,51 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.edge; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.TimePageLink; |
|||
import org.thingsboard.server.dao.Dao; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
/** |
|||
* The Interface EdgeEventDao. |
|||
*/ |
|||
public interface EdgeEventDao extends Dao<EdgeEvent> { |
|||
|
|||
/** |
|||
* Save or update edge event object async |
|||
* |
|||
* @param edgeEvent the event object |
|||
* @return saved edge event object future |
|||
*/ |
|||
ListenableFuture<EdgeEvent> saveAsync(EdgeEvent edgeEvent); |
|||
|
|||
|
|||
/** |
|||
* Find edge events by tenantId, edgeId and pageLink. |
|||
* |
|||
* @param tenantId the tenantId |
|||
* @param edgeId the edgeId |
|||
* @param pageLink the pageLink |
|||
* @return the event list |
|||
*/ |
|||
PageData<EdgeEvent> findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink); |
|||
|
|||
} |
|||
@ -0,0 +1,407 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.edge; |
|||
|
|||
import com.google.common.base.Function; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.MoreExecutors; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.cache.Cache; |
|||
import org.springframework.cache.CacheManager; |
|||
import org.springframework.cache.annotation.CacheEvict; |
|||
import org.springframework.cache.annotation.Cacheable; |
|||
import org.springframework.stereotype.Service; |
|||
import org.springframework.util.StringUtils; |
|||
import org.thingsboard.server.common.data.Customer; |
|||
import org.thingsboard.server.common.data.EntitySubtype; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.Tenant; |
|||
import org.thingsboard.server.common.data.edge.Edge; |
|||
import org.thingsboard.server.common.data.edge.EdgeInfo; |
|||
import org.thingsboard.server.common.data.edge.EdgeSearchQuery; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.data.id.DashboardId; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.page.PageData; |
|||
import org.thingsboard.server.common.data.page.PageLink; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.common.data.relation.EntitySearchDirection; |
|||
import org.thingsboard.server.common.data.rule.RuleChain; |
|||
import org.thingsboard.server.dao.customer.CustomerDao; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.entity.AbstractEntityService; |
|||
import org.thingsboard.server.dao.exception.DataValidationException; |
|||
import org.thingsboard.server.dao.relation.RelationService; |
|||
import org.thingsboard.server.dao.rule.RuleChainService; |
|||
import org.thingsboard.server.dao.service.DataValidator; |
|||
import org.thingsboard.server.dao.service.PaginatedRemover; |
|||
import org.thingsboard.server.dao.service.Validator; |
|||
import org.thingsboard.server.dao.tenant.TenantDao; |
|||
|
|||
import javax.annotation.Nullable; |
|||
import java.util.ArrayList; |
|||
import java.util.Collections; |
|||
import java.util.Comparator; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.stream.Collectors; |
|||
|
|||
import static org.thingsboard.server.common.data.CacheConstants.EDGE_CACHE; |
|||
import static org.thingsboard.server.dao.DaoUtil.toUUIDs; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; |
|||
import static org.thingsboard.server.dao.service.Validator.validateId; |
|||
import static org.thingsboard.server.dao.service.Validator.validateIds; |
|||
import static org.thingsboard.server.dao.service.Validator.validatePageLink; |
|||
import static org.thingsboard.server.dao.service.Validator.validateString; |
|||
|
|||
@Service |
|||
@Slf4j |
|||
public class EdgeServiceImpl extends AbstractEntityService implements EdgeService { |
|||
|
|||
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; |
|||
public static final String INCORRECT_PAGE_LINK = "Incorrect page link "; |
|||
public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId "; |
|||
public static final String INCORRECT_EDGE_ID = "Incorrect edgeId "; |
|||
|
|||
@Autowired |
|||
private EdgeDao edgeDao; |
|||
|
|||
@Autowired |
|||
private TenantDao tenantDao; |
|||
|
|||
@Autowired |
|||
private CustomerDao customerDao; |
|||
|
|||
@Autowired |
|||
private CacheManager cacheManager; |
|||
|
|||
@Autowired |
|||
private DashboardService dashboardService; |
|||
|
|||
@Autowired |
|||
private RuleChainService ruleChainService; |
|||
|
|||
@Autowired |
|||
private RelationService relationService; |
|||
|
|||
@Override |
|||
public Edge findEdgeById(TenantId tenantId, EdgeId edgeId) { |
|||
log.trace("Executing findEdgeById [{}]", edgeId); |
|||
validateId(edgeId, INCORRECT_EDGE_ID + edgeId); |
|||
return edgeDao.findById(tenantId, edgeId.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<Edge> findEdgeByIdAsync(TenantId tenantId, EdgeId edgeId) { |
|||
log.trace("Executing findEdgeById [{}]", edgeId); |
|||
validateId(edgeId, INCORRECT_EDGE_ID + edgeId); |
|||
return edgeDao.findByIdAsync(tenantId, edgeId.getId()); |
|||
} |
|||
|
|||
@Cacheable(cacheNames = EDGE_CACHE, key = "{#tenantId, #name}") |
|||
@Override |
|||
public Edge findEdgeByTenantIdAndName(TenantId tenantId, String name) { |
|||
log.trace("Executing findEdgeByTenantIdAndName [{}][{}]", tenantId, name); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
Optional<Edge> edgeOpt = edgeDao.findEdgeByTenantIdAndName(tenantId.getId(), name); |
|||
return edgeOpt.orElse(null); |
|||
} |
|||
|
|||
@Override |
|||
public Optional<Edge> findEdgeByRoutingKey(TenantId tenantId, String routingKey) { |
|||
log.trace("Executing findEdgeByRoutingKey [{}]", routingKey); |
|||
Validator.validateString(routingKey, "Incorrect edge routingKey for search request."); |
|||
return edgeDao.findByRoutingKey(tenantId.getId(), routingKey); |
|||
} |
|||
|
|||
@CacheEvict(cacheNames = EDGE_CACHE, key = "{#edge.tenantId, #edge.name}") |
|||
@Override |
|||
public Edge saveEdge(Edge edge) { |
|||
log.trace("Executing saveEdge [{}]", edge); |
|||
edgeValidator.validate(edge, Edge::getTenantId); |
|||
return edgeDao.save(edge.getTenantId(), edge); |
|||
} |
|||
|
|||
@Override |
|||
public Edge assignEdgeToCustomer(TenantId tenantId, EdgeId edgeId, CustomerId customerId) { |
|||
Edge edge = findEdgeById(tenantId, edgeId); |
|||
edge.setCustomerId(customerId); |
|||
return saveEdge(edge); |
|||
} |
|||
|
|||
@Override |
|||
public Edge unassignEdgeFromCustomer(TenantId tenantId, EdgeId edgeId) { |
|||
Edge edge = findEdgeById(tenantId, edgeId); |
|||
edge.setCustomerId(null); |
|||
return saveEdge(edge); |
|||
} |
|||
|
|||
@Override |
|||
public void deleteEdge(TenantId tenantId, EdgeId edgeId) { |
|||
log.trace("Executing deleteEdge [{}]", edgeId); |
|||
validateId(edgeId, INCORRECT_EDGE_ID + edgeId); |
|||
|
|||
Edge edge = edgeDao.findById(tenantId, edgeId.getId()); |
|||
|
|||
dashboardService.unassignEdgeDashboards(tenantId, edgeId); |
|||
// TODO: validate that rule chains are removed by deleteEntityRelations(tenantId, edgeId); call
|
|||
ruleChainService.unassignEdgeRuleChains(tenantId, edgeId); |
|||
|
|||
List<Object> list = new ArrayList<>(); |
|||
list.add(edge.getTenantId()); |
|||
list.add(edge.getName()); |
|||
Cache cache = cacheManager.getCache(EDGE_CACHE); |
|||
cache.evict(list); |
|||
|
|||
deleteEntityRelations(tenantId, edgeId); |
|||
|
|||
edgeDao.removeById(tenantId, edgeId.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Edge> findEdgesByTenantId(TenantId tenantId, PageLink pageLink) { |
|||
log.trace("Executing findEdgesByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgesByTenantId(tenantId.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Edge> findEdgesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) { |
|||
log.trace("Executing findEdgesByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateString(type, "Incorrect type " + type); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgesByTenantIdAndType(tenantId.getId(), type, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EdgeInfo> findEdgeInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) { |
|||
log.trace("Executing findEdgeInfosByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateString(type, "Incorrect type " + type); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgeInfosByTenantIdAndType(tenantId.getId(), type, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<EdgeInfo> findEdgeInfosByTenantId(TenantId tenantId, PageLink pageLink) { |
|||
log.trace("Executing findEdgeInfosByTenantId, tenantId [{}], pageLink [{}]", tenantId, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgeInfosByTenantId(tenantId.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<Edge>> findEdgesByTenantIdAndIdsAsync(TenantId tenantId, List<EdgeId> edgeIds) { |
|||
log.trace("Executing findEdgesByTenantIdAndIdsAsync, tenantId [{}], edgeIds [{}]", tenantId, edgeIds); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateIds(edgeIds, "Incorrect edgeIds " + edgeIds); |
|||
return edgeDao.findEdgesByTenantIdAndIdsAsync(tenantId.getId(), toUUIDs(edgeIds)); |
|||
} |
|||
|
|||
@Override |
|||
public void deleteEdgesByTenantId(TenantId tenantId) { |
|||
log.trace("Executing deleteEdgesByTenantId, tenantId [{}]", tenantId); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
tenantEdgesRemover.removeEntities(tenantId, tenantId); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Edge> findEdgesByTenantIdAndCustomerId(TenantId tenantId, CustomerId customerId, PageLink pageLink) { |
|||
log.trace("Executing findEdgesByTenantIdAndCustomerId, tenantId [{}], customerId [{}], pageLink [{}]", tenantId, customerId, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgesByTenantIdAndCustomerId(tenantId.getId(), customerId.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public PageData<Edge> findEdgesByTenantIdAndCustomerIdAndType(TenantId tenantId, CustomerId customerId, String type, PageLink pageLink) { |
|||
log.trace("Executing findEdgesByTenantIdAndCustomerIdAndType, tenantId [{}], customerId [{}], type [{}], pageLink [{}]", tenantId, customerId, type, pageLink); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); |
|||
validateString(type, "Incorrect type " + type); |
|||
validatePageLink(pageLink); |
|||
return edgeDao.findEdgesByTenantIdAndCustomerIdAndType(tenantId.getId(), customerId.getId(), type, pageLink); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<Edge>> findEdgesByTenantIdCustomerIdAndIdsAsync(TenantId tenantId, CustomerId customerId, List<EdgeId> edgeIds) { |
|||
log.trace("Executing findEdgesByTenantIdCustomerIdAndIdsAsync, tenantId [{}], customerId [{}], edgeIds [{}]", tenantId, customerId, edgeIds); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); |
|||
validateIds(edgeIds, "Incorrect edgeIds " + edgeIds); |
|||
return edgeDao.findEdgesByTenantIdCustomerIdAndIdsAsync(tenantId.getId(), |
|||
customerId.getId(), toUUIDs(edgeIds)); |
|||
} |
|||
|
|||
@Override |
|||
public void unassignCustomerEdges(TenantId tenantId, CustomerId customerId) { |
|||
log.trace("Executing unassignCustomerEdges, tenantId [{}], customerId [{}]", tenantId, customerId); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); |
|||
customerEdgeUnassigner.removeEntities(tenantId, customerId); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<Edge>> findEdgesByQuery(TenantId tenantId, EdgeSearchQuery query) { |
|||
ListenableFuture<List<EntityRelation>> relations = relationService.findByQuery(tenantId, query.toEntitySearchQuery()); |
|||
ListenableFuture<List<Edge>> edges = Futures.transformAsync(relations, r -> { |
|||
EntitySearchDirection direction = query.toEntitySearchQuery().getParameters().getDirection(); |
|||
List<ListenableFuture<Edge>> futures = new ArrayList<>(); |
|||
for (EntityRelation relation : r) { |
|||
EntityId entityId = direction == EntitySearchDirection.FROM ? relation.getTo() : relation.getFrom(); |
|||
if (entityId.getEntityType() == EntityType.EDGE) { |
|||
futures.add(findEdgeByIdAsync(tenantId, new EdgeId(entityId.getId()))); |
|||
} |
|||
} |
|||
return Futures.successfulAsList(futures); |
|||
}, MoreExecutors.directExecutor()); |
|||
|
|||
edges = Futures.transform(edges, new Function<List<Edge>, List<Edge>>() { |
|||
@Nullable |
|||
@Override |
|||
public List<Edge> apply(@Nullable List<Edge> edgeList) { |
|||
return edgeList == null ? Collections.emptyList() : edgeList.stream().filter(edge -> query.getEdgeTypes().contains(edge.getType())).collect(Collectors.toList()); |
|||
} |
|||
}, MoreExecutors.directExecutor()); |
|||
|
|||
return edges; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<EntitySubtype>> findEdgeTypesByTenantId(TenantId tenantId) { |
|||
log.trace("Executing findEdgeTypesByTenantId, tenantId [{}]", tenantId); |
|||
validateId(tenantId, INCORRECT_TENANT_ID + tenantId); |
|||
ListenableFuture<List<EntitySubtype>> tenantEdgeTypes = edgeDao.findTenantEdgeTypesAsync(tenantId.getId()); |
|||
return Futures.transform(tenantEdgeTypes, |
|||
edgeTypes -> { |
|||
edgeTypes.sort(Comparator.comparing(EntitySubtype::getType)); |
|||
return edgeTypes; |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
|
|||
@Override |
|||
public void assignDefaultRuleChainsToEdge(TenantId tenantId, EdgeId edgeId) { |
|||
log.trace("Executing assignDefaultRuleChainsToEdge, tenantId [{}], edgeId [{}]", tenantId, edgeId); |
|||
ListenableFuture<List<RuleChain>> future = ruleChainService.findDefaultEdgeRuleChainsByTenantId(tenantId); |
|||
Futures.transform(future, ruleChains -> { |
|||
if (ruleChains != null && !ruleChains.isEmpty()) { |
|||
for (RuleChain ruleChain : ruleChains) { |
|||
ruleChainService.assignRuleChainToEdge(tenantId, ruleChain.getId(), edgeId); |
|||
} |
|||
} |
|||
return null; |
|||
}, MoreExecutors.directExecutor()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<Edge>> findEdgesByTenantIdAndRuleChainId(TenantId tenantId, RuleChainId ruleChainId) { |
|||
log.trace("Executing findEdgesByTenantIdAndRuleChainId, tenantId [{}], ruleChainId [{}]", tenantId, ruleChainId); |
|||
Validator.validateId(tenantId, "Incorrect tenantId " + tenantId); |
|||
Validator.validateId(ruleChainId, "Incorrect ruleChainId " + ruleChainId); |
|||
return edgeDao.findEdgesByTenantIdAndRuleChainId(tenantId.getId(), ruleChainId.getId()); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<List<Edge>> findEdgesByTenantIdAndDashboardId(TenantId tenantId, DashboardId dashboardId) { |
|||
log.trace("Executing findEdgesByTenantIdAndDashboardId, tenantId [{}], dashboardId [{}]", tenantId, dashboardId); |
|||
Validator.validateId(tenantId, "Incorrect tenantId " + tenantId); |
|||
Validator.validateId(dashboardId, "Incorrect dashboardId " + dashboardId); |
|||
return edgeDao.findEdgesByTenantIdAndDashboardId(tenantId.getId(), dashboardId.getId()); |
|||
} |
|||
|
|||
private DataValidator<Edge> edgeValidator = |
|||
new DataValidator<Edge>() { |
|||
|
|||
@Override |
|||
protected void validateCreate(TenantId tenantId, Edge edge) { |
|||
} |
|||
|
|||
@Override |
|||
protected void validateUpdate(TenantId tenantId, Edge edge) { |
|||
} |
|||
|
|||
@Override |
|||
protected void validateDataImpl(TenantId tenantId, Edge edge) { |
|||
if (StringUtils.isEmpty(edge.getType())) { |
|||
throw new DataValidationException("Edge type should be specified!"); |
|||
} |
|||
if (StringUtils.isEmpty(edge.getName())) { |
|||
throw new DataValidationException("Edge name should be specified!"); |
|||
} |
|||
if (StringUtils.isEmpty(edge.getSecret())) { |
|||
throw new DataValidationException("Edge secret should be specified!"); |
|||
} |
|||
if (StringUtils.isEmpty(edge.getRoutingKey())) { |
|||
throw new DataValidationException("Edge routing key should be specified!"); |
|||
} |
|||
if (edge.getTenantId() == null) { |
|||
throw new DataValidationException("Edge should be assigned to tenant!"); |
|||
} else { |
|||
Tenant tenant = tenantDao.findById(edge.getTenantId(), edge.getTenantId().getId()); |
|||
if (tenant == null) { |
|||
throw new DataValidationException("Edge is referencing to non-existent tenant!"); |
|||
} |
|||
} |
|||
if (edge.getCustomerId() == null) { |
|||
edge.setCustomerId(new CustomerId(NULL_UUID)); |
|||
} else if (!edge.getCustomerId().getId().equals(NULL_UUID)) { |
|||
Customer customer = customerDao.findById(edge.getTenantId(), edge.getCustomerId().getId()); |
|||
if (customer == null) { |
|||
throw new DataValidationException("Can't assign edge to non-existent customer!"); |
|||
} |
|||
if (!customer.getTenantId().getId().equals(edge.getTenantId().getId())) { |
|||
throw new DataValidationException("Can't assign edge to customer from different tenant!"); |
|||
} |
|||
} |
|||
} |
|||
}; |
|||
|
|||
private PaginatedRemover<TenantId, Edge> tenantEdgesRemover = |
|||
new PaginatedRemover<TenantId, Edge>() { |
|||
|
|||
@Override |
|||
protected PageData<Edge> findEntities(TenantId tenantId, TenantId id, PageLink pageLink) { |
|||
return edgeDao.findEdgesByTenantId(id.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
protected void removeEntity(TenantId tenantId, Edge entity) { |
|||
deleteEdge(tenantId, new EdgeId(entity.getUuidId())); |
|||
} |
|||
}; |
|||
|
|||
private PaginatedRemover<CustomerId, Edge> customerEdgeUnassigner = new PaginatedRemover<CustomerId, Edge>() { |
|||
|
|||
@Override |
|||
protected PageData<Edge> findEntities(TenantId tenantId, CustomerId id, PageLink pageLink) { |
|||
return edgeDao.findEdgesByTenantIdAndCustomerId(tenantId.getId(), id.getId(), pageLink); |
|||
} |
|||
|
|||
@Override |
|||
protected void removeEntity(TenantId tenantId, Edge entity) { |
|||
unassignEdgeFromCustomer(tenantId, new EdgeId(entity.getUuidId())); |
|||
} |
|||
}; |
|||
|
|||
} |
|||
@ -0,0 +1,121 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.model.sql; |
|||
|
|||
import com.datastax.oss.driver.api.core.uuid.Uuids; |
|||
import com.fasterxml.jackson.databind.JsonNode; |
|||
import lombok.Data; |
|||
import lombok.EqualsAndHashCode; |
|||
import lombok.NoArgsConstructor; |
|||
import org.hibernate.annotations.Type; |
|||
import org.hibernate.annotations.TypeDef; |
|||
import org.thingsboard.server.common.data.edge.EdgeEvent; |
|||
import org.thingsboard.server.common.data.edge.EdgeEventType; |
|||
import org.thingsboard.server.common.data.id.EdgeEventId; |
|||
import org.thingsboard.server.common.data.id.EdgeId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.dao.model.BaseEntity; |
|||
import org.thingsboard.server.dao.model.BaseSqlEntity; |
|||
import org.thingsboard.server.dao.util.mapping.JsonStringType; |
|||
|
|||
import javax.persistence.Column; |
|||
import javax.persistence.Entity; |
|||
import javax.persistence.EnumType; |
|||
import javax.persistence.Enumerated; |
|||
import javax.persistence.Table; |
|||
import java.util.UUID; |
|||
|
|||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_ACTION_PROPERTY; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_COLUMN_FAMILY_NAME; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_EDGE_ID_PROPERTY; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_ENTITY_BODY_PROPERTY; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_ENTITY_ID_PROPERTY; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TENANT_ID_PROPERTY; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TYPE_PROPERTY; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.EPOCH_DIFF; |
|||
import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; |
|||
|
|||
@Data |
|||
@EqualsAndHashCode(callSuper = true) |
|||
@Entity |
|||
@TypeDef(name = "json", typeClass = JsonStringType.class) |
|||
@Table(name = EDGE_EVENT_COLUMN_FAMILY_NAME) |
|||
@NoArgsConstructor |
|||
public class EdgeEventEntity extends BaseSqlEntity<EdgeEvent> implements BaseEntity<EdgeEvent> { |
|||
|
|||
@Column(name = EDGE_EVENT_TENANT_ID_PROPERTY) |
|||
private String tenantId; |
|||
|
|||
@Column(name = EDGE_EVENT_EDGE_ID_PROPERTY) |
|||
private String edgeId; |
|||
|
|||
@Column(name = EDGE_EVENT_ENTITY_ID_PROPERTY) |
|||
private String entityId; |
|||
|
|||
@Enumerated(EnumType.STRING) |
|||
@Column(name = EDGE_EVENT_TYPE_PROPERTY) |
|||
private EdgeEventType edgeEventType; |
|||
|
|||
@Column(name = EDGE_EVENT_ACTION_PROPERTY) |
|||
private String edgeEventAction; |
|||
|
|||
@Type(type = "json") |
|||
@Column(name = EDGE_EVENT_ENTITY_BODY_PROPERTY) |
|||
private JsonNode entityBody; |
|||
|
|||
@Column(name = TS_COLUMN) |
|||
private long ts; |
|||
|
|||
public EdgeEventEntity(EdgeEvent edgeEvent) { |
|||
if (edgeEvent.getId() != null) { |
|||
this.setUuid(edgeEvent.getId().getId()); |
|||
this.ts = getTs(edgeEvent.getId().getId()); |
|||
} else { |
|||
this.ts = System.currentTimeMillis(); |
|||
} |
|||
if (edgeEvent.getTenantId() != null) { |
|||
this.tenantId = toString(edgeEvent.getTenantId().getId()); |
|||
} |
|||
if (edgeEvent.getEdgeId() != null) { |
|||
this.edgeId = toString(edgeEvent.getEdgeId().getId()); |
|||
} |
|||
if (edgeEvent.getEntityId() != null) { |
|||
this.entityId = toString(edgeEvent.getEntityId()); |
|||
} |
|||
this.edgeEventType = edgeEvent.getEdgeEventType(); |
|||
this.edgeEventAction = edgeEvent.getEdgeEventAction(); |
|||
this.entityBody = edgeEvent.getEntityBody(); |
|||
} |
|||
|
|||
@Override |
|||
public EdgeEvent toData() { |
|||
EdgeEvent edgeEvent = new EdgeEvent(new EdgeEventId(this.getUuid())); |
|||
edgeEvent.setCreatedTime(Uuids.unixTimestamp(this.getUuid())); |
|||
edgeEvent.setTenantId(new TenantId(toUUID(tenantId))); |
|||
edgeEvent.setEdgeId(new EdgeId(toUUID(edgeId))); |
|||
if (entityId != null) { |
|||
edgeEvent.setEntityId(toUUID(entityId)); |
|||
} |
|||
edgeEvent.setEdgeEventType(edgeEventType); |
|||
edgeEvent.setEdgeEventAction(edgeEventAction); |
|||
edgeEvent.setEntityBody(entityBody); |
|||
return edgeEvent; |
|||
} |
|||
|
|||
private static long getTs(UUID uuid) { |
|||
return (uuid.timestamp() - EPOCH_DIFF) / 10000; |
|||
} |
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue