diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 28e6ad190c..e1ccc7dbf5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -261,7 +261,7 @@ public class TenantActor extends RuleChainManagerActor { edgeRpcService.updateEdge(tenantId, edge); } } - if (msg.getEntityId().getEntityType() == EntityType.DEVICE && ComponentLifecycleEvent.DELETED == msg.getEvent()) { + if (msg.getEntityId().getEntityType() == EntityType.DEVICE && ComponentLifecycleEvent.DELETED == msg.getEvent() && isMyPartition(msg.getEntityId())) { DeviceId deviceId = (DeviceId) msg.getEntityId(); onToDeviceActorMsg(new DeviceDeleteMsg(tenantId, deviceId), true); deletedDevices.add(deviceId); diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/EntitiesLimitTriggerProcessor.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/EntitiesLimitTriggerProcessor.java index b7847b9857..e1db043188 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/EntitiesLimitTriggerProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/trigger/EntitiesLimitTriggerProcessor.java @@ -19,10 +19,10 @@ import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.notification.info.EntitiesLimitNotificationInfo; import org.thingsboard.server.common.data.notification.info.RuleOriginatedNotificationInfo; +import org.thingsboard.server.common.data.notification.rule.trigger.EntitiesLimitTrigger; import org.thingsboard.server.common.data.notification.rule.trigger.config.EntitiesLimitNotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.config.NotificationRuleTriggerType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.data.notification.rule.trigger.EntitiesLimitTrigger; import org.thingsboard.server.dao.entity.EntityCountService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; @@ -48,6 +48,9 @@ public class EntitiesLimitTriggerProcessor implements NotificationRuleTriggerPro return false; } long currentCount = entityCountService.countByTenantIdAndEntityType(trigger.getTenantId(), trigger.getEntityType()); + if (currentCount == 0) { + return false; + } trigger.setLimit(limit); trigger.setCurrentCount(currentCount); return (int) (limit * triggerConfig.getThreshold()) == currentCount; // strict comparing not to send notification on each new entity @@ -59,7 +62,7 @@ public class EntitiesLimitTriggerProcessor implements NotificationRuleTriggerPro .entityType(trigger.getEntityType()) .currentCount(trigger.getCurrentCount()) .limit(trigger.getLimit()) - .percents((int) (((float)trigger.getCurrentCount() / trigger.getLimit()) * 100)) + .percents((int) (((float) trigger.getCurrentCount() / trigger.getLimit()) * 100)) .tenantId(trigger.getTenantId()) .tenantName(tenantService.findTenantById(trigger.getTenantId()).getName()) .build(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index e888dcd526..2d5e6963eb 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -373,10 +373,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService 0 ? RpcError.values()[proto.getError()] : null; @@ -164,10 +165,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< tbDeviceRpcService.processRpcResponseFromDevice(response); callback.onSuccess(); } else if (nfMsg.hasQueueUpdateMsg()) { - ctx.getScheduler().execute(() -> updateQueue(nfMsg.getQueueUpdateMsg())); + updateQueue(nfMsg.getQueueUpdateMsg()); callback.onSuccess(); } else if (nfMsg.hasQueueDeleteMsg()) { - ctx.getScheduler().execute(() -> deleteQueue(nfMsg.getQueueDeleteMsg())); + deleteQueue(nfMsg.getQueueDeleteMsg()); callback.onSuccess(); } else { log.trace("Received notification with missing handler"); @@ -204,13 +205,30 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); var consumerManager = consumers.remove(queueKey); if (consumerManager != null) { - consumerManager.delete(); + consumerManager.delete(true); } partitionService.removeQueue(queueDeleteMsg); partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); } + @EventListener + public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) { + if (event.getEntityId().getEntityType() == EntityType.TENANT) { + if (event.getEvent() == ComponentLifecycleEvent.DELETED) { + List toRemove = consumers.keySet().stream() + .filter(queueKey -> queueKey.getTenantId().equals(event.getTenantId())) + .collect(Collectors.toList()); + toRemove.forEach(queueKey -> { + var consumerManager = consumers.remove(queueKey); + if (consumerManager != null) { + consumerManager.delete(false); + } + }); + } + } + } + private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) { return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key)); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 394f41883d..3065652d0f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.queue.processing; -import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationEventPublisher; @@ -30,7 +29,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -166,57 +164,51 @@ public abstract class AbstractConsumerService actorMsgOpt = encodingService.decode(nfMsg.toByteArray()); - actorMsgOpt.ifPresent(tbActorMsg -> handleComponentLifecycleMsg(id, tbActorMsg)); - } - - protected void handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg) { - if (actorMsg instanceof ComponentLifecycleMsg) { - ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg; - log.debug("[{}][{}][{}] Received Lifecycle event: {}", componentLifecycleMsg.getTenantId(), componentLifecycleMsg.getEntityId().getEntityType(), - componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent()); - if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId()); - tenantProfileCache.evict(tenantProfileId); + protected final void handleComponentLifecycleMsg(UUID id, ComponentLifecycleMsg componentLifecycleMsg) { + TenantId tenantId = componentLifecycleMsg.getTenantId(); + log.debug("[{}][{}][{}] Received Lifecycle event: {}", tenantId, componentLifecycleMsg.getEntityId().getEntityType(), + componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent()); + if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId()); + tenantProfileCache.evict(tenantProfileId); + if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { + apiUsageStateService.onTenantProfileUpdate(tenantProfileId); + } + } else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + if (TenantId.SYS_TENANT_ID.equals(tenantId)) { + jwtSettingsService.ifPresent(JwtSettingsService::reloadJwtSettings); + return; + } else { + tenantProfileCache.evict(tenantId); + partitionService.evictTenantInfo(tenantId); if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { - apiUsageStateService.onTenantProfileUpdate(tenantProfileId); - } - } else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - if (TenantId.SYS_TENANT_ID.equals(componentLifecycleMsg.getTenantId())) { - jwtSettingsService.ifPresent(JwtSettingsService::reloadJwtSettings); - return; - } else { - tenantProfileCache.evict(componentLifecycleMsg.getTenantId()); - partitionService.removeTenant(componentLifecycleMsg.getTenantId()); - if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) { - apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId()); - } else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) { - apiUsageStateService.onTenantDelete((TenantId) componentLifecycleMsg.getEntityId()); - } - } - } else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - assetProfileCache.evict(componentLifecycleMsg.getTenantId(), new AssetProfileId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - assetProfileCache.evict(componentLifecycleMsg.getTenantId(), new AssetId(componentLifecycleMsg.getEntityId().getId())); - } else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg); - } else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - apiUsageStateService.onApiUsageStateUpdate(componentLifecycleMsg.getTenantId()); - } else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) { - if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) { - apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId()); + apiUsageStateService.onTenantUpdate(tenantId); + } else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) { + apiUsageStateService.onTenantDelete(tenantId); + partitionService.removeTenant(tenantId); } } - eventPublisher.publishEvent(componentLifecycleMsg); + } else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + deviceProfileCache.evict(tenantId, new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + deviceProfileCache.evict(tenantId, new DeviceId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + assetProfileCache.evict(tenantId, new AssetProfileId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + assetProfileCache.evict(tenantId, new AssetId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg); + } else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + apiUsageStateService.onApiUsageStateUpdate(tenantId); + } else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) { + apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId()); + } } - log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, actorMsg); - actorContext.tellWithHighPriority(actorMsg); + + eventPublisher.publishEvent(componentLifecycleMsg); + log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, componentLifecycleMsg); + actorContext.tellWithHighPriority(componentLifecycleMsg); } protected abstract void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) throws Exception; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java index 20eea32a6a..3c52e31afe 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbQueueConsumerManagerTask.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.queue.ruleengine; +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.ToString; import org.thingsboard.server.common.data.queue.Queue; @@ -24,24 +25,24 @@ import java.util.Set; @Getter @ToString +@AllArgsConstructor public class TbQueueConsumerManagerTask { private final QueueEvent event; private Queue queue; private Set partitions; + private boolean drainQueue; - public TbQueueConsumerManagerTask(QueueEvent event) { - this.event = event; + public static TbQueueConsumerManagerTask delete(boolean drainQueue) { + return new TbQueueConsumerManagerTask(QueueEvent.DELETE, null, null, drainQueue); } - public TbQueueConsumerManagerTask(QueueEvent event, Queue queue) { - this.event = event; - this.queue = queue; + public static TbQueueConsumerManagerTask configUpdate(Queue queue) { + return new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue, null, false); } - public TbQueueConsumerManagerTask(QueueEvent event, Set partitions) { - this.event = event; - this.partitions = partitions; + public static TbQueueConsumerManagerTask partitionChange(Set partitions) { + return new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, null, partitions, false); } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java index 22db5b7b38..2da3dbc6dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManager.java @@ -95,15 +95,15 @@ public class TbRuleEngineQueueConsumerManager { } public void update(Queue queue) { - addTask(new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue)); + addTask(TbQueueConsumerManagerTask.configUpdate(queue)); } public void update(Set partitions) { - addTask(new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, partitions)); + addTask(TbQueueConsumerManagerTask.partitionChange(partitions)); } - public void delete() { - addTask(new TbQueueConsumerManagerTask(QueueEvent.DELETE)); + public void delete(boolean drainQueue) { + addTask(TbQueueConsumerManagerTask.delete(drainQueue)); } private void addTask(TbQueueConsumerManagerTask todo) { @@ -138,7 +138,7 @@ public class TbRuleEngineQueueConsumerManager { } else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) { newConfiguration = task.getQueue(); } else if (task.getEvent() == QueueEvent.DELETE) { - doDelete(); + doDelete(task.isDrainQueue()); return; } } @@ -205,7 +205,7 @@ public class TbRuleEngineQueueConsumerManager { log.debug("[{}] Unsubscribed and stopped consumers", queueKey); } - private void doDelete() { + private void doDelete(boolean drainQueue) { stopped = true; log.info("[{}] Handling queue deletion", queueKey); consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion); @@ -213,7 +213,9 @@ public class TbRuleEngineQueueConsumerManager { List>> queueConsumers = consumerWrapper.getConsumers().stream() .map(TbQueueConsumerTask::getConsumer).collect(Collectors.toList()); ctx.getConsumersExecutor().submit(() -> { - drainQueue(queueConsumers); + if (drainQueue) { + drainQueue(queueConsumers); + } queueConsumers.forEach(consumer -> { for (String topic : consumer.getFullTopicNames()) { diff --git a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java index f52fc32f4f..d0e3444d0f 100644 --- a/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java +++ b/application/src/main/java/org/thingsboard/server/service/security/auth/oauth2/AbstractOAuth2ClientMapper.java @@ -38,7 +38,6 @@ import org.thingsboard.server.common.data.oauth2.OAuth2MapperConfig; import org.thingsboard.server.common.data.oauth2.OAuth2Registration; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.UserCredentials; import org.thingsboard.server.dao.customer.CustomerService; @@ -47,12 +46,12 @@ import org.thingsboard.server.dao.oauth2.OAuth2User; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.service.entitiy.tenant.TbTenantService; import org.thingsboard.server.service.entitiy.user.TbUserService; import org.thingsboard.server.service.install.InstallScripts; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.model.UserPrincipal; -import java.io.IOException; import java.util.List; import java.util.Optional; import java.util.concurrent.locks.Lock; @@ -71,6 +70,9 @@ public abstract class AbstractOAuth2ClientMapper { @Autowired private TenantService tenantService; + @Autowired + private TbTenantService tbTenantService; + @Autowired private CustomerService customerService; @@ -92,7 +94,7 @@ public abstract class AbstractOAuth2ClientMapper { @Value("${edges.enabled}") @Getter private boolean edgesEnabled; - + private final Lock userCreationLock = new ReentrantLock(); protected SecurityUser getOrCreateSecurityUserFromOAuth2User(OAuth2User oauth2User, OAuth2Registration registration) { @@ -171,19 +173,13 @@ public abstract class AbstractOAuth2ClientMapper { } } - private TenantId getTenantId(String tenantName) throws IOException { + private TenantId getTenantId(String tenantName) throws Exception { List tenants = tenantService.findTenants(new PageLink(1, 0, tenantName)).getData(); Tenant tenant; if (tenants == null || tenants.isEmpty()) { tenant = new Tenant(); tenant.setTitle(tenantName); - tenant = tenantService.saveTenant(tenant); - installScripts.createDefaultRuleChains(tenant.getId()); - installScripts.createDefaultEdgeRuleChains(tenant.getId()); - tenantProfileCache.evict(tenant.getId()); - tbClusterService.onTenantChange(tenant, null); - tbClusterService.broadcastEntityStateChangeEvent(tenant.getId(), tenant.getId(), - ComponentLifecycleEvent.CREATED); + tenant = tbTenantService.save(tenant); } else { tenant = tenants.get(0); } diff --git a/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java b/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java new file mode 100644 index 0000000000..557da0332e --- /dev/null +++ b/application/src/test/java/org/thingsboard/server/actors/tenant/TenantActorTest.java @@ -0,0 +1,74 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.tenant; + +import org.junit.Before; +import org.junit.Test; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.TbActorCtx; +import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.msg.rule.engine.DeviceDeleteMsg; +import org.thingsboard.server.dao.tenant.TenantService; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TenantActorTest { + + TenantActor tenantActor; + TbActorCtx ctx; + ActorSystemContext systemContext; + TenantId tenantId = TenantId.SYS_TENANT_ID; + DeviceId deviceId = DeviceId.fromString("78bf9b26-74ef-4af2-9cfb-ad6cf24ad2ec"); + + @Before + public void setUp() throws Exception { + systemContext = mock(ActorSystemContext.class); + ctx = mock(TbActorCtx.class); + tenantActor = (TenantActor) new TenantActor.ActorCreator(systemContext, tenantId).createActor(); + when(systemContext.getTenantService()).thenReturn(mock(TenantService.class)); + tenantActor.init(ctx); + tenantActor.cantFindTenant = false; + } + + @Test + public void deleteDeviceTest() { + TbActorRef deviceActorRef = mock(TbActorRef.class); + when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 0,true)); + when(ctx.getOrCreateChildActor(any(), any(), any(), any())).thenReturn(deviceActorRef); + ComponentLifecycleMsg componentLifecycleMsg = new ComponentLifecycleMsg(tenantId, deviceId, ComponentLifecycleEvent.DELETED); + tenantActor.doProcess(componentLifecycleMsg); + verify(deviceActorRef).tellWithHighPriority(eq(new DeviceDeleteMsg(tenantId, deviceId))); + + reset(ctx, deviceActorRef); + when(systemContext.resolve(ServiceType.TB_CORE, tenantId, deviceId)).thenReturn(new TopicPartitionInfo("Main", tenantId, 1,false)); + tenantActor.doProcess(componentLifecycleMsg); + verify(ctx, never()).getOrCreateChildActor(any(), any(), any(), any()); + verify(deviceActorRef, never()).tellWithHighPriority(any()); + } + +} \ No newline at end of file diff --git a/application/src/test/java/org/thingsboard/server/controller/ImageControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/ImageControllerTest.java index 3761e0c9c1..e272a40857 100644 --- a/application/src/test/java/org/thingsboard/server/controller/ImageControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/ImageControllerTest.java @@ -130,13 +130,14 @@ public class ImageControllerTest extends AbstractControllerTest { checkPngImageDescriptor(imageInfo.getDescriptor(ImageDescriptor.class)); String newFilename = "my_jpeg_image.png"; - imageInfo = uploadImage(HttpMethod.PUT, "/api/images/tenant/" + filename, newFilename, "image/jpeg", JPEG_IMAGE); + TbResourceInfo newImageInfo = uploadImage(HttpMethod.PUT, "/api/images/tenant/" + filename, newFilename, "image/jpeg", JPEG_IMAGE); - assertThat(imageInfo.getTitle()).isEqualTo(filename); - assertThat(imageInfo.getResourceKey()).isEqualTo(filename); - assertThat(imageInfo.getFileName()).isEqualTo(newFilename); + assertThat(newImageInfo.getTitle()).isEqualTo(filename); + assertThat(newImageInfo.getResourceKey()).isEqualTo(filename); + assertThat(newImageInfo.getFileName()).isEqualTo(newFilename); + assertThat(newImageInfo.getPublicResourceKey()).isEqualTo(imageInfo.getPublicResourceKey()); - ImageDescriptor imageDescriptor = imageInfo.getDescriptor(ImageDescriptor.class); + ImageDescriptor imageDescriptor = newImageInfo.getDescriptor(ImageDescriptor.class); checkJpegImageDescriptor(imageDescriptor); assertThat(downloadImage("tenant", filename)).containsExactly(JPEG_IMAGE); @@ -154,12 +155,15 @@ public class ImageControllerTest extends AbstractControllerTest { assertThat(imageInfo.getFileName()).isEqualTo(filename); String newTitle = "My PNG image"; - imageInfo.setTitle(newTitle); - imageInfo.setDescriptor(JacksonUtil.newObjectNode()); - imageInfo = doPut("/api/images/tenant/" + filename + "/info", imageInfo, TbResourceInfo.class); - - assertThat(imageInfo.getTitle()).isEqualTo(newTitle); - assertThat(imageInfo.getDescriptor(ImageDescriptor.class)).isEqualTo(imageDescriptor); + TbResourceInfo newImageInfo = new TbResourceInfo(imageInfo); + newImageInfo.setTitle(newTitle); + newImageInfo.setDescriptor(JacksonUtil.newObjectNode()); + newImageInfo = doPut("/api/images/tenant/" + filename + "/info", newImageInfo, TbResourceInfo.class); + + assertThat(newImageInfo.getTitle()).isEqualTo(newTitle); + assertThat(newImageInfo.getDescriptor(ImageDescriptor.class)).isEqualTo(imageDescriptor); + assertThat(newImageInfo.getResourceKey()).isEqualTo(imageInfo.getResourceKey()); + assertThat(newImageInfo.getPublicResourceKey()).isEqualTo(newImageInfo.getPublicResourceKey()); } @Test diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java index 07fa41f8bd..cd47988658 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java @@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantInfo; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -64,6 +65,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.QueueKey; import java.util.ArrayList; import java.util.Collections; @@ -71,7 +73,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -80,6 +81,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.containsString; import static org.mockito.ArgumentMatchers.argThat; @@ -700,6 +702,45 @@ public class TenantControllerTest extends AbstractControllerTest { }); } + @Test + public void whenTenantIsDeleted_thenDeleteQueues() throws Exception { + loginSysAdmin(); + TenantProfile tenantProfile = new TenantProfile(); + tenantProfile.setName("Test profile"); + TenantProfileData tenantProfileData = new TenantProfileData(); + tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration()); + tenantProfile.setProfileData(tenantProfileData); + tenantProfile.setIsolatedTbRuleEngine(true); + addQueueConfig(tenantProfile, MAIN_QUEUE_NAME); + tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); + createDifferentTenant(); + loginSysAdmin(); + savedDifferentTenant.setTenantProfileId(tenantProfile.getId()); + savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class); + TenantId tenantId = differentTenantId; + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNotNull(); + }); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId); + assertThat(tpi.getTenantId()).hasValue(tenantId); + TbMsg tbMsg = publishTbMsg(tenantId, tpi); + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + verify(actorContext).tell(argThat(msg -> { + return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(tbMsg.getId()); + })); + }); + + deleteDifferentTenant(); + + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNull(); + assertThatThrownBy(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId)) + .isInstanceOf(TenantNotFoundException.class); + + verify(queueAdmin).deleteTopic(eq(tpi.getFullTopicName())); + }); + } + private TbMsg publishTbMsg(TenantId tenantId, TopicPartitionInfo tpi) { TbMsg tbMsg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, tenantId, TbMsgMetaData.EMPTY, "{\"test\":1}"); TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() @@ -759,7 +800,7 @@ public class TenantControllerTest extends AbstractControllerTest { queueConfiguration.setName(queueName); queueConfiguration.setTopic(topic); queueConfiguration.setPollInterval(25); - queueConfiguration.setPartitions(1 + new Random().nextInt(99)); + queueConfiguration.setPartitions(12); queueConfiguration.setConsumerPerPartition(true); queueConfiguration.setPackProcessingTimeout(2000); SubmitStrategy submitStrategy = new SubmitStrategy(); @@ -799,20 +840,20 @@ public class TenantControllerTest extends AbstractControllerTest { ArgumentMatcher matcherTenant = cntTime == 1 ? argument -> argument.equals(tenant) : argument -> argument.getClass().equals(Tenant.class); if (ComponentLifecycleEvent.DELETED.equals(event)) { - Mockito.verify(tbClusterService, times( cntTime)).onTenantDelete(Mockito.argThat(matcherTenant), + Mockito.verify(tbClusterService, times(cntTime)).onTenantDelete(Mockito.argThat(matcherTenant), Mockito.isNull()); } else { - Mockito.verify(tbClusterService, times( cntTime)).onTenantChange(Mockito.argThat(matcherTenant), + Mockito.verify(tbClusterService, times(cntTime)).onTenantChange(Mockito.argThat(matcherTenant), Mockito.isNull()); } TenantId tenantId = cntTime == 1 ? tenant.getId() : (TenantId) createEntityId_NULL_UUID(tenant); - testBroadcastEntityStateChangeEventTime(tenantId, tenantId, cntTime); + testBroadcastEntityStateChangeEventTime(tenantId, tenantId, cntTime); Mockito.reset(tbClusterService); } private void testBroadcastEntityStateChangeEventNeverTenant() { Mockito.verify(tbClusterService, never()).onTenantChange(Mockito.any(Tenant.class), - Mockito.isNull()); + Mockito.isNull()); testBroadcastEntityStateChangeEventNever(createEntityId_NULL_UUID(new Tenant())); Mockito.reset(tbClusterService); } diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index c03c2babaa..6bd0cd9c0e 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -447,7 +447,7 @@ public class TbRuleEngineQueueConsumerManagerTest { verifyMsgProcessed(consumer1.testMsg); verifyMsgProcessed(consumer2.testMsg); - consumerManager.delete(); + consumerManager.delete(true); await().atMost(2, TimeUnit.SECONDS) .untilAsserted(() -> { @@ -488,7 +488,7 @@ public class TbRuleEngineQueueConsumerManagerTest { verifySubscribedAndLaunched(consumer, partitions); verifyMsgProcessed(consumer.testMsg); - consumerManager.delete(); + consumerManager.delete(true); await().atMost(2, TimeUnit.SECONDS) .untilAsserted(() -> { diff --git a/application/src/test/resources/logback-test.xml b/application/src/test/resources/logback-test.xml index 3d0ce3f933..981bcab132 100644 --- a/application/src/test/resources/logback-test.xml +++ b/application/src/test/resources/logback-test.xml @@ -38,8 +38,6 @@ - - diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 1b496c8aca..6caf17aeef 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -35,7 +35,6 @@ import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent; import org.thingsboard.server.queue.util.AfterStartUp; import javax.annotation.PostConstruct; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -190,14 +189,25 @@ public class HashPartitionService implements PartitionService { myPartitions.remove(queueKey); partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); - //TODO: remove after merging tb entity services - removeTenant(tenantId); - + evictTenantInfo(tenantId); if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet())); } } + @Override + public void removeTenant(TenantId tenantId) { + List queueKeys = partitionSizesMap.keySet().stream() + .filter(queueKey -> tenantId.equals(queueKey.getTenantId())) + .collect(Collectors.toList()); + queueKeys.forEach(queueKey -> { + myPartitions.remove(queueKey); + partitionTopicsMap.remove(queueKey); + partitionSizesMap.remove(queueKey); + }); + evictTenantInfo(tenantId); + } + @Override public boolean isManagedByCurrentService(TenantId tenantId) { Set assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles(); @@ -258,6 +268,7 @@ public class HashPartitionService implements PartitionService { @Override public synchronized void recalculatePartitions(ServiceInfo currentService, List otherServices) { + log.info("Recalculating partitions"); tbTransportServicesByType.clear(); responsibleServices.clear(); logServiceInfo(currentService); @@ -274,9 +285,14 @@ public class HashPartitionService implements PartitionService { final ConcurrentMap> newPartitions = new ConcurrentHashMap<>(); partitionSizesMap.forEach((queueKey, size) -> { for (int i = 0; i < size; i++) { - ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i); - if (currentService.equals(serviceInfo)) { - newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); + try { + ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i); + log.trace("Server responsible for {}[{}] - {}", queueKey, i, serviceInfo != null ? serviceInfo.getServiceId() : "none"); + if (currentService.equals(serviceInfo)) { + newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i); + } + } catch (Exception e) { + log.warn("Failed to resolve server responsible for {}[{}]", queueKey, i, e); } } }); @@ -399,7 +415,7 @@ public class HashPartitionService implements PartitionService { } @Override - public void removeTenant(TenantId tenantId) { + public void evictTenantInfo(TenantId tenantId) { tenantRoutingInfoMap.remove(tenantId); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 48e5b55ece..2b2479f59b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -59,7 +59,7 @@ public interface PartitionService { int resolvePartitionIndex(UUID entityId, int partitions); - void removeTenant(TenantId tenantId); + void evictTenantInfo(TenantId tenantId); int countTransportsByType(String type); @@ -67,6 +67,8 @@ public interface PartitionService { void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg); + void removeTenant(TenantId tenantId); + boolean isManagedByCurrentService(TenantId tenantId); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index a6a6f5d196..f397a6c02a 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -96,6 +96,8 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.QueueKey; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; @@ -931,8 +933,8 @@ public class DefaultTransportService extends TransportActivityManager implements Optional profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); if (profileOpt.isPresent()) { Tenant tenant = profileOpt.get(); - partitionService.removeTenant(tenant.getId()); boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId()); + partitionService.evictTenantInfo(tenant.getId()); if (updated) { rateLimitService.update(tenant.getId()); } @@ -957,7 +959,9 @@ public class DefaultTransportService extends TransportActivityManager implements } else if (EntityType.TENANT_PROFILE.equals(entityType)) { tenantProfileCache.remove(new TenantProfileId(entityUuid)); } else if (EntityType.TENANT.equals(entityType)) { - rateLimitService.remove(TenantId.fromUUID(entityUuid)); + TenantId tenantId = TenantId.fromUUID(entityUuid); + rateLimitService.remove(tenantId); + partitionService.removeTenant(tenantId); } else if (EntityType.DEVICE.equals(entityType)) { rateLimitService.remove(new DeviceId(entityUuid)); onDeviceDeleted(new DeviceId(entityUuid)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java index 0f7cc40abd..7220a3acdb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java @@ -215,7 +215,7 @@ public class BaseAssetService extends AbstractCachedEntityService