diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index f82db6ebe7..bade75df00 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -246,6 +246,7 @@ public class ActorSystemContext { private RuleNodeStateService ruleNodeStateService; @Autowired + @Getter private PartitionService partitionService; @Autowired diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 1461654216..98e0bf7c25 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -46,6 +46,7 @@ import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import java.util.HashSet; +import java.util.Optional; import java.util.Set; @Slf4j @@ -129,8 +130,11 @@ public class AppActor extends ContextAwareActor { PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT); for (Tenant tenant : tenantIterator) { log.debug("[{}] Creating tenant actor", tenant.getId()); - getOrCreateTenantActor(tenant.getId()); - log.debug("[{}] Tenant actor created.", tenant.getId()); + getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(tenantActor -> { + log.debug("[{}] Tenant actor created.", tenant.getId()); + }, () -> { + log.debug("[{}] Skipped actor creation", tenant.getId()); + }); } } log.info("Main system actor started."); @@ -143,11 +147,9 @@ public class AppActor extends ContextAwareActor { if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) { msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!")); } else { - if (!deletedTenants.contains(msg.getTenantId())) { - getOrCreateTenantActor(msg.getTenantId()).tell(msg); - } else { - msg.getMsg().getCallback().onSuccess(); - } + getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> { + actor.tell(msg); + }, () -> msg.getMsg().getCallback().onSuccess()); } } @@ -164,12 +166,13 @@ public class AppActor extends ContextAwareActor { log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); deletedTenants.add(tenantId); ctx.stop(new TbEntityActorId(tenantId)); - } else { - target = getOrCreateTenantActor(msg.getTenantId()); + return; } - } else { - target = getOrCreateTenantActor(msg.getTenantId()); } + target = getOrCreateTenantActor(msg.getTenantId()).orElseGet(() -> { + log.debug("Ignoring component lifecycle msg for tenant {} because it is not managed by this service", msg.getTenantId()); + return null; + }); } if (target != null) { target.tellWithHighPriority(msg); @@ -179,24 +182,28 @@ public class AppActor extends ContextAwareActor { } private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) { - if (!deletedTenants.contains(msg.getTenantId())) { - TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId()); + getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> { if (priority) { tenantActor.tellWithHighPriority(msg); } else { tenantActor.tell(msg); } - } else { + }, () -> { if (msg instanceof TransportToDeviceActorMsgWrapper) { ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); } - } + }); } - private TbActorRef getOrCreateTenantActor(TenantId tenantId) { - return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), + private Optional getOrCreateTenantActor(TenantId tenantId) { + if (deletedTenants.contains(tenantId)) { + return Optional.empty(); + } + return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, - () -> new TenantActor.ActorCreator(systemContext, tenantId)); + () -> new TenantActor.ActorCreator(systemContext, tenantId), + () -> systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE) || + systemContext.getPartitionService().isManagedByCurrentService(tenantId))); } private void onToEdgeSessionMsg(EdgeSessionMsg msg) { @@ -204,7 +211,7 @@ public class AppActor extends ContextAwareActor { if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) { log.warn("Message has system tenant id: {}", msg); } else { - target = getOrCreateTenantActor(msg.getTenantId()); + target = getOrCreateTenantActor(msg.getTenantId()).orElse(null); } if (target != null) { target.tellWithHighPriority(msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index de617367dd..cbfd78180b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -167,7 +167,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor DefaultActorService.RULE_DISPATCHER_NAME, - () -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleChainName, ruleNode.getId())); + () -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleChainName, ruleNode.getId()), + () -> true); } private void initRoutes(RuleChain ruleChain, List ruleNodeList) { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java index 7f919754fc..987554c683 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java @@ -94,7 +94,8 @@ public abstract class RuleChainManagerActor extends ContextAwareActor { } else { return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain); } - }); + }, + () -> true); } protected TbActorRef getEntityActorRef(EntityId entityId) { diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 11a5895fef..c9f5448df3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -32,7 +32,6 @@ import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; -import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; @@ -82,21 +81,21 @@ public class TenantActor extends RuleChainManagerActor { cantFindTenant = true; log.info("[{}] Started tenant actor for missing tenant.", tenantId); } else { - TenantProfile tenantProfile = systemContext.getTenantProfileCache().get(tenant.getTenantProfileId()); - isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); if (isRuleEngine) { - try { - if (getApiUsageState().isReExecEnabled()) { - log.debug("[{}] Going to init rule chains", tenantId); - initRuleChains(); - } else { - log.info("[{}] Skip init of the rule chains due to API limits", tenantId); + if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { + try { + if (getApiUsageState().isReExecEnabled()) { + log.debug("[{}] Going to init rule chains", tenantId); + initRuleChains(); + } else { + log.info("[{}] Skip init of the rule chains due to API limits", tenantId); + } + } catch (Exception e) { + log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); + cantFindTenant = true; } - } catch (Exception e) { - log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); - cantFindTenant = true; } } log.debug("[{}] Tenant actor started.", tenantId); @@ -270,7 +269,8 @@ public class TenantActor extends RuleChainManagerActor { private TbActorRef getOrCreateDeviceActor(DeviceId deviceId) { return ctx.getOrCreateChildActor(new TbEntityActorId(deviceId), () -> DefaultActorService.DEVICE_DISPATCHER_NAME, - () -> new DeviceActorCreator(systemContext, tenantId, deviceId)); + () -> new DeviceActorCreator(systemContext, tenantId, deviceId), + () -> true); } private void onToEdgeSessionMsg(EdgeSessionMsg msg) { diff --git a/application/src/main/java/org/thingsboard/server/controller/QueueController.java b/application/src/main/java/org/thingsboard/server/controller/QueueController.java index 0e79ae7955..faeea1a4aa 100644 --- a/application/src/main/java/org/thingsboard/server/controller/QueueController.java +++ b/application/src/main/java/org/thingsboard/server/controller/QueueController.java @@ -126,7 +126,6 @@ public class QueueController extends BaseController { @PreAuthorize("hasAnyAuthority('SYS_ADMIN')") @RequestMapping(value = "/queues", params = {"serviceType"}, method = RequestMethod.POST) @ResponseBody - public Queue saveQueue(@ApiParam(value = "A JSON value representing the queue.") @RequestBody Queue queue, @ApiParam(value = QUEUE_SERVICE_TYPE_DESCRIPTION, allowableValues = QUEUE_SERVICE_TYPE_ALLOWABLE_VALUES, required = true) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index 40d294d238..b0ef8f4cde 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.service.entitiy.queue; -import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.server.cluster.TbClusterService; @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.queue.TbQueueAdmin; -import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.AbstractTbEntityService; @@ -35,20 +34,17 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j @Service @TbCoreComponent -@AllArgsConstructor +@RequiredArgsConstructor public class DefaultTbQueueService extends AbstractTbEntityService implements TbQueueService { - private static final long DELETE_DELAY = 30; private final QueueService queueService; private final TbClusterService tbClusterService; private final TbQueueAdmin tbQueueAdmin; - private final SchedulerComponent scheduler; @Override public Queue saveQueue(Queue queue) { @@ -90,7 +86,9 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb private void onQueueCreated(Queue queue) { for (int i = 0; i < queue.getPartitions(); i++) { tbQueueAdmin.createTopicIfNotExists( - new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); + new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), + queue.getCustomProperties() + ); } tbClusterService.onQueueChange(queue); @@ -105,21 +103,15 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName()); for (int i = oldPartitions; i < currentPartitions; i++) { tbQueueAdmin.createTopicIfNotExists( - new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName()); + new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), + queue.getCustomProperties() + ); } tbClusterService.onQueueChange(queue); } else { log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); tbClusterService.onQueueChange(queue); - - scheduler.schedule(() -> { - for (int i = currentPartitions; i < oldPartitions; i++) { - String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(); - log.info("Removed partition [{}]", fullTopicName); - tbQueueAdmin.deleteTopic( - fullTopicName); - } - }, DELETE_DELAY, TimeUnit.SECONDS); + // TODO: move all the messages left in old partitions and delete topics } } else if (!oldQueue.equals(queue)) { tbClusterService.onQueueChange(queue); @@ -128,20 +120,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb private void onQueueDeleted(Queue queue) { tbClusterService.onQueueDelete(queue); - // queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId); - - scheduler.schedule(() -> { - for (int i = 0; i < queue.getPartitions(); i++) { - String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(); - log.info("Deleting queue [{}]", fullTopicName); - try { - tbQueueAdmin.deleteTopic(fullTopicName); - } catch (Exception e) { - log.error("Failed to delete queue [{}]", fullTopicName); - } - } - }, DELETE_DELAY, TimeUnit.SECONDS); } @Override @@ -193,6 +172,10 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb } } + if (log.isDebugEnabled()) { + log.debug("[{}] Handling profile queue config update: creating queues {}, updating {}, deleting {}. Affected tenants: {}", + newTenantProfile.getUuidId(), toCreate, toUpdate, toRemove, tenantIds); + } tenantIds.forEach(tenantId -> { toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java index 1219a5b929..15bcc2e040 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java @@ -44,16 +44,14 @@ public class DefaultTbTenantProfileService extends AbstractTbEntityService imple @Override public TenantProfile save(TenantId tenantId, TenantProfile tenantProfile, TenantProfile oldTenantProfile) throws ThingsboardException { TenantProfile savedTenantProfile = checkNotNull(tenantProfileService.saveTenantProfile(tenantId, tenantProfile)); - if (oldTenantProfile != null && savedTenantProfile.isIsolatedTbRuleEngine()) { - List tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId()); - tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile); - } - tenantProfileCache.put(savedTenantProfile); tbClusterService.onTenantProfileChange(savedTenantProfile, null); tbClusterService.broadcastEntityStateChangeEvent(TenantId.SYS_TENANT_ID, savedTenantProfile.getId(), tenantProfile.getId() == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); + List tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId()); + tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile); + return savedTenantProfile; } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index ac7d796e33..670dbcaa6d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -23,11 +23,14 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.gen.MsgProtos; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.RuleNodeInfo; @@ -42,12 +45,14 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; +import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbRuleEngineComponent; @@ -98,6 +103,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private boolean statsEnabled; @Value("${queue.rule-engine.prometheus-stats.enabled:false}") boolean prometheusStatsEnabled; + @Value("${queue.rule-engine.topic-deletion-delay:30}") + private int topicDeletionDelayInSec; private final StatsFactory statsFactory; private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory; @@ -107,7 +114,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private final TbRuleEngineDeviceRpcService tbDeviceRpcService; private final TbServiceInfoProvider serviceInfoProvider; private final QueueService queueService; - // private final TenantId tenantId; + private final TbQueueProducerProvider producerProvider; + private final TbQueueAdmin queueAdmin; private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); private final ConcurrentMap consumerConfigurations = new ConcurrentHashMap<>(); private final ConcurrentMap consumerStats = new ConcurrentHashMap<>(); @@ -128,7 +136,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, PartitionService partitionService, ApplicationEventPublisher eventPublisher, - TbServiceInfoProvider serviceInfoProvider, QueueService queueService) { + TbServiceInfoProvider serviceInfoProvider, QueueService queueService, + TbQueueProducerProvider producerProvider, TbQueueAdmin queueAdmin) { super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer(), Optional.empty()); this.statisticsService = statisticsService; this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; @@ -138,6 +147,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< this.statsFactory = statsFactory; this.serviceInfoProvider = serviceInfoProvider; this.queueService = queueService; + this.producerProvider = producerProvider; + this.queueAdmin = queueAdmin; } @PostConstruct @@ -145,14 +156,16 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< super.init("tb-rule-engine-consumer", "tb-rule-engine-notifications-consumer"); List queues = queueService.findAllQueues(); for (Queue configuration : queues) { - initConsumer(configuration); + if (partitionService.isManagedByCurrentService(configuration.getTenantId())) { + initConsumer(configuration); + } } } private void initConsumer(Queue configuration) { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, configuration); consumerConfigurations.putIfAbsent(queueKey, configuration); - consumerStats.putIfAbsent(queueKey, new TbRuleEngineConsumerStats(configuration.getName(), statsFactory)); + consumerStats.putIfAbsent(queueKey, new TbRuleEngineConsumerStats(configuration, statsFactory)); if (!configuration.isConsumerPerPartition()) { consumers.computeIfAbsent(queueKey, queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration)); } else { @@ -172,7 +185,12 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< if (event.getServiceType().equals(getServiceType())) { String serviceQueue = event.getQueueKey().getQueueName(); log.info("[{}] Subscribing to partitions: {}", serviceQueue, event.getPartitions()); - if (!consumerConfigurations.get(event.getQueueKey()).isConsumerPerPartition()) { + Queue configuration = consumerConfigurations.get(event.getQueueKey()); + if (configuration == null) { + log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey()); + return; + } + if (!configuration.isConsumerPerPartition()) { consumers.get(event.getQueueKey()).subscribe(event.getPartitions()); } else { log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, event.getPartitions()); @@ -230,7 +248,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< launchConsumer(consumer, consumerConfigurations.get(queueKey), consumerStats.get(queueKey), "" + queueKey + "-" + tpi.getPartition().orElse(-999999)); consumer.subscribe(Collections.singleton(tpi)); }); - } finally { tbTopicWithConsumerPerPartition.getLock().unlock(); } @@ -278,9 +295,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< void consumerLoop(TbQueueConsumer> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { updateCurrentThreadName(threadSuffix); - while (!stopped && !consumer.isStopped()) { + while (!stopped && !consumer.isStopped() && !consumer.isQueueDeleted()) { try { - List> msgs = consumer.poll(pollDuration); + List> msgs = consumer.poll(configuration.getPollInterval()); if (msgs.isEmpty()) { continue; } @@ -328,6 +345,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } } + + if (consumer.isQueueDeleted()) { + processQueueDeletion(configuration, consumer); + } log.info("TB Rule Engine Consumer stopped."); } @@ -425,32 +446,34 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { log.info("Received queue update msg: [{}]", queueUpdateMsg); - String queueName = queueUpdateMsg.getQueueName(); TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); - QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId); - Queue queue = queueService.findQueueById(tenantId, queueId); - Queue oldQueue = consumerConfigurations.remove(queueKey); - if (oldQueue != null) { - if (oldQueue.isConsumerPerPartition()) { - TbTopicWithConsumerPerPartition consumerPerPartition = topicsConsumerPerPartition.remove(queueKey); - ReentrantLock lock = consumerPerPartition.getLock(); - try { - lock.lock(); - consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe); - } finally { - lock.unlock(); + if (partitionService.isManagedByCurrentService(tenantId)) { + QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); + String queueName = queueUpdateMsg.getQueueName(); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); + Queue queue = queueService.findQueueById(tenantId, queueId); + Queue oldQueue = consumerConfigurations.remove(queueKey); + if (oldQueue != null) { + if (oldQueue.isConsumerPerPartition()) { + TbTopicWithConsumerPerPartition consumerPerPartition = topicsConsumerPerPartition.remove(queueKey); + ReentrantLock lock = consumerPerPartition.getLock(); + try { + lock.lock(); + consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe); + } finally { + lock.unlock(); + } + } else { + TbQueueConsumer> consumer = consumers.remove(queueKey); + consumer.unsubscribe(); } - } else { - TbQueueConsumer> consumer = consumers.remove(queueKey); - consumer.unsubscribe(); } - } - initConsumer(queue); + initConsumer(queue); - if (!queue.isConsumerPerPartition()) { - launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName); + if (!queue.isConsumerPerPartition()) { + launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName); + } } partitionService.updateQueue(queueUpdateMsg); @@ -462,22 +485,22 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + partitionService.removeQueue(queueDeleteMsg); Queue queue = consumerConfigurations.remove(queueKey); if (queue != null) { if (queue.isConsumerPerPartition()) { TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.remove(queueKey); if (tbTopicWithConsumerPerPartition != null) { - tbTopicWithConsumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe); + tbTopicWithConsumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::onQueueDelete); tbTopicWithConsumerPerPartition.getConsumers().clear(); } } else { TbQueueConsumer> consumer = consumers.remove(queueKey); if (consumer != null) { - consumer.unsubscribe(); + consumer.onQueueDelete(); } } } - partitionService.removeQueue(queueDeleteMsg); } private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { @@ -496,6 +519,47 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< actorContext.tell(msg); } + private void processQueueDeletion(Queue queue, TbQueueConsumer> consumer) { + long finishTs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(topicDeletionDelayInSec); + try { + int n = 0; + while (System.currentTimeMillis() <= finishTs) { + List> msgs = consumer.poll(queue.getPollInterval()); + if (msgs.isEmpty()) { + continue; + } + for (TbProtoQueueMsg msg : msgs) { + try { + MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray()); + EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB())); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue.getName(), TenantId.SYS_TENANT_ID, originator); + producerProvider.getRuleEngineMsgProducer().send(tpi, msg, null); + n++; + } catch (Throwable e) { + log.debug("Failed to move message to system {}: {}", consumer.getTopic(), msg, e); + } + } + consumer.commit(); + } + if (n > 0) { + log.info("Moved {} messages from {} to system {}", n, consumer.getFullTopicNames(), consumer.getTopic()); + } + + consumer.unsubscribe(); + for (String topic : consumer.getFullTopicNames()) { + try { + queueAdmin.deleteTopic(topic); + log.info("Deleted topic {}", topic); + } catch (Exception e) { + log.error("Failed to delete topic {} after unsubscribing", topic, e); + } + } + } catch (Exception e) { + log.error("Failed to process deletion of {} ({})", consumer.getTopic(), queue.getTenantId(), e); + } + } + @Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}") public void printStats() { if (statsEnabled) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java index 400586235e..ea90364ef7 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java @@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; -import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.queue.discovery.TenantRoutingInfo; import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; @@ -31,12 +30,9 @@ import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; @ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine'") public class DefaultTenantRoutingInfoService implements TenantRoutingInfoService { - private final TenantService tenantService; - private final TbTenantProfileCache tenantProfileCache; - public DefaultTenantRoutingInfoService(TenantService tenantService, TbTenantProfileCache tenantProfileCache) { - this.tenantService = tenantService; + public DefaultTenantRoutingInfoService(TbTenantProfileCache tenantProfileCache) { this.tenantProfileCache = tenantProfileCache; } @@ -44,7 +40,7 @@ public class DefaultTenantRoutingInfoService implements TenantRoutingInfoService public TenantRoutingInfo getRoutingInfo(TenantId tenantId) { TenantProfile tenantProfile = tenantProfileCache.get(tenantId); if (tenantProfile != null) { - return new TenantRoutingInfo(tenantId, tenantProfile.isIsolatedTbRuleEngine()); + return new TenantRoutingInfo(tenantId, tenantProfile.getId(), tenantProfile.isIsolatedTbRuleEngine()); } else { throw new TenantNotFoundException(tenantId); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java index 77e805eb62..2904c299ce 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java @@ -18,6 +18,7 @@ package org.thingsboard.server.service.queue; import io.micrometer.core.instrument.Timer; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.stats.StatsCounter; import org.thingsboard.server.common.stats.StatsFactory; @@ -63,9 +64,11 @@ public class TbRuleEngineConsumerStats { private final ConcurrentMap tenantExceptions = new ConcurrentHashMap<>(); private final String queueName; + private final TenantId tenantId; - public TbRuleEngineConsumerStats(String queueName, StatsFactory statsFactory) { - this.queueName = queueName; + public TbRuleEngineConsumerStats(Queue queue, StatsFactory statsFactory) { + this.queueName = queue.getName(); + this.tenantId = queue.getTenantId(); this.statsFactory = statsFactory; String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName; @@ -156,7 +159,11 @@ public class TbRuleEngineConsumerStats { counters.forEach(counter -> { stats.append(counter.getName()).append(" = [").append(counter.get()).append("] "); }); - log.info("[{}] Stats: {}", queueName, stats); + if (tenantId.isSysTenantId()) { + log.info("[{}] Stats: {}", queueName, stats); + } else { + log.info("[{}][{}] Stats: {}", queueName, tenantId, stats); + } } } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index e874e41cd7..d5aa55f1be 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1091,6 +1091,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" @@ -1287,6 +1288,8 @@ queue: failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries; max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries. + # After a queue is deleted (or profile's isolation option was disabled), Rule Engine will continue reading related topics during this period, before deleting the actual topics + topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:30}" transport: # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}" @@ -1300,6 +1303,10 @@ service: type: "${TB_SERVICE_TYPE:monolith}" # monolith or tb-core or tb-rule-engine # Unique id for this service (autogenerated if empty) id: "${TB_SERVICE_ID:}" + rule_engine: + # Comma-separated list of tenant profiles ids assigned to this Rule Engine. + # This Rule Engine will only be responsible for tenants with these profiles (in case 'isolation' option is enabled in profile). + assigned_tenant_profiles: "${TB_RULE_ENGINE_ASSIGNED_TENANT_PROFILES:}" metrics: # Enable/disable actuator metrics. diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index c32b4cf8d7..888c9de367 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -1029,13 +1029,20 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return (DeviceActorMessageProcessor) ReflectionTestUtils.getField(actor, "processor"); } - protected void updateDefaultTenantProfile(Consumer updater) throws ThingsboardException { - TenantProfile tenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID); - TenantProfileData profileData = tenantProfile.getProfileData(); - DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration(); - updater.accept(profileConfiguration); - tenantProfile.setProfileData(profileData); - tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, null); + protected void updateDefaultTenantProfileConfig(Consumer updater) throws ThingsboardException { + updateDefaultTenantProfile(tenantProfile -> { + TenantProfileData profileData = tenantProfile.getProfileData(); + DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration(); + updater.accept(profileConfiguration); + tenantProfile.setProfileData(profileData); + }); + } + + protected void updateDefaultTenantProfile(Consumer updater) throws ThingsboardException { + TenantProfile oldTenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID); + TenantProfile tenantProfile = JacksonUtil.clone(oldTenantProfile); + updater.accept(tenantProfile); + tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, oldTenantProfile); } } diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java index a5b86fff85..4b594d55f9 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java @@ -27,15 +27,20 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatcher; import org.mockito.Mockito; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; import org.springframework.test.web.servlet.ResultActions; import org.thingsboard.common.util.ThingsBoardExecutors; +import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantInfo; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; @@ -50,24 +55,47 @@ import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileCon import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.discovery.PartitionService; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; +import java.util.Deque; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.containsString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; +import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; +import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_TOPIC; @TestPropertySource(properties = { "js.evaluator=mock", + "queue.rule-engine.topic-deletion-delay=10" }) @Slf4j @DaoSqlTest @@ -80,6 +108,13 @@ public class TenantControllerTest extends AbstractControllerTest { ListeningExecutorService executor; + @SpyBean + private PartitionService partitionService; + @SpyBean + private ActorSystemContext actorContext; + @SpyBean + private TbQueueAdmin queueAdmin; + @Before public void setUp() throws Exception { executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(8, getClass())); @@ -87,6 +122,12 @@ public class TenantControllerTest extends AbstractControllerTest { @After public void tearDown() throws Exception { + loginSysAdmin(); + for (Queue queue : doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference>() {}, new PageLink(100)).getData()) { + if (!queue.getName().equals(MAIN_QUEUE_NAME)) { + doDelete("/api/queues/" + queue.getId()).andExpect(status().isOk()); + } + } executor.shutdownNow(); } @@ -430,7 +471,7 @@ public class TenantControllerTest extends AbstractControllerTest { tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration()); tenantProfile.setProfileData(tenantProfileData); tenantProfile.setIsolatedTbRuleEngine(true); - addQueueConfig(tenantProfile, DataConstants.MAIN_QUEUE_NAME); + addQueueConfig(tenantProfile, MAIN_QUEUE_NAME); addQueueConfig(tenantProfile, "Test"); tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); @@ -459,7 +500,7 @@ public class TenantControllerTest extends AbstractControllerTest { tenantProfileData2.setConfiguration(new DefaultTenantProfileConfiguration()); tenantProfile2.setProfileData(tenantProfileData2); tenantProfile2.setIsolatedTbRuleEngine(true); - addQueueConfig(tenantProfile2, DataConstants.MAIN_QUEUE_NAME); + addQueueConfig(tenantProfile2, MAIN_QUEUE_NAME); addQueueConfig(tenantProfile2, "Test"); addQueueConfig(tenantProfile2, "Test2"); tenantProfile2 = doPost("/api/tenantProfile", tenantProfile2, TenantProfile.class); @@ -520,10 +561,201 @@ public class TenantControllerTest extends AbstractControllerTest { doDelete("/api/tenant/" + tenant.getId().getId().toString()).andExpect(status().isOk()); } + @Test + public void testUpdateTenantProfileToIsolated() throws Exception { + loginSysAdmin(); + doPost("/api/queues?serviceType=TB_RULE_ENGINE", new Queue(TenantId.SYS_TENANT_ID, getQueueConfig(DataConstants.HP_QUEUE_NAME, DataConstants.HP_QUEUE_TOPIC))).andExpect(status().isOk()); + TenantProfile tenantProfile = new TenantProfile(); + tenantProfile.setName("Test profile"); + TenantProfileData tenantProfileData = new TenantProfileData(); + tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration()); + tenantProfile.setProfileData(tenantProfileData); + tenantProfile.setIsolatedTbRuleEngine(false); + tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); + createDifferentTenant(); + loginSysAdmin(); + savedDifferentTenant.setTenantProfileId(tenantProfile.getId()); + savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class); + TenantId tenantId = differentTenantId; + + loginDifferentTenant(); + DeviceProfile hpQueueProfile = createDeviceProfile("HighPriority profile"); + hpQueueProfile.setDefaultQueueName(DataConstants.HP_QUEUE_NAME); + hpQueueProfile = doPost("/api/deviceProfile", hpQueueProfile, DeviceProfile.class); + Device hpQueueDevice = createDevice("HP", hpQueueProfile.getName(), "HP"); + + DeviceProfile mainQueueProfile = createDeviceProfile("Main profile"); + mainQueueProfile.setDefaultQueueName(MAIN_QUEUE_NAME); + mainQueueProfile = doPost("/api/deviceProfile", mainQueueProfile, DeviceProfile.class); + Device mainQueueDevice = createDevice("Main", mainQueueProfile.getName(), "Main"); + + verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, tenantId, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTopic()).isEqualTo(DataConstants.HP_QUEUE_TOPIC); + assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID); + }); + verifyUsedQueueAndMessage(MAIN_QUEUE_NAME, tenantId, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTopic()).isEqualTo(MAIN_QUEUE_TOPIC); + assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID); + }); + + loginSysAdmin(); + tenantProfile.setIsolatedTbRuleEngine(true); + tenantProfile.getProfileData().setQueueConfiguration(List.of( + getQueueConfig(MAIN_QUEUE_NAME, MAIN_QUEUE_TOPIC) + )); + tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); + + loginDifferentTenant(); + verifyUsedQueueAndMessage(MAIN_QUEUE_NAME, tenantId, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTopic()).isEqualTo(MAIN_QUEUE_TOPIC); + assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId); + }); + verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, tenantId, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTopic()).isEqualTo(DataConstants.HP_QUEUE_TOPIC); + assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID); + }); + + loginSysAdmin(); + tenantProfile.setIsolatedTbRuleEngine(true); + tenantProfile.getProfileData().setQueueConfiguration(List.of( + getQueueConfig(MAIN_QUEUE_NAME, MAIN_QUEUE_TOPIC), + getQueueConfig(DataConstants.HP_QUEUE_NAME, DataConstants.HP_QUEUE_TOPIC) + )); + tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); + + loginDifferentTenant(); + verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, tenantId, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTopic()).isEqualTo(DataConstants.HP_QUEUE_TOPIC); + assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId); + }); + verifyUsedQueueAndMessage(MAIN_QUEUE_NAME, tenantId, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> { + doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class); + }, usedTpi -> { + assertThat(usedTpi.getTopic()).isEqualTo(MAIN_QUEUE_TOPIC); + assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId); + }); + } + + @Test + public void testIsolatedQueueDeletion() throws Exception { + loginSysAdmin(); + TenantProfile tenantProfile = new TenantProfile(); + tenantProfile.setName("Test profile"); + TenantProfileData tenantProfileData = new TenantProfileData(); + tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration()); + tenantProfile.setProfileData(tenantProfileData); + tenantProfile.setIsolatedTbRuleEngine(true); + addQueueConfig(tenantProfile, MAIN_QUEUE_NAME); + tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); + createDifferentTenant(); + loginSysAdmin(); + savedDifferentTenant.setTenantProfileId(tenantProfile.getId()); + savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class); + TenantId tenantId = differentTenantId; + await().atMost(10, TimeUnit.SECONDS) + .until(() -> { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId); + return !tpi.getTenantId().get().isSysTenantId(); + }); + TopicPartitionInfo tpi = new TopicPartitionInfo(MAIN_QUEUE_TOPIC, tenantId, 0, false); + String isolatedTopic = tpi.getFullTopicName(); + TbMsg expectedMsg = publishTbMsg(tenantId, tpi); + awaitTbMsg(tbMsg -> tbMsg.getId().equals(expectedMsg.getId()), 10000); // to wait for consumer start + + loginSysAdmin(); + tenantProfile.setIsolatedTbRuleEngine(false); + tenantProfile.getProfileData().setQueueConfiguration(Collections.emptyList()); + tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId) + .getTenantId().get().isSysTenantId()); + + Deque submittedMsgs = new LinkedList<>(); + await().atLeast(8, TimeUnit.SECONDS) // due to topic-deletion-delay + .atMost(20, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .untilAsserted(() -> { + TbMsg tbMsg = publishTbMsg(tenantId, tpi); + submittedMsgs.add(tbMsg.getId()); + + verify(queueAdmin, times(1)).deleteTopic(eq(isolatedTopic)); + }); + submittedMsgs.removeLast(); + for (UUID msgId : submittedMsgs) { + verify(actorContext, timeout(2000)).tell(argThat(msg -> { + return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(msgId); + })); + } + } + + private TbMsg publishTbMsg(TenantId tenantId, TopicPartitionInfo tpi) { + TbMsg tbMsg = TbMsg.newMsg("POST_TELEMETRY_REQUEST", tenantId, TbMsgMetaData.EMPTY, "{\"test\":1}"); + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null); + return tbMsg; + } + + private void verifyUsedQueueAndMessage(String queue, TenantId tenantId, EntityId entityId, String msgType, Runnable action, Consumer tpiAssert) { + await().atMost(15, TimeUnit.SECONDS) + .untilAsserted(() -> { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId); + tpiAssert.accept(tpi); + }); + action.run(); + TbMsg tbMsg = awaitTbMsg(msg -> msg.getOriginator().equals(entityId) + && msg.getType().equals(msgType), 10000); + assertThat(tbMsg.getQueueName()).isEqualTo(queue); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId); + tpiAssert.accept(tpi); + } + + protected TbMsg awaitTbMsg(Predicate predicate, int timeoutMillis) { + AtomicReference tbMsgCaptor = new AtomicReference<>(); + verify(actorContext, timeout(timeoutMillis).atLeastOnce()).tell(argThat(actorMsg -> { + if (!(actorMsg instanceof QueueToRuleEngineMsg)) { + return false; + } + TbMsg tbMsg = ((QueueToRuleEngineMsg) actorMsg).getMsg(); + if (predicate.test(tbMsg)) { + tbMsgCaptor.set(tbMsg); + return true; + } + return false; + })); + return tbMsgCaptor.get(); + } + private void addQueueConfig(TenantProfile tenantProfile, String queueName) { + TenantProfileQueueConfiguration queueConfiguration = getQueueConfig(queueName, "tb_rule_engine." + queueName.toLowerCase()); + TenantProfileData profileData = tenantProfile.getProfileData(); + + List configs = profileData.getQueueConfiguration(); + if (configs == null) { + configs = new ArrayList<>(); + } + configs.add(queueConfiguration); + profileData.setQueueConfiguration(configs); + tenantProfile.setProfileData(profileData); + } + + private TenantProfileQueueConfiguration getQueueConfig(String queueName, String topic) { TenantProfileQueueConfiguration queueConfiguration = new TenantProfileQueueConfiguration(); queueConfiguration.setName(queueName); - queueConfiguration.setTopic("tb_rule_engine." + queueName.toLowerCase()); + queueConfiguration.setTopic(topic); queueConfiguration.setPollInterval(25); queueConfiguration.setPartitions(1 + new Random().nextInt(99)); queueConfiguration.setConsumerPerPartition(true); @@ -539,15 +771,7 @@ public class TenantControllerTest extends AbstractControllerTest { processingStrategy.setPauseBetweenRetries(3); processingStrategy.setMaxPauseBetweenRetries(3); queueConfiguration.setProcessingStrategy(processingStrategy); - TenantProfileData profileData = tenantProfile.getProfileData(); - - List configs = profileData.getQueueConfiguration(); - if (configs == null) { - configs = new ArrayList<>(); - } - configs.add(queueConfiguration); - profileData.setQueueConfiguration(configs); - tenantProfile.setProfileData(profileData); + return queueConfiguration; } private List getQueuesFromConfig(List queueConfiguration, List queues) { diff --git a/application/src/test/java/org/thingsboard/server/controller/TenantProfileControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/TenantProfileControllerTest.java index e6b37143ad..eba9fb8b12 100644 --- a/application/src/test/java/org/thingsboard/server/controller/TenantProfileControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/TenantProfileControllerTest.java @@ -167,23 +167,6 @@ public class TenantProfileControllerTest extends AbstractControllerTest { testBroadcastEntityStateChangeEventNeverTenantProfile(); } - @Test - public void testSaveSameTenantProfileWithDifferentIsolatedTbRuleEngine() throws Exception { - loginSysAdmin(); - TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile"); - TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); - savedTenantProfile.setIsolatedTbRuleEngine(true); - addMainQueueConfig(savedTenantProfile); - - Mockito.reset(tbClusterService); - - doPost("/api/tenantProfile", savedTenantProfile) - .andExpect(status().isBadRequest()) - .andExpect(statusReason(containsString("Can't update isolatedTbRuleEngine property"))); - - testBroadcastEntityStateChangeEventNeverTenantProfile(); - } - @Test public void testDeleteTenantProfileWithExistingTenant() throws Exception { loginSysAdmin(); diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 25b06e7840..7bd9ec576f 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.discovery; import com.datastax.oss.driver.api.core.uuid.Uuids; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.ListUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -25,24 +26,36 @@ import org.junit.runner.RunWith; import org.mockito.junit.MockitoJUnitRunner; import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.QueueId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.data.id.UUIDBased; +import org.thingsboard.server.common.data.queue.Queue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @Slf4j @RunWith(MockitoJUnitRunner.class) @@ -57,7 +70,7 @@ public class HashPartitionServiceTest { private ApplicationEventPublisher applicationEventPublisher; private QueueRoutingInfoService queueRoutingInfoService; - private String hashFunctionName = "sha256"; + private String hashFunctionName = "murmur3_128"; @Before public void setup() throws Exception { @@ -74,15 +87,15 @@ public class HashPartitionServiceTest { ReflectionTestUtils.setField(clusterRoutingService, "vcTopic", "tb.vc"); ReflectionTestUtils.setField(clusterRoutingService, "vcPartitions", 10); ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName); - TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder() + ServiceInfo currentServer = ServiceInfo.newBuilder() .setServiceId("tb-core-0") .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name())) .build(); // when(queueService.resolve(Mockito.any(), Mockito.anyString())).thenAnswer(i -> i.getArguments()[1]); // when(discoveryService.getServiceInfo()).thenReturn(currentServer); - List otherServers = new ArrayList<>(); + List otherServers = new ArrayList<>(); for (int i = 1; i < SERVER_COUNT; i++) { - otherServers.add(TransportProtos.ServiceInfo.newBuilder() + otherServers.add(ServiceInfo.newBuilder() .setServiceId("tb-rule-" + i) .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name())) .build()); @@ -122,10 +135,10 @@ public class HashPartitionServiceTest { int queueCount = 3; int partitionCount = 3; - List services = new ArrayList<>(); + List services = new ArrayList<>(); for (int i = 0; i < serverCount; i++) { - services.add(TransportProtos.ServiceInfo.newBuilder().setServiceId("RE-" + i).build()); + services.add(ServiceInfo.newBuilder().setServiceId("RE-" + i).build()); } long start = System.currentTimeMillis(); @@ -140,7 +153,7 @@ public class HashPartitionServiceTest { for (int queueIndex = 0; queueIndex < queueCount; queueIndex++) { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "queue" + queueIndex, tenantId); for (int partition = 0; partition < partitionCount; partition++) { - TransportProtos.ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition); + ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition); String serviceId = serviceInfo.getServiceId(); map.put(serviceId, map.get(serviceId) + 1); } @@ -163,4 +176,124 @@ public class HashPartitionServiceTest { Assert.assertTrue(diffPercent < maxDiffPercent); } + @Test + public void testPartitionsAssignmentWithDedicatedServers() { + int isolatedProfilesCount = 5; + int tenantsCountPerProfile = 100; + int dedicatedServerSetsCount = 3; + int serversCountPerSet = 3; + int profilesPerSet = (int) Math.ceil((double) isolatedProfilesCount / dedicatedServerSetsCount); + + List isolatedTenantProfiles = Stream.generate(() -> new TenantProfileId(UUID.randomUUID())) + .limit(isolatedProfilesCount).collect(Collectors.toList()); + Map tenants = new HashMap<>(); + for (TenantProfileId tenantProfileId : isolatedTenantProfiles) { + for (int i = 0; i < tenantsCountPerProfile; i++) { + tenants.put(new TenantId(UUID.randomUUID()), tenantProfileId); + } + } + + List queues = new ArrayList<>(); + Queue systemQueue = new Queue(); + systemQueue.setTenantId(TenantId.SYS_TENANT_ID); + systemQueue.setName("Main"); + systemQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC); + systemQueue.setPartitions(10); + systemQueue.setId(new QueueId(UUID.randomUUID())); + queues.add(systemQueue); + tenants.forEach((tenantId, profileId) -> { + Queue isolatedQueue = new Queue(); + isolatedQueue.setTenantId(tenantId); + isolatedQueue.setName("Main"); + isolatedQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC); + isolatedQueue.setPartitions(2); + isolatedQueue.setId(new QueueId(UUID.randomUUID())); + queues.add(isolatedQueue); + when(routingInfoService.getRoutingInfo(eq(tenantId))).thenReturn(new TenantRoutingInfo(tenantId, profileId, true)); + }); + when(queueRoutingInfoService.getAllQueuesRoutingInfo()).thenReturn(queues.stream() + .map(QueueRoutingInfo::new).collect(Collectors.toList())); + + List ruleEngines = new ArrayList<>(); + Map> dedicatedServers = new HashMap<>(); + int serviceId = 0; + for (int i = 0; i < serversCountPerSet; i++) { + ServiceInfo commonServer = ServiceInfo.newBuilder() + .setServiceId("tb-rule-engine-" + serviceId) + .addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name())) + .build(); + ruleEngines.add(commonServer); + serviceId++; + } + for (int i = 0; i < dedicatedServerSetsCount; i++) { + List assignedProfiles = ListUtils.partition(isolatedTenantProfiles, profilesPerSet).get(i); + for (int j = 0; j < serversCountPerSet; j++) { + ServiceInfo dedicatedServer = ServiceInfo.newBuilder() + .setServiceId("tb-rule-engine-" + serviceId) + .addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name())) + .addAllAssignedTenantProfiles(assignedProfiles.stream().map(UUIDBased::toString).collect(Collectors.toList())) + .build(); + ruleEngines.add(dedicatedServer); + serviceId++; + + for (TenantProfileId assignedProfileId : assignedProfiles) { + dedicatedServers.computeIfAbsent(assignedProfileId, p -> new ArrayList<>()).add(dedicatedServer); + } + } + } + + Map>> serversPartitions = new HashMap<>(); + clusterRoutingService.init(); + for (ServiceInfo ruleEngine : ruleEngines) { + List other = new ArrayList<>(ruleEngines); + other.removeIf(serviceInfo -> serviceInfo.getServiceId().equals(ruleEngine.getServiceId())); + + clusterRoutingService.recalculatePartitions(ruleEngine, other); + clusterRoutingService.myPartitions.forEach((queueKey, partitions) -> { + serversPartitions.computeIfAbsent(queueKey, k -> new HashMap<>()).put(ruleEngine, partitions); + }); + } + assertThat(serversPartitions.keySet()).containsAll(queues.stream().map(queue -> new QueueKey(ServiceType.TB_RULE_ENGINE, queue)).collect(Collectors.toList())); + + serversPartitions.forEach((queueKey, partitionsPerServer) -> { + if (queueKey.getTenantId().isSysTenantId()) { + partitionsPerServer.forEach((server, partitions) -> { + assertThat(server.getAssignedTenantProfilesCount()).as("system queues are not assigned to dedicated servers").isZero(); + }); + } else { + List responsibleServers = dedicatedServers.get(tenants.get(queueKey.getTenantId())); + partitionsPerServer.forEach((server, partitions) -> { + assertThat(server.getAssignedTenantProfilesCount()).as("isolated queues are only assigned to dedicated servers").isPositive(); + assertThat(responsibleServers).contains(server); + }); + } + + List allPartitions = partitionsPerServer.values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + assertThat(allPartitions).doesNotHaveDuplicates(); + }); + } + + @Test + public void testIsManagedByCurrentServiceCheck() { + TenantProfileId isolatedProfileId = new TenantProfileId(UUID.randomUUID()); + when(discoveryService.getAssignedTenantProfiles()).thenReturn(Set.of(isolatedProfileId.getId())); // dedicated server + TenantProfileId regularProfileId = new TenantProfileId(UUID.randomUUID()); + + TenantId isolatedTenantId = new TenantId(UUID.randomUUID()); + when(routingInfoService.getRoutingInfo(eq(isolatedTenantId))).thenReturn(new TenantRoutingInfo(isolatedTenantId, isolatedProfileId, true)); + TenantId regularTenantId = new TenantId(UUID.randomUUID()); + when(routingInfoService.getRoutingInfo(eq(regularTenantId))).thenReturn(new TenantRoutingInfo(regularTenantId, regularProfileId, false)); + + assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue(); + assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isFalse(); + + + when(discoveryService.getAssignedTenantProfiles()).thenReturn(Collections.emptySet()); // common server + + assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue(); + assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isTrue(); + } + } diff --git a/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java b/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java index f89730b689..0a2854a28d 100644 --- a/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java +++ b/application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java @@ -342,7 +342,7 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest { @Test public void testNotificationRuleProcessing_entitiesLimit() throws Exception { int limit = 5; - updateDefaultTenantProfile(profileConfiguration -> { + updateDefaultTenantProfileConfig(profileConfiguration -> { profileConfiguration.setMaxDevices(limit); profileConfiguration.setMaxAssets(limit); profileConfiguration.setMaxCustomers(limit); @@ -421,10 +421,10 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest { int n = 10; updateDefaultTenantProfile(profileConfiguration -> { - profileConfiguration.setTenantEntityExportRateLimit(n + ":600"); - profileConfiguration.setCustomerServerRestLimitsConfiguration(n + ":600"); - profileConfiguration.setTenantNotificationRequestsPerRuleRateLimit(n + ":600"); - profileConfiguration.setTransportDeviceTelemetryMsgRateLimit(n + ":600"); + profileConfiguration.getProfileConfiguration().get().setTenantEntityExportRateLimit(n + ":600"); + profileConfiguration.getProfileConfiguration().get().setCustomerServerRestLimitsConfiguration(n + ":600"); + profileConfiguration.getProfileConfiguration().get().setTenantNotificationRequestsPerRuleRateLimit(n + ":600"); + profileConfiguration.getProfileConfiguration().get().setTransportDeviceTelemetryMsgRateLimit(n + ":600"); }); loginTenantAdmin(); NotificationRule rule = createNotificationRule(AlarmCommentNotificationRuleTriggerConfig.builder() @@ -608,7 +608,7 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest { @Test public void testNotificationRequestsPerRuleRateLimits() throws Exception { int notificationRequestsLimit = 10; - updateDefaultTenantProfile(profileConfiguration -> { + updateDefaultTenantProfileConfig(profileConfiguration -> { profileConfiguration.setTenantNotificationRequestsPerRuleRateLimit(notificationRequestsLimit + ":300"); }); @@ -691,8 +691,8 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest { int n = 5; updateDefaultTenantProfile(profileConfiguration -> { - profileConfiguration.setTenantEntityExportRateLimit(n + ":600"); - profileConfiguration.setTransportDeviceTelemetryMsgRateLimit(n + ":800"); + profileConfiguration.getProfileConfiguration().get().setTenantEntityExportRateLimit(n + ":600"); + profileConfiguration.getProfileConfiguration().get().setTransportDeviceTelemetryMsgRateLimit(n + ":800"); }); RateLimitsTrigger expectedTrigger = RateLimitsTrigger.builder() diff --git a/application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java index 6cabff788e..09e78df9ff 100644 --- a/application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java @@ -67,7 +67,7 @@ public class AlarmsCleanUpServiceTest extends AbstractControllerTest { @Test public void testAlarmsCleanUp() throws Exception { int ttlDays = 1; - updateDefaultTenantProfile(profileConfiguration -> { + updateDefaultTenantProfileConfig(profileConfiguration -> { profileConfiguration.setAlarmsTtlDays(ttlDays); }); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java index 2a8f641c4e..bbe19fb3dd 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java @@ -32,7 +32,7 @@ public interface TbActorCtx extends TbActorRef { void stop(TbActorId target); - TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier dispatcher, Supplier creator); + TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier dispatcher, Supplier creator, Supplier createCondition); void broadcastToChildren(TbActorMsg msg); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java index ad1604f7b0..34537c143c 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java @@ -15,7 +15,8 @@ */ package org.thingsboard.server.actors; -import lombok.Data; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.msg.MsgType; @@ -31,7 +32,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; @Slf4j -@Data +@Getter +@RequiredArgsConstructor public final class TbActorMailbox implements TbActorCtx { private static final boolean HIGH_PRIORITY = true; private static final boolean NORMAL_PRIORITY = false; @@ -212,9 +214,9 @@ public final class TbActorMailbox implements TbActorCtx { } @Override - public TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier dispatcher, Supplier creator) { + public TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier dispatcher, Supplier creator, Supplier createCondition) { TbActorRef actorRef = system.getActor(actorId); - if (actorRef == null) { + if (actorRef == null && createCondition.get()) { return system.createChildActor(dispatcher.get(), creator.get(), selfId); } else { return actorRef; diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java index 4b2bde733e..19aa0284ea 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java @@ -17,7 +17,11 @@ package org.thingsboard.server.queue; public interface TbQueueAdmin { - void createTopicIfNotExists(String topic); + default void createTopicIfNotExists(String topic) { + createTopicIfNotExists(topic, null); + } + + void createTopicIfNotExists(String topic, String properties); void destroy(); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java index 04439fc85d..9c41f9d342 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java @@ -36,4 +36,10 @@ public interface TbQueueConsumer { boolean isStopped(); + void onQueueDelete(); + + boolean isQueueDeleted(); + + List getFullTopicNames(); + } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index 80f44e59be..78614911d4 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -28,6 +28,7 @@ message ServiceInfo { repeated string serviceTypes = 2; repeated string transports = 6; SystemInfoProto systemInfo = 10; + repeated string assignedTenantProfiles = 11; } message SystemInfoProto { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/queue/Queue.java b/common/data/src/main/java/org/thingsboard/server/common/data/queue/Queue.java index 6839c60bcd..5b613a7c05 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/queue/Queue.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/queue/Queue.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.common.data.queue; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.JsonNode; import lombok.Data; import org.thingsboard.server.common.data.BaseDataWithAdditionalInfo; import org.thingsboard.server.common.data.HasName; @@ -25,6 +27,8 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi import org.thingsboard.server.common.data.validation.Length; import org.thingsboard.server.common.data.validation.NoXss; +import java.util.Optional; + @Data public class Queue extends BaseDataWithAdditionalInfo implements HasName, HasTenantId { private TenantId tenantId; @@ -60,4 +64,13 @@ public class Queue extends BaseDataWithAdditionalInfo implements HasNam this.processingStrategy = queueConfiguration.getProcessingStrategy(); setAdditionalInfo(queueConfiguration.getAdditionalInfo()); } -} \ No newline at end of file + + + @JsonIgnore + public String getCustomProperties() { + return Optional.ofNullable(getAdditionalInfo()) + .map(info -> info.get("customProperties")) + .filter(JsonNode::isTextual).map(JsonNode::asText).orElse(null); + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java index fe29c3a04f..7a8764325f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java @@ -99,7 +99,7 @@ public class RuleEngineTbQueueAdminFactory { return new TbQueueAdmin() { @Override - public void createTopicIfNotExists(String topic) { + public void createTopicIfNotExists(String topic, String properties) { } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java index e171bb7a31..d95d2064a4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java @@ -22,6 +22,7 @@ import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsExc import com.microsoft.azure.servicebus.primitives.ServiceBusException; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.util.PropertyUtils; import java.io.IOException; import java.time.Duration; @@ -60,7 +61,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin { } @Override - public void createTopicIfNotExists(String topic) { + public void createTopicIfNotExists(String topic, String properties) { if (queues.contains(topic)) { return; } @@ -68,7 +69,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin { try { QueueDescription queueDescription = new QueueDescription(topic); queueDescription.setRequiresDuplicateDetection(false); - setQueueConfigs(queueDescription); + setQueueConfigs(queueDescription, PropertyUtils.getProps(queueConfigs, properties)); client.createQueue(queueDescription); queues.add(topic); @@ -107,7 +108,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin { } } - private void setQueueConfigs(QueueDescription queueDescription) { + private void setQueueConfigs(QueueDescription queueDescription, Map queueConfigs) { queueConfigs.forEach((confKey, confValue) -> { switch (confKey) { case MAX_SIZE: diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 86146dabd1..2ebe41850d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -44,6 +44,7 @@ public abstract class AbstractTbQueueConsumerTemplate i protected volatile Set partitions; protected final ReentrantLock consumerLock = new ReentrantLock(); //NonfairSync final Queue> subscribeQueue = new ConcurrentLinkedQueue<>(); + protected volatile boolean queueDeleted = false; @Getter private final String topic; @@ -94,7 +95,7 @@ public abstract class AbstractTbQueueConsumerTemplate i partitions = subscribeQueue.poll(); } if (!subscribed) { - List topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + List topicNames = getFullTopicNames(); doSubscribe(topicNames); subscribed = true; } @@ -103,7 +104,9 @@ public abstract class AbstractTbQueueConsumerTemplate i consumerLock.unlock(); } - if (records.isEmpty()) { return sleepAndReturnEmpty(startNanos, durationInMillis); } + if (records.isEmpty() && !isLongPollingSupported()) { + return sleepAndReturnEmpty(startNanos, durationInMillis); + } return decodeRecords(records); } @@ -162,7 +165,9 @@ public abstract class AbstractTbQueueConsumerTemplate i @Override public void unsubscribe() { - log.info("unsubscribe topic and stop consumer {}", getTopic()); + log.info("Unsubscribing from topics and stopping consumer for topics {}", partitions.stream() + .map(TopicPartitionInfo::getFullTopicName) + .collect(Collectors.joining(", "))); stopped = true; consumerLock.lock(); try { @@ -187,4 +192,22 @@ public abstract class AbstractTbQueueConsumerTemplate i abstract protected void doUnsubscribe(); + @Override + public void onQueueDelete() { + queueDeleted = true; + } + + public boolean isQueueDeleted() { + return queueDeleted; + } + + @Override + public List getFullTopicNames() { + return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + } + + protected boolean isLongPollingSupported() { + return false; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java index 3c85aef350..64a8b70a1f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java @@ -23,6 +23,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TbTransportService; +import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; @@ -35,6 +36,8 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import static org.thingsboard.common.util.SystemUtil.getCpuCount; @@ -57,6 +60,10 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { @Value("${service.type:monolith}") private String serviceType; + @Getter + @Value("${service.rule_engine.assigned_tenant_profiles:}") + private Set assignedTenantProfiles; + @Autowired private ApplicationContext applicationContext; @@ -78,6 +85,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { } else { serviceTypes = Collections.singletonList(ServiceType.of(serviceType)); } + if (!serviceTypes.contains(ServiceType.TB_RULE_ENGINE) || assignedTenantProfiles == null) { + assignedTenantProfiles = Collections.emptySet(); + } generateNewServiceInfoWithCurrentSystemInfo(); } @@ -111,7 +121,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider { .setServiceId(serviceId) .addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).collect(Collectors.toList())) .setSystemInfo(getCurrentSystemInfoProto()); - + if (CollectionsUtil.isNotEmpty(assignedTenantProfiles)) { + builder.addAllAssignedTenantProfiles(assignedTenantProfiles.stream().map(UUID::toString).collect(Collectors.toList())); + } return serviceInfo = builder.build(); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 14fb0369ee..2db342d4e6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -24,6 +24,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.exception.TenantNotFoundException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; @@ -36,6 +37,7 @@ import org.thingsboard.server.queue.util.AfterStartUp; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -48,6 +50,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; +import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME; + @Service @Slf4j public class HashPartitionService implements PartitionService { @@ -68,15 +72,16 @@ public class HashPartitionService implements PartitionService { private final TenantRoutingInfoService tenantRoutingInfoService; private final QueueRoutingInfoService queueRoutingInfoService; - private volatile ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); + protected volatile ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); private final ConcurrentMap partitionTopicsMap = new ConcurrentHashMap<>(); private final ConcurrentMap partitionSizesMap = new ConcurrentHashMap<>(); private final ConcurrentMap tenantRoutingInfoMap = new ConcurrentHashMap<>(); - private Map> tbTransportServicesByType = new HashMap<>(); private List currentOtherServices; + private final Map> tbTransportServicesByType = new HashMap<>(); + private final Map> responsibleServices = new HashMap<>(); private HashFunction hashFunction; @@ -165,6 +170,9 @@ public class HashPartitionService implements PartitionService { partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic()); partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions()); myPartitions.remove(queueKey); + if (!tenantId.isSysTenantId()) { + tenantRoutingInfoMap.remove(tenantId); + } } @Override @@ -178,12 +186,38 @@ public class HashPartitionService implements PartitionService { removeTenant(tenantId); } + @Override + public boolean isManagedByCurrentService(TenantId tenantId) { + Set assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles(); + if (assignedTenantProfiles.isEmpty()) { + // TODO: refactor this for common servers + return true; + } else { + if (tenantId.isSysTenantId()) { + return false; + } + TenantProfileId profileId = tenantRoutingInfoService.getRoutingInfo(tenantId).getProfileId(); + return assignedTenantProfiles.contains(profileId.getId()); + } + } + @Override public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) { TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId); + if (queueName == null) { + queueName = MAIN_QUEUE_NAME; + } QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId); if (!partitionSizesMap.containsKey(queueKey)) { - queueKey = new QueueKey(serviceType, isolatedOrSystemTenantId); + if (isolatedOrSystemTenantId.isSysTenantId()) { + queueKey = new QueueKey(serviceType, TenantId.SYS_TENANT_ID); + } else { + queueKey = new QueueKey(serviceType, queueName, TenantId.SYS_TENANT_ID); + if (!MAIN_QUEUE_NAME.equals(queueName) && !partitionSizesMap.containsKey(queueKey)) { + queueKey = new QueueKey(serviceType, TenantId.SYS_TENANT_ID); + } + log.warn("Using queue {} instead of isolated {} for tenant {}", queueKey, queueName, isolatedOrSystemTenantId); + } } return resolve(queueKey, entityId); } @@ -199,11 +233,12 @@ public class HashPartitionService implements PartitionService { } private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) { - int hash = hashFunction.newHasher() - .putLong(entityId.getId().getMostSignificantBits()) - .putLong(entityId.getId().getLeastSignificantBits()).hash().asInt(); - Integer partitionSize = partitionSizesMap.get(queueKey); + if (partitionSize == null) { + throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing"); + } + + int hash = hash(entityId.getId()); int partition = Math.abs(hash % partitionSize); return buildTopicPartitionInfo(queueKey, partition); @@ -212,6 +247,7 @@ public class HashPartitionService implements PartitionService { @Override public synchronized void recalculatePartitions(ServiceInfo currentService, List otherServices) { tbTransportServicesByType.clear(); + responsibleServices.clear(); logServiceInfo(currentService); otherServices.forEach(this::logServiceInfo); @@ -221,6 +257,7 @@ public class HashPartitionService implements PartitionService { addNode(queueServicesMap, other); } queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId))); + responsibleServices.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId))); final ConcurrentMap> newPartitions = new ConcurrentHashMap<>(); partitionSizesMap.forEach((queueKey, size) -> { @@ -268,6 +305,9 @@ public class HashPartitionService implements PartitionService { changes.addAll(newMap.keySet()); if (!changes.isEmpty()) { applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes)); + responsibleServices.forEach((profileId, serviceInfos) -> { + log.info("Servers responsible for tenant profile {}: {}", profileId, toServiceIds(serviceInfos)); + }); } } @@ -305,9 +345,7 @@ public class HashPartitionService implements PartitionService { @Override public int resolvePartitionIndex(UUID entityId, int partitions) { - int hash = hashFunction.newHasher() - .putLong(entityId.getMostSignificantBits()) - .putLong(entityId.getLeastSignificantBits()).hash().asInt(); + int hash = hash(entityId); return Math.abs(hash % partitions); } @@ -358,16 +396,9 @@ public class HashPartitionService implements PartitionService { if (TenantId.SYS_TENANT_ID.equals(tenantId)) { return false; } - TenantRoutingInfo routingInfo = tenantRoutingInfoMap.get(tenantId); - if (routingInfo == null) { - synchronized (tenantRoutingInfoMap) { - routingInfo = tenantRoutingInfoMap.get(tenantId); - if (routingInfo == null) { - routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId); - tenantRoutingInfoMap.put(tenantId, routingInfo); - } - } - } + TenantRoutingInfo routingInfo = tenantRoutingInfoMap.computeIfAbsent(tenantId, k -> { + return tenantRoutingInfoService.getRoutingInfo(tenantId); + }); if (routingInfo == null) { throw new TenantNotFoundException(tenantId); } @@ -396,6 +427,19 @@ public class HashPartitionService implements PartitionService { queueServiceList.computeIfAbsent(key, k -> new ArrayList<>()).add(instance); } }); + + if (instance.getAssignedTenantProfilesCount() > 0) { + for (String profileIdStr : instance.getAssignedTenantProfilesList()) { + TenantProfileId profileId; + try { + profileId = new TenantProfileId(UUID.fromString(profileIdStr)); + } catch (IllegalArgumentException e) { + log.warn("Failed to parse '{}' as tenant profile id", profileIdStr); + continue; + } + responsibleServices.computeIfAbsent(profileId, k -> new ArrayList<>()).add(instance); + } + } } else if (ServiceType.TB_CORE.equals(serviceType) || ServiceType.TB_VC_EXECUTOR.equals(serviceType)) { queueServiceList.computeIfAbsent(new QueueKey(serviceType), key -> new ArrayList<>()).add(instance); } @@ -411,18 +455,51 @@ public class HashPartitionService implements PartitionService { return null; } - if (!ServiceType.TB_RULE_ENGINE.equals(queueKey.getType()) || TenantId.SYS_TENANT_ID.equals(queueKey.getTenantId())) { - return servers.get(partition % servers.size()); - } else { - int hash = hashFunction.newHasher().putLong(queueKey.getTenantId().getId().getMostSignificantBits()) - .putLong(queueKey.getTenantId().getId().getLeastSignificantBits()) + TenantId tenantId = queueKey.getTenantId(); + if (queueKey.getType() == ServiceType.TB_RULE_ENGINE) { + if (!responsibleServices.isEmpty()) { // if there are any dedicated servers + TenantProfileId profileId; + if (tenantId != null && !tenantId.isSysTenantId()) { + TenantRoutingInfo routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId); + profileId = routingInfo.getProfileId(); + } else { + profileId = null; + } + + List responsible = responsibleServices.get(profileId); + if (responsible == null) { + // if there are no dedicated servers for this tenant profile, or for system queues, + // using the servers that are not responsible for any profile + responsible = servers.stream() + .filter(serviceInfo -> serviceInfo.getAssignedTenantProfilesCount() == 0) + .sorted(Comparator.comparing(ServiceInfo::getServiceId)) + .collect(Collectors.toList()); + if (profileId != null) { + log.debug("Using servers {} for profile {}", toServiceIds(responsible), profileId); + } + responsibleServices.put(profileId, responsible); + } + servers = responsible; + } + + int hash = hashFunction.newHasher() + .putLong(tenantId.getId().getMostSignificantBits()) + .putLong(tenantId.getId().getLeastSignificantBits()) .putString(queueKey.getQueueName(), StandardCharsets.UTF_8) .hash().asInt(); - return servers.get(Math.abs((hash + partition) % servers.size())); + } else { + return servers.get(partition % servers.size()); } } + private int hash(UUID key) { + return hashFunction.newHasher() + .putLong(key.getMostSignificantBits()) + .putLong(key.getLeastSignificantBits()) + .hash().asInt(); + } + public static HashFunction forName(String name) { switch (name) { case "murmur3_32": @@ -436,4 +513,8 @@ public class HashPartitionService implements PartitionService { } } + private List toServiceIds(Collection serviceInfos) { + return serviceInfos.stream().map(ServiceInfo::getServiceId).collect(Collectors.toList()); + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index faa4d956a8..b55ba79f67 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -64,4 +64,7 @@ public interface PartitionService { void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg); void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg); + + boolean isManagedByCurrentService(TenantId tenantId); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java index e49cbbcfd9..9c7d1630ec 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java @@ -18,6 +18,9 @@ package org.thingsboard.server.queue.discovery; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; +import java.util.Set; +import java.util.UUID; + public interface TbServiceInfoProvider { String getServiceId(); @@ -30,4 +33,6 @@ public interface TbServiceInfoProvider { ServiceInfo generateNewServiceInfoWithCurrentSystemInfo(); + Set getAssignedTenantProfiles(); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java index c1c0b49dab..8dee68da49 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java @@ -17,9 +17,11 @@ package org.thingsboard.server.queue.discovery; import lombok.Data; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; @Data public class TenantRoutingInfo { private final TenantId tenantId; + private final TenantProfileId profileId; private final boolean isolatedTbRuleEngine; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java index 8dd3ff95e7..e4c0ac8250 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java @@ -20,4 +20,5 @@ import org.thingsboard.server.common.data.id.TenantId; public interface TenantRoutingInfoService { TenantRoutingInfo getRoutingInfo(TenantId tenantId); + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index f15b9258e8..d486d04783 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.errors.TopicExistsException; import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.util.PropertyUtils; import java.util.Collections; import java.util.Map; @@ -62,12 +63,12 @@ public class TbKafkaAdmin implements TbQueueAdmin { } @Override - public void createTopicIfNotExists(String topic) { + public void createTopicIfNotExists(String topic, String properties) { if (topics.contains(topic)) { return; } try { - NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(topicConfigs); + NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(PropertyUtils.getProps(topicConfigs, properties)); createTopic(newTopic).values().get(topic).get(); topics.add(topic); } catch (ExecutionException ee) { @@ -81,7 +82,6 @@ public class TbKafkaAdmin implements TbQueueAdmin { log.warn("[{}] Failed to create topic", topic, e); throw new RuntimeException(e); } - } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index c17a563d46..9f58446966 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -114,7 +114,6 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue @Override protected void doUnsubscribe() { - log.info("unsubscribe topic and close consumer for topic {}", getTopic()); if (consumer != null) { consumer.unsubscribe(); consumer.close(); @@ -123,4 +122,10 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue statsService.unregisterClientGroup(groupId); } } + + @Override + public boolean isLongPollingSupported() { + return true; + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 41ae0d5ea3..55f1721c46 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -115,6 +115,9 @@ public class TbKafkaSettings { @Value("${queue.kafka.session.timeout.ms:10000}") private int sessionTimeoutMs; + @Value("${queue.kafka.auto_offset_reset:earliest}") + private String autoOffsetReset; + @Value("${queue.kafka.use_confluent_cloud:false}") private boolean useConfluent; @@ -155,6 +158,8 @@ public class TbKafkaSettings { props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes); props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java index 081202315e..8711cbbcf1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java @@ -31,6 +31,7 @@ public class InMemoryTbQueueConsumer implements TbQueueCon private volatile Set partitions; private volatile boolean stopped; private volatile boolean subscribed; + private volatile boolean queueDeleted; public InMemoryTbQueueConsumer(InMemoryStorage storage, String topic) { this.storage = storage; @@ -103,4 +104,19 @@ public class InMemoryTbQueueConsumer implements TbQueueCon return stopped; } + @Override + public void onQueueDelete() { + queueDeleted = true; + } + + @Override + public boolean isQueueDeleted() { + return queueDeleted; + } + + @Override + public List getFullTopicNames() { + return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList()); + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index 6b6253acc4..a789816c34 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -74,7 +74,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory templateBuilder.queueAdmin(new TbQueueAdmin() { @Override - public void createTopicIfNotExists(String topic) {} + public void createTopicIfNotExists(String topic, String properties) {} @Override public void destroy() {} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 364abcff4c..22a16de64f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -187,7 +187,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(configuration.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId("re-" + queueName + "-consumer"); + consumerBuilder.groupId("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-" + configuration.getTenantId())) + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index b8e07a45f7..2e3bf784d7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -166,7 +166,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { consumerBuilder.settings(kafkaSettings); consumerBuilder.topic(configuration.getTopic()); consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet()); - consumerBuilder.groupId("re-" + queueName + "-consumer"); + consumerBuilder.groupId("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-" + configuration.getTenantId())) + "-consumer"); consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders())); consumerBuilder.admin(ruleEngineAdmin); consumerBuilder.statsService(consumerStatsService); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java index d1a4942ad3..f9f20c2448 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java @@ -103,7 +103,7 @@ public class TbPubSubAdmin implements TbQueueAdmin { } @Override - public void createTopicIfNotExists(String partition) { + public void createTopicIfNotExists(String partition, String properties) { TopicName topicName = TopicName.newBuilder() .setTopic(partition) .setProject(pubSubSettings.getProjectId()) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java index 00a2ee4c6c..fb646f383a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java @@ -18,9 +18,11 @@ package org.thingsboard.server.queue.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.thingsboard.server.queue.TbQueueAdmin; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; @@ -50,7 +52,12 @@ public class TbRabbitMqAdmin implements TbQueueAdmin { } @Override - public void createTopicIfNotExists(String topic) { + public void createTopicIfNotExists(String topic, String properties) { + Map arguments = this.arguments; + if (StringUtils.isNotBlank(properties)) { + arguments = new HashMap<>(arguments); + arguments.putAll(TbRabbitMqQueueArguments.getArgs(properties)); + } try { channel.queueDeclare(topic, false, false, false, arguments); } catch (IOException e) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java index cb96abdf3c..8fa8c537e6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java @@ -65,7 +65,7 @@ public class TbRabbitMqQueueArguments { vcArgs = getArgs(vcProperties); } - private Map getArgs(String properties) { + public static Map getArgs(String properties) { Map configs = new HashMap<>(); if (StringUtils.isNotEmpty(properties)) { for (String property : properties.split(";")) { @@ -78,7 +78,7 @@ public class TbRabbitMqQueueArguments { return configs; } - private Object getObjectValue(String str) { + private static Object getObjectValue(String str) { if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("false")) { return Boolean.valueOf(str); } else if (isNumeric(str)) { @@ -87,7 +87,7 @@ public class TbRabbitMqQueueArguments { return str; } - private Object getNumericValue(String str) { + private static Object getNumericValue(String str) { if (str.contains(".")) { return Double.valueOf(str); } else { @@ -97,7 +97,7 @@ public class TbRabbitMqQueueArguments { private static final Pattern PATTERN = Pattern.compile("-?\\d+(\\.\\d+)?"); - public boolean isNumeric(String strNum) { + private static boolean isNumeric(String strNum) { if (strNum == null) { return false; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java index f88a34941a..ba4eeb6ca4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java @@ -26,6 +26,7 @@ import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.GetQueueUrlResult; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.queue.TbQueueAdmin; +import org.thingsboard.server.queue.util.PropertyUtils; import java.util.Map; import java.util.function.Function; @@ -63,11 +64,12 @@ public class TbAwsSqsAdmin implements TbQueueAdmin { } @Override - public void createTopicIfNotExists(String topic) { + public void createTopicIfNotExists(String topic, String properties) { String queueName = convertTopicToQueueName(topic); if (queues.containsKey(queueName)) { return; } + Map attributes = PropertyUtils.getProps(this.attributes, properties, TbAwsSqsQueueAttributes::toConfigs); final CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName).withAttributes(attributes); String queueUrl = sqsClient.createQueue(createQueueRequest).getQueueUrl(); queues.put(getQueueNameFromUrl(queueUrl), queueUrl); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java index 66110ade74..faa8eccc90 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java @@ -76,6 +76,12 @@ public class TbAwsSqsQueueAttributes { private Map getConfigs(String properties) { Map configs = new HashMap<>(defaultAttributes); + configs.putAll(toConfigs(properties)); + return configs; + } + + public static Map toConfigs(String properties) { + Map configs = new HashMap<>(); if (StringUtils.isNotEmpty(properties)) { for (String property : properties.split(";")) { int delimiterPosition = property.indexOf(":"); @@ -88,7 +94,7 @@ public class TbAwsSqsQueueAttributes { return configs; } - private void validateAttributeName(String key) { + private static void validateAttributeName(String key) { QueueAttributeName.fromValue(key); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java index 089d7f2219..afee64f382 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java @@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.StringUtils; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; public class PropertyUtils { @@ -37,4 +38,17 @@ public class PropertyUtils { return configs; } + public static Map getProps(Map defaultProperties, String propertiesStr) { + return getProps(defaultProperties, propertiesStr, PropertyUtils::getProps); + } + + public static Map getProps(Map defaultProperties, String propertiesStr, Function> parser) { + Map properties = defaultProperties; + if (StringUtils.isNotBlank(propertiesStr)) { + properties = new HashMap<>(properties); + properties.putAll(parser.apply(propertiesStr)); + } + return properties; + } + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java index d73e445516..12f89f29e6 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java @@ -82,6 +82,7 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil if (profileOpt.isPresent()) { TenantProfile newProfile = profileOpt.get(); log.trace("[{}] put: {}", newProfile.getId(), newProfile); + profiles.put(newProfile.getId(), newProfile); Set affectedTenants = tenantProfileIds.get(newProfile.getId()); return new TenantProfileUpdateResult(newProfile, affectedTenants != null ? affectedTenants : Collections.emptySet()); } else { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java index c9f126b808..e1192391d5 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java @@ -29,7 +29,7 @@ import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; @ConditionalOnExpression("'${service.type:null}'=='tb-transport'") public class TransportTenantRoutingInfoService implements TenantRoutingInfoService { - private TransportTenantProfileCache tenantProfileCache; + private final TransportTenantProfileCache tenantProfileCache; public TransportTenantRoutingInfoService(TransportTenantProfileCache tenantProfileCache) { this.tenantProfileCache = tenantProfileCache; @@ -38,7 +38,7 @@ public class TransportTenantRoutingInfoService implements TenantRoutingInfoServi @Override public TenantRoutingInfo getRoutingInfo(TenantId tenantId) { TenantProfile profile = tenantProfileCache.get(tenantId); - return new TenantRoutingInfo(tenantId, profile.isIsolatedTbRuleEngine()); + return new TenantRoutingInfo(tenantId, profile.getId(), profile.isIsolatedTbRuleEngine()); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java b/dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java index 0477ccf32f..aa166ba12e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java @@ -99,8 +99,6 @@ public class TenantProfileDataValidator extends DataValidator { TenantProfile old = tenantProfileDao.findById(TenantId.SYS_TENANT_ID, tenantProfile.getId().getId()); if (old == null) { throw new DataValidationException("Can't update non existing tenant profile!"); - } else if (old.isIsolatedTbRuleEngine() != tenantProfile.isIsolatedTbRuleEngine()) { - throw new DataValidationException("Can't update isolatedTbRuleEngine property!"); } return old; } diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/TenantProfileServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/TenantProfileServiceTest.java index 51f8aabb57..7b5e016d14 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/TenantProfileServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/TenantProfileServiceTest.java @@ -188,17 +188,6 @@ public class TenantProfileServiceTest extends AbstractServiceTest { }); } - @Test - public void testSaveSameTenantProfileWithDifferentIsolatedTbRuleEngine() { - TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile"); - TenantProfile savedTenantProfile = tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, tenantProfile); - savedTenantProfile.setIsolatedTbRuleEngine(true); - addMainQueueConfig(savedTenantProfile); - Assertions.assertThrows(DataValidationException.class, () -> { - tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, savedTenantProfile); - }); - } - @Test public void testDeleteTenantProfileWithExistingTenant() { TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile"); diff --git a/msa/vc-executor/src/main/java/org/thingsboard/server/vc/service/VersionControlTenantRoutingInfoService.java b/msa/vc-executor/src/main/java/org/thingsboard/server/vc/service/VersionControlTenantRoutingInfoService.java index fb33ff9931..b343a4791f 100644 --- a/msa/vc-executor/src/main/java/org/thingsboard/server/vc/service/VersionControlTenantRoutingInfoService.java +++ b/msa/vc-executor/src/main/java/org/thingsboard/server/vc/service/VersionControlTenantRoutingInfoService.java @@ -25,6 +25,6 @@ public class VersionControlTenantRoutingInfoService implements TenantRoutingInfo @Override public TenantRoutingInfo getRoutingInfo(TenantId tenantId) { //This dummy implementation is ok since Version Control service does not produce any rule engine messages. - return new TenantRoutingInfo(tenantId, false); + return new TenantRoutingInfo(tenantId, null, false); } } diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 418184e732..22ad6c60f8 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -73,6 +73,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index fe6deebbcf..753bd36bd4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import lombok.Data; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import net.objecthunter.exp4j.Expression; import net.objecthunter.exp4j.ExpressionBuilder; @@ -44,6 +46,8 @@ import java.math.BigDecimal; import java.math.RoundingMode; import java.util.List; import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -81,9 +85,8 @@ import static org.thingsboard.rule.engine.math.TbMathArgumentType.CONSTANT; ) public class TbMathNode implements TbNode { - private static final ConcurrentMap semaphores = new ConcurrentReferenceHashMap<>(); + private static final ConcurrentMap> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK); private final ThreadLocal customExpression = new ThreadLocal<>(); - private TbMathNodeConfiguration config; private boolean msgBodyToJsonConversionRequired; @@ -108,42 +111,71 @@ public class TbMathNode implements TbNode { @Override public void onMsg(TbContext ctx, TbMsg msg) { - var originator = msg.getOriginator(); - var originatorSemaphore = semaphores.computeIfAbsent(originator, tmp -> new Semaphore(1, true)); - boolean acquired = tryAcquire(originator, originatorSemaphore); + var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new); + semaphoreWithQueue.getQueue().add(new TbMsgTbContext(msg, ctx)); - if (!acquired) { - ctx.tellFailure(msg, new RuntimeException("Failed to process message for originator synchronously")); - return; - } + tryProcessQueue(semaphoreWithQueue); + } - try { - var arguments = config.getArguments(); - Optional msgBodyOpt = convertMsgBodyIfRequired(msg); - var argumentValues = Futures.allAsList(arguments.stream() - .map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList())); - ListenableFuture resultMsgFuture = Futures.transformAsync(argumentValues, args -> - updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(args)), ctx.getDbCallbackExecutor()); - DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { - try { - ctx.tellSuccess(resultMsg); - } finally { - originatorSemaphore.release(); + void tryProcessQueue(SemaphoreWithQueue lockAndQueue) { + final Semaphore semaphore = lockAndQueue.getSemaphore(); + final Queue queue = lockAndQueue.getQueue(); + while (!queue.isEmpty()) { + // The semaphore have to be acquired before EACH poll and released before NEXT poll. + // Otherwise, some message will remain unprocessed in queue + if (!semaphore.tryAcquire()) { + return; + } + TbMsgTbContext tbMsgTbContext = null; + try { + tbMsgTbContext = queue.poll(); + if (tbMsgTbContext == null) { + semaphore.release(); + continue; } - }, t -> { - try { - ctx.tellFailure(msg, t); - } finally { - originatorSemaphore.release(); + final TbMsg msg = tbMsgTbContext.getMsg(); + if (!msg.getCallback().isMsgValid()) { + log.trace("[{}] Skipping non-valid message [{}]", lockAndQueue.getEntityId(), msg); + semaphore.release(); + continue; } - }, ctx.getDbCallbackExecutor()); - } catch (Throwable e) { - originatorSemaphore.release(); - log.warn("[{}] Failed to process message: {}", originator, msg, e); - throw e; + //DO PROCESSING + final TbContext ctx = tbMsgTbContext.getCtx(); + final ListenableFuture resultMsgFuture = processMsgAsync(ctx, msg); + DonAsynchron.withCallback(resultMsgFuture, resultMsg -> { + try { + ctx.tellSuccess(resultMsg); + } finally { + lockAndQueue.getSemaphore().release(); + tryProcessQueue(lockAndQueue); + } + }, t -> { + try { + ctx.tellFailure(msg, t); + } finally { + lockAndQueue.getSemaphore().release(); + tryProcessQueue(lockAndQueue); + } + }, ctx.getDbCallbackExecutor()); + } catch (Throwable e) { + semaphore.release(); + log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), tbMsgTbContext == null ? null : tbMsgTbContext.getMsg(), e); + throw e; + } + break; //submitted async exact one task. next poll will try on callback } } + ListenableFuture processMsgAsync(TbContext ctx, TbMsg msg) { + var arguments = config.getArguments(); + Optional msgBodyOpt = convertMsgBodyIfRequired(msg); + var argumentValues = Futures.allAsList(arguments.stream() + .map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList())); + ListenableFuture resultMsgFuture = Futures.transformAsync(argumentValues, args -> + updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(args)), ctx.getDbCallbackExecutor()); + return resultMsgFuture; + } + private boolean tryAcquire(EntityId originator, Semaphore originatorSemaphore) { boolean acquired; try { @@ -402,4 +434,20 @@ public class TbMathNode implements TbNode { @Override public void destroy() { } + + @Data + @RequiredArgsConstructor + static public class SemaphoreWithQueue { + final EntityId entityId; + final Semaphore semaphore = new Semaphore(1); + final Queue queue = new ConcurrentLinkedQueue<>(); + } + + @Data + @RequiredArgsConstructor + static public class TbMsgTbContext { + final TbMsg msg; + final TbContext ctx; + } + } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index 0a42349433..1e1b8843d5 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -15,20 +15,22 @@ */ package org.thingsboard.rule.engine.math; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.verification.Timeout; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.AbstractListeningExecutor; import org.thingsboard.common.util.JacksonUtil; @@ -54,24 +56,36 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.willAnswer; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @Slf4j -@RunWith(MockitoJUnitRunner.class) +@ExtendWith(MockitoExtension.class) public class TbMathNodeTest { - private EntityId originator = new DeviceId(Uuids.timeBased()); - private TenantId tenantId = TenantId.fromUUID(Uuids.timeBased()); + static final int RULE_DISPATCHER_POOL_SIZE = 2; + static final int DB_CALLBACK_POOL_SIZE = 3; + private final EntityId originator = DeviceId.fromString("ccd71696-0586-422d-940e-755a41ec3b0d"); + private final TenantId tenantId = TenantId.fromUUID(UUID.fromString("e7f46b23-0c7d-42f5-9b06-fc35ab17af8a")); @Mock private TbContext ctx; @@ -81,35 +95,31 @@ public class TbMathNodeTest { private TimeseriesService tsService; @Mock private RuleEngineTelemetryService telemetryService; - private AbstractListeningExecutor dbExecutor; + private AbstractListeningExecutor dbCallbackExecutor; + private AbstractListeningExecutor ruleEngineDispatcherExecutor; - @Before + @BeforeEach public void before() { - dbExecutor = new AbstractListeningExecutor() { - @Override - protected int getThreadPollSize() { - return 3; - } - }; - dbExecutor.init(); - initMocks(); + dbCallbackExecutor = new DBCallbackExecutor(); + dbCallbackExecutor.init(); + ruleEngineDispatcherExecutor = new RuleDispatcherExecutor(); + ruleEngineDispatcherExecutor.init(); + + lenient().when(ctx.getAttributesService()).thenReturn(attributesService); + lenient().when(ctx.getTelemetryService()).thenReturn(telemetryService); + lenient().when(ctx.getTimeseriesService()).thenReturn(tsService); + lenient().when(ctx.getTenantId()).thenReturn(tenantId); + lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor); } - @After + @AfterEach public void after() { - dbExecutor.destroy(); + ruleEngineDispatcherExecutor.executor().shutdownNow(); + dbCallbackExecutor.executor().shutdownNow(); } private void initMocks() { - Mockito.reset(ctx); - Mockito.reset(attributesService); - Mockito.reset(tsService); - Mockito.reset(telemetryService); - lenient().when(ctx.getAttributesService()).thenReturn(attributesService); - lenient().when(ctx.getTelemetryService()).thenReturn(telemetryService); - lenient().when(ctx.getTimeseriesService()).thenReturn(tsService); - lenient().when(ctx.getTenantId()).thenReturn(tenantId); - lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor); + Mockito.clearInvocations(ctx, attributesService, tsService, telemetryService); } private TbMathNode initNode(TbRuleNodeMathFunctionType operation, TbMathResult result, TbMathArgument... arguments) { @@ -154,10 +164,8 @@ public class TbMathNodeTest { node.onMsg(ctx, msg); - ConcurrentMap semaphores = (ConcurrentMap) ReflectionTestUtils.getField(node, "semaphores"); + ConcurrentMap> semaphores = (ConcurrentMap>) ReflectionTestUtils.getField(node, "locks"); Assert.assertNotNull(semaphores); - Semaphore originatorSemaphore = semaphores.get(originator); - Assert.assertNotNull(originatorSemaphore); metaData.putValue("key1", "secondMsgResult"); metaData.putValue("key2", "argumentC"); @@ -167,7 +175,7 @@ public class TbMathNodeTest { node.onMsg(ctx, msg); - Awaitility.await("Semaphore released").atMost(5, TimeUnit.SECONDS).until(semaphores.get(originator)::tryAcquire); + Awaitility.await("Semaphore released").atMost(5, TimeUnit.SECONDS).until(() -> semaphores.get(originator).semaphore.tryAcquire()); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); Mockito.verify(ctx, Mockito.times(2)).tellSuccess(msgCaptor.capture()); @@ -534,4 +542,87 @@ public class TbMathNodeTest { }); Assert.assertNotNull(thrown.getMessage()); } + + @Test + public void testExp4j_concurrent() { + TbMathNode node = spy(initNodeWithCustomFunction("2a+3b", + new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"), + new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b") + )); + EntityId originatorSlow = DeviceId.fromString("7f01170d-6bba-419c-b95c-2b4c3ba32f30"); + EntityId originatorFast = DeviceId.fromString("c45360ff-7906-4102-a2ae-3495a86168d0"); + CountDownLatch slowProcessingLatch = new CountDownLatch(1); + + List slowMsgList = IntStream.range(0, 5) + .mapToObj(x -> TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())) + .collect(Collectors.toList()); + List fastMsgList = IntStream.range(0, 2) + .mapToObj(x -> TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString())) + .collect(Collectors.toList()); + + assertThat(slowMsgList.size()).as("slow msgs >= rule-dispatcher pool size").isGreaterThanOrEqualTo(RULE_DISPATCHER_POOL_SIZE); + + log.debug("rule-dispatcher [{}], db-callback [{}], slowMsg [{}], fastMsg [{}]", RULE_DISPATCHER_POOL_SIZE, DB_CALLBACK_POOL_SIZE, slowMsgList.size(), fastMsgList.size()); + + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("\uD83D\uDC0C processMsgAsync slow originator [{}][{}]", msg.getOriginator(), msg); + try { + assertThat(slowProcessingLatch.await(30, TimeUnit.SECONDS)).as("await on slowProcessingLatch").isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return invocation.callRealMethod(); + }).given(node).processMsgAsync(eq(ctx), argThat(slowMsgList::contains)); + + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("\u26A1\uFE0F processMsgAsync FAST originator [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).processMsgAsync(eq(ctx), argThat(fastMsgList::contains)); + + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("submit slow originator onMsg [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).onMsg(eq(ctx), argThat(slowMsgList::contains)); + + willAnswer(invocation -> { + TbMsg msg = invocation.getArgument(1); + log.debug("submit FAST originator onMsg [{}][{}]", msg.getOriginator(), msg); + return invocation.callRealMethod(); + }).given(node).onMsg(eq(ctx), argThat(fastMsgList::contains)); + + // submit slow msg may block all rule engine dispatcher threads + slowMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg))); + // wait until dispatcher threads started with all slowMsg + verify(node, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).onMsg(eq(ctx), argThat(slowMsgList::contains)); + + // submit fast have to return immediately + fastMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg))); + // wait until all fast messages processed + verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(fastMsgList.size()))).tellSuccess(any()); + + slowProcessingLatch.countDown(); + + verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(fastMsgList.size() + slowMsgList.size()))).tellSuccess(any()); + + verify(ctx, never()).tellFailure(any(), any()); + } + + static class RuleDispatcherExecutor extends AbstractListeningExecutor { + @Override + protected int getThreadPollSize() { + return RULE_DISPATCHER_POOL_SIZE; + } + } + + static class DBCallbackExecutor extends AbstractListeningExecutor { + @Override + protected int getThreadPollSize() { + return DB_CALLBACK_POOL_SIZE; + } + } + } diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index bbded8aec9..082b6c0536 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -184,6 +184,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 396d95e63c..6c5ddb9c52 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -169,6 +169,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 7c0cd950ff..4c0a33929a 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -250,6 +250,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index e6f2b0af60..67986e4bfc 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -199,6 +199,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 3fc11bbdfe..84fdf5ddbb 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -145,6 +145,7 @@ queue: fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms + auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}" confluent: ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}" diff --git a/ui-ngx/src/app/modules/home/components/profile/queue/tenant-profile-queues.component.ts b/ui-ngx/src/app/modules/home/components/profile/queue/tenant-profile-queues.component.ts index ac128b3ec5..298b4fe575 100644 --- a/ui-ngx/src/app/modules/home/components/profile/queue/tenant-profile-queues.component.ts +++ b/ui-ngx/src/app/modules/home/components/profile/queue/tenant-profile-queues.component.ts @@ -173,7 +173,8 @@ export class TenantProfileQueuesComponent implements ControlValueAccessor, Valid }, topic: '', additionalInfo: { - description: '' + description: '', + customProperties: '' } }; this.idMap.push(queue.id); diff --git a/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts b/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts index 0257fb35af..26d6611a3c 100644 --- a/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts +++ b/ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts @@ -58,9 +58,9 @@ export class TenantProfileComponent extends EntityComponent { id: guid(), consumerPerPartition: true, name: 'Main', - packProcessingTimeout: 2000, - partitions: 10, - pollInterval: 25, + packProcessingTimeout: 10000, + partitions: 1, + pollInterval: 2000, processingStrategy: { failurePercentage: 0, maxPauseBetweenRetries: 3, @@ -74,7 +74,56 @@ export class TenantProfileComponent extends EntityComponent { }, topic: 'tb_rule_engine.main', additionalInfo: { - description: '' + description: '', + customProperties: '' + } + }, + { + id: guid(), + name: 'HighPriority', + topic: 'tb_rule_engine.hp', + pollInterval: 2000, + partitions: 1, + consumerPerPartition: true, + packProcessingTimeout: 10000, + submitStrategy: { + type: 'BURST', + batchSize: 100 + }, + processingStrategy: { + type: 'RETRY_FAILED_AND_TIMED_OUT', + retries: 0, + failurePercentage: 0, + pauseBetweenRetries: 5, + maxPauseBetweenRetries: 5 + }, + additionalInfo: { + description: '', + customProperties: '' + } + }, + { + id: guid(), + name: 'SequentialByOriginator', + topic: 'tb_rule_engine.sq', + pollInterval: 2000, + partitions: 1, + consumerPerPartition: true, + packProcessingTimeout: 10000, + submitStrategy: { + type: 'SEQUENTIAL_BY_ORIGINATOR', + batchSize: 100 + }, + processingStrategy: { + type: 'RETRY_FAILED_AND_TIMED_OUT', + retries: 3, + failurePercentage: 0, + pauseBetweenRetries: 5, + maxPauseBetweenRetries: 5 + }, + additionalInfo: { + description: '', + customProperties: '' } } ]; @@ -118,9 +167,6 @@ export class TenantProfileComponent extends EntityComponent { if (this.entityForm) { if (this.isEditValue) { this.entityForm.enable({emitEvent: false}); - if (!this.isAdd) { - this.entityForm.get('isolatedTbRuleEngine').disable({emitEvent: false}); - } } else { this.entityForm.disable({emitEvent: false}); } diff --git a/ui-ngx/src/app/modules/home/components/queue/queue-form.component.html b/ui-ngx/src/app/modules/home/components/queue/queue-form.component.html index 56b20d8aa8..4845f7f25a 100644 --- a/ui-ngx/src/app/modules/home/components/queue/queue-form.component.html +++ b/ui-ngx/src/app/modules/home/components/queue/queue-form.component.html @@ -203,6 +203,11 @@ + + queue.custom-properties + + queue.custom-properties-hint + queue.description diff --git a/ui-ngx/src/app/modules/home/components/queue/queue-form.component.ts b/ui-ngx/src/app/modules/home/components/queue/queue-form.component.ts index b123cf2ad5..e4fbcd031b 100644 --- a/ui-ngx/src/app/modules/home/components/queue/queue-form.component.ts +++ b/ui-ngx/src/app/modules/home/components/queue/queue-form.component.ts @@ -117,7 +117,8 @@ export class QueueFormComponent implements ControlValueAccessor, OnInit, OnDestr }), topic: [''], additionalInfo: this.fb.group({ - description: [''] + description: [''], + customProperties: [''] }) }); this.valueChange$ = this.queueFormGroup.valueChanges.subscribe(() => { diff --git a/ui-ngx/src/app/shared/models/queue.models.ts b/ui-ngx/src/app/shared/models/queue.models.ts index 76ee0bf022..07a57e68c2 100644 --- a/ui-ngx/src/app/shared/models/queue.models.ts +++ b/ui-ngx/src/app/shared/models/queue.models.ts @@ -121,5 +121,6 @@ export interface QueueInfo extends BaseData { topic: string; additionalInfo: { description?: string; + customProperties?: string; }; } diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index d161526958..8f2f338800 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -3612,6 +3612,8 @@ "description": "Description", "description-hint": "This text will be displayed in the Queue description instead of the selected strategy", "alt-description": "Submit Strategy: {{submitStrategy}}, Processing Strategy: {{processingStrategy}}", + "custom-properties": "Custom properties", + "custom-properties-hint": "Custom queue (topic) creation properties, e.g. 'retention.ms:604800000;retention.bytes:1048576000'", "strategies": { "sequential-by-originator-label": "Sequential by originator", "sequential-by-originator-hint": "New message for e.g. device A is not submitted until previous message for device A is acknowledged", @@ -3679,8 +3681,8 @@ "tenant-required": "Tenant is required", "search": "Search tenants", "selected-tenants": "{ count, plural, =1 {1 tenant} other {# tenants} } selected", - "isolated-tb-rule-engine": "Processing in isolated ThingsBoard Rule Engine container", - "isolated-tb-rule-engine-details": "Requires separate microservice(s) per isolated Tenant" + "isolated-tb-rule-engine": "Use isolated ThingsBoard Rule Engine queues", + "isolated-tb-rule-engine-details": "Each tenant will have dedicated Rule Engine queues" }, "tenant-profile": { "tenant-profile": "Tenant profile",