diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index 3dca3417b4..77bf7c32fa 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -865,6 +865,8 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) { + attributeSubscriptions.remove(sessionId); + rpcSubscriptions.remove(sessionId); notifyTransportAboutClosedSession(sessionId, sessionMd, TransportSessionCloseReason.MAX_CONCURRENT_SESSIONS_LIMIT_REACHED); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java index 52277b8527..faf001fbba 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/BaseDashboardProcessor.java @@ -26,6 +26,7 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg; import org.thingsboard.server.service.edge.rpc.processor.BaseEdgeProcessor; +import java.util.HashSet; import java.util.Set; @Slf4j @@ -40,54 +41,63 @@ public abstract class BaseDashboardProcessor extends BaseEdgeProcessor { if (dashboard == null) { throw new RuntimeException("[{" + tenantId + "}] dashboardUpdateMsg {" + dashboardUpdateMsg + "} cannot be converted to dashboard"); } - Set assignedCustomers = null; + Set newAssignedCustomers = new HashSet<>(dashboard.getAssignedCustomers()); Dashboard dashboardById = edgeCtx.getDashboardService().findDashboardById(tenantId, dashboardId); if (dashboardById == null) { created = true; dashboard.setId(null); + dashboard.setAssignedCustomers(null); } else { dashboard.setId(dashboardId); - assignedCustomers = filterNonExistingCustomers(tenantId, dashboardById.getAssignedCustomers()); + dashboard.setAssignedCustomers(dashboardById.getAssignedCustomers()); } dashboardValidator.validate(dashboard, Dashboard::getTenantId); if (created) { dashboard.setId(dashboardId); } - Set msgAssignedCustomers = filterNonExistingCustomers(tenantId, dashboard.getAssignedCustomers()); - if (msgAssignedCustomers != null) { - if (assignedCustomers == null) { - assignedCustomers = msgAssignedCustomers; - } else { - assignedCustomers.addAll(msgAssignedCustomers); + + Dashboard savedDashboard = edgeCtx.getDashboardService().saveDashboard(dashboard, false); + + updateDashboardAssignments(tenantId, dashboardById, savedDashboard, newAssignedCustomers); + + return created; + } + + private void updateDashboardAssignments(TenantId tenantId, Dashboard dashboardById, Dashboard savedDashboard, Set newAssignedCustomers) { + Set currentAssignedCustomers = new HashSet<>(); + if (dashboardById != null) { + if (dashboardById.getAssignedCustomers() != null) { + currentAssignedCustomers.addAll(dashboardById.getAssignedCustomers()); } } - dashboard.setAssignedCustomers(assignedCustomers); - Dashboard savedDashboard = edgeCtx.getDashboardService().saveDashboard(dashboard, false); - if (msgAssignedCustomers != null && !msgAssignedCustomers.isEmpty()) { - for (ShortCustomerInfo assignedCustomer : msgAssignedCustomers) { - if (assignedCustomer.getCustomerId().equals(customerId)) { - edgeCtx.getDashboardService().assignDashboardToCustomer(tenantId, savedDashboard.getId(), assignedCustomer.getCustomerId()); - } + + newAssignedCustomers = filterNonExistingCustomers(tenantId, currentAssignedCustomers, newAssignedCustomers); + + Set addedCustomerIds = new HashSet<>(); + Set removedCustomerIds = new HashSet<>(); + for (ShortCustomerInfo newAssignedCustomer : newAssignedCustomers) { + if (!savedDashboard.isAssignedToCustomer(newAssignedCustomer.getCustomerId())) { + addedCustomerIds.add(newAssignedCustomer.getCustomerId()); } - } else { - unassignCustomersFromDashboard(tenantId, savedDashboard, customerId); } - return created; - } - private void unassignCustomersFromDashboard(TenantId tenantId, Dashboard dashboard, CustomerId customerId) { - if (dashboard.getAssignedCustomers() != null && !dashboard.getAssignedCustomers().isEmpty()) { - for (ShortCustomerInfo assignedCustomer : dashboard.getAssignedCustomers()) { - if (assignedCustomer.getCustomerId().equals(customerId)) { - edgeCtx.getDashboardService().unassignDashboardFromCustomer(tenantId, dashboard.getId(), assignedCustomer.getCustomerId()); - } + for (ShortCustomerInfo currentAssignedCustomer : currentAssignedCustomers) { + if (!newAssignedCustomers.contains(currentAssignedCustomer)) { + removedCustomerIds.add(currentAssignedCustomer.getCustomerId()); } } + + for (CustomerId customerIdToAdd : addedCustomerIds) { + edgeCtx.getDashboardService().assignDashboardToCustomer(tenantId, savedDashboard.getId(), customerIdToAdd); + } + for (CustomerId customerIdToRemove : removedCustomerIds) { + edgeCtx.getDashboardService().unassignDashboardFromCustomer(tenantId, savedDashboard.getId(), customerIdToRemove); + } } protected abstract Dashboard constructDashboardFromUpdateMsg(TenantId tenantId, DashboardId dashboardId, DashboardUpdateMsg dashboardUpdateMsg); - protected abstract Set filterNonExistingCustomers(TenantId tenantId, Set assignedCustomers); + protected abstract Set filterNonExistingCustomers(TenantId tenantId, Set currentAssignedCustomers, Set newAssignedCustomers); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java index cb54977c3c..8d953907b6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessor.java @@ -128,9 +128,9 @@ public abstract class DashboardEdgeProcessor extends BaseDashboardProcessor impl } @Override - protected Set filterNonExistingCustomers(TenantId tenantId, Set assignedCustomers) { - // do nothing on cloud - return assignedCustomers; + protected Set filterNonExistingCustomers(TenantId tenantId, Set currentAssignedCustomers, Set newAssignedCustomers) { + newAssignedCustomers.addAll(currentAssignedCustomers); + return newAssignedCustomers; } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessorV1.java index 20ab6019bf..c13c85779a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/dashboard/DashboardEdgeProcessorV1.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.gen.edge.v1.DashboardUpdateMsg; import org.thingsboard.server.queue.util.TbCoreComponent; +import java.util.HashSet; import java.util.Set; @Component @@ -44,7 +45,7 @@ public class DashboardEdgeProcessorV1 extends DashboardEdgeProcessor { Set assignedCustomers; if (dashboardUpdateMsg.hasAssignedCustomers()) { assignedCustomers = JacksonUtil.fromString(dashboardUpdateMsg.getAssignedCustomers(), new TypeReference<>() {}); - assignedCustomers = filterNonExistingCustomers(tenantId, assignedCustomers); + assignedCustomers = filterNonExistingCustomers(tenantId, new HashSet<>(), assignedCustomers); dashboard.setAssignedCustomers(assignedCustomers); } dashboard.setMobileOrder(dashboardUpdateMsg.hasMobileOrder() ? dashboardUpdateMsg.getMobileOrder() : null); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/BaseResourceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/BaseResourceProcessor.java index ac7bd6768c..a9a3bcf9fb 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/BaseResourceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/resource/BaseResourceProcessor.java @@ -53,6 +53,9 @@ public abstract class BaseResourceProcessor extends BaseEdgeProcessor { } String resourceKey = resource.getResourceKey(); ResourceType resourceType = resource.getResourceType(); + if (!created && !resourceType.isUpdatable()) { + resource.setData(null); + } PageDataIterable resourcesIterable = new PageDataIterable<>( link -> edgeCtx.getResourceService().findTenantResourcesByResourceTypeAndPageLink(tenantId, resourceType, link), 1024); for (TbResource tbResource : resourcesIterable) { diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index debc1cb987..27a64dc443 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -115,8 +115,9 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi ToDeviceRpcRequest request = new ToDeviceRpcRequest(src.getRequestUUID(), src.getTenantId(), src.getDeviceId(), src.isOneway(), src.getExpirationTime(), new ToDeviceRpcRequestBody(src.getMethod(), src.getBody()), src.isPersisted(), src.getRetries(), src.getAdditionalInfo()); forwardRpcRequestToDeviceActor(request, response -> { - if (src.isRestApiCall()) { - sendRpcResponseToTbCore(src.getOriginServiceId(), response); + String originServiceId = src.getOriginServiceId(); + if (src.isRestApiCall() && originServiceId != null) { + sendRpcResponseToTbCore(originServiceId, response); } consumer.accept(RuleEngineDeviceRpcResponse.builder() .deviceId(src.getDeviceId()) diff --git a/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java index 3b252046ab..5b09e05a09 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DashboardEdgeTest.java @@ -16,6 +16,7 @@ package org.thingsboard.server.edge; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Sets; import com.google.protobuf.AbstractMessage; import org.junit.Assert; import org.junit.Test; @@ -175,7 +176,11 @@ public class DashboardEdgeTest extends AbstractEdgeTest { @Test public void testSendDashboardToCloud() throws Exception { - Dashboard dashboard = buildDashboardForUplinkMsg(); + Customer customer = new Customer(); + customer.setTitle("Edge Customer"); + Customer savedCustomer = doPost("/api/customer", customer, Customer.class); + + Dashboard dashboard = buildDashboardForUplinkMsg(savedCustomer); UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); DashboardUpdateMsg.Builder dashboardUpdateMsgBuilder = DashboardUpdateMsg.newBuilder(); @@ -196,6 +201,11 @@ public class DashboardEdgeTest extends AbstractEdgeTest { Dashboard foundDashboard = doGet("/api/dashboard/" + dashboard.getUuidId(), Dashboard.class); Assert.assertNotNull(foundDashboard); Assert.assertEquals("Edge Test Dashboard", foundDashboard.getName()); + + PageData pageData = doGetTypedWithPageLink("/api/customer/" + savedCustomer.getId().toString() + "/dashboards?", + new TypeReference<>() {}, new PageLink(100)); + Assert.assertEquals(1, pageData.getData().size()); + Assert.assertEquals("Edge Test Dashboard", pageData.getData().get(0).getTitle()); } @Test @@ -242,11 +252,12 @@ public class DashboardEdgeTest extends AbstractEdgeTest { return savedDashboard; } - private Dashboard buildDashboardForUplinkMsg() { + private Dashboard buildDashboardForUplinkMsg(Customer savedCustomer) { Dashboard dashboard = new Dashboard(); dashboard.setId(new DashboardId(UUID.randomUUID())); dashboard.setTenantId(tenantId); dashboard.setTitle("Edge Test Dashboard"); + dashboard.setAssignedCustomers(Sets.newHashSet(new ShortCustomerInfo(savedCustomer.getId(), savedCustomer.getTitle(), savedCustomer.isPublic()))); return dashboard; } diff --git a/application/src/test/java/org/thingsboard/server/edge/ResourceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/ResourceEdgeTest.java index c27489d207..cc5aa4da0e 100644 --- a/application/src/test/java/org/thingsboard/server/edge/ResourceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/ResourceEdgeTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.edge; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.google.protobuf.AbstractMessage; +import com.google.protobuf.InvalidProtocolBufferException; import org.junit.Assert; import org.junit.Test; import org.thingsboard.common.util.JacksonUtil; @@ -98,30 +99,23 @@ public class ResourceEdgeTest extends AbstractEdgeTest { public void testSendResourceToCloud() throws Exception { TbResource tbResource = createTbResource(); UUID uuid = Uuids.timeBased(); + UplinkMsg uplinkMsg = getUplinkMsg(uuid, tbResource, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); - UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); - ResourceUpdateMsg.Builder resourceUpdateMsgBuilder = ResourceUpdateMsg.newBuilder(); - resourceUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits()); - resourceUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits()); - resourceUpdateMsgBuilder.setEntity(JacksonUtil.toString(tbResource)); - resourceUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); - testAutoGeneratedCodeByProtobuf(resourceUpdateMsgBuilder); - uplinkMsgBuilder.addResourceUpdateMsg(resourceUpdateMsgBuilder.build()); + checkResourceOnCloud(uplinkMsg, uuid, tbResource.getTitle()); + } - testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + @Test + public void testUpdateResourceTitleOnCloud() throws Exception { + TbResource tbResource = createTbResource(); + UUID uuid = Uuids.timeBased(); + UplinkMsg uplinkMsg = getUplinkMsg(uuid, tbResource, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); - edgeImitator.expectResponsesAmount(1); - edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + checkResourceOnCloud(uplinkMsg, uuid, tbResource.getTitle()); - Assert.assertTrue(edgeImitator.waitForResponses()); + tbResource.setTitle("Updated Edge Test Resource"); + UplinkMsg updatedUplinkMsg = getUplinkMsg(uuid, tbResource, UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE); - UplinkResponseMsg latestResponseMsg = edgeImitator.getLatestResponseMsg(); - Assert.assertTrue(latestResponseMsg.getSuccess()); - - TbResource tb = doGet("/api/resource/" + uuid, TbResource.class); - Assert.assertNotNull(tb); - Assert.assertEquals("Edge Test Resource", tb.getName()); - Assert.assertEquals(TEST_DATA, tb.getEncodedData()); + checkResourceOnCloud(updatedUplinkMsg, uuid, tbResource.getTitle()); } @Test @@ -134,21 +128,12 @@ public class ResourceEdgeTest extends AbstractEdgeTest { UUID uuid = Uuids.timeBased(); - UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); - ResourceUpdateMsg.Builder resourceUpdateMsgBuilder = ResourceUpdateMsg.newBuilder(); - resourceUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits()); - resourceUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits()); - resourceUpdateMsgBuilder.setEntity(JacksonUtil.toString(resource)); - resourceUpdateMsgBuilder.setMsgType(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); - testAutoGeneratedCodeByProtobuf(resourceUpdateMsgBuilder); - uplinkMsgBuilder.addResourceUpdateMsg(resourceUpdateMsgBuilder.build()); - - testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + UplinkMsg uplinkMsg = getUplinkMsg(uuid, resource, UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE); edgeImitator.expectResponsesAmount(1); edgeImitator.expectMessageAmount(1); - edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build()); + edgeImitator.sendUplinkMsg(uplinkMsg); Assert.assertTrue(edgeImitator.waitForResponses()); Assert.assertTrue(edgeImitator.waitForMessages()); @@ -177,4 +162,35 @@ public class ResourceEdgeTest extends AbstractEdgeTest { tbResource.setEncodedData(TEST_DATA); return tbResource; } + + private UplinkMsg getUplinkMsg(UUID uuid, TbResource tbResource, UpdateMsgType updateMsgType) throws InvalidProtocolBufferException { + UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder(); + ResourceUpdateMsg.Builder resourceUpdateMsgBuilder = ResourceUpdateMsg.newBuilder(); + resourceUpdateMsgBuilder.setIdMSB(uuid.getMostSignificantBits()); + resourceUpdateMsgBuilder.setIdLSB(uuid.getLeastSignificantBits()); + resourceUpdateMsgBuilder.setEntity(JacksonUtil.toString(tbResource)); + resourceUpdateMsgBuilder.setMsgType(updateMsgType); + testAutoGeneratedCodeByProtobuf(resourceUpdateMsgBuilder); + uplinkMsgBuilder.addResourceUpdateMsg(resourceUpdateMsgBuilder.build()); + + testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder); + + return uplinkMsgBuilder.build(); + } + + private void checkResourceOnCloud(UplinkMsg uplinkMsg, UUID uuid, String resourceTitle) throws Exception { + edgeImitator.expectResponsesAmount(1); + edgeImitator.sendUplinkMsg(uplinkMsg); + + Assert.assertTrue(edgeImitator.waitForResponses()); + + UplinkResponseMsg latestResponseMsg = edgeImitator.getLatestResponseMsg(); + Assert.assertTrue(latestResponseMsg.getSuccess()); + + TbResource tb = doGet("/api/resource/" + uuid, TbResource.class); + Assert.assertNotNull(tb); + Assert.assertEquals(resourceTitle, tb.getName()); + Assert.assertEquals(TEST_DATA, tb.getEncodedData()); + } + } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index e669016514..e6efcf8206 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -963,7 +963,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement unSubResults.add((short) MqttReasonCodes.UnsubAck.NO_SUBSCRIPTION_EXISTED.byteValue()); } } - if (!activityReported) { + if (!activityReported && !deviceSessionCtx.isProvisionOnly()) { transportService.recordActivity(deviceSessionCtx.getSessionInfo()); } ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), unSubResults)); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index af9dca8583..547eab6172 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -735,7 +735,11 @@ public class DefaultTransportService extends TransportActivityManager implements } private void recordActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { - onActivity(toSessionId(sessionInfo), sessionInfo, getCurrentTimeMillis()); + if (sessionInfo != null) { + onActivity(toSessionId(sessionInfo), sessionInfo, getCurrentTimeMillis()); + } else { + log.warn("Session info is missing, unable to record activity"); + } } @Override diff --git a/ui-ngx/src/app/modules/home/pages/admin/queue/queue.component.html b/ui-ngx/src/app/modules/home/pages/admin/queue/queue.component.html index f25cd1c857..68b73e7357 100644 --- a/ui-ngx/src/app/modules/home/pages/admin/queue/queue.component.html +++ b/ui-ngx/src/app/modules/home/pages/admin/queue/queue.component.html @@ -22,7 +22,7 @@ [class.!hidden]="isEdit || isDetailsPage"> {{'common.open-details-page' | translate }} - - - - - - - - -