From bf0cc22223b94fb0b73c55eba03a0237ca16eae4 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Fri, 19 Jan 2024 17:33:04 +0200 Subject: [PATCH 1/8] Init all actors for isolated rule engines --- .../server/actors/app/AppActor.java | 3 +-- .../server/actors/tenant/TenantActor.java | 20 +++++++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 98e0bf7c25..fcd6d7a1f9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -202,8 +202,7 @@ public class AppActor extends ContextAwareActor { return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, () -> new TenantActor.ActorCreator(systemContext, tenantId), - () -> systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE) || - systemContext.getPartitionService().isManagedByCurrentService(tenantId))); + () -> true)); } private void onToEdgeSessionMsg(EdgeSessionMsg msg) { diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 01b01a7d4a..04c0f1c9e9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -90,18 +90,16 @@ public class TenantActor extends RuleChainManagerActor { isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); if (isRuleEngine) { - if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { - try { - if (getApiUsageState().isReExecEnabled()) { - log.debug("[{}] Going to init rule chains", tenantId); - initRuleChains(); - } else { - log.info("[{}] Skip init of the rule chains due to API limits", tenantId); - } - } catch (Exception e) { - log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); - cantFindTenant = true; + try { + if (getApiUsageState().isReExecEnabled()) { + log.debug("[{}] Going to init rule chains", tenantId); + initRuleChains(); + } else { + log.info("[{}] Skip init of the rule chains due to API limits", tenantId); } + } catch (Exception e) { + log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); + cantFindTenant = true; } } log.debug("[{}] Tenant actor started.", tenantId); From a571153b7cf0826302030c0e6d50f7d52b8c375f Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 24 Jan 2024 18:08:01 +0200 Subject: [PATCH 2/8] Single update message for multiple queues --- .../entitiy/queue/DefaultTbQueueService.java | 115 ++++++++---------- .../queue/DefaultTbClusterService.java | 66 +++++----- .../queue/DefaultTbCoreConsumerService.java | 10 +- .../DefaultTbRuleEngineConsumerService.java | 71 ++++++----- .../service/queue/TbCoreConsumerStats.java | 4 +- .../discovery/HashPartitionServiceTest.java | 6 +- .../queue/DefaultTbClusterServiceTest.java | 9 +- .../server/queue/TbQueueClusterService.java | 8 +- common/proto/src/main/proto/queue.proto | 12 +- .../queue/discovery/HashPartitionService.java | 41 ++++--- .../queue/discovery/PartitionService.java | 4 +- .../service/DefaultTransportService.java | 13 +- .../server/dao/queue/BaseQueueService.java | 3 - 13 files changed, 184 insertions(+), 178 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index b0ef8f4cde..e13342eee0 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -50,22 +50,15 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb public Queue saveQueue(Queue queue) { boolean create = queue.getId() == null; Queue oldQueue; - if (create) { oldQueue = null; } else { oldQueue = queueService.findQueueById(queue.getTenantId(), queue.getId()); } - //TODO: add checkNotNull Queue savedQueue = queueService.saveQueue(queue); - - if (create) { - onQueueCreated(savedQueue); - } else { - onQueueUpdated(savedQueue, oldQueue); - } - + createTopicsIfNeeded(savedQueue, oldQueue); + tbClusterService.onQueuesUpdate(List.of(savedQueue)); return savedQueue; } @@ -73,54 +66,14 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb public void deleteQueue(TenantId tenantId, QueueId queueId) { Queue queue = queueService.findQueueById(tenantId, queueId); queueService.deleteQueue(tenantId, queueId); - onQueueDeleted(queue); + tbClusterService.onQueuesDelete(List.of(queue)); } @Override public void deleteQueueByQueueName(TenantId tenantId, String queueName) { Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, queueName); queueService.deleteQueue(tenantId, queue.getId()); - onQueueDeleted(queue); - } - - private void onQueueCreated(Queue queue) { - for (int i = 0; i < queue.getPartitions(); i++) { - tbQueueAdmin.createTopicIfNotExists( - new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), - queue.getCustomProperties() - ); - } - - tbClusterService.onQueueChange(queue); - } - - private void onQueueUpdated(Queue queue, Queue oldQueue) { - int oldPartitions = oldQueue.getPartitions(); - int currentPartitions = queue.getPartitions(); - - if (currentPartitions != oldPartitions) { - if (currentPartitions > oldPartitions) { - log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName()); - for (int i = oldPartitions; i < currentPartitions; i++) { - tbQueueAdmin.createTopicIfNotExists( - new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), - queue.getCustomProperties() - ); - } - tbClusterService.onQueueChange(queue); - } else { - log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); - tbClusterService.onQueueChange(queue); - // TODO: move all the messages left in old partitions and delete topics - } - } else if (!oldQueue.equals(queue)) { - tbClusterService.onQueueChange(queue); - } - } - - private void onQueueDeleted(Queue queue) { - tbClusterService.onQueueDelete(queue); -// queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId); + tbClusterService.onQueuesDelete(List.of(queue)); } @Override @@ -176,26 +129,56 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb log.debug("[{}] Handling profile queue config update: creating queues {}, updating {}, deleting {}. Affected tenants: {}", newTenantProfile.getUuidId(), toCreate, toUpdate, toRemove, tenantIds); } - tenantIds.forEach(tenantId -> { - toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))); - toUpdate.forEach(key -> { - Queue queueToUpdate = new Queue(tenantId, newQueues.get(key)); - Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, key); - queueToUpdate.setId(foundQueue.getId()); - queueToUpdate.setCreatedTime(foundQueue.getCreatedTime()); + List updated = new ArrayList<>(); + List deleted = new ArrayList<>(); + for (TenantId tenantId : tenantIds) { + for (String name : toCreate) { + updated.add(new Queue(tenantId, newQueues.get(name))); + } - if (!queueToUpdate.equals(foundQueue)) { - saveQueue(queueToUpdate); + for (String name : toUpdate) { + Queue queue = new Queue(tenantId, newQueues.get(name)); + Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, name); + if (foundQueue != null) { + queue.setId(foundQueue.getId()); + queue.setCreatedTime(foundQueue.getCreatedTime()); } - }); + if (!queue.equals(foundQueue)) { + updated.add(queue); + createTopicsIfNeeded(queue, foundQueue); + } + } + + for (String name : toRemove) { + Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, name); + deleted.add(queue); + } + } - toRemove.forEach(q -> { - Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q); - QueueId queueIdForRemove = queue.getId(); - deleteQueue(tenantId, queueIdForRemove); + if (!updated.isEmpty()) { + updated = updated.stream() + .map(queueService::saveQueue) + .collect(Collectors.toList()); + tbClusterService.onQueuesUpdate(updated); + } + if (!deleted.isEmpty()) { + deleted.forEach(queue -> { + queueService.deleteQueue(queue.getTenantId(), queue.getId()); }); - }); + tbClusterService.onQueuesDelete(deleted); + } + } + + private void createTopicsIfNeeded(Queue queue, Queue oldQueue) { + int newPartitions = queue.getPartitions(); + int oldPartitions = oldQueue != null ? oldQueue.getPartitions() : 0; + for (int i = oldPartitions; i < newPartitions; i++) { + tbQueueAdmin.createTopicIfNotExists( + new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), + queue.getCustomProperties() + ); + } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 14930912d1..1f27f06502 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -59,6 +59,8 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; +import org.thingsboard.server.gen.transport.TransportProtos.QueueDeleteMsg; +import org.thingsboard.server.gen.transport.TransportProtos.QueueUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; @@ -68,8 +70,8 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; @@ -77,9 +79,11 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.thingsboard.server.common.util.ProtoUtils.toProto; @@ -552,40 +556,40 @@ public class DefaultTbClusterService implements TbClusterService { } @Override - public void onQueueChange(Queue queue) { - log.trace("[{}][{}] Processing queue change [{}] event", queue.getTenantId(), queue.getId(), queue.getName()); - - TransportProtos.QueueUpdateMsg queueUpdateMsg = TransportProtos.QueueUpdateMsg.newBuilder() - .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) - .setQueueIdMSB(queue.getId().getId().getMostSignificantBits()) - .setQueueIdLSB(queue.getId().getId().getLeastSignificantBits()) - .setQueueName(queue.getName()) - .setQueueTopic(queue.getTopic()) - .setPartitions(queue.getPartitions()) - .build(); - - ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); - ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build(); + public void onQueuesUpdate(List queues) { + List queueUpdateMsgs = queues.stream() + .map(queue -> QueueUpdateMsg.newBuilder() + .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) + .setQueueIdMSB(queue.getId().getId().getMostSignificantBits()) + .setQueueIdLSB(queue.getId().getId().getLeastSignificantBits()) + .setQueueName(queue.getName()) + .setQueueTopic(queue.getTopic()) + .setPartitions(queue.getPartitions()) + .build()) + .collect(Collectors.toList()); + + ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build(); + ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build(); doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg); } @Override - public void onQueueDelete(Queue queue) { - log.trace("[{}][{}] Processing queue delete [{}] event", queue.getTenantId(), queue.getId(), queue.getName()); - - TransportProtos.QueueDeleteMsg queueDeleteMsg = TransportProtos.QueueDeleteMsg.newBuilder() - .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) - .setQueueIdMSB(queue.getId().getId().getMostSignificantBits()) - .setQueueIdLSB(queue.getId().getId().getLeastSignificantBits()) - .setQueueName(queue.getName()) - .build(); - - ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); - ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build(); + public void onQueuesDelete(List queues) { + List queueDeleteMsgs = queues.stream() + .map(queue -> QueueDeleteMsg.newBuilder() + .setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits()) + .setQueueIdMSB(queue.getId().getId().getMostSignificantBits()) + .setQueueIdLSB(queue.getId().getId().getLeastSignificantBits()) + .setQueueName(queue.getName()) + .build()) + .collect(Collectors.toList()); + + ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build(); + ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build(); doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 8e5fde9ae8..f544c4fe25 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -391,13 +391,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService 0) { + partitionService.updateQueues(toCoreNotification.getQueueUpdateMsgsList()); callback.onSuccess(); - } else if (toCoreNotification.hasQueueDeleteMsg()) { - TransportProtos.QueueDeleteMsg queue = toCoreNotification.getQueueDeleteMsg(); - partitionService.removeQueue(queue); + } else if (toCoreNotification.getQueueDeleteMsgsCount() > 0) { + partitionService.removeQueues(toCoreNotification.getQueueDeleteMsgsList()); callback.onSuccess(); } else if (toCoreNotification.hasVcResponseMsg()) { vcQueueService.processResponse(toCoreNotification.getVcResponseMsg()); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 8cc477999f..6a205ed5a2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -36,6 +36,8 @@ import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.QueueDeleteMsg; +import org.thingsboard.server.gen.transport.TransportProtos.QueueUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -164,11 +166,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< , proto.getResponse(), error); tbDeviceRpcService.processRpcResponseFromDevice(response); callback.onSuccess(); - } else if (nfMsg.hasQueueUpdateMsg()) { - updateQueue(nfMsg.getQueueUpdateMsg()); + } else if (nfMsg.getQueueUpdateMsgsCount() > 0) { + updateQueues(nfMsg.getQueueUpdateMsgsList()); callback.onSuccess(); - } else if (nfMsg.hasQueueDeleteMsg()) { - deleteQueue(nfMsg.getQueueDeleteMsg()); + } else if (nfMsg.getQueueDeleteMsgsCount() > 0) { + deleteQueues(nfMsg.getQueueDeleteMsgsList()); callback.onSuccess(); } else { log.trace("Received notification with missing handler"); @@ -176,39 +178,48 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } } - private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { - log.info("Received queue update msg: [{}]", queueUpdateMsg); - TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); - if (partitionService.isManagedByCurrentService(tenantId)) { - QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); - String queueName = queueUpdateMsg.getQueueName(); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); - Queue queue = queueService.findQueueById(tenantId, queueId); - - TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey); - Queue oldQueue = consumerManager.getQueue(); - consumerManager.update(queue); - - if (oldQueue != null && queue.getPartitions() == oldQueue.getPartitions()) { - return; + private void updateQueues(List queueUpdateMsgs) { + boolean partitionsChanged = false; + for (QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) { + log.info("Received queue update msg: [{}]", queueUpdateMsg); + TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); + if (partitionService.isManagedByCurrentService(tenantId)) { + QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB())); + String queueName = queueUpdateMsg.getQueueName(); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId); + Queue queue = queueService.findQueueById(tenantId, queueId); + + TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey); + Queue oldQueue = consumerManager.getQueue(); + consumerManager.update(queue); + + if (oldQueue == null || queue.getPartitions() != oldQueue.getPartitions()) { + partitionsChanged = true; + } + } else { + partitionsChanged = true; } } - partitionService.updateQueue(queueUpdateMsg); - partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), - new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); + if (partitionsChanged) { + partitionService.updateQueues(queueUpdateMsgs); + partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), + new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); + } } - private void deleteQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) { - log.info("Received queue delete msg: [{}]", queueDeleteMsg); - TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); - var consumerManager = consumers.remove(queueKey); - if (consumerManager != null) { - consumerManager.delete(true); + private void deleteQueues(List queueDeleteMsgs) { + for (QueueDeleteMsg queueDeleteMsg : queueDeleteMsgs) { + log.info("Received queue delete msg: [{}]", queueDeleteMsg); + TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + var consumerManager = consumers.remove(queueKey); + if (consumerManager != null) { + consumerManager.delete(true); + } } - partitionService.removeQueue(queueDeleteMsg); + partitionService.removeQueues(queueDeleteMsgs); partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE))); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index 5ff72e97e3..da3171bfc0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -184,9 +184,9 @@ public class TbCoreConsumerStats { toCoreNfEdgeSyncResponseCounter.increment(); } else if (!msg.getFromEdgeSyncResponseMsg().isEmpty()) { toCoreNfEdgeSyncResponseCounter.increment(); - } else if (msg.hasQueueUpdateMsg()) { + } else if (msg.getQueueUpdateMsgsCount() > 0) { toCoreNfQueueUpdateCounter.increment(); - } else if (msg.hasQueueDeleteMsg()) { + } else if (msg.getQueueDeleteMsgsCount() > 0) { toCoreNfQueueDeleteCounter.increment(); } else if (msg.hasVcResponseMsg()) { toCoreNfVersionControlResponseCounter.increment(); diff --git a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java index 4ea648119b..669b3dda4a 100644 --- a/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java @@ -315,7 +315,7 @@ public class HashPartitionServiceTest { .setPartitions(isolatedQueue.getPartitions()) .build(); - partitionService_common.updateQueue(queueUpdateMsg); + partitionService_common.updateQueues(List.of(queueUpdateMsg)); partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine)); // expecting event about no partitions for isolated queue key verifyPartitionChangeEvent(event -> { @@ -323,7 +323,7 @@ public class HashPartitionServiceTest { return event.getPartitionsMap().get(queueKey).isEmpty(); }); - partitionService_dedicated.updateQueue(queueUpdateMsg); + partitionService_dedicated.updateQueues(List.of(queueUpdateMsg)); partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine)); verifyPartitionChangeEvent(event -> { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId); @@ -342,7 +342,7 @@ public class HashPartitionServiceTest { .setQueueIdLSB(isolatedQueue.getUuidId().getLeastSignificantBits()) .setQueueName(isolatedQueue.getName()) .build(); - partitionService_dedicated.removeQueue(queueDeleteMsg); + partitionService_dedicated.removeQueues(List.of(queueDeleteMsg)); verifyPartitionChangeEvent(event -> { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId); return event.getPartitionsMap().get(queueKey).isEmpty(); diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java index 233839bb5f..57bf94fec3 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -40,6 +40,7 @@ import org.thingsboard.server.service.gateway_device.GatewayNotificationsService import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.List; import java.util.UUID; import static org.mockito.ArgumentMatchers.any; @@ -92,7 +93,7 @@ public class DefaultTbClusterServiceTest { when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer); - clusterService.onQueueChange(createTestQueue()); + clusterService.onQueuesUpdate(List.of(createTestQueue())); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH); verify(topicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any()); @@ -117,7 +118,7 @@ public class DefaultTbClusterServiceTest { when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer); - clusterService.onQueueChange(createTestQueue()); + clusterService.onQueuesUpdate(List.of(createTestQueue())); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2); @@ -145,7 +146,7 @@ public class DefaultTbClusterServiceTest { when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbREQueueProducer); when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer); - clusterService.onQueueChange(createTestQueue()); + clusterService.onQueuesUpdate(List.of(createTestQueue())); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, TRANSPORT); @@ -191,7 +192,7 @@ public class DefaultTbClusterServiceTest { when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer); when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer); - clusterService.onQueueChange(createTestQueue()); + clusterService.onQueuesUpdate(List.of(createTestQueue())); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1); verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2); diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueClusterService.java index c852a24fa7..49017df678 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueClusterService.java @@ -17,8 +17,12 @@ package org.thingsboard.server.queue; import org.thingsboard.server.common.data.queue.Queue; +import java.util.List; + public interface TbQueueClusterService { - void onQueueChange(Queue queue); - void onQueueDelete(Queue queue); + void onQueuesUpdate(List queues); + + void onQueuesDelete(List queues); + } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index fea4848654..b1362bf27c 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1280,8 +1280,8 @@ message ToCoreNotificationMsg { FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; bytes componentLifecycleMsg = 3 [deprecated = true]; bytes edgeEventUpdateMsg = 4 [deprecated = true]; - QueueUpdateMsg queueUpdateMsg = 5; - QueueDeleteMsg queueDeleteMsg = 6; + repeated QueueUpdateMsg queueUpdateMsgs = 5; + repeated QueueDeleteMsg queueDeleteMsgs = 6; VersionControlResponseMsg vcResponseMsg = 7; bytes toEdgeSyncRequestMsg = 8 [deprecated = true]; bytes fromEdgeSyncResponseMsg = 9 [deprecated = true]; @@ -1307,8 +1307,8 @@ message ToRuleEngineMsg { message ToRuleEngineNotificationMsg { bytes componentLifecycleMsg = 1 [deprecated = true]; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; - QueueUpdateMsg queueUpdateMsg = 3; - QueueDeleteMsg queueDeleteMsg = 4; + repeated QueueUpdateMsg queueUpdateMsgs = 3; + repeated QueueDeleteMsg queueDeleteMsgs = 4; ComponentLifecycleMsgProto componentLifecycle = 5; } @@ -1328,8 +1328,8 @@ message ToTransportMsg { ResourceUpdateMsg resourceUpdateMsg = 12; ResourceDeleteMsg resourceDeleteMsg = 13; UplinkNotificationMsg uplinkNotificationMsg = 14; - QueueUpdateMsg queueUpdateMsg = 15; - QueueDeleteMsg queueDeleteMsg = 16; + repeated QueueUpdateMsg queueUpdateMsgs = 15; + repeated QueueDeleteMsg queueDeleteMsgs = 16; } message UsageStatsKVProto{ diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index bfc50e076b..acb26842d5 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -171,27 +171,36 @@ public class HashPartitionService implements PartitionService { } @Override - public void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { - TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId); - partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic()); - partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions()); - myPartitions.remove(queueKey); - if (!tenantId.isSysTenantId()) { - tenantRoutingInfoMap.remove(tenantId); + public void updateQueues(List queueUpdateMsgs) { + for (TransportProtos.QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) { + TenantId tenantId = TenantId.fromUUID(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); + QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId); + partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic()); + partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions()); + myPartitions.remove(queueKey); + if (!tenantId.isSysTenantId()) { + tenantRoutingInfoMap.remove(tenantId); + } } } @Override - public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) { - TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); - QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); - myPartitions.remove(queueKey); - partitionTopicsMap.remove(queueKey); - partitionSizesMap.remove(queueKey); - evictTenantInfo(tenantId); + public void removeQueues(List queueDeleteMsgs) { + List queueKeys = queueDeleteMsgs.stream() + .map(queueDeleteMsg -> { + TenantId tenantId = TenantId.fromUUID(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); + return new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); + }) + .collect(Collectors.toList()); + queueKeys.forEach(queueKey -> { + myPartitions.remove(queueKey); + partitionTopicsMap.remove(queueKey); + partitionSizesMap.remove(queueKey); + evictTenantInfo(queueKey.getTenantId()); + }); if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) { - publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet())); + publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, queueKeys.stream() + .collect(Collectors.toMap(k -> k, k -> Collections.emptySet()))); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index e9049a8085..27417e5f20 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -63,9 +63,9 @@ public interface PartitionService { int countTransportsByType(String type); - void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg); + void updateQueues(List queueUpdateMsgs); - void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg); + void removeQueues(List queueDeleteMsgs); void removeTenant(TenantId tenantId); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 6f962ca269..2d6ab0fb8b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -50,9 +50,9 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.limit.LimitedApi; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -96,10 +96,9 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.QueueKey; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbTransportQueueFactory; import org.thingsboard.server.queue.scheduler.SchedulerComponent; @@ -1056,10 +1055,10 @@ public class DefaultTransportService implements TransportService { log.warn("ResourceDelete - [{}] [{}]", id, mdRez); transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(msg)); }); - } else if (toSessionMsg.hasQueueUpdateMsg()) { - partitionService.updateQueue(toSessionMsg.getQueueUpdateMsg()); - } else if (toSessionMsg.hasQueueDeleteMsg()) { - partitionService.removeQueue(toSessionMsg.getQueueDeleteMsg()); + } else if (toSessionMsg.getQueueUpdateMsgsCount() > 0) { + partitionService.updateQueues(toSessionMsg.getQueueUpdateMsgsList()); + } else if (toSessionMsg.getQueueDeleteMsgsCount() > 0) { + partitionService.removeQueues(toSessionMsg.getQueueDeleteMsgsList()); } else { //TODO: should we notify the device actor about missed session? log.debug("[{}] Missing session.", sessionId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java b/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java index c1b0c66875..1bf9e25fc0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/queue/BaseQueueService.java @@ -57,9 +57,6 @@ public class BaseQueueService extends AbstractEntityService implements QueueServ @Autowired private DataValidator queueValidator; -// @Autowired -// private QueueStatsService queueStatsService; - @Override public Queue saveQueue(Queue queue) { log.trace("Executing createOrUpdateQueue [{}]", queue); From 27f448a5430220ca8c6acb41449cc7cdcd4496ec Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 25 Jan 2024 12:04:18 +0200 Subject: [PATCH 3/8] Improve partition service logs --- .../queue/discovery/HashPartitionService.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index acb26842d5..2d0e03ecf9 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -325,13 +325,11 @@ public class HashPartitionService implements PartitionService { .forEach(removed::add); } removed.forEach(queueKey -> { - log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", queueKey); changedPartitionsMap.put(queueKey, Collections.emptySet()); }); myPartitions.forEach((queueKey, partitions) -> { if (!partitions.equals(oldPartitions.get(queueKey))) { - log.info("[{}] NEW PARTITIONS: {}", queueKey, partitions); Set tpiList = partitions.stream() .map(partition -> buildTopicPartitionInfo(queueKey, partition)) .collect(Collectors.toSet()); @@ -377,14 +375,11 @@ public class HashPartitionService implements PartitionService { } private void publishPartitionChangeEvent(ServiceType serviceType, Map> partitionsMap) { - if (log.isDebugEnabled()) { - log.debug("Publishing partition change event for service type " + serviceType + ":" + System.lineSeparator() + - partitionsMap.entrySet().stream() - .map(entry -> entry.getKey() + " - " + entry.getValue().stream() - .map(TopicPartitionInfo::getFullTopicName).sorted() - .collect(Collectors.toList())) - .collect(Collectors.joining(System.lineSeparator()))); - } + log.info("Partitions changed: {}", System.lineSeparator() + partitionsMap.entrySet().stream() + .map(entry -> "[" + entry.getKey() + "] - [" + entry.getValue().stream() + .map(tpi -> tpi.getPartition().orElse(-1).toString()).sorted() + .collect(Collectors.joining(", ")) + "]") + .collect(Collectors.joining(System.lineSeparator()))); applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap)); } @@ -489,7 +484,7 @@ public class HashPartitionService implements PartitionService { } private void logServiceInfo(TransportProtos.ServiceInfo server) { - log.info("[{}] Found common server: [{}]", server.getServiceId(), server.getServiceTypesList()); + log.info("[{}] Found common server: {}", server.getServiceId(), server.getServiceTypesList()); } private void addNode(Map> queueServiceList, ServiceInfo instance) { From 2f7351200504126c159cb7ef920eb447c8ce7f36 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Thu, 25 Jan 2024 15:47:26 +0200 Subject: [PATCH 4/8] Don't remove from myPartitions on queue update --- .../thingsboard/server/queue/discovery/HashPartitionService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 2d0e03ecf9..9b4cf0e974 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -177,7 +177,6 @@ public class HashPartitionService implements PartitionService { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId); partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic()); partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions()); - myPartitions.remove(queueKey); if (!tenantId.isSysTenantId()) { tenantRoutingInfoMap.remove(tenantId); } From bf9c4af2aa738655dcbfc5dabde934184b38cbc2 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Thu, 1 Feb 2024 20:23:12 +0100 Subject: [PATCH 5/8] fixed infinit tell failure --- .../AbstractTbRuleEngineSubmitStrategy.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java index 3801e43b7f..6f9a4c49c6 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.queue.processing; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -51,7 +52,18 @@ public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngine List> newOrderedMsgList = new ArrayList<>(reprocessMap.size()); for (IdMsgPair pair : orderedMsgList) { if (reprocessMap.containsKey(pair.uuid)) { - newOrderedMsgList.add(pair); + var oldValue = pair.getMsg().getValue(); + if (StringUtils.isNotEmpty(oldValue.getFailureMessage())) { + var newValue = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(oldValue.getTenantIdMSB()) + .setTenantIdLSB(oldValue.getTenantIdLSB()) + .setTbMsg(oldValue.getTbMsg()) + .build(); + var newMsg = new TbProtoQueueMsg<>(pair.getMsg().getKey(), newValue, pair.getMsg().getHeaders()); + newOrderedMsgList.add(new IdMsgPair<>(pair.getUuid(), newMsg)); + } else { + newOrderedMsgList.add(pair); + } } } orderedMsgList = newOrderedMsgList; From 100cdd5cf18f12859940db9284c38f17829a0473 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 2 Feb 2024 14:00:30 +0100 Subject: [PATCH 6/8] refactored due to comments --- .../AbstractTbRuleEngineSubmitStrategy.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java index 6f9a4c49c6..bfec9f747f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractTbRuleEngineSubmitStrategy.java @@ -52,14 +52,12 @@ public abstract class AbstractTbRuleEngineSubmitStrategy implements TbRuleEngine List> newOrderedMsgList = new ArrayList<>(reprocessMap.size()); for (IdMsgPair pair : orderedMsgList) { if (reprocessMap.containsKey(pair.uuid)) { - var oldValue = pair.getMsg().getValue(); - if (StringUtils.isNotEmpty(oldValue.getFailureMessage())) { - var newValue = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(oldValue.getTenantIdMSB()) - .setTenantIdLSB(oldValue.getTenantIdLSB()) - .setTbMsg(oldValue.getTbMsg()) + if (StringUtils.isNotEmpty(pair.getMsg().getValue().getFailureMessage())) { + var toRuleEngineMsg = TransportProtos.ToRuleEngineMsg.newBuilder(pair.getMsg().getValue()) + .clearFailureMessage() + .clearRelationTypes() .build(); - var newMsg = new TbProtoQueueMsg<>(pair.getMsg().getKey(), newValue, pair.getMsg().getHeaders()); + var newMsg = new TbProtoQueueMsg<>(pair.getMsg().getKey(), toRuleEngineMsg, pair.getMsg().getHeaders()); newOrderedMsgList.add(new IdMsgPair<>(pair.getUuid(), newMsg)); } else { newOrderedMsgList.add(pair); From e3883413a1ce52ece9b89cb7b20f816b78d12a79 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Fri, 26 Jan 2024 15:48:01 +0200 Subject: [PATCH 7/8] Make saveRuleChainMetaData @Transactional --- .../org/thingsboard/server/dao/rule/BaseRuleChainService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java index 6257bb0420..aaa274db4a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java @@ -149,6 +149,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC } @Override + @Transactional public RuleChainUpdateResult saveRuleChainMetaData(TenantId tenantId, RuleChainMetaData ruleChainMetaData, Function ruleNodeUpdater) { Validator.validateId(ruleChainMetaData.getRuleChainId(), "Incorrect rule chain id."); RuleChain ruleChain = findRuleChainById(tenantId, ruleChainMetaData.getRuleChainId()); From e502c5ebbad9e6a331ca4ad40a1d950b93cd4200 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 6 Feb 2024 15:07:38 +0200 Subject: [PATCH 8/8] Revert "Init all actors for isolated rule engines" This reverts commit bf0cc22223b94fb0b73c55eba03a0237ca16eae4. --- .../server/actors/app/AppActor.java | 3 ++- .../server/actors/tenant/TenantActor.java | 20 ++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index fcd6d7a1f9..98e0bf7c25 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -202,7 +202,8 @@ public class AppActor extends ContextAwareActor { return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId), () -> DefaultActorService.TENANT_DISPATCHER_NAME, () -> new TenantActor.ActorCreator(systemContext, tenantId), - () -> true)); + () -> systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE) || + systemContext.getPartitionService().isManagedByCurrentService(tenantId))); } private void onToEdgeSessionMsg(EdgeSessionMsg msg) { diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 04c0f1c9e9..01b01a7d4a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -90,16 +90,18 @@ public class TenantActor extends RuleChainManagerActor { isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); if (isRuleEngine) { - try { - if (getApiUsageState().isReExecEnabled()) { - log.debug("[{}] Going to init rule chains", tenantId); - initRuleChains(); - } else { - log.info("[{}] Skip init of the rule chains due to API limits", tenantId); + if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) { + try { + if (getApiUsageState().isReExecEnabled()) { + log.debug("[{}] Going to init rule chains", tenantId); + initRuleChains(); + } else { + log.info("[{}] Skip init of the rule chains due to API limits", tenantId); + } + } catch (Exception e) { + log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); + cantFindTenant = true; } - } catch (Exception e) { - log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e); - cantFindTenant = true; } } log.debug("[{}] Tenant actor started.", tenantId);