Browse Source

Merge with master

pull/9097/head
Andrii Shvaika 3 years ago
parent
commit
ca6095402f
  1. 1
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 45
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  3. 3
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java
  4. 3
      application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java
  5. 26
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  6. 1
      application/src/main/java/org/thingsboard/server/controller/QueueController.java
  7. 43
      application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java
  8. 8
      application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java
  9. 128
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
  10. 8
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java
  11. 13
      application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java
  12. 7
      application/src/main/resources/thingsboard.yml
  13. 21
      application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java
  14. 248
      application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java
  15. 17
      application/src/test/java/org/thingsboard/server/controller/TenantProfileControllerTest.java
  16. 149
      application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java
  17. 16
      application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java
  18. 2
      application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java
  19. 2
      common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java
  20. 10
      common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java
  21. 6
      common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java
  22. 6
      common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java
  23. 1
      common/cluster-api/src/main/proto/queue.proto
  24. 15
      common/data/src/main/java/org/thingsboard/server/common/data/queue/Queue.java
  25. 2
      common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java
  26. 7
      common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java
  27. 29
      common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java
  28. 14
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java
  29. 133
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java
  30. 3
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java
  31. 5
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java
  32. 2
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java
  33. 1
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java
  34. 6
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java
  35. 7
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java
  36. 5
      common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java
  37. 16
      common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java
  38. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java
  39. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java
  40. 2
      common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java
  41. 2
      common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java
  42. 9
      common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java
  43. 8
      common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java
  44. 4
      common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java
  45. 8
      common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java
  46. 14
      common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java
  47. 1
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java
  48. 4
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java
  49. 2
      dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java
  50. 11
      dao/src/test/java/org/thingsboard/server/dao/service/TenantProfileServiceTest.java
  51. 2
      msa/vc-executor/src/main/java/org/thingsboard/server/vc/service/VersionControlTenantRoutingInfoService.java
  52. 1
      msa/vc-executor/src/main/resources/tb-vc-executor.yml
  53. 110
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java
  54. 155
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java
  55. 1
      transport/coap/src/main/resources/tb-coap-transport.yml
  56. 1
      transport/http/src/main/resources/tb-http-transport.yml
  57. 1
      transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml
  58. 1
      transport/mqtt/src/main/resources/tb-mqtt-transport.yml
  59. 1
      transport/snmp/src/main/resources/tb-snmp-transport.yml
  60. 3
      ui-ngx/src/app/modules/home/components/profile/queue/tenant-profile-queues.component.ts
  61. 60
      ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts
  62. 5
      ui-ngx/src/app/modules/home/components/queue/queue-form.component.html
  63. 3
      ui-ngx/src/app/modules/home/components/queue/queue-form.component.ts
  64. 1
      ui-ngx/src/app/shared/models/queue.models.ts
  65. 6
      ui-ngx/src/assets/locale/locale.constant-en_US.json

1
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -246,6 +246,7 @@ public class ActorSystemContext {
private RuleNodeStateService ruleNodeStateService;
@Autowired
@Getter
private PartitionService partitionService;
@Autowired

45
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -46,6 +46,7 @@ import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
@Slf4j
@ -129,8 +130,11 @@ public class AppActor extends ContextAwareActor {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
for (Tenant tenant : tenantIterator) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("[{}] Tenant actor created.", tenant.getId());
getOrCreateTenantActor(tenant.getId()).ifPresentOrElse(tenantActor -> {
log.debug("[{}] Tenant actor created.", tenant.getId());
}, () -> {
log.debug("[{}] Skipped actor creation", tenant.getId());
});
}
}
log.info("Main system actor started.");
@ -143,11 +147,9 @@ public class AppActor extends ContextAwareActor {
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
if (!deletedTenants.contains(msg.getTenantId())) {
getOrCreateTenantActor(msg.getTenantId()).tell(msg);
} else {
msg.getMsg().getCallback().onSuccess();
}
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> {
actor.tell(msg);
}, () -> msg.getMsg().getCallback().onSuccess());
}
}
@ -164,12 +166,13 @@ public class AppActor extends ContextAwareActor {
log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
deletedTenants.add(tenantId);
ctx.stop(new TbEntityActorId(tenantId));
} else {
target = getOrCreateTenantActor(msg.getTenantId());
return;
}
} else {
target = getOrCreateTenantActor(msg.getTenantId());
}
target = getOrCreateTenantActor(msg.getTenantId()).orElseGet(() -> {
log.debug("Ignoring component lifecycle msg for tenant {} because it is not managed by this service", msg.getTenantId());
return null;
});
}
if (target != null) {
target.tellWithHighPriority(msg);
@ -179,24 +182,28 @@ public class AppActor extends ContextAwareActor {
}
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
if (!deletedTenants.contains(msg.getTenantId())) {
TbActorRef tenantActor = getOrCreateTenantActor(msg.getTenantId());
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> {
if (priority) {
tenantActor.tellWithHighPriority(msg);
} else {
tenantActor.tell(msg);
}
} else {
}, () -> {
if (msg instanceof TransportToDeviceActorMsgWrapper) {
((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess();
}
}
});
}
private TbActorRef getOrCreateTenantActor(TenantId tenantId) {
return ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
private Optional<TbActorRef> getOrCreateTenantActor(TenantId tenantId) {
if (deletedTenants.contains(tenantId)) {
return Optional.empty();
}
return Optional.ofNullable(ctx.getOrCreateChildActor(new TbEntityActorId(tenantId),
() -> DefaultActorService.TENANT_DISPATCHER_NAME,
() -> new TenantActor.ActorCreator(systemContext, tenantId));
() -> new TenantActor.ActorCreator(systemContext, tenantId),
() -> systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE) ||
systemContext.getPartitionService().isManagedByCurrentService(tenantId)));
}
private void onToEdgeSessionMsg(EdgeSessionMsg msg) {
@ -204,7 +211,7 @@ public class AppActor extends ContextAwareActor {
if (ModelConstants.SYSTEM_TENANT.equals(msg.getTenantId())) {
log.warn("Message has system tenant id: {}", msg);
} else {
target = getOrCreateTenantActor(msg.getTenantId());
target = getOrCreateTenantActor(msg.getTenantId()).orElse(null);
}
if (target != null) {
target.tellWithHighPriority(msg);

3
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java

@ -167,7 +167,8 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private TbActorRef createRuleNodeActor(TbActorCtx ctx, RuleNode ruleNode) {
return ctx.getOrCreateChildActor(new TbEntityActorId(ruleNode.getId()),
() -> DefaultActorService.RULE_DISPATCHER_NAME,
() -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleChainName, ruleNode.getId()));
() -> new RuleNodeActor.ActorCreator(systemContext, tenantId, entityId, ruleChainName, ruleNode.getId()),
() -> true);
}
private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {

3
application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainManagerActor.java

@ -94,7 +94,8 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
} else {
return new RuleChainActor.ActorCreator(systemContext, tenantId, ruleChain);
}
});
},
() -> true);
}
protected TbActorRef getEntityActorRef(EntityId entityId) {

26
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -32,7 +32,6 @@ import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EdgeId;
@ -82,21 +81,21 @@ public class TenantActor extends RuleChainManagerActor {
cantFindTenant = true;
log.info("[{}] Started tenant actor for missing tenant.", tenantId);
} else {
TenantProfile tenantProfile = systemContext.getTenantProfileCache().get(tenant.getTenantProfileId());
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
if (isRuleEngine) {
try {
if (getApiUsageState().isReExecEnabled()) {
log.debug("[{}] Going to init rule chains", tenantId);
initRuleChains();
} else {
log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
if (systemContext.getPartitionService().isManagedByCurrentService(tenantId)) {
try {
if (getApiUsageState().isReExecEnabled()) {
log.debug("[{}] Going to init rule chains", tenantId);
initRuleChains();
} else {
log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
}
} catch (Exception e) {
log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
cantFindTenant = true;
}
} catch (Exception e) {
log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
cantFindTenant = true;
}
}
log.debug("[{}] Tenant actor started.", tenantId);
@ -270,7 +269,8 @@ public class TenantActor extends RuleChainManagerActor {
private TbActorRef getOrCreateDeviceActor(DeviceId deviceId) {
return ctx.getOrCreateChildActor(new TbEntityActorId(deviceId),
() -> DefaultActorService.DEVICE_DISPATCHER_NAME,
() -> new DeviceActorCreator(systemContext, tenantId, deviceId));
() -> new DeviceActorCreator(systemContext, tenantId, deviceId),
() -> true);
}
private void onToEdgeSessionMsg(EdgeSessionMsg msg) {

1
application/src/main/java/org/thingsboard/server/controller/QueueController.java

@ -126,7 +126,6 @@ public class QueueController extends BaseController {
@PreAuthorize("hasAnyAuthority('SYS_ADMIN')")
@RequestMapping(value = "/queues", params = {"serviceType"}, method = RequestMethod.POST)
@ResponseBody
public Queue saveQueue(@ApiParam(value = "A JSON value representing the queue.")
@RequestBody Queue queue,
@ApiParam(value = QUEUE_SERVICE_TYPE_DESCRIPTION, allowableValues = QUEUE_SERVICE_TYPE_ALLOWABLE_VALUES, required = true)

43
application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java

@ -15,7 +15,7 @@
*/
package org.thingsboard.server.service.entitiy.queue;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cluster.TbClusterService;
@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.queue.QueueService;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
@ -35,20 +34,17 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@Service
@TbCoreComponent
@AllArgsConstructor
@RequiredArgsConstructor
public class DefaultTbQueueService extends AbstractTbEntityService implements TbQueueService {
private static final long DELETE_DELAY = 30;
private final QueueService queueService;
private final TbClusterService tbClusterService;
private final TbQueueAdmin tbQueueAdmin;
private final SchedulerComponent scheduler;
@Override
public Queue saveQueue(Queue queue) {
@ -90,7 +86,9 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
private void onQueueCreated(Queue queue) {
for (int i = 0; i < queue.getPartitions(); i++) {
tbQueueAdmin.createTopicIfNotExists(
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
queue.getCustomProperties()
);
}
tbClusterService.onQueueChange(queue);
@ -105,21 +103,15 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName());
for (int i = oldPartitions; i < currentPartitions; i++) {
tbQueueAdmin.createTopicIfNotExists(
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName());
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
queue.getCustomProperties()
);
}
tbClusterService.onQueueChange(queue);
} else {
log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName());
tbClusterService.onQueueChange(queue);
scheduler.schedule(() -> {
for (int i = currentPartitions; i < oldPartitions; i++) {
String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName();
log.info("Removed partition [{}]", fullTopicName);
tbQueueAdmin.deleteTopic(
fullTopicName);
}
}, DELETE_DELAY, TimeUnit.SECONDS);
// TODO: move all the messages left in old partitions and delete topics
}
} else if (!oldQueue.equals(queue)) {
tbClusterService.onQueueChange(queue);
@ -128,20 +120,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
private void onQueueDeleted(Queue queue) {
tbClusterService.onQueueDelete(queue);
// queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId);
scheduler.schedule(() -> {
for (int i = 0; i < queue.getPartitions(); i++) {
String fullTopicName = new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName();
log.info("Deleting queue [{}]", fullTopicName);
try {
tbQueueAdmin.deleteTopic(fullTopicName);
} catch (Exception e) {
log.error("Failed to delete queue [{}]", fullTopicName);
}
}
}, DELETE_DELAY, TimeUnit.SECONDS);
}
@Override
@ -193,6 +172,10 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] Handling profile queue config update: creating queues {}, updating {}, deleting {}. Affected tenants: {}",
newTenantProfile.getUuidId(), toCreate, toUpdate, toRemove, tenantIds);
}
tenantIds.forEach(tenantId -> {
toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key))));

8
application/src/main/java/org/thingsboard/server/service/entitiy/tenant/profile/DefaultTbTenantProfileService.java

@ -44,16 +44,14 @@ public class DefaultTbTenantProfileService extends AbstractTbEntityService imple
@Override
public TenantProfile save(TenantId tenantId, TenantProfile tenantProfile, TenantProfile oldTenantProfile) throws ThingsboardException {
TenantProfile savedTenantProfile = checkNotNull(tenantProfileService.saveTenantProfile(tenantId, tenantProfile));
if (oldTenantProfile != null && savedTenantProfile.isIsolatedTbRuleEngine()) {
List<TenantId> tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId());
tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile);
}
tenantProfileCache.put(savedTenantProfile);
tbClusterService.onTenantProfileChange(savedTenantProfile, null);
tbClusterService.broadcastEntityStateChangeEvent(TenantId.SYS_TENANT_ID, savedTenantProfile.getId(),
tenantProfile.getId() == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
List<TenantId> tenantIds = tenantService.findTenantIdsByTenantProfileId(savedTenantProfile.getId());
tbQueueService.updateQueuesByTenants(tenantIds, savedTenantProfile, oldTenantProfile);
return savedTenantProfile;
}

128
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java

@ -23,11 +23,14 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.gen.MsgProtos;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
@ -42,12 +45,14 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
@ -98,6 +103,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private boolean statsEnabled;
@Value("${queue.rule-engine.prometheus-stats.enabled:false}")
boolean prometheusStatsEnabled;
@Value("${queue.rule-engine.topic-deletion-delay:30}")
private int topicDeletionDelayInSec;
private final StatsFactory statsFactory;
private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory;
@ -107,7 +114,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private final TbRuleEngineDeviceRpcService tbDeviceRpcService;
private final TbServiceInfoProvider serviceInfoProvider;
private final QueueService queueService;
// private final TenantId tenantId;
private final TbQueueProducerProvider producerProvider;
private final TbQueueAdmin queueAdmin;
private final ConcurrentMap<QueueKey, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, Queue> consumerConfigurations = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
@ -128,7 +136,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
TbTenantProfileCache tenantProfileCache,
TbApiUsageStateService apiUsageStateService,
PartitionService partitionService, ApplicationEventPublisher eventPublisher,
TbServiceInfoProvider serviceInfoProvider, QueueService queueService) {
TbServiceInfoProvider serviceInfoProvider, QueueService queueService,
TbQueueProducerProvider producerProvider, TbQueueAdmin queueAdmin) {
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer(), Optional.empty());
this.statisticsService = statisticsService;
this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
@ -138,6 +147,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
this.statsFactory = statsFactory;
this.serviceInfoProvider = serviceInfoProvider;
this.queueService = queueService;
this.producerProvider = producerProvider;
this.queueAdmin = queueAdmin;
}
@PostConstruct
@ -145,14 +156,16 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
super.init("tb-rule-engine-consumer", "tb-rule-engine-notifications-consumer");
List<Queue> queues = queueService.findAllQueues();
for (Queue configuration : queues) {
initConsumer(configuration);
if (partitionService.isManagedByCurrentService(configuration.getTenantId())) {
initConsumer(configuration);
}
}
}
private void initConsumer(Queue configuration) {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, configuration);
consumerConfigurations.putIfAbsent(queueKey, configuration);
consumerStats.putIfAbsent(queueKey, new TbRuleEngineConsumerStats(configuration.getName(), statsFactory));
consumerStats.putIfAbsent(queueKey, new TbRuleEngineConsumerStats(configuration, statsFactory));
if (!configuration.isConsumerPerPartition()) {
consumers.computeIfAbsent(queueKey, queueName -> tbRuleEngineQueueFactory.createToRuleEngineMsgConsumer(configuration));
} else {
@ -172,7 +185,12 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
if (event.getServiceType().equals(getServiceType())) {
String serviceQueue = event.getQueueKey().getQueueName();
log.info("[{}] Subscribing to partitions: {}", serviceQueue, event.getPartitions());
if (!consumerConfigurations.get(event.getQueueKey()).isConsumerPerPartition()) {
Queue configuration = consumerConfigurations.get(event.getQueueKey());
if (configuration == null) {
log.warn("Received invalid partition change event for {} that is not managed by this service", event.getQueueKey());
return;
}
if (!configuration.isConsumerPerPartition()) {
consumers.get(event.getQueueKey()).subscribe(event.getPartitions());
} else {
log.info("[{}] Subscribing consumer per partition: {}", serviceQueue, event.getPartitions());
@ -230,7 +248,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
launchConsumer(consumer, consumerConfigurations.get(queueKey), consumerStats.get(queueKey), "" + queueKey + "-" + tpi.getPartition().orElse(-999999));
consumer.subscribe(Collections.singleton(tpi));
});
} finally {
tbTopicWithConsumerPerPartition.getLock().unlock();
}
@ -278,9 +295,9 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
updateCurrentThreadName(threadSuffix);
while (!stopped && !consumer.isStopped()) {
while (!stopped && !consumer.isStopped() && !consumer.isQueueDeleted()) {
try {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(pollDuration);
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(configuration.getPollInterval());
if (msgs.isEmpty()) {
continue;
}
@ -328,6 +345,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
}
}
}
if (consumer.isQueueDeleted()) {
processQueueDeletion(configuration, consumer);
}
log.info("TB Rule Engine Consumer stopped.");
}
@ -425,32 +446,34 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
log.info("Received queue update msg: [{}]", queueUpdateMsg);
String queueName = queueUpdateMsg.getQueueName();
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId);
Queue queue = queueService.findQueueById(tenantId, queueId);
Queue oldQueue = consumerConfigurations.remove(queueKey);
if (oldQueue != null) {
if (oldQueue.isConsumerPerPartition()) {
TbTopicWithConsumerPerPartition consumerPerPartition = topicsConsumerPerPartition.remove(queueKey);
ReentrantLock lock = consumerPerPartition.getLock();
try {
lock.lock();
consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe);
} finally {
lock.unlock();
if (partitionService.isManagedByCurrentService(tenantId)) {
QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
String queueName = queueUpdateMsg.getQueueName();
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId);
Queue queue = queueService.findQueueById(tenantId, queueId);
Queue oldQueue = consumerConfigurations.remove(queueKey);
if (oldQueue != null) {
if (oldQueue.isConsumerPerPartition()) {
TbTopicWithConsumerPerPartition consumerPerPartition = topicsConsumerPerPartition.remove(queueKey);
ReentrantLock lock = consumerPerPartition.getLock();
try {
lock.lock();
consumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe);
} finally {
lock.unlock();
}
} else {
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = consumers.remove(queueKey);
consumer.unsubscribe();
}
} else {
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = consumers.remove(queueKey);
consumer.unsubscribe();
}
}
initConsumer(queue);
initConsumer(queue);
if (!queue.isConsumerPerPartition()) {
launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName);
if (!queue.isConsumerPerPartition()) {
launchConsumer(consumers.get(queueKey), consumerConfigurations.get(queueKey), consumerStats.get(queueKey), queueName);
}
}
partitionService.updateQueue(queueUpdateMsg);
@ -462,22 +485,22 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
partitionService.removeQueue(queueDeleteMsg);
Queue queue = consumerConfigurations.remove(queueKey);
if (queue != null) {
if (queue.isConsumerPerPartition()) {
TbTopicWithConsumerPerPartition tbTopicWithConsumerPerPartition = topicsConsumerPerPartition.remove(queueKey);
if (tbTopicWithConsumerPerPartition != null) {
tbTopicWithConsumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::unsubscribe);
tbTopicWithConsumerPerPartition.getConsumers().values().forEach(TbQueueConsumer::onQueueDelete);
tbTopicWithConsumerPerPartition.getConsumers().clear();
}
} else {
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer = consumers.remove(queueKey);
if (consumer != null) {
consumer.unsubscribe();
consumer.onQueueDelete();
}
}
}
partitionService.removeQueue(queueDeleteMsg);
}
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
@ -496,6 +519,47 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
actorContext.tell(msg);
}
private void processQueueDeletion(Queue queue, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer) {
long finishTs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(topicDeletionDelayInSec);
try {
int n = 0;
while (System.currentTimeMillis() <= finishTs) {
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(queue.getPollInterval());
if (msgs.isEmpty()) {
continue;
}
for (TbProtoQueueMsg<ToRuleEngineMsg> msg : msgs) {
try {
MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray());
EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB()));
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue.getName(), TenantId.SYS_TENANT_ID, originator);
producerProvider.getRuleEngineMsgProducer().send(tpi, msg, null);
n++;
} catch (Throwable e) {
log.debug("Failed to move message to system {}: {}", consumer.getTopic(), msg, e);
}
}
consumer.commit();
}
if (n > 0) {
log.info("Moved {} messages from {} to system {}", n, consumer.getFullTopicNames(), consumer.getTopic());
}
consumer.unsubscribe();
for (String topic : consumer.getFullTopicNames()) {
try {
queueAdmin.deleteTopic(topic);
log.info("Deleted topic {}", topic);
} catch (Exception e) {
log.error("Failed to delete topic {} after unsubscribing", topic, e);
}
}
} catch (Exception e) {
log.error("Failed to process deletion of {} ({})", consumer.getTopic(), queue.getTenantId(), e);
}
}
@Scheduled(fixedDelayString = "${queue.rule-engine.stats.print-interval-ms}")
public void printStats() {
if (statsEnabled) {

8
application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java

@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
@ -31,12 +30,9 @@ import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine'")
public class DefaultTenantRoutingInfoService implements TenantRoutingInfoService {
private final TenantService tenantService;
private final TbTenantProfileCache tenantProfileCache;
public DefaultTenantRoutingInfoService(TenantService tenantService, TbTenantProfileCache tenantProfileCache) {
this.tenantService = tenantService;
public DefaultTenantRoutingInfoService(TbTenantProfileCache tenantProfileCache) {
this.tenantProfileCache = tenantProfileCache;
}
@ -44,7 +40,7 @@ public class DefaultTenantRoutingInfoService implements TenantRoutingInfoService
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
if (tenantProfile != null) {
return new TenantRoutingInfo(tenantId, tenantProfile.isIsolatedTbRuleEngine());
return new TenantRoutingInfo(tenantId, tenantProfile.getId(), tenantProfile.isIsolatedTbRuleEngine());
} else {
throw new TenantNotFoundException(tenantId);
}

13
application/src/main/java/org/thingsboard/server/service/queue/TbRuleEngineConsumerStats.java

@ -18,6 +18,7 @@ package org.thingsboard.server.service.queue;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.stats.StatsCounter;
import org.thingsboard.server.common.stats.StatsFactory;
@ -63,9 +64,11 @@ public class TbRuleEngineConsumerStats {
private final ConcurrentMap<TenantId, RuleEngineException> tenantExceptions = new ConcurrentHashMap<>();
private final String queueName;
private final TenantId tenantId;
public TbRuleEngineConsumerStats(String queueName, StatsFactory statsFactory) {
this.queueName = queueName;
public TbRuleEngineConsumerStats(Queue queue, StatsFactory statsFactory) {
this.queueName = queue.getName();
this.tenantId = queue.getTenantId();
this.statsFactory = statsFactory;
String statsKey = StatsType.RULE_ENGINE.getName() + "." + queueName;
@ -156,7 +159,11 @@ public class TbRuleEngineConsumerStats {
counters.forEach(counter -> {
stats.append(counter.getName()).append(" = [").append(counter.get()).append("] ");
});
log.info("[{}] Stats: {}", queueName, stats);
if (tenantId.isSysTenantId()) {
log.info("[{}] Stats: {}", queueName, stats);
} else {
log.info("[{}][{}] Stats: {}", queueName, tenantId, stats);
}
}
}

7
application/src/main/resources/thingsboard.yml

@ -1091,6 +1091,7 @@ queue:
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
confluent:
ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"
@ -1287,6 +1288,8 @@ queue:
failure-percentage: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_RETRY_PAUSE:5}" # Time in seconds to wait in consumer thread before retries;
max-pause-between-retries: "${TB_QUEUE_RE_SQ_PROCESSING_STRATEGY_MAX_RETRY_PAUSE:5}" # Max allowed time in seconds for pause between retries.
# After a queue is deleted (or profile's isolation option was disabled), Rule Engine will continue reading related topics during this period, before deleting the actual topics
topic-deletion-delay: "${TB_QUEUE_RULE_ENGINE_TOPIC_DELETION_DELAY_SEC:30}"
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb_transport.notifications}"
@ -1300,6 +1303,10 @@ service:
type: "${TB_SERVICE_TYPE:monolith}" # monolith or tb-core or tb-rule-engine
# Unique id for this service (autogenerated if empty)
id: "${TB_SERVICE_ID:}"
rule_engine:
# Comma-separated list of tenant profiles ids assigned to this Rule Engine.
# This Rule Engine will only be responsible for tenants with these profiles (in case 'isolation' option is enabled in profile).
assigned_tenant_profiles: "${TB_RULE_ENGINE_ASSIGNED_TENANT_PROFILES:}"
metrics:
# Enable/disable actuator metrics.

21
application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

@ -1029,13 +1029,20 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return (DeviceActorMessageProcessor) ReflectionTestUtils.getField(actor, "processor");
}
protected void updateDefaultTenantProfile(Consumer<DefaultTenantProfileConfiguration> updater) throws ThingsboardException {
TenantProfile tenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID);
TenantProfileData profileData = tenantProfile.getProfileData();
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
updater.accept(profileConfiguration);
tenantProfile.setProfileData(profileData);
tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, null);
protected void updateDefaultTenantProfileConfig(Consumer<DefaultTenantProfileConfiguration> updater) throws ThingsboardException {
updateDefaultTenantProfile(tenantProfile -> {
TenantProfileData profileData = tenantProfile.getProfileData();
DefaultTenantProfileConfiguration profileConfiguration = (DefaultTenantProfileConfiguration) profileData.getConfiguration();
updater.accept(profileConfiguration);
tenantProfile.setProfileData(profileData);
});
}
protected void updateDefaultTenantProfile(Consumer<TenantProfile> updater) throws ThingsboardException {
TenantProfile oldTenantProfile = tenantProfileService.findDefaultTenantProfile(TenantId.SYS_TENANT_ID);
TenantProfile tenantProfile = JacksonUtil.clone(oldTenantProfile);
updater.accept(tenantProfile);
tbTenantProfileService.save(TenantId.SYS_TENANT_ID, tenantProfile, oldTenantProfile);
}
}

248
application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java

@ -27,15 +27,20 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.ResultActions;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantInfo;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
@ -50,24 +55,47 @@ import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileCon
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.discovery.PartitionService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_TOPIC;
@TestPropertySource(properties = {
"js.evaluator=mock",
"queue.rule-engine.topic-deletion-delay=10"
})
@Slf4j
@DaoSqlTest
@ -80,6 +108,13 @@ public class TenantControllerTest extends AbstractControllerTest {
ListeningExecutorService executor;
@SpyBean
private PartitionService partitionService;
@SpyBean
private ActorSystemContext actorContext;
@SpyBean
private TbQueueAdmin queueAdmin;
@Before
public void setUp() throws Exception {
executor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(8, getClass()));
@ -87,6 +122,12 @@ public class TenantControllerTest extends AbstractControllerTest {
@After
public void tearDown() throws Exception {
loginSysAdmin();
for (Queue queue : doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference<PageData<Queue>>() {}, new PageLink(100)).getData()) {
if (!queue.getName().equals(MAIN_QUEUE_NAME)) {
doDelete("/api/queues/" + queue.getId()).andExpect(status().isOk());
}
}
executor.shutdownNow();
}
@ -430,7 +471,7 @@ public class TenantControllerTest extends AbstractControllerTest {
tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration());
tenantProfile.setProfileData(tenantProfileData);
tenantProfile.setIsolatedTbRuleEngine(true);
addQueueConfig(tenantProfile, DataConstants.MAIN_QUEUE_NAME);
addQueueConfig(tenantProfile, MAIN_QUEUE_NAME);
addQueueConfig(tenantProfile, "Test");
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
@ -459,7 +500,7 @@ public class TenantControllerTest extends AbstractControllerTest {
tenantProfileData2.setConfiguration(new DefaultTenantProfileConfiguration());
tenantProfile2.setProfileData(tenantProfileData2);
tenantProfile2.setIsolatedTbRuleEngine(true);
addQueueConfig(tenantProfile2, DataConstants.MAIN_QUEUE_NAME);
addQueueConfig(tenantProfile2, MAIN_QUEUE_NAME);
addQueueConfig(tenantProfile2, "Test");
addQueueConfig(tenantProfile2, "Test2");
tenantProfile2 = doPost("/api/tenantProfile", tenantProfile2, TenantProfile.class);
@ -520,10 +561,201 @@ public class TenantControllerTest extends AbstractControllerTest {
doDelete("/api/tenant/" + tenant.getId().getId().toString()).andExpect(status().isOk());
}
@Test
public void testUpdateTenantProfileToIsolated() throws Exception {
loginSysAdmin();
doPost("/api/queues?serviceType=TB_RULE_ENGINE", new Queue(TenantId.SYS_TENANT_ID, getQueueConfig(DataConstants.HP_QUEUE_NAME, DataConstants.HP_QUEUE_TOPIC))).andExpect(status().isOk());
TenantProfile tenantProfile = new TenantProfile();
tenantProfile.setName("Test profile");
TenantProfileData tenantProfileData = new TenantProfileData();
tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration());
tenantProfile.setProfileData(tenantProfileData);
tenantProfile.setIsolatedTbRuleEngine(false);
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
createDifferentTenant();
loginSysAdmin();
savedDifferentTenant.setTenantProfileId(tenantProfile.getId());
savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class);
TenantId tenantId = differentTenantId;
loginDifferentTenant();
DeviceProfile hpQueueProfile = createDeviceProfile("HighPriority profile");
hpQueueProfile.setDefaultQueueName(DataConstants.HP_QUEUE_NAME);
hpQueueProfile = doPost("/api/deviceProfile", hpQueueProfile, DeviceProfile.class);
Device hpQueueDevice = createDevice("HP", hpQueueProfile.getName(), "HP");
DeviceProfile mainQueueProfile = createDeviceProfile("Main profile");
mainQueueProfile.setDefaultQueueName(MAIN_QUEUE_NAME);
mainQueueProfile = doPost("/api/deviceProfile", mainQueueProfile, DeviceProfile.class);
Device mainQueueDevice = createDevice("Main", mainQueueProfile.getName(), "Main");
verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, tenantId, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTopic()).isEqualTo(DataConstants.HP_QUEUE_TOPIC);
assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID);
});
verifyUsedQueueAndMessage(MAIN_QUEUE_NAME, tenantId, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTopic()).isEqualTo(MAIN_QUEUE_TOPIC);
assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID);
});
loginSysAdmin();
tenantProfile.setIsolatedTbRuleEngine(true);
tenantProfile.getProfileData().setQueueConfiguration(List.of(
getQueueConfig(MAIN_QUEUE_NAME, MAIN_QUEUE_TOPIC)
));
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
loginDifferentTenant();
verifyUsedQueueAndMessage(MAIN_QUEUE_NAME, tenantId, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTopic()).isEqualTo(MAIN_QUEUE_TOPIC);
assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId);
});
verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, tenantId, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTopic()).isEqualTo(DataConstants.HP_QUEUE_TOPIC);
assertThat(usedTpi.getTenantId()).get().isEqualTo(TenantId.SYS_TENANT_ID);
});
loginSysAdmin();
tenantProfile.setIsolatedTbRuleEngine(true);
tenantProfile.getProfileData().setQueueConfiguration(List.of(
getQueueConfig(MAIN_QUEUE_NAME, MAIN_QUEUE_TOPIC),
getQueueConfig(DataConstants.HP_QUEUE_NAME, DataConstants.HP_QUEUE_TOPIC)
));
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
loginDifferentTenant();
verifyUsedQueueAndMessage(DataConstants.HP_QUEUE_NAME, tenantId, hpQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + hpQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTopic()).isEqualTo(DataConstants.HP_QUEUE_TOPIC);
assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId);
});
verifyUsedQueueAndMessage(MAIN_QUEUE_NAME, tenantId, mainQueueDevice.getId(), DataConstants.ATTRIBUTES_UPDATED, () -> {
doPost("/api/plugins/telemetry/DEVICE/" + mainQueueDevice.getId() + "/attributes/SERVER_SCOPE", "{\"test\":123}", String.class);
}, usedTpi -> {
assertThat(usedTpi.getTopic()).isEqualTo(MAIN_QUEUE_TOPIC);
assertThat(usedTpi.getTenantId()).get().isEqualTo(tenantId);
});
}
@Test
public void testIsolatedQueueDeletion() throws Exception {
loginSysAdmin();
TenantProfile tenantProfile = new TenantProfile();
tenantProfile.setName("Test profile");
TenantProfileData tenantProfileData = new TenantProfileData();
tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration());
tenantProfile.setProfileData(tenantProfileData);
tenantProfile.setIsolatedTbRuleEngine(true);
addQueueConfig(tenantProfile, MAIN_QUEUE_NAME);
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
createDifferentTenant();
loginSysAdmin();
savedDifferentTenant.setTenantProfileId(tenantProfile.getId());
savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class);
TenantId tenantId = differentTenantId;
await().atMost(10, TimeUnit.SECONDS)
.until(() -> {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId);
return !tpi.getTenantId().get().isSysTenantId();
});
TopicPartitionInfo tpi = new TopicPartitionInfo(MAIN_QUEUE_TOPIC, tenantId, 0, false);
String isolatedTopic = tpi.getFullTopicName();
TbMsg expectedMsg = publishTbMsg(tenantId, tpi);
awaitTbMsg(tbMsg -> tbMsg.getId().equals(expectedMsg.getId()), 10000); // to wait for consumer start
loginSysAdmin();
tenantProfile.setIsolatedTbRuleEngine(false);
tenantProfile.getProfileData().setQueueConfiguration(Collections.emptyList());
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
await().atMost(10, TimeUnit.SECONDS)
.until(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId)
.getTenantId().get().isSysTenantId());
Deque<UUID> submittedMsgs = new LinkedList<>();
await().atLeast(8, TimeUnit.SECONDS) // due to topic-deletion-delay
.atMost(20, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.untilAsserted(() -> {
TbMsg tbMsg = publishTbMsg(tenantId, tpi);
submittedMsgs.add(tbMsg.getId());
verify(queueAdmin, times(1)).deleteTopic(eq(isolatedTopic));
});
submittedMsgs.removeLast();
for (UUID msgId : submittedMsgs) {
verify(actorContext, timeout(2000)).tell(argThat(msg -> {
return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(msgId);
}));
}
}
private TbMsg publishTbMsg(TenantId tenantId, TopicPartitionInfo tpi) {
TbMsg tbMsg = TbMsg.newMsg("POST_TELEMETRY_REQUEST", tenantId, TbMsgMetaData.EMPTY, "{\"test\":1}");
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
tbClusterService.pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, null);
return tbMsg;
}
private void verifyUsedQueueAndMessage(String queue, TenantId tenantId, EntityId entityId, String msgType, Runnable action, Consumer<TopicPartitionInfo> tpiAssert) {
await().atMost(15, TimeUnit.SECONDS)
.untilAsserted(() -> {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId);
tpiAssert.accept(tpi);
});
action.run();
TbMsg tbMsg = awaitTbMsg(msg -> msg.getOriginator().equals(entityId)
&& msg.getType().equals(msgType), 10000);
assertThat(tbMsg.getQueueName()).isEqualTo(queue);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId);
tpiAssert.accept(tpi);
}
protected TbMsg awaitTbMsg(Predicate<TbMsg> predicate, int timeoutMillis) {
AtomicReference<TbMsg> tbMsgCaptor = new AtomicReference<>();
verify(actorContext, timeout(timeoutMillis).atLeastOnce()).tell(argThat(actorMsg -> {
if (!(actorMsg instanceof QueueToRuleEngineMsg)) {
return false;
}
TbMsg tbMsg = ((QueueToRuleEngineMsg) actorMsg).getMsg();
if (predicate.test(tbMsg)) {
tbMsgCaptor.set(tbMsg);
return true;
}
return false;
}));
return tbMsgCaptor.get();
}
private void addQueueConfig(TenantProfile tenantProfile, String queueName) {
TenantProfileQueueConfiguration queueConfiguration = getQueueConfig(queueName, "tb_rule_engine." + queueName.toLowerCase());
TenantProfileData profileData = tenantProfile.getProfileData();
List<TenantProfileQueueConfiguration> configs = profileData.getQueueConfiguration();
if (configs == null) {
configs = new ArrayList<>();
}
configs.add(queueConfiguration);
profileData.setQueueConfiguration(configs);
tenantProfile.setProfileData(profileData);
}
private TenantProfileQueueConfiguration getQueueConfig(String queueName, String topic) {
TenantProfileQueueConfiguration queueConfiguration = new TenantProfileQueueConfiguration();
queueConfiguration.setName(queueName);
queueConfiguration.setTopic("tb_rule_engine." + queueName.toLowerCase());
queueConfiguration.setTopic(topic);
queueConfiguration.setPollInterval(25);
queueConfiguration.setPartitions(1 + new Random().nextInt(99));
queueConfiguration.setConsumerPerPartition(true);
@ -539,15 +771,7 @@ public class TenantControllerTest extends AbstractControllerTest {
processingStrategy.setPauseBetweenRetries(3);
processingStrategy.setMaxPauseBetweenRetries(3);
queueConfiguration.setProcessingStrategy(processingStrategy);
TenantProfileData profileData = tenantProfile.getProfileData();
List<TenantProfileQueueConfiguration> configs = profileData.getQueueConfiguration();
if (configs == null) {
configs = new ArrayList<>();
}
configs.add(queueConfiguration);
profileData.setQueueConfiguration(configs);
tenantProfile.setProfileData(profileData);
return queueConfiguration;
}
private List<Queue> getQueuesFromConfig(List<TenantProfileQueueConfiguration> queueConfiguration, List<Queue> queues) {

17
application/src/test/java/org/thingsboard/server/controller/TenantProfileControllerTest.java

@ -167,23 +167,6 @@ public class TenantProfileControllerTest extends AbstractControllerTest {
testBroadcastEntityStateChangeEventNeverTenantProfile();
}
@Test
public void testSaveSameTenantProfileWithDifferentIsolatedTbRuleEngine() throws Exception {
loginSysAdmin();
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
savedTenantProfile.setIsolatedTbRuleEngine(true);
addMainQueueConfig(savedTenantProfile);
Mockito.reset(tbClusterService);
doPost("/api/tenantProfile", savedTenantProfile)
.andExpect(status().isBadRequest())
.andExpect(statusReason(containsString("Can't update isolatedTbRuleEngine property")));
testBroadcastEntityStateChangeEventNeverTenantProfile();
}
@Test
public void testDeleteTenantProfileWithExistingTenant() throws Exception {
loginSysAdmin();

149
application/src/test/java/org/thingsboard/server/queue/discovery/HashPartitionServiceTest.java

@ -18,6 +18,7 @@ package org.thingsboard.server.queue.discovery;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.ListUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -25,24 +26,36 @@ import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.QueueId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
@ -57,7 +70,7 @@ public class HashPartitionServiceTest {
private ApplicationEventPublisher applicationEventPublisher;
private QueueRoutingInfoService queueRoutingInfoService;
private String hashFunctionName = "sha256";
private String hashFunctionName = "murmur3_128";
@Before
public void setup() throws Exception {
@ -74,15 +87,15 @@ public class HashPartitionServiceTest {
ReflectionTestUtils.setField(clusterRoutingService, "vcTopic", "tb.vc");
ReflectionTestUtils.setField(clusterRoutingService, "vcPartitions", 10);
ReflectionTestUtils.setField(clusterRoutingService, "hashFunctionName", hashFunctionName);
TransportProtos.ServiceInfo currentServer = TransportProtos.ServiceInfo.newBuilder()
ServiceInfo currentServer = ServiceInfo.newBuilder()
.setServiceId("tb-core-0")
.addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
.build();
// when(queueService.resolve(Mockito.any(), Mockito.anyString())).thenAnswer(i -> i.getArguments()[1]);
// when(discoveryService.getServiceInfo()).thenReturn(currentServer);
List<TransportProtos.ServiceInfo> otherServers = new ArrayList<>();
List<ServiceInfo> otherServers = new ArrayList<>();
for (int i = 1; i < SERVER_COUNT; i++) {
otherServers.add(TransportProtos.ServiceInfo.newBuilder()
otherServers.add(ServiceInfo.newBuilder()
.setServiceId("tb-rule-" + i)
.addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name()))
.build());
@ -122,10 +135,10 @@ public class HashPartitionServiceTest {
int queueCount = 3;
int partitionCount = 3;
List<TransportProtos.ServiceInfo> services = new ArrayList<>();
List<ServiceInfo> services = new ArrayList<>();
for (int i = 0; i < serverCount; i++) {
services.add(TransportProtos.ServiceInfo.newBuilder().setServiceId("RE-" + i).build());
services.add(ServiceInfo.newBuilder().setServiceId("RE-" + i).build());
}
long start = System.currentTimeMillis();
@ -140,7 +153,7 @@ public class HashPartitionServiceTest {
for (int queueIndex = 0; queueIndex < queueCount; queueIndex++) {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, "queue" + queueIndex, tenantId);
for (int partition = 0; partition < partitionCount; partition++) {
TransportProtos.ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition);
ServiceInfo serviceInfo = clusterRoutingService.resolveByPartitionIdx(services, queueKey, partition);
String serviceId = serviceInfo.getServiceId();
map.put(serviceId, map.get(serviceId) + 1);
}
@ -163,4 +176,124 @@ public class HashPartitionServiceTest {
Assert.assertTrue(diffPercent < maxDiffPercent);
}
@Test
public void testPartitionsAssignmentWithDedicatedServers() {
int isolatedProfilesCount = 5;
int tenantsCountPerProfile = 100;
int dedicatedServerSetsCount = 3;
int serversCountPerSet = 3;
int profilesPerSet = (int) Math.ceil((double) isolatedProfilesCount / dedicatedServerSetsCount);
List<TenantProfileId> isolatedTenantProfiles = Stream.generate(() -> new TenantProfileId(UUID.randomUUID()))
.limit(isolatedProfilesCount).collect(Collectors.toList());
Map<TenantId, TenantProfileId> tenants = new HashMap<>();
for (TenantProfileId tenantProfileId : isolatedTenantProfiles) {
for (int i = 0; i < tenantsCountPerProfile; i++) {
tenants.put(new TenantId(UUID.randomUUID()), tenantProfileId);
}
}
List<Queue> queues = new ArrayList<>();
Queue systemQueue = new Queue();
systemQueue.setTenantId(TenantId.SYS_TENANT_ID);
systemQueue.setName("Main");
systemQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
systemQueue.setPartitions(10);
systemQueue.setId(new QueueId(UUID.randomUUID()));
queues.add(systemQueue);
tenants.forEach((tenantId, profileId) -> {
Queue isolatedQueue = new Queue();
isolatedQueue.setTenantId(tenantId);
isolatedQueue.setName("Main");
isolatedQueue.setTopic(DataConstants.MAIN_QUEUE_TOPIC);
isolatedQueue.setPartitions(2);
isolatedQueue.setId(new QueueId(UUID.randomUUID()));
queues.add(isolatedQueue);
when(routingInfoService.getRoutingInfo(eq(tenantId))).thenReturn(new TenantRoutingInfo(tenantId, profileId, true));
});
when(queueRoutingInfoService.getAllQueuesRoutingInfo()).thenReturn(queues.stream()
.map(QueueRoutingInfo::new).collect(Collectors.toList()));
List<ServiceInfo> ruleEngines = new ArrayList<>();
Map<TenantProfileId, List<ServiceInfo>> dedicatedServers = new HashMap<>();
int serviceId = 0;
for (int i = 0; i < serversCountPerSet; i++) {
ServiceInfo commonServer = ServiceInfo.newBuilder()
.setServiceId("tb-rule-engine-" + serviceId)
.addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name()))
.build();
ruleEngines.add(commonServer);
serviceId++;
}
for (int i = 0; i < dedicatedServerSetsCount; i++) {
List<TenantProfileId> assignedProfiles = ListUtils.partition(isolatedTenantProfiles, profilesPerSet).get(i);
for (int j = 0; j < serversCountPerSet; j++) {
ServiceInfo dedicatedServer = ServiceInfo.newBuilder()
.setServiceId("tb-rule-engine-" + serviceId)
.addAllServiceTypes(List.of(ServiceType.TB_RULE_ENGINE.name()))
.addAllAssignedTenantProfiles(assignedProfiles.stream().map(UUIDBased::toString).collect(Collectors.toList()))
.build();
ruleEngines.add(dedicatedServer);
serviceId++;
for (TenantProfileId assignedProfileId : assignedProfiles) {
dedicatedServers.computeIfAbsent(assignedProfileId, p -> new ArrayList<>()).add(dedicatedServer);
}
}
}
Map<QueueKey, Map<ServiceInfo, List<Integer>>> serversPartitions = new HashMap<>();
clusterRoutingService.init();
for (ServiceInfo ruleEngine : ruleEngines) {
List<ServiceInfo> other = new ArrayList<>(ruleEngines);
other.removeIf(serviceInfo -> serviceInfo.getServiceId().equals(ruleEngine.getServiceId()));
clusterRoutingService.recalculatePartitions(ruleEngine, other);
clusterRoutingService.myPartitions.forEach((queueKey, partitions) -> {
serversPartitions.computeIfAbsent(queueKey, k -> new HashMap<>()).put(ruleEngine, partitions);
});
}
assertThat(serversPartitions.keySet()).containsAll(queues.stream().map(queue -> new QueueKey(ServiceType.TB_RULE_ENGINE, queue)).collect(Collectors.toList()));
serversPartitions.forEach((queueKey, partitionsPerServer) -> {
if (queueKey.getTenantId().isSysTenantId()) {
partitionsPerServer.forEach((server, partitions) -> {
assertThat(server.getAssignedTenantProfilesCount()).as("system queues are not assigned to dedicated servers").isZero();
});
} else {
List<ServiceInfo> responsibleServers = dedicatedServers.get(tenants.get(queueKey.getTenantId()));
partitionsPerServer.forEach((server, partitions) -> {
assertThat(server.getAssignedTenantProfilesCount()).as("isolated queues are only assigned to dedicated servers").isPositive();
assertThat(responsibleServers).contains(server);
});
}
List<Integer> allPartitions = partitionsPerServer.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertThat(allPartitions).doesNotHaveDuplicates();
});
}
@Test
public void testIsManagedByCurrentServiceCheck() {
TenantProfileId isolatedProfileId = new TenantProfileId(UUID.randomUUID());
when(discoveryService.getAssignedTenantProfiles()).thenReturn(Set.of(isolatedProfileId.getId())); // dedicated server
TenantProfileId regularProfileId = new TenantProfileId(UUID.randomUUID());
TenantId isolatedTenantId = new TenantId(UUID.randomUUID());
when(routingInfoService.getRoutingInfo(eq(isolatedTenantId))).thenReturn(new TenantRoutingInfo(isolatedTenantId, isolatedProfileId, true));
TenantId regularTenantId = new TenantId(UUID.randomUUID());
when(routingInfoService.getRoutingInfo(eq(regularTenantId))).thenReturn(new TenantRoutingInfo(regularTenantId, regularProfileId, false));
assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue();
assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isFalse();
when(discoveryService.getAssignedTenantProfiles()).thenReturn(Collections.emptySet()); // common server
assertThat(clusterRoutingService.isManagedByCurrentService(isolatedTenantId)).isTrue();
assertThat(clusterRoutingService.isManagedByCurrentService(regularTenantId)).isTrue();
}
}

16
application/src/test/java/org/thingsboard/server/service/notification/NotificationRuleApiTest.java

@ -342,7 +342,7 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
@Test
public void testNotificationRuleProcessing_entitiesLimit() throws Exception {
int limit = 5;
updateDefaultTenantProfile(profileConfiguration -> {
updateDefaultTenantProfileConfig(profileConfiguration -> {
profileConfiguration.setMaxDevices(limit);
profileConfiguration.setMaxAssets(limit);
profileConfiguration.setMaxCustomers(limit);
@ -421,10 +421,10 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
int n = 10;
updateDefaultTenantProfile(profileConfiguration -> {
profileConfiguration.setTenantEntityExportRateLimit(n + ":600");
profileConfiguration.setCustomerServerRestLimitsConfiguration(n + ":600");
profileConfiguration.setTenantNotificationRequestsPerRuleRateLimit(n + ":600");
profileConfiguration.setTransportDeviceTelemetryMsgRateLimit(n + ":600");
profileConfiguration.getProfileConfiguration().get().setTenantEntityExportRateLimit(n + ":600");
profileConfiguration.getProfileConfiguration().get().setCustomerServerRestLimitsConfiguration(n + ":600");
profileConfiguration.getProfileConfiguration().get().setTenantNotificationRequestsPerRuleRateLimit(n + ":600");
profileConfiguration.getProfileConfiguration().get().setTransportDeviceTelemetryMsgRateLimit(n + ":600");
});
loginTenantAdmin();
NotificationRule rule = createNotificationRule(AlarmCommentNotificationRuleTriggerConfig.builder()
@ -608,7 +608,7 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
@Test
public void testNotificationRequestsPerRuleRateLimits() throws Exception {
int notificationRequestsLimit = 10;
updateDefaultTenantProfile(profileConfiguration -> {
updateDefaultTenantProfileConfig(profileConfiguration -> {
profileConfiguration.setTenantNotificationRequestsPerRuleRateLimit(notificationRequestsLimit + ":300");
});
@ -691,8 +691,8 @@ public class NotificationRuleApiTest extends AbstractNotificationApiTest {
int n = 5;
updateDefaultTenantProfile(profileConfiguration -> {
profileConfiguration.setTenantEntityExportRateLimit(n + ":600");
profileConfiguration.setTransportDeviceTelemetryMsgRateLimit(n + ":800");
profileConfiguration.getProfileConfiguration().get().setTenantEntityExportRateLimit(n + ":600");
profileConfiguration.getProfileConfiguration().get().setTransportDeviceTelemetryMsgRateLimit(n + ":800");
});
RateLimitsTrigger expectedTrigger = RateLimitsTrigger.builder()

2
application/src/test/java/org/thingsboard/server/service/ttl/AlarmsCleanUpServiceTest.java

@ -67,7 +67,7 @@ public class AlarmsCleanUpServiceTest extends AbstractControllerTest {
@Test
public void testAlarmsCleanUp() throws Exception {
int ttlDays = 1;
updateDefaultTenantProfile(profileConfiguration -> {
updateDefaultTenantProfileConfig(profileConfiguration -> {
profileConfiguration.setAlarmsTtlDays(ttlDays);
});

2
common/actor/src/main/java/org/thingsboard/server/actors/TbActorCtx.java

@ -32,7 +32,7 @@ public interface TbActorCtx extends TbActorRef {
void stop(TbActorId target);
TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier<String> dispatcher, Supplier<TbActorCreator> creator);
TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier<String> dispatcher, Supplier<TbActorCreator> creator, Supplier<Boolean> createCondition);
void broadcastToChildren(TbActorMsg msg);

10
common/actor/src/main/java/org/thingsboard/server/actors/TbActorMailbox.java

@ -15,7 +15,8 @@
*/
package org.thingsboard.server.actors;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.msg.MsgType;
@ -31,7 +32,8 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
@Slf4j
@Data
@Getter
@RequiredArgsConstructor
public final class TbActorMailbox implements TbActorCtx {
private static final boolean HIGH_PRIORITY = true;
private static final boolean NORMAL_PRIORITY = false;
@ -212,9 +214,9 @@ public final class TbActorMailbox implements TbActorCtx {
}
@Override
public TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier<String> dispatcher, Supplier<TbActorCreator> creator) {
public TbActorRef getOrCreateChildActor(TbActorId actorId, Supplier<String> dispatcher, Supplier<TbActorCreator> creator, Supplier<Boolean> createCondition) {
TbActorRef actorRef = system.getActor(actorId);
if (actorRef == null) {
if (actorRef == null && createCondition.get()) {
return system.createChildActor(dispatcher.get(), creator.get(), selfId);
} else {
return actorRef;

6
common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java

@ -17,7 +17,11 @@ package org.thingsboard.server.queue;
public interface TbQueueAdmin {
void createTopicIfNotExists(String topic);
default void createTopicIfNotExists(String topic) {
createTopicIfNotExists(topic, null);
}
void createTopicIfNotExists(String topic, String properties);
void destroy();

6
common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueConsumer.java

@ -36,4 +36,10 @@ public interface TbQueueConsumer<T extends TbQueueMsg> {
boolean isStopped();
void onQueueDelete();
boolean isQueueDeleted();
List<String> getFullTopicNames();
}

1
common/cluster-api/src/main/proto/queue.proto

@ -28,6 +28,7 @@ message ServiceInfo {
repeated string serviceTypes = 2;
repeated string transports = 6;
SystemInfoProto systemInfo = 10;
repeated string assignedTenantProfiles = 11;
}
message SystemInfoProto {

15
common/data/src/main/java/org/thingsboard/server/common/data/queue/Queue.java

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.common.data.queue;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import org.thingsboard.server.common.data.BaseDataWithAdditionalInfo;
import org.thingsboard.server.common.data.HasName;
@ -25,6 +27,8 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfi
import org.thingsboard.server.common.data.validation.Length;
import org.thingsboard.server.common.data.validation.NoXss;
import java.util.Optional;
@Data
public class Queue extends BaseDataWithAdditionalInfo<QueueId> implements HasName, HasTenantId {
private TenantId tenantId;
@ -60,4 +64,13 @@ public class Queue extends BaseDataWithAdditionalInfo<QueueId> implements HasNam
this.processingStrategy = queueConfiguration.getProcessingStrategy();
setAdditionalInfo(queueConfiguration.getAdditionalInfo());
}
}
@JsonIgnore
public String getCustomProperties() {
return Optional.ofNullable(getAdditionalInfo())
.map(info -> info.get("customProperties"))
.filter(JsonNode::isTextual).map(JsonNode::asText).orElse(null);
}
}

2
common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java

@ -99,7 +99,7 @@ public class RuleEngineTbQueueAdminFactory {
return new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic) {
public void createTopicIfNotExists(String topic, String properties) {
}
@Override

7
common/queue/src/main/java/org/thingsboard/server/queue/azure/servicebus/TbServiceBusAdmin.java

@ -22,6 +22,7 @@ import com.microsoft.azure.servicebus.primitives.MessagingEntityAlreadyExistsExc
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils;
import java.io.IOException;
import java.time.Duration;
@ -60,7 +61,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
}
@Override
public void createTopicIfNotExists(String topic) {
public void createTopicIfNotExists(String topic, String properties) {
if (queues.contains(topic)) {
return;
}
@ -68,7 +69,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
try {
QueueDescription queueDescription = new QueueDescription(topic);
queueDescription.setRequiresDuplicateDetection(false);
setQueueConfigs(queueDescription);
setQueueConfigs(queueDescription, PropertyUtils.getProps(queueConfigs, properties));
client.createQueue(queueDescription);
queues.add(topic);
@ -107,7 +108,7 @@ public class TbServiceBusAdmin implements TbQueueAdmin {
}
}
private void setQueueConfigs(QueueDescription queueDescription) {
private void setQueueConfigs(QueueDescription queueDescription, Map<String, String> queueConfigs) {
queueConfigs.forEach((confKey, confValue) -> {
switch (confKey) {
case MAX_SIZE:

29
common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java

@ -44,6 +44,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
protected volatile Set<TopicPartitionInfo> partitions;
protected final ReentrantLock consumerLock = new ReentrantLock(); //NonfairSync
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
protected volatile boolean queueDeleted = false;
@Getter
private final String topic;
@ -94,7 +95,7 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
partitions = subscribeQueue.poll();
}
if (!subscribed) {
List<String> topicNames = partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
List<String> topicNames = getFullTopicNames();
doSubscribe(topicNames);
subscribed = true;
}
@ -103,7 +104,9 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
consumerLock.unlock();
}
if (records.isEmpty()) { return sleepAndReturnEmpty(startNanos, durationInMillis); }
if (records.isEmpty() && !isLongPollingSupported()) {
return sleepAndReturnEmpty(startNanos, durationInMillis);
}
return decodeRecords(records);
}
@ -162,7 +165,9 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
@Override
public void unsubscribe() {
log.info("unsubscribe topic and stop consumer {}", getTopic());
log.info("Unsubscribing from topics and stopping consumer for topics {}", partitions.stream()
.map(TopicPartitionInfo::getFullTopicName)
.collect(Collectors.joining(", ")));
stopped = true;
consumerLock.lock();
try {
@ -187,4 +192,22 @@ public abstract class AbstractTbQueueConsumerTemplate<R, T extends TbQueueMsg> i
abstract protected void doUnsubscribe();
@Override
public void onQueueDelete() {
queueDeleted = true;
}
public boolean isQueueDeleted() {
return queueDeleted;
}
@Override
public List<String> getFullTopicNames() {
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
}
protected boolean isLongPollingSupported() {
return false;
}
}

14
common/queue/src/main/java/org/thingsboard/server/queue/discovery/DefaultTbServiceInfoProvider.java

@ -23,6 +23,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.TbTransportService;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
@ -35,6 +36,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.thingsboard.common.util.SystemUtil.getCpuCount;
@ -57,6 +60,10 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
@Value("${service.type:monolith}")
private String serviceType;
@Getter
@Value("${service.rule_engine.assigned_tenant_profiles:}")
private Set<UUID> assignedTenantProfiles;
@Autowired
private ApplicationContext applicationContext;
@ -78,6 +85,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
} else {
serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
}
if (!serviceTypes.contains(ServiceType.TB_RULE_ENGINE) || assignedTenantProfiles == null) {
assignedTenantProfiles = Collections.emptySet();
}
generateNewServiceInfoWithCurrentSystemInfo();
}
@ -111,7 +121,9 @@ public class DefaultTbServiceInfoProvider implements TbServiceInfoProvider {
.setServiceId(serviceId)
.addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).collect(Collectors.toList()))
.setSystemInfo(getCurrentSystemInfoProto());
if (CollectionsUtil.isNotEmpty(assignedTenantProfiles)) {
builder.addAllAssignedTenantProfiles(assignedTenantProfiles.stream().map(UUID::toString).collect(Collectors.toList()));
}
return serviceInfo = builder.build();
}

133
common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java

@ -24,6 +24,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -36,6 +37,7 @@ import org.thingsboard.server.queue.util.AfterStartUp;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -48,6 +50,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.DataConstants.MAIN_QUEUE_NAME;
@Service
@Slf4j
public class HashPartitionService implements PartitionService {
@ -68,15 +72,16 @@ public class HashPartitionService implements PartitionService {
private final TenantRoutingInfoService tenantRoutingInfoService;
private final QueueRoutingInfoService queueRoutingInfoService;
private volatile ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
protected volatile ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, String> partitionTopicsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, Integer> partitionSizesMap = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>();
private Map<String, List<ServiceInfo>> tbTransportServicesByType = new HashMap<>();
private List<ServiceInfo> currentOtherServices;
private final Map<String, List<ServiceInfo>> tbTransportServicesByType = new HashMap<>();
private final Map<TenantProfileId, List<ServiceInfo>> responsibleServices = new HashMap<>();
private HashFunction hashFunction;
@ -165,6 +170,9 @@ public class HashPartitionService implements PartitionService {
partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic());
partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions());
myPartitions.remove(queueKey);
if (!tenantId.isSysTenantId()) {
tenantRoutingInfoMap.remove(tenantId);
}
}
@Override
@ -178,12 +186,38 @@ public class HashPartitionService implements PartitionService {
removeTenant(tenantId);
}
@Override
public boolean isManagedByCurrentService(TenantId tenantId) {
Set<UUID> assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles();
if (assignedTenantProfiles.isEmpty()) {
// TODO: refactor this for common servers
return true;
} else {
if (tenantId.isSysTenantId()) {
return false;
}
TenantProfileId profileId = tenantRoutingInfoService.getRoutingInfo(tenantId).getProfileId();
return assignedTenantProfiles.contains(profileId.getId());
}
}
@Override
public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) {
TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId);
if (queueName == null) {
queueName = MAIN_QUEUE_NAME;
}
QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId);
if (!partitionSizesMap.containsKey(queueKey)) {
queueKey = new QueueKey(serviceType, isolatedOrSystemTenantId);
if (isolatedOrSystemTenantId.isSysTenantId()) {
queueKey = new QueueKey(serviceType, TenantId.SYS_TENANT_ID);
} else {
queueKey = new QueueKey(serviceType, queueName, TenantId.SYS_TENANT_ID);
if (!MAIN_QUEUE_NAME.equals(queueName) && !partitionSizesMap.containsKey(queueKey)) {
queueKey = new QueueKey(serviceType, TenantId.SYS_TENANT_ID);
}
log.warn("Using queue {} instead of isolated {} for tenant {}", queueKey, queueName, isolatedOrSystemTenantId);
}
}
return resolve(queueKey, entityId);
}
@ -199,11 +233,12 @@ public class HashPartitionService implements PartitionService {
}
private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
int hash = hashFunction.newHasher()
.putLong(entityId.getId().getMostSignificantBits())
.putLong(entityId.getId().getLeastSignificantBits()).hash().asInt();
Integer partitionSize = partitionSizesMap.get(queueKey);
if (partitionSize == null) {
throw new IllegalStateException("Partitions info for queue " + queueKey + " is missing");
}
int hash = hash(entityId.getId());
int partition = Math.abs(hash % partitionSize);
return buildTopicPartitionInfo(queueKey, partition);
@ -212,6 +247,7 @@ public class HashPartitionService implements PartitionService {
@Override
public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
tbTransportServicesByType.clear();
responsibleServices.clear();
logServiceInfo(currentService);
otherServices.forEach(this::logServiceInfo);
@ -221,6 +257,7 @@ public class HashPartitionService implements PartitionService {
addNode(queueServicesMap, other);
}
queueServicesMap.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId)));
responsibleServices.values().forEach(list -> list.sort(Comparator.comparing(ServiceInfo::getServiceId)));
final ConcurrentMap<QueueKey, List<Integer>> newPartitions = new ConcurrentHashMap<>();
partitionSizesMap.forEach((queueKey, size) -> {
@ -268,6 +305,9 @@ public class HashPartitionService implements PartitionService {
changes.addAll(newMap.keySet());
if (!changes.isEmpty()) {
applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes));
responsibleServices.forEach((profileId, serviceInfos) -> {
log.info("Servers responsible for tenant profile {}: {}", profileId, toServiceIds(serviceInfos));
});
}
}
@ -305,9 +345,7 @@ public class HashPartitionService implements PartitionService {
@Override
public int resolvePartitionIndex(UUID entityId, int partitions) {
int hash = hashFunction.newHasher()
.putLong(entityId.getMostSignificantBits())
.putLong(entityId.getLeastSignificantBits()).hash().asInt();
int hash = hash(entityId);
return Math.abs(hash % partitions);
}
@ -358,16 +396,9 @@ public class HashPartitionService implements PartitionService {
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
return false;
}
TenantRoutingInfo routingInfo = tenantRoutingInfoMap.get(tenantId);
if (routingInfo == null) {
synchronized (tenantRoutingInfoMap) {
routingInfo = tenantRoutingInfoMap.get(tenantId);
if (routingInfo == null) {
routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId);
tenantRoutingInfoMap.put(tenantId, routingInfo);
}
}
}
TenantRoutingInfo routingInfo = tenantRoutingInfoMap.computeIfAbsent(tenantId, k -> {
return tenantRoutingInfoService.getRoutingInfo(tenantId);
});
if (routingInfo == null) {
throw new TenantNotFoundException(tenantId);
}
@ -396,6 +427,19 @@ public class HashPartitionService implements PartitionService {
queueServiceList.computeIfAbsent(key, k -> new ArrayList<>()).add(instance);
}
});
if (instance.getAssignedTenantProfilesCount() > 0) {
for (String profileIdStr : instance.getAssignedTenantProfilesList()) {
TenantProfileId profileId;
try {
profileId = new TenantProfileId(UUID.fromString(profileIdStr));
} catch (IllegalArgumentException e) {
log.warn("Failed to parse '{}' as tenant profile id", profileIdStr);
continue;
}
responsibleServices.computeIfAbsent(profileId, k -> new ArrayList<>()).add(instance);
}
}
} else if (ServiceType.TB_CORE.equals(serviceType) || ServiceType.TB_VC_EXECUTOR.equals(serviceType)) {
queueServiceList.computeIfAbsent(new QueueKey(serviceType), key -> new ArrayList<>()).add(instance);
}
@ -411,18 +455,51 @@ public class HashPartitionService implements PartitionService {
return null;
}
if (!ServiceType.TB_RULE_ENGINE.equals(queueKey.getType()) || TenantId.SYS_TENANT_ID.equals(queueKey.getTenantId())) {
return servers.get(partition % servers.size());
} else {
int hash = hashFunction.newHasher().putLong(queueKey.getTenantId().getId().getMostSignificantBits())
.putLong(queueKey.getTenantId().getId().getLeastSignificantBits())
TenantId tenantId = queueKey.getTenantId();
if (queueKey.getType() == ServiceType.TB_RULE_ENGINE) {
if (!responsibleServices.isEmpty()) { // if there are any dedicated servers
TenantProfileId profileId;
if (tenantId != null && !tenantId.isSysTenantId()) {
TenantRoutingInfo routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId);
profileId = routingInfo.getProfileId();
} else {
profileId = null;
}
List<ServiceInfo> responsible = responsibleServices.get(profileId);
if (responsible == null) {
// if there are no dedicated servers for this tenant profile, or for system queues,
// using the servers that are not responsible for any profile
responsible = servers.stream()
.filter(serviceInfo -> serviceInfo.getAssignedTenantProfilesCount() == 0)
.sorted(Comparator.comparing(ServiceInfo::getServiceId))
.collect(Collectors.toList());
if (profileId != null) {
log.debug("Using servers {} for profile {}", toServiceIds(responsible), profileId);
}
responsibleServices.put(profileId, responsible);
}
servers = responsible;
}
int hash = hashFunction.newHasher()
.putLong(tenantId.getId().getMostSignificantBits())
.putLong(tenantId.getId().getLeastSignificantBits())
.putString(queueKey.getQueueName(), StandardCharsets.UTF_8)
.hash().asInt();
return servers.get(Math.abs((hash + partition) % servers.size()));
} else {
return servers.get(partition % servers.size());
}
}
private int hash(UUID key) {
return hashFunction.newHasher()
.putLong(key.getMostSignificantBits())
.putLong(key.getLeastSignificantBits())
.hash().asInt();
}
public static HashFunction forName(String name) {
switch (name) {
case "murmur3_32":
@ -436,4 +513,8 @@ public class HashPartitionService implements PartitionService {
}
}
private List<String> toServiceIds(Collection<ServiceInfo> serviceInfos) {
return serviceInfos.stream().map(ServiceInfo::getServiceId).collect(Collectors.toList());
}
}

3
common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java

@ -64,4 +64,7 @@ public interface PartitionService {
void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg);
void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg);
boolean isManagedByCurrentService(TenantId tenantId);
}

5
common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbServiceInfoProvider.java

@ -18,6 +18,9 @@ package org.thingsboard.server.queue.discovery;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
import java.util.Set;
import java.util.UUID;
public interface TbServiceInfoProvider {
String getServiceId();
@ -30,4 +33,6 @@ public interface TbServiceInfoProvider {
ServiceInfo generateNewServiceInfoWithCurrentSystemInfo();
Set<UUID> getAssignedTenantProfiles();
}

2
common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java

@ -17,9 +17,11 @@ package org.thingsboard.server.queue.discovery;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
@Data
public class TenantRoutingInfo {
private final TenantId tenantId;
private final TenantProfileId profileId;
private final boolean isolatedTbRuleEngine;
}

1
common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java

@ -20,4 +20,5 @@ import org.thingsboard.server.common.data.id.TenantId;
public interface TenantRoutingInfoService {
TenantRoutingInfo getRoutingInfo(TenantId tenantId);
}

6
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java

@ -21,6 +21,7 @@ import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils;
import java.util.Collections;
import java.util.Map;
@ -62,12 +63,12 @@ public class TbKafkaAdmin implements TbQueueAdmin {
}
@Override
public void createTopicIfNotExists(String topic) {
public void createTopicIfNotExists(String topic, String properties) {
if (topics.contains(topic)) {
return;
}
try {
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(topicConfigs);
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(PropertyUtils.getProps(topicConfigs, properties));
createTopic(newTopic).values().get(topic).get();
topics.add(topic);
} catch (ExecutionException ee) {
@ -81,7 +82,6 @@ public class TbKafkaAdmin implements TbQueueAdmin {
log.warn("[{}] Failed to create topic", topic, e);
throw new RuntimeException(e);
}
}
@Override

7
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java

@ -114,7 +114,6 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
@Override
protected void doUnsubscribe() {
log.info("unsubscribe topic and close consumer for topic {}", getTopic());
if (consumer != null) {
consumer.unsubscribe();
consumer.close();
@ -123,4 +122,10 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
statsService.unregisterClientGroup(groupId);
}
}
@Override
public boolean isLongPollingSupported() {
return true;
}
}

5
common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java

@ -115,6 +115,9 @@ public class TbKafkaSettings {
@Value("${queue.kafka.session.timeout.ms:10000}")
private int sessionTimeoutMs;
@Value("${queue.kafka.auto_offset_reset:earliest}")
private String autoOffsetReset;
@Value("${queue.kafka.use_confluent_cloud:false}")
private boolean useConfluent;
@ -155,6 +158,8 @@ public class TbKafkaSettings {
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, fetchMaxBytes);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

16
common/queue/src/main/java/org/thingsboard/server/queue/memory/InMemoryTbQueueConsumer.java

@ -31,6 +31,7 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon
private volatile Set<TopicPartitionInfo> partitions;
private volatile boolean stopped;
private volatile boolean subscribed;
private volatile boolean queueDeleted;
public InMemoryTbQueueConsumer(InMemoryStorage storage, String topic) {
this.storage = storage;
@ -103,4 +104,19 @@ public class InMemoryTbQueueConsumer<T extends TbQueueMsg> implements TbQueueCon
return stopped;
}
@Override
public void onQueueDelete() {
queueDeleted = true;
}
@Override
public boolean isQueueDeleted() {
return queueDeleted;
}
@Override
public List<String> getFullTopicNames() {
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList());
}
}

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java

@ -74,7 +74,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
templateBuilder.queueAdmin(new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic) {}
public void createTopicIfNotExists(String topic, String properties) {}
@Override
public void destroy() {}

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java

@ -187,7 +187,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(configuration.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.groupId("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-" + configuration.getTenantId())) + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);

2
common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java

@ -166,7 +166,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
consumerBuilder.settings(kafkaSettings);
consumerBuilder.topic(configuration.getTopic());
consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
consumerBuilder.groupId("re-" + queueName + "-consumer");
consumerBuilder.groupId("re-" + queueName + (configuration.getTenantId().isSysTenantId() ? "" : ("-" + configuration.getTenantId())) + "-consumer");
consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
consumerBuilder.admin(ruleEngineAdmin);
consumerBuilder.statsService(consumerStatsService);

2
common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubAdmin.java

@ -103,7 +103,7 @@ public class TbPubSubAdmin implements TbQueueAdmin {
}
@Override
public void createTopicIfNotExists(String partition) {
public void createTopicIfNotExists(String partition, String properties) {
TopicName topicName = TopicName.newBuilder()
.setTopic(partition)
.setProject(pubSubSettings.getProjectId())

9
common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqAdmin.java

@ -18,9 +18,11 @@ package org.thingsboard.server.queue.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.thingsboard.server.queue.TbQueueAdmin;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@ -50,7 +52,12 @@ public class TbRabbitMqAdmin implements TbQueueAdmin {
}
@Override
public void createTopicIfNotExists(String topic) {
public void createTopicIfNotExists(String topic, String properties) {
Map<String, Object> arguments = this.arguments;
if (StringUtils.isNotBlank(properties)) {
arguments = new HashMap<>(arguments);
arguments.putAll(TbRabbitMqQueueArguments.getArgs(properties));
}
try {
channel.queueDeclare(topic, false, false, false, arguments);
} catch (IOException e) {

8
common/queue/src/main/java/org/thingsboard/server/queue/rabbitmq/TbRabbitMqQueueArguments.java

@ -65,7 +65,7 @@ public class TbRabbitMqQueueArguments {
vcArgs = getArgs(vcProperties);
}
private Map<String, Object> getArgs(String properties) {
public static Map<String, Object> getArgs(String properties) {
Map<String, Object> configs = new HashMap<>();
if (StringUtils.isNotEmpty(properties)) {
for (String property : properties.split(";")) {
@ -78,7 +78,7 @@ public class TbRabbitMqQueueArguments {
return configs;
}
private Object getObjectValue(String str) {
private static Object getObjectValue(String str) {
if (str.equalsIgnoreCase("true") || str.equalsIgnoreCase("false")) {
return Boolean.valueOf(str);
} else if (isNumeric(str)) {
@ -87,7 +87,7 @@ public class TbRabbitMqQueueArguments {
return str;
}
private Object getNumericValue(String str) {
private static Object getNumericValue(String str) {
if (str.contains(".")) {
return Double.valueOf(str);
} else {
@ -97,7 +97,7 @@ public class TbRabbitMqQueueArguments {
private static final Pattern PATTERN = Pattern.compile("-?\\d+(\\.\\d+)?");
public boolean isNumeric(String strNum) {
private static boolean isNumeric(String strNum) {
if (strNum == null) {
return false;
}

4
common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsAdmin.java

@ -26,6 +26,7 @@ import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils;
import java.util.Map;
import java.util.function.Function;
@ -63,11 +64,12 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
}
@Override
public void createTopicIfNotExists(String topic) {
public void createTopicIfNotExists(String topic, String properties) {
String queueName = convertTopicToQueueName(topic);
if (queues.containsKey(queueName)) {
return;
}
Map<String, String> attributes = PropertyUtils.getProps(this.attributes, properties, TbAwsSqsQueueAttributes::toConfigs);
final CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName).withAttributes(attributes);
String queueUrl = sqsClient.createQueue(createQueueRequest).getQueueUrl();
queues.put(getQueueNameFromUrl(queueUrl), queueUrl);

8
common/queue/src/main/java/org/thingsboard/server/queue/sqs/TbAwsSqsQueueAttributes.java

@ -76,6 +76,12 @@ public class TbAwsSqsQueueAttributes {
private Map<String, String> getConfigs(String properties) {
Map<String, String> configs = new HashMap<>(defaultAttributes);
configs.putAll(toConfigs(properties));
return configs;
}
public static Map<String, String> toConfigs(String properties) {
Map<String, String> configs = new HashMap<>();
if (StringUtils.isNotEmpty(properties)) {
for (String property : properties.split(";")) {
int delimiterPosition = property.indexOf(":");
@ -88,7 +94,7 @@ public class TbAwsSqsQueueAttributes {
return configs;
}
private void validateAttributeName(String key) {
private static void validateAttributeName(String key) {
QueueAttributeName.fromValue(key);
}
}

14
common/queue/src/main/java/org/thingsboard/server/queue/util/PropertyUtils.java

@ -19,6 +19,7 @@ import org.thingsboard.server.common.data.StringUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
public class PropertyUtils {
@ -37,4 +38,17 @@ public class PropertyUtils {
return configs;
}
public static Map<String, String> getProps(Map<String, String> defaultProperties, String propertiesStr) {
return getProps(defaultProperties, propertiesStr, PropertyUtils::getProps);
}
public static Map<String, String> getProps(Map<String, String> defaultProperties, String propertiesStr, Function<String, Map<String, String>> parser) {
Map<String, String> properties = defaultProperties;
if (StringUtils.isNotBlank(propertiesStr)) {
properties = new HashMap<>(properties);
properties.putAll(parser.apply(propertiesStr));
}
return properties;
}
}

1
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java

@ -82,6 +82,7 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
if (profileOpt.isPresent()) {
TenantProfile newProfile = profileOpt.get();
log.trace("[{}] put: {}", newProfile.getId(), newProfile);
profiles.put(newProfile.getId(), newProfile);
Set<TenantId> affectedTenants = tenantProfileIds.get(newProfile.getId());
return new TenantProfileUpdateResult(newProfile, affectedTenants != null ? affectedTenants : Collections.emptySet());
} else {

4
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java

@ -29,7 +29,7 @@ import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
@ConditionalOnExpression("'${service.type:null}'=='tb-transport'")
public class TransportTenantRoutingInfoService implements TenantRoutingInfoService {
private TransportTenantProfileCache tenantProfileCache;
private final TransportTenantProfileCache tenantProfileCache;
public TransportTenantRoutingInfoService(TransportTenantProfileCache tenantProfileCache) {
this.tenantProfileCache = tenantProfileCache;
@ -38,7 +38,7 @@ public class TransportTenantRoutingInfoService implements TenantRoutingInfoServi
@Override
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
TenantProfile profile = tenantProfileCache.get(tenantId);
return new TenantRoutingInfo(tenantId, profile.isIsolatedTbRuleEngine());
return new TenantRoutingInfo(tenantId, profile.getId(), profile.isIsolatedTbRuleEngine());
}
}

2
dao/src/main/java/org/thingsboard/server/dao/service/validator/TenantProfileDataValidator.java

@ -99,8 +99,6 @@ public class TenantProfileDataValidator extends DataValidator<TenantProfile> {
TenantProfile old = tenantProfileDao.findById(TenantId.SYS_TENANT_ID, tenantProfile.getId().getId());
if (old == null) {
throw new DataValidationException("Can't update non existing tenant profile!");
} else if (old.isIsolatedTbRuleEngine() != tenantProfile.isIsolatedTbRuleEngine()) {
throw new DataValidationException("Can't update isolatedTbRuleEngine property!");
}
return old;
}

11
dao/src/test/java/org/thingsboard/server/dao/service/TenantProfileServiceTest.java

@ -188,17 +188,6 @@ public class TenantProfileServiceTest extends AbstractServiceTest {
});
}
@Test
public void testSaveSameTenantProfileWithDifferentIsolatedTbRuleEngine() {
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");
TenantProfile savedTenantProfile = tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, tenantProfile);
savedTenantProfile.setIsolatedTbRuleEngine(true);
addMainQueueConfig(savedTenantProfile);
Assertions.assertThrows(DataValidationException.class, () -> {
tenantProfileService.saveTenantProfile(TenantId.SYS_TENANT_ID, savedTenantProfile);
});
}
@Test
public void testDeleteTenantProfileWithExistingTenant() {
TenantProfile tenantProfile = this.createTenantProfile("Tenant Profile");

2
msa/vc-executor/src/main/java/org/thingsboard/server/vc/service/VersionControlTenantRoutingInfoService.java

@ -25,6 +25,6 @@ public class VersionControlTenantRoutingInfoService implements TenantRoutingInfo
@Override
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
//This dummy implementation is ok since Version Control service does not produce any rule engine messages.
return new TenantRoutingInfo(tenantId, false);
return new TenantRoutingInfo(tenantId, null, false);
}
}

1
msa/vc-executor/src/main/resources/tb-vc-executor.yml

@ -73,6 +73,7 @@ queue:
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
confluent:
ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"

110
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java

@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.objecthunter.exp4j.Expression;
import net.objecthunter.exp4j.ExpressionBuilder;
@ -44,6 +46,8 @@ import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@ -81,9 +85,8 @@ import static org.thingsboard.rule.engine.math.TbMathArgumentType.CONSTANT;
)
public class TbMathNode implements TbNode {
private static final ConcurrentMap<EntityId, Semaphore> semaphores = new ConcurrentReferenceHashMap<>();
private static final ConcurrentMap<EntityId, SemaphoreWithQueue<TbMsgTbContext>> locks = new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
private final ThreadLocal<Expression> customExpression = new ThreadLocal<>();
private TbMathNodeConfiguration config;
private boolean msgBodyToJsonConversionRequired;
@ -108,42 +111,71 @@ public class TbMathNode implements TbNode {
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
var originator = msg.getOriginator();
var originatorSemaphore = semaphores.computeIfAbsent(originator, tmp -> new Semaphore(1, true));
boolean acquired = tryAcquire(originator, originatorSemaphore);
var semaphoreWithQueue = locks.computeIfAbsent(msg.getOriginator(), SemaphoreWithQueue::new);
semaphoreWithQueue.getQueue().add(new TbMsgTbContext(msg, ctx));
if (!acquired) {
ctx.tellFailure(msg, new RuntimeException("Failed to process message for originator synchronously"));
return;
}
tryProcessQueue(semaphoreWithQueue);
}
try {
var arguments = config.getArguments();
Optional<ObjectNode> msgBodyOpt = convertMsgBodyIfRequired(msg);
var argumentValues = Futures.allAsList(arguments.stream()
.map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList()));
ListenableFuture<TbMsg> resultMsgFuture = Futures.transformAsync(argumentValues, args ->
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(args)), ctx.getDbCallbackExecutor());
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try {
ctx.tellSuccess(resultMsg);
} finally {
originatorSemaphore.release();
void tryProcessQueue(SemaphoreWithQueue<TbMsgTbContext> lockAndQueue) {
final Semaphore semaphore = lockAndQueue.getSemaphore();
final Queue<TbMsgTbContext> queue = lockAndQueue.getQueue();
while (!queue.isEmpty()) {
// The semaphore have to be acquired before EACH poll and released before NEXT poll.
// Otherwise, some message will remain unprocessed in queue
if (!semaphore.tryAcquire()) {
return;
}
TbMsgTbContext tbMsgTbContext = null;
try {
tbMsgTbContext = queue.poll();
if (tbMsgTbContext == null) {
semaphore.release();
continue;
}
}, t -> {
try {
ctx.tellFailure(msg, t);
} finally {
originatorSemaphore.release();
final TbMsg msg = tbMsgTbContext.getMsg();
if (!msg.getCallback().isMsgValid()) {
log.trace("[{}] Skipping non-valid message [{}]", lockAndQueue.getEntityId(), msg);
semaphore.release();
continue;
}
}, ctx.getDbCallbackExecutor());
} catch (Throwable e) {
originatorSemaphore.release();
log.warn("[{}] Failed to process message: {}", originator, msg, e);
throw e;
//DO PROCESSING
final TbContext ctx = tbMsgTbContext.getCtx();
final ListenableFuture<TbMsg> resultMsgFuture = processMsgAsync(ctx, msg);
DonAsynchron.withCallback(resultMsgFuture, resultMsg -> {
try {
ctx.tellSuccess(resultMsg);
} finally {
lockAndQueue.getSemaphore().release();
tryProcessQueue(lockAndQueue);
}
}, t -> {
try {
ctx.tellFailure(msg, t);
} finally {
lockAndQueue.getSemaphore().release();
tryProcessQueue(lockAndQueue);
}
}, ctx.getDbCallbackExecutor());
} catch (Throwable e) {
semaphore.release();
log.warn("[{}] Failed to process message: {}", lockAndQueue.getEntityId(), tbMsgTbContext == null ? null : tbMsgTbContext.getMsg(), e);
throw e;
}
break; //submitted async exact one task. next poll will try on callback
}
}
ListenableFuture<TbMsg> processMsgAsync(TbContext ctx, TbMsg msg) {
var arguments = config.getArguments();
Optional<ObjectNode> msgBodyOpt = convertMsgBodyIfRequired(msg);
var argumentValues = Futures.allAsList(arguments.stream()
.map(arg -> resolveArguments(ctx, msg, msgBodyOpt, arg)).collect(Collectors.toList()));
ListenableFuture<TbMsg> resultMsgFuture = Futures.transformAsync(argumentValues, args ->
updateMsgAndDb(ctx, msg, msgBodyOpt, calculateResult(args)), ctx.getDbCallbackExecutor());
return resultMsgFuture;
}
private boolean tryAcquire(EntityId originator, Semaphore originatorSemaphore) {
boolean acquired;
try {
@ -402,4 +434,20 @@ public class TbMathNode implements TbNode {
@Override
public void destroy() {
}
@Data
@RequiredArgsConstructor
static public class SemaphoreWithQueue<T> {
final EntityId entityId;
final Semaphore semaphore = new Semaphore(1);
final Queue<T> queue = new ConcurrentLinkedQueue<>();
}
@Data
@RequiredArgsConstructor
static public class TbMsgTbContext {
final TbMsg msg;
final TbContext ctx;
}
}

155
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java

@ -15,20 +15,22 @@
*/
package org.thingsboard.rule.engine.math;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.Futures;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.Timeout;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.AbstractListeningExecutor;
import org.thingsboard.common.util.JacksonUtil;
@ -54,24 +56,36 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@Slf4j
@RunWith(MockitoJUnitRunner.class)
@ExtendWith(MockitoExtension.class)
public class TbMathNodeTest {
private EntityId originator = new DeviceId(Uuids.timeBased());
private TenantId tenantId = TenantId.fromUUID(Uuids.timeBased());
static final int RULE_DISPATCHER_POOL_SIZE = 2;
static final int DB_CALLBACK_POOL_SIZE = 3;
private final EntityId originator = DeviceId.fromString("ccd71696-0586-422d-940e-755a41ec3b0d");
private final TenantId tenantId = TenantId.fromUUID(UUID.fromString("e7f46b23-0c7d-42f5-9b06-fc35ab17af8a"));
@Mock
private TbContext ctx;
@ -81,35 +95,31 @@ public class TbMathNodeTest {
private TimeseriesService tsService;
@Mock
private RuleEngineTelemetryService telemetryService;
private AbstractListeningExecutor dbExecutor;
private AbstractListeningExecutor dbCallbackExecutor;
private AbstractListeningExecutor ruleEngineDispatcherExecutor;
@Before
@BeforeEach
public void before() {
dbExecutor = new AbstractListeningExecutor() {
@Override
protected int getThreadPollSize() {
return 3;
}
};
dbExecutor.init();
initMocks();
dbCallbackExecutor = new DBCallbackExecutor();
dbCallbackExecutor.init();
ruleEngineDispatcherExecutor = new RuleDispatcherExecutor();
ruleEngineDispatcherExecutor.init();
lenient().when(ctx.getAttributesService()).thenReturn(attributesService);
lenient().when(ctx.getTelemetryService()).thenReturn(telemetryService);
lenient().when(ctx.getTimeseriesService()).thenReturn(tsService);
lenient().when(ctx.getTenantId()).thenReturn(tenantId);
lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbCallbackExecutor);
}
@After
@AfterEach
public void after() {
dbExecutor.destroy();
ruleEngineDispatcherExecutor.executor().shutdownNow();
dbCallbackExecutor.executor().shutdownNow();
}
private void initMocks() {
Mockito.reset(ctx);
Mockito.reset(attributesService);
Mockito.reset(tsService);
Mockito.reset(telemetryService);
lenient().when(ctx.getAttributesService()).thenReturn(attributesService);
lenient().when(ctx.getTelemetryService()).thenReturn(telemetryService);
lenient().when(ctx.getTimeseriesService()).thenReturn(tsService);
lenient().when(ctx.getTenantId()).thenReturn(tenantId);
lenient().when(ctx.getDbCallbackExecutor()).thenReturn(dbExecutor);
Mockito.clearInvocations(ctx, attributesService, tsService, telemetryService);
}
private TbMathNode initNode(TbRuleNodeMathFunctionType operation, TbMathResult result, TbMathArgument... arguments) {
@ -154,10 +164,8 @@ public class TbMathNodeTest {
node.onMsg(ctx, msg);
ConcurrentMap<EntityId, Semaphore> semaphores = (ConcurrentMap<EntityId, Semaphore>) ReflectionTestUtils.getField(node, "semaphores");
ConcurrentMap<EntityId, TbMathNode.SemaphoreWithQueue<TbMathNode.TbMsgTbContext>> semaphores = (ConcurrentMap<EntityId, TbMathNode.SemaphoreWithQueue<TbMathNode.TbMsgTbContext>>) ReflectionTestUtils.getField(node, "locks");
Assert.assertNotNull(semaphores);
Semaphore originatorSemaphore = semaphores.get(originator);
Assert.assertNotNull(originatorSemaphore);
metaData.putValue("key1", "secondMsgResult");
metaData.putValue("key2", "argumentC");
@ -167,7 +175,7 @@ public class TbMathNodeTest {
node.onMsg(ctx, msg);
Awaitility.await("Semaphore released").atMost(5, TimeUnit.SECONDS).until(semaphores.get(originator)::tryAcquire);
Awaitility.await("Semaphore released").atMost(5, TimeUnit.SECONDS).until(() -> semaphores.get(originator).semaphore.tryAcquire());
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
Mockito.verify(ctx, Mockito.times(2)).tellSuccess(msgCaptor.capture());
@ -534,4 +542,87 @@ public class TbMathNodeTest {
});
Assert.assertNotNull(thrown.getMessage());
}
@Test
public void testExp4j_concurrent() {
TbMathNode node = spy(initNodeWithCustomFunction("2a+3b",
new TbMathResult(TbMathArgumentType.MESSAGE_BODY, "result", 2, false, false, null),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "a"),
new TbMathArgument(TbMathArgumentType.MESSAGE_BODY, "b")
));
EntityId originatorSlow = DeviceId.fromString("7f01170d-6bba-419c-b95c-2b4c3ba32f30");
EntityId originatorFast = DeviceId.fromString("c45360ff-7906-4102-a2ae-3495a86168d0");
CountDownLatch slowProcessingLatch = new CountDownLatch(1);
List<TbMsg> slowMsgList = IntStream.range(0, 5)
.mapToObj(x -> TbMsg.newMsg("TEST", originatorSlow, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.collect(Collectors.toList());
List<TbMsg> fastMsgList = IntStream.range(0, 2)
.mapToObj(x -> TbMsg.newMsg("TEST", originatorFast, new TbMsgMetaData(), JacksonUtil.newObjectNode().put("a", 2).put("b", 2).toString()))
.collect(Collectors.toList());
assertThat(slowMsgList.size()).as("slow msgs >= rule-dispatcher pool size").isGreaterThanOrEqualTo(RULE_DISPATCHER_POOL_SIZE);
log.debug("rule-dispatcher [{}], db-callback [{}], slowMsg [{}], fastMsg [{}]", RULE_DISPATCHER_POOL_SIZE, DB_CALLBACK_POOL_SIZE, slowMsgList.size(), fastMsgList.size());
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("\uD83D\uDC0C processMsgAsync slow originator [{}][{}]", msg.getOriginator(), msg);
try {
assertThat(slowProcessingLatch.await(30, TimeUnit.SECONDS)).as("await on slowProcessingLatch").isTrue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return invocation.callRealMethod();
}).given(node).processMsgAsync(eq(ctx), argThat(slowMsgList::contains));
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("\u26A1\uFE0F processMsgAsync FAST originator [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).processMsgAsync(eq(ctx), argThat(fastMsgList::contains));
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("submit slow originator onMsg [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).onMsg(eq(ctx), argThat(slowMsgList::contains));
willAnswer(invocation -> {
TbMsg msg = invocation.getArgument(1);
log.debug("submit FAST originator onMsg [{}][{}]", msg.getOriginator(), msg);
return invocation.callRealMethod();
}).given(node).onMsg(eq(ctx), argThat(fastMsgList::contains));
// submit slow msg may block all rule engine dispatcher threads
slowMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg)));
// wait until dispatcher threads started with all slowMsg
verify(node, new Timeout(TimeUnit.SECONDS.toMillis(5), times(slowMsgList.size()))).onMsg(eq(ctx), argThat(slowMsgList::contains));
// submit fast have to return immediately
fastMsgList.forEach(msg -> ruleEngineDispatcherExecutor.executeAsync(() -> node.onMsg(ctx, msg)));
// wait until all fast messages processed
verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(fastMsgList.size()))).tellSuccess(any());
slowProcessingLatch.countDown();
verify(ctx, new Timeout(TimeUnit.SECONDS.toMillis(5), times(fastMsgList.size() + slowMsgList.size()))).tellSuccess(any());
verify(ctx, never()).tellFailure(any(), any());
}
static class RuleDispatcherExecutor extends AbstractListeningExecutor {
@Override
protected int getThreadPollSize() {
return RULE_DISPATCHER_POOL_SIZE;
}
}
static class DBCallbackExecutor extends AbstractListeningExecutor {
@Override
protected int getThreadPollSize() {
return DB_CALLBACK_POOL_SIZE;
}
}
}

1
transport/coap/src/main/resources/tb-coap-transport.yml

@ -184,6 +184,7 @@ queue:
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
confluent:
ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"

1
transport/http/src/main/resources/tb-http-transport.yml

@ -169,6 +169,7 @@ queue:
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
confluent:
ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"

1
transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml

@ -250,6 +250,7 @@ queue:
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
confluent:
ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"

1
transport/mqtt/src/main/resources/tb-mqtt-transport.yml

@ -199,6 +199,7 @@ queue:
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
confluent:
ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"

1
transport/snmp/src/main/resources/tb-snmp-transport.yml

@ -145,6 +145,7 @@ queue:
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
request.timeout.ms: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
session.timeout.ms: "${TB_QUEUE_KAFKA_SESSION_TIMEOUT_MS:10000}" # (10 seconds) # refer to https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html#consumerconfigs_session.timeout.ms
auto_offset_reset: "${TB_QUEUE_KAFKA_AUTO_OFFSET_RESET:earliest}" # earliest, latest or none
use_confluent_cloud: "${TB_QUEUE_KAFKA_USE_CONFLUENT_CLOUD:false}"
confluent:
ssl.algorithm: "${TB_QUEUE_KAFKA_CONFLUENT_SSL_ALGORITHM:https}"

3
ui-ngx/src/app/modules/home/components/profile/queue/tenant-profile-queues.component.ts

@ -173,7 +173,8 @@ export class TenantProfileQueuesComponent implements ControlValueAccessor, Valid
},
topic: '',
additionalInfo: {
description: ''
description: '',
customProperties: ''
}
};
this.idMap.push(queue.id);

60
ui-ngx/src/app/modules/home/components/profile/tenant-profile.component.ts

@ -58,9 +58,9 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
id: guid(),
consumerPerPartition: true,
name: 'Main',
packProcessingTimeout: 2000,
partitions: 10,
pollInterval: 25,
packProcessingTimeout: 10000,
partitions: 1,
pollInterval: 2000,
processingStrategy: {
failurePercentage: 0,
maxPauseBetweenRetries: 3,
@ -74,7 +74,56 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
},
topic: 'tb_rule_engine.main',
additionalInfo: {
description: ''
description: '',
customProperties: ''
}
},
{
id: guid(),
name: 'HighPriority',
topic: 'tb_rule_engine.hp',
pollInterval: 2000,
partitions: 1,
consumerPerPartition: true,
packProcessingTimeout: 10000,
submitStrategy: {
type: 'BURST',
batchSize: 100
},
processingStrategy: {
type: 'RETRY_FAILED_AND_TIMED_OUT',
retries: 0,
failurePercentage: 0,
pauseBetweenRetries: 5,
maxPauseBetweenRetries: 5
},
additionalInfo: {
description: '',
customProperties: ''
}
},
{
id: guid(),
name: 'SequentialByOriginator',
topic: 'tb_rule_engine.sq',
pollInterval: 2000,
partitions: 1,
consumerPerPartition: true,
packProcessingTimeout: 10000,
submitStrategy: {
type: 'SEQUENTIAL_BY_ORIGINATOR',
batchSize: 100
},
processingStrategy: {
type: 'RETRY_FAILED_AND_TIMED_OUT',
retries: 3,
failurePercentage: 0,
pauseBetweenRetries: 5,
maxPauseBetweenRetries: 5
},
additionalInfo: {
description: '',
customProperties: ''
}
}
];
@ -118,9 +167,6 @@ export class TenantProfileComponent extends EntityComponent<TenantProfile> {
if (this.entityForm) {
if (this.isEditValue) {
this.entityForm.enable({emitEvent: false});
if (!this.isAdd) {
this.entityForm.get('isolatedTbRuleEngine').disable({emitEvent: false});
}
} else {
this.entityForm.disable({emitEvent: false});
}

5
ui-ngx/src/app/modules/home/components/queue/queue-form.component.html

@ -203,6 +203,11 @@
</ng-template>
</mat-expansion-panel>
</mat-accordion>
<mat-form-field class="mat-block" formGroupName="additionalInfo" appearance="fill">
<mat-label translate>queue.custom-properties</mat-label>
<textarea matInput formControlName="customProperties" cdkTextareaAutosize cdkAutosizeMinRows="1"></textarea>
<mat-hint translate>queue.custom-properties-hint</mat-hint>
</mat-form-field>
<mat-form-field class="mat-block" formGroupName="additionalInfo" appearance="fill">
<mat-label translate>queue.description</mat-label>
<textarea matInput formControlName="description" rows="2"></textarea>

3
ui-ngx/src/app/modules/home/components/queue/queue-form.component.ts

@ -117,7 +117,8 @@ export class QueueFormComponent implements ControlValueAccessor, OnInit, OnDestr
}),
topic: [''],
additionalInfo: this.fb.group({
description: ['']
description: [''],
customProperties: ['']
})
});
this.valueChange$ = this.queueFormGroup.valueChanges.subscribe(() => {

1
ui-ngx/src/app/shared/models/queue.models.ts

@ -121,5 +121,6 @@ export interface QueueInfo extends BaseData<QueueId> {
topic: string;
additionalInfo: {
description?: string;
customProperties?: string;
};
}

6
ui-ngx/src/assets/locale/locale.constant-en_US.json

@ -3612,6 +3612,8 @@
"description": "Description",
"description-hint": "This text will be displayed in the Queue description instead of the selected strategy",
"alt-description": "Submit Strategy: {{submitStrategy}}, Processing Strategy: {{processingStrategy}}",
"custom-properties": "Custom properties",
"custom-properties-hint": "Custom queue (topic) creation properties, e.g. 'retention.ms:604800000;retention.bytes:1048576000'",
"strategies": {
"sequential-by-originator-label": "Sequential by originator",
"sequential-by-originator-hint": "New message for e.g. device A is not submitted until previous message for device A is acknowledged",
@ -3679,8 +3681,8 @@
"tenant-required": "Tenant is required",
"search": "Search tenants",
"selected-tenants": "{ count, plural, =1 {1 tenant} other {# tenants} } selected",
"isolated-tb-rule-engine": "Processing in isolated ThingsBoard Rule Engine container",
"isolated-tb-rule-engine-details": "Requires separate microservice(s) per isolated Tenant"
"isolated-tb-rule-engine": "Use isolated ThingsBoard Rule Engine queues",
"isolated-tb-rule-engine-details": "Each tenant will have dedicated Rule Engine queues"
},
"tenant-profile": {
"tenant-profile": "Tenant profile",

Loading…
Cancel
Save