diff --git a/application/src/main/data/upgrade/2.4.x/schema_update.cql b/application/src/main/data/upgrade/2.6.0/schema_update.cql similarity index 100% rename from application/src/main/data/upgrade/2.4.x/schema_update.cql rename to application/src/main/data/upgrade/2.6.0/schema_update.cql diff --git a/application/src/main/data/upgrade/2.4.x/schema_update.sql b/application/src/main/data/upgrade/2.6.0/schema_update.sql similarity index 100% rename from application/src/main/data/upgrade/2.4.x/schema_update.sql rename to application/src/main/data/upgrade/2.6.0/schema_update.sql diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index f99c51e8a8..249071058a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -26,6 +26,7 @@ import com.google.protobuf.ByteString; import io.grpc.stub.StreamObserver; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.DataConstants; @@ -81,6 +82,7 @@ import org.thingsboard.server.gen.edge.UplinkResponseMsg; import org.thingsboard.server.gen.edge.UserUpdateMsg; import org.thingsboard.server.service.edge.EdgeContextComponent; +import java.io.Closeable; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -95,7 +97,7 @@ import static org.thingsboard.server.gen.edge.UpdateMsgType.ENTITY_CREATED_RPC_M @Slf4j @Data -public final class EdgeGrpcSession implements Cloneable { +public final class EdgeGrpcSession implements Closeable { private static final ReentrantLock entityCreationLock = new ReentrantLock(); @@ -168,7 +170,7 @@ public final class EdgeGrpcSession implements Cloneable { UUID ifOffset = null; do { pageData = ctx.getEdgeService().findQueueEvents(edge.getTenantId(), edge.getId(), pageLink); - if (!pageData.getData().isEmpty()) { + if (isConnected() && !pageData.getData().isEmpty()) { log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); for (Event event : pageData.getData()) { log.trace("[{}] Processing event [{}]", this.sessionId, event); @@ -196,7 +198,7 @@ public final class EdgeGrpcSession implements Cloneable { ifOffset = event.getUuidId(); } } - if (pageData.hasNext()) { + if (isConnected() && pageData.hasNext()) { pageLink = pageData.getNextPageLink(); try { Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); @@ -204,7 +206,7 @@ public final class EdgeGrpcSession implements Cloneable { log.error("Error during sleep between batches", e); } } - } while (pageData.hasNext()); + } while (isConnected() && pageData.hasNext()); if (ifOffset != null) { Long newStartTs = UUIDs.unixTimestamp(ifOffset); @@ -287,7 +289,7 @@ public final class EdgeGrpcSession implements Cloneable { private void processCustomDownlinkMessage(EdgeQueueEntry entry) throws IOException { log.trace("Executing processCustomDownlinkMessage, entry [{}]", entry); - TbMsg tbMsg = objectMapper.readValue(entry.getData(), TbMsg.class); + TbMsg tbMsg = TbMsg.fromBytes(Base64.decodeBase64(entry.getData()), TbMsgCallback.EMPTY); String entityName = null; switch (entry.getEntityType()) { case DEVICE: @@ -700,4 +702,14 @@ public final class EdgeGrpcSession implements Cloneable { .setType(edge.getType().toString()) .build(); } + + @Override + public void close() { + connected = false; + try { + outputStream.onCompleted(); + } catch (Exception e) { + log.debug("[{}] Failed to close output stream: {}", sessionId, e.getMessage()); + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java index e87b9138b2..56b4e011cb 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/CassandraDatabaseUpgradeService.java @@ -308,7 +308,7 @@ public class CassandraDatabaseUpgradeService extends AbstractCassandraDatabaseUp break; case "2.5.0": log.info("Updating schema ..."); - schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.x", SCHEMA_UPDATE_CQL); + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_CQL); loadCql(schemaUpdateFile); try { diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index 5a73a7a076..bff44636d6 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -211,26 +211,6 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService case "2.4.3": try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { log.info("Updating schema ..."); - schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.4.x", SCHEMA_UPDATE_SQL); - loadSql(schemaUpdateFile, conn); - try { - conn.createStatement().execute("ALTER TABLE asset ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - } catch (Exception e) {} - try { - conn.createStatement().execute("ALTER TABLE device ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - } catch (Exception e) {} - try { - conn.createStatement().execute("ALTER TABLE entity_view ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - } catch (Exception e) {} - try { - conn.createStatement().execute("ALTER TABLE dashboard ADD assigned_edges varchar(10000000)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - } catch (Exception e) {} - try { - conn.createStatement().execute("ALTER TABLE rule_chain ADD assigned_edges varchar(10000000)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - } catch (Exception e) {} - try { - conn.createStatement().execute("ALTER TABLE rule_chain ADD type varchar(255) DEFAULT 'SYSTEM'"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script - } catch (Exception e) {} try { conn.createStatement().execute("ALTER TABLE attribute_kv ADD COLUMN json_v json;"); } catch (Exception e) { @@ -253,6 +233,32 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService log.info("Schema updated."); } break; + case "2.5.0": + try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { + log.info("Updating schema ..."); + schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.6.0", SCHEMA_UPDATE_SQL); + loadSql(schemaUpdateFile, conn); + try { + conn.createStatement().execute("ALTER TABLE asset ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + } catch (Exception e) {} + try { + conn.createStatement().execute("ALTER TABLE device ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + } catch (Exception e) {} + try { + conn.createStatement().execute("ALTER TABLE entity_view ADD edge_id varchar(31)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + } catch (Exception e) {} + try { + conn.createStatement().execute("ALTER TABLE dashboard ADD assigned_edges varchar(10000000)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + } catch (Exception e) {} + try { + conn.createStatement().execute("ALTER TABLE rule_chain ADD assigned_edges varchar(10000000)"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + } catch (Exception e) {} + try { + conn.createStatement().execute("ALTER TABLE rule_chain ADD type varchar(255) DEFAULT 'SYSTEM'"); //NOSONAR, ignoring because method used to execute thingsboard database upgrade script + } catch (Exception e) {} + log.info("Schema updated."); + } + break; default: throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DashboardInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/DashboardInfo.java index 817c9d1857..0a3ebe0abd 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DashboardInfo.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DashboardInfo.java @@ -124,52 +124,26 @@ public class DashboardInfo extends SearchTextBased implements HasNa } public boolean isAssignedToEdge(EdgeId edgeId) { - return this.assignedEdges != null && this.assignedEdges.contains(new ShortEdgeInfo(edgeId, null, null)); + return EdgeUtils.isAssignedToEdge(this.assignedEdges, edgeId); } public ShortEdgeInfo getAssignedEdgeInfo(EdgeId edgeId) { - if (this.assignedEdges != null) { - for (ShortEdgeInfo edgeInfo : this.assignedEdges) { - if (edgeInfo.getEdgeId().equals(edgeId)) { - return edgeInfo; - } - } - } - return null; + return EdgeUtils.getAssignedEdgeInfo(this.assignedEdges, edgeId); } public boolean addAssignedEdge(Edge edge) { - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) { - return false; - } else { - if (this.assignedEdges == null) { - this.assignedEdges = new HashSet<>(); - } - this.assignedEdges.add(edgeInfo); - return true; + if (this.assignedEdges == null) { + this.assignedEdges = new HashSet<>(); } + return EdgeUtils.addAssignedEdge(this.assignedEdges, edge); } public boolean updateAssignedEdge(Edge edge) { - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) { - this.assignedEdges.remove(edgeInfo); - this.assignedEdges.add(edgeInfo); - return true; - } else { - return false; - } + return EdgeUtils.updateAssignedEdge(this.assignedEdges, edge); } public boolean removeAssignedEdge(Edge edge) { - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) { - this.assignedEdges.remove(edgeInfo); - return true; - } else { - return false; - } + return EdgeUtils.removeAssignedEdge(this.assignedEdges, edge); } @Override diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java new file mode 100644 index 0000000000..fd0c21b2cb --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/EdgeUtils.java @@ -0,0 +1,61 @@ +package org.thingsboard.server.common.data; + +import org.thingsboard.server.common.data.edge.Edge; +import org.thingsboard.server.common.data.id.EdgeId; + +import java.util.Set; + +public final class EdgeUtils { + + private EdgeUtils() {} + + public static boolean isAssignedToEdge(Set assignedEdges, EdgeId edgeId) { + return assignedEdges != null && assignedEdges.contains(new ShortEdgeInfo(edgeId, null, null)); + } + + public static ShortEdgeInfo getAssignedEdgeInfo(Set assignedEdges, EdgeId edgeId) { + if (assignedEdges != null) { + for (ShortEdgeInfo edgeInfo : assignedEdges) { + if (edgeInfo.getEdgeId().equals(edgeId)) { + return edgeInfo; + } + } + } + return null; + } + + public static boolean addAssignedEdge(Set assignedEdges, Edge edge) { + ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); + if (assignedEdges != null && assignedEdges.contains(edgeInfo)) { + return false; + } else { + if (assignedEdges != null) { + assignedEdges.add(edgeInfo); + return true; + } else { + return false; + } + } + } + + public static boolean updateAssignedEdge(Set assignedEdges, Edge edge) { + ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); + if (assignedEdges != null && assignedEdges.contains(edgeInfo)) { + assignedEdges.remove(edgeInfo); + assignedEdges.add(edgeInfo); + return true; + } else { + return false; + } + } + + public static boolean removeAssignedEdge(Set assignedEdges, Edge edge) { + ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); + if (assignedEdges != null && assignedEdges.contains(edgeInfo)) { + assignedEdges.remove(edgeInfo); + return true; + } else { + return false; + } + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java index a7b649951d..b81f3f7a9a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleChain.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.JsonNode; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; @@ -89,52 +90,28 @@ public class RuleChain extends SearchTextBasedWithAdditionalInfo im setJson(data, json -> this.configuration = json, bytes -> this.configurationBytes = bytes); } + + public boolean isAssignedToEdge(EdgeId edgeId) { - return this.assignedEdges != null && this.assignedEdges.contains(new ShortEdgeInfo(edgeId, null, null)); + return EdgeUtils.isAssignedToEdge(this.assignedEdges, edgeId); } public ShortEdgeInfo getAssignedEdgeInfo(EdgeId edgeId) { - if (this.assignedEdges != null) { - for (ShortEdgeInfo edgeInfo : this.assignedEdges) { - if (edgeInfo.getEdgeId().equals(edgeId)) { - return edgeInfo; - } - } - } - return null; + return EdgeUtils.getAssignedEdgeInfo(this.assignedEdges, edgeId); } public boolean addAssignedEdge(Edge edge) { - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) { - return false; - } else { - if (this.assignedEdges == null) { - this.assignedEdges = new HashSet<>(); - } - this.assignedEdges.add(edgeInfo); - return true; + if (this.assignedEdges == null) { + this.assignedEdges = new HashSet<>(); } + return EdgeUtils.addAssignedEdge(this.assignedEdges, edge); } public boolean updateAssignedEdge(Edge edge) { - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) { - this.assignedEdges.remove(edgeInfo); - this.assignedEdges.add(edgeInfo); - return true; - } else { - return false; - } + return EdgeUtils.updateAssignedEdge(this.assignedEdges, edge); } public boolean removeAssignedEdge(Edge edge) { - ShortEdgeInfo edgeInfo = edge.toShortEdgeInfo(); - if (this.assignedEdges != null && this.assignedEdges.contains(edgeInfo)) { - this.assignedEdges.remove(edgeInfo); - return true; - } else { - return false; - } + return EdgeUtils.removeAssignedEdge(this.assignedEdges, edge); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index dc64d7df40..ef384a9fba 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.binary.Base64; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; @@ -301,7 +302,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic log.trace("Executing unassignCustomerEdges, tenantId [{}], customerId [{}]", tenantId, customerId); validateId(tenantId, INCORRECT_TENANT_ID + tenantId); validateId(customerId, INCORRECT_CUSTOMER_ID + customerId); - customerEdgeUnasigner.removeEntities(tenantId, customerId); + customerEdgeUnassigner.removeEntities(tenantId, customerId); } @Override @@ -387,7 +388,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic EdgeQueueEntityType edgeQueueEntityType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType()); if (edgeId != null && edgeQueueEntityType != null) { try { - saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), mapper.writeValueAsString(tbMsg), callback); + saveEventToEdgeQueue(tenantId, edgeId, edgeQueueEntityType, tbMsg.getType(), Base64.encodeBase64String(TbMsg.toByteArray(tbMsg)), callback); } catch (IOException e) { log.error("Error while saving custom tbMsg into Edge Queue", e); } @@ -723,7 +724,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic } }; - private PaginatedRemover customerEdgeUnasigner = new PaginatedRemover() { + private PaginatedRemover customerEdgeUnassigner = new PaginatedRemover() { @Override protected List findEntities(TenantId tenantId, CustomerId id, TextPageLink pageLink) {