Browse Source

Implemented dedicated RE per tenant

pull/2618/head
Andrii Shvaika 6 years ago
parent
commit
6c4b50a380
  1. 28
      application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
  2. 25
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  3. 2
      application/src/main/java/org/thingsboard/server/controller/TenantController.java
  4. 8
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
  5. 47
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java
  6. 28
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java
  7. 19
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java
  8. 2
      application/src/main/resources/logback.xml
  9. 27
      application/src/main/resources/thingsboard.yml
  10. 5
      application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java
  11. 43
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java
  12. 26
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java
  13. 23
      common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java
  14. 12
      common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java
  15. 4
      common/queue/src/main/java/org/thingsboard/server/queue/util/TbCoreComponent.java
  16. 4
      common/queue/src/main/java/org/thingsboard/server/queue/util/TbRuleEngineComponent.java
  17. 12
      common/queue/src/main/proto/queue.proto
  18. 14
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java
  19. 32
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  20. 52
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java
  21. 35
      dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java
  22. 74
      transport/mqtt/src/main/resources/tb-mqtt-transport.yml

28
application/src/main/java/org/thingsboard/server/actors/app/AppActor.java

@ -39,10 +39,13 @@ import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.tenant.TenantService;
import scala.concurrent.duration.Duration;
import java.util.Optional;
public class AppActor extends ContextAwareActor {
private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID);
@ -68,7 +71,7 @@ public class AppActor extends ContextAwareActor {
@Override
protected boolean process(TbActorMsg msg) {
if (!ruleChainsInitialized) {
initRuleChainsAndTenantActors();
initTenantActors();
ruleChainsInitialized = true;
if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
log.warn("Rule Chains initialized by unexpected message: {}", msg);
@ -100,15 +103,30 @@ public class AppActor extends ContextAwareActor {
return true;
}
private void initRuleChainsAndTenantActors() {
private void initTenantActors() {
log.info("Starting main system actor.");
try {
if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
for (Tenant tenant : tenantIterator) {
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
if (isolatedTenantId.isPresent()) {
Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
if (tenant != null) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("Tenant actor created.");
} else {
log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
}
} else if (systemContext.isTenantComponentsInitEnabled()) {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
for (Tenant tenant : tenantIterator) {
if (isCore || (isRuleEngine && !tenant.isIsolatedTbRuleEngine())) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("[{}] Tenant actor created.", tenant.getId());
}
}
}
log.info("Main system actor started.");

25
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -22,7 +22,6 @@ import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.japi.Function;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.thingsboard.server.actors.ActorSystemContext;
@ -31,6 +30,7 @@ import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
@ -47,12 +47,13 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import scala.concurrent.duration.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class TenantActor extends RuleChainManagerActor {
private final BiMap<DeviceId, ActorRef> deviceActors;
private boolean isRuleEngine;
private boolean isRuleEngineForCurrentTenant;
private boolean isCore;
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
@ -69,10 +70,19 @@ public class TenantActor extends RuleChainManagerActor {
public void preStart() {
log.info("[{}] Starting tenant actor.", tenantId);
try {
isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
if (isRuleEngine) {
initRuleChains();
if (isRuleEngineForCurrentTenant) {
if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) {
initRuleChains();
} else {
isRuleEngineForCurrentTenant = false;
}
}
log.info("[{}] Tenant actor started.", tenantId);
} catch (Exception e) {
@ -132,8 +142,9 @@ public class TenantActor extends RuleChainManagerActor {
}
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (!isRuleEngine) {
if (!isRuleEngineForCurrentTenant) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
return;
}
TbMsg tbMsg = msg.getTbMsg();
if (tbMsg.getRuleChainId() == null) {
@ -167,7 +178,7 @@ public class TenantActor extends RuleChainManagerActor {
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
if (isRuleEngine) {
if (isRuleEngineForCurrentTenant) {
ActorRef target = getEntityActorRef(msg.getEntityId());
if (target != null) {
if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) {

2
application/src/main/java/org/thingsboard/server/controller/TenantController.java

@ -76,7 +76,6 @@ public class TenantController extends BaseController {
accessControlService.checkPermission(getCurrentUser(), Resource.TENANT, operation,
tenant.getId(), tenant);
tenant = checkNotNull(tenantService.saveTenant(tenant));
if (newTenant) {
installScripts.createDefaultRuleChains(tenant.getId());
@ -96,7 +95,6 @@ public class TenantController extends BaseController {
TenantId tenantId = new TenantId(toUUID(strTenantId));
checkTenantId(tenantId, Operation.DELETE);
tenantService.deleteTenant(tenantId);
tbClusterService.onEntityStateChange(tenantId, tenantId, ComponentLifecycleEvent.DELETED);
} catch (Exception e) {
throw handleException(e);

8
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

@ -54,6 +54,14 @@ public class DefaultTbClusterService implements TbClusterService {
@Override
public void onToRuleEngineMsg(TenantId tenantId, EntityId entityId, TbMsg tbMsg) {
if (tenantId.isNullUid()) {
if (entityId.getEntityType().equals(EntityType.TENANT)) {
tenantId = new TenantId(entityId.getId());
} else {
log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);
return;
}
}
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())

47
application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java

@ -0,0 +1,47 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.queue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
@Slf4j
@Service
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine'")
public class DefaultTenantRoutingInfoService implements TenantRoutingInfoService {
private final TenantService tenantService;
public DefaultTenantRoutingInfoService(TenantService tenantService) {
this.tenantService = tenantService;
}
@Override
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
Tenant tenant = tenantService.findTenantById(tenantId);
if (tenant != null) {
return new TenantRoutingInfo(tenantId, tenant.isIsolatedTbCore(), tenant.isIsolatedTbRuleEngine());
} else {
throw new RuntimeException("Tenant not found!");
}
}
}

28
application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java

@ -173,19 +173,21 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer
@Override
public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(currentPartitions);
removedPartitions.removeAll(partitionChangeEvent.getPartitions());
currentPartitions.clear();
currentPartitions.addAll(partitionChangeEvent.getPartitions());
// We no longer manage current partition of devices;
removedPartitions.forEach(partition -> {
Set<TbSubscription> subs = partitionedSubscriptions.remove(partition);
if (subs != null) {
subs.forEach(this::removeSubscriptionFromEntityMap);
}
});
if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(currentPartitions);
removedPartitions.removeAll(partitionChangeEvent.getPartitions());
currentPartitions.clear();
currentPartitions.addAll(partitionChangeEvent.getPartitions());
// We no longer manage current partition of devices;
removedPartitions.forEach(partition -> {
Set<TbSubscription> subs = partitionedSubscriptions.remove(partition);
if (subs != null) {
subs.forEach(this::removeSubscriptionFromEntityMap);
}
});
}
}
@Override

19
application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java

@ -24,8 +24,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.relation.EntityRelation;
@ -34,14 +34,18 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.state.DeviceStateService;
@ -59,6 +63,10 @@ public class DefaultTransportApiService implements TransportApiService {
private static final ObjectMapper mapper = new ObjectMapper();
//TODO: Constructor dependencies;
@Autowired
private TenantService tenantService;
@Autowired
private DeviceService deviceService;
@ -87,6 +95,8 @@ public class DefaultTransportApiService implements TransportApiService {
return Futures.transform(validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
return Futures.transform(handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
} else if (transportApiRequestMsg.hasGetTenantRoutingInfoRequestMsg()) {
return Futures.transform(handle(transportApiRequestMsg.getGetTenantRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
}
return Futures.transform(getEmptyTransportApiResponseFuture(), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
}
@ -129,6 +139,13 @@ public class DefaultTransportApiService implements TransportApiService {
}, dbCallbackExecutorService);
}
private ListenableFuture<TransportApiResponseMsg> handle(GetTenantRoutingInfoRequestMsg requestMsg) {
TenantId tenantId = new TenantId(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB()));
ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(TenantId.SYS_TENANT_ID, tenantId);
return Futures.transform(tenantFuture, tenant -> TransportApiResponseMsg.newBuilder()
.setGetTenantRoutingInfoResponseMsg(GetTenantRoutingInfoResponseMsg.newBuilder().setIsolatedTbCore(tenant.isIsolatedTbCore())
.setIsolatedTbRuleEngine(tenant.isIsolatedTbRuleEngine()).build()).build(), dbCallbackExecutorService);
}
private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) {
return Futures.transform(deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, deviceId), device -> {

2
application/src/main/resources/logback.xml

@ -27,6 +27,8 @@
<logger name="org.thingsboard.server" level="INFO" />
<logger name="akka" level="INFO" />
<logger name="org.springframework.boot.autoconfigure.logging" level="DEBUG" />
<!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
<!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->

27
application/src/main/resources/thingsboard.yml

@ -437,18 +437,6 @@ js:
print_interval_ms: "${TB_JS_LOCAL_STATS_PRINT_INTERVAL_MS:10000}"
# Remote JavaScript environment properties
remote:
# JS Eval request topic
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
# JS Eval responses topic prefix that is combined with node id
response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}"
# JS Eval max pending requests
max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
# JS Eval max request timeout
max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
# JS response poll interval
response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
# JS response auto commit interval
response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
# Maximum allowed JavaScript execution errors before JavaScript will be blacklisted
max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}"
# Maximum time in seconds for black listed function to stay in the list.
@ -579,6 +567,19 @@ queue:
stats:
enabled: "${TB_QUEUE_CORE_STATS_ENABLED:false}"
print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
js:
# JS Eval request topic
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
# JS Eval responses topic prefix that is combined with node id
response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}"
# JS Eval max pending requests
max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
# JS Eval max request timeout
max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
# JS response poll interval
response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
# JS response auto commit interval
response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
rule-engine:
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
@ -586,7 +587,7 @@ queue:
stats:
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
queues: # TODO 2.5: specify correct ENV variable names.
queues:
- name: "Main"
topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}"
poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"

5
application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java

@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
import java.util.ArrayList;
import java.util.Collections;
@ -49,6 +50,7 @@ public class ConsistentHashParitionServiceTest {
private ConsistentHashPartitionService clusterRoutingService;
private TbServiceInfoProvider discoveryService;
private TenantRoutingInfoService routingInfoService;
private ApplicationEventPublisher applicationEventPublisher;
private String hashFunctionName = "murmur3_128";
@ -59,7 +61,8 @@ public class ConsistentHashParitionServiceTest {
public void setup() throws Exception {
discoveryService = mock(TbServiceInfoProvider.class);
applicationEventPublisher = mock(ApplicationEventPublisher.class);
clusterRoutingService = new ConsistentHashPartitionService(discoveryService, applicationEventPublisher);
routingInfoService = mock(TenantRoutingInfoService.class);
clusterRoutingService = new ConsistentHashPartitionService(discoveryService, routingInfoService, applicationEventPublisher);
ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core");
ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3);
ReflectionTestUtils.setField(clusterRoutingService, "ruleEngineTopic", "tb.rule-engine");

43
common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java

@ -60,11 +60,12 @@ public class ConsistentHashPartitionService implements PartitionService {
private final ApplicationEventPublisher applicationEventPublisher;
private final TbServiceInfoProvider serviceInfoProvider;
private final TenantRoutingInfoService tenantRoutingInfoService;
private final ConcurrentMap<ServiceQueue, String> partitionTopics = new ConcurrentHashMap<>();
private final ConcurrentMap<ServiceQueue, Integer> partitionSizes = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>();
private ConcurrentMap<ServiceQueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
//TODO: Fetch this from the database, together with size of partitions for each service for each tenant.
private ConcurrentMap<TenantId, Set<ServiceType>> isolatedTenants = new ConcurrentHashMap<>();
private ConcurrentMap<TopicPartitionInfoKey, TopicPartitionInfo> tpiCache = new ConcurrentHashMap<>();
private Map<String, TopicPartitionInfo> tbCoreNotificationTopics = new HashMap<>();
@ -73,8 +74,9 @@ public class ConsistentHashPartitionService implements PartitionService {
private HashFunction hashFunction;
public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, ApplicationEventPublisher applicationEventPublisher) {
public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, TenantRoutingInfoService tenantRoutingInfoService, ApplicationEventPublisher applicationEventPublisher) {
this.serviceInfoProvider = serviceInfoProvider;
this.tenantRoutingInfoService = tenantRoutingInfoService;
this.applicationEventPublisher = applicationEventPublisher;
}
@ -122,10 +124,10 @@ public class ConsistentHashPartitionService implements PartitionService {
addNode(circles, other);
}
ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
TenantId myTenantId = getSystemOrIsolatedTenantId(currentService);
TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
myPartitions = new ConcurrentHashMap<>();
partitionSizes.forEach((type, size) -> {
ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(type, myTenantId);
ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(type, myIsolatedOrSystemTenantId);
for (int i = 0; i < size; i++) {
ServiceInfo serviceInfo = resolveByPartitionIdx(circles.get(myServiceQueueKey), i);
if (currentService.equals(serviceInfo)) {
@ -247,7 +249,30 @@ public class ConsistentHashPartitionService implements PartitionService {
}
private boolean isIsolated(ServiceQueue serviceQueue, TenantId tenantId) {
return isolatedTenants.get(tenantId) != null && isolatedTenants.get(tenantId).contains(serviceQueue.getType());
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
return false;
}
TenantRoutingInfo routingInfo = tenantRoutingInfoMap.get(tenantId);
if (routingInfo == null) {
synchronized (tenantRoutingInfoMap) {
routingInfo = tenantRoutingInfoMap.get(tenantId);
if (routingInfo == null) {
routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId);
tenantRoutingInfoMap.put(tenantId, routingInfo);
}
}
}
if (routingInfo == null) {
throw new RuntimeException("Tenant not found!");
}
switch (serviceQueue.getType()) {
case TB_CORE:
return routingInfo.isIsolatedTbCore();
case TB_RULE_ENGINE:
return routingInfo.isIsolatedTbRuleEngine();
default:
return false;
}
}
private void logServiceInfo(TransportProtos.ServiceInfo server) {
@ -265,12 +290,6 @@ public class ConsistentHashPartitionService implements PartitionService {
private void addNode(Map<ServiceQueueKey, ConsistentHashCircle<ServiceInfo>> circles, ServiceInfo instance) {
TenantId tenantId = getSystemOrIsolatedTenantId(instance);
if (!tenantId.isNullUid()) {
isolatedTenants.putIfAbsent(tenantId, new HashSet<>());
for (String serviceType : instance.getServiceTypesList()) {
isolatedTenants.get(tenantId).add(ServiceType.valueOf(serviceType.toUpperCase()));
}
}
for (String serviceTypeStr : instance.getServiceTypesList()) {
ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase());
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {

26
common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java

@ -0,0 +1,26 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.discovery;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
@Data
public class TenantRoutingInfo {
private final TenantId tenantId;
private final boolean isolatedTbCore;
private final boolean isolatedTbRuleEngine;
}

23
common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java

@ -0,0 +1,23 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.discovery;
import org.thingsboard.server.common.data.id.TenantId;
public interface TenantRoutingInfoService {
TenantRoutingInfo getRoutingInfo(TenantId tenantId);
}

12
common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java

@ -22,21 +22,21 @@ import org.springframework.stereotype.Component;
@Data
@Component
public class TbQueueRemoteJsInvokeSettings {
@Value("${js.remote.request_topic}")
@Value("${queue.js.request_topic}")
private String requestTopic;
@Value("${js.remote.response_topic_prefix}")
@Value("${queue.js.response_topic_prefix}")
private String responseTopic;
@Value("${js.remote.max_pending_requests}")
@Value("${queue.js.max_pending_requests}")
private long maxPendingRequests;
@Value("${js.remote.response_poll_interval}")
@Value("${queue.js.response_poll_interval}")
private int responsePollInterval;
@Value("${js.remote.response_auto_commit_interval}")
@Value("${queue.js.response_auto_commit_interval}")
private int autoCommitInterval;
@Value("${js.remote.max_requests_timeout}")
@Value("${queue.js.max_requests_timeout}")
private long maxRequestsTimeout;
}

4
common/queue/src/main/java/org/thingsboard/server/queue/util/TbCoreComponent.java

@ -17,6 +17,10 @@ package org.thingsboard.server.queue.util;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core'")
public @interface TbCoreComponent {
}

4
common/queue/src/main/java/org/thingsboard/server/queue/util/TbRuleEngineComponent.java

@ -17,6 +17,10 @@ package org.thingsboard.server.queue.util;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-rule-engine'")
public @interface TbRuleEngineComponent {
}

12
common/queue/src/main/proto/queue.proto

@ -158,6 +158,16 @@ message GetOrCreateDeviceFromGatewayResponseMsg {
DeviceInfoProto deviceInfo = 1;
}
message GetTenantRoutingInfoRequestMsg {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
}
message GetTenantRoutingInfoResponseMsg {
bool isolatedTbCore = 1;
bool isolatedTbRuleEngine = 2;
}
message SessionCloseNotificationProto {
string message = 1;
}
@ -359,12 +369,14 @@ message TransportApiRequestMsg {
ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2;
GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3;
GetTenantRoutingInfoRequestMsg getTenantRoutingInfoRequestMsg = 4;
}
/* Response from ThingsBoard Core Service to Transport Service */
message TransportApiResponseMsg {
ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2;
GetTenantRoutingInfoResponseMsg getTenantRoutingInfoResponseMsg = 4;
}
/* Messages that are handled by ThingsBoard Core Service */

14
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java

@ -15,15 +15,19 @@
*/
package org.thingsboard.server.common.transport;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
@ -35,14 +39,16 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce
*/
public interface TransportService {
GetTenantRoutingInfoResponseMsg getRoutingInfo(GetTenantRoutingInfoRequestMsg msg);
void process(ValidateDeviceTokenRequestMsg msg,
TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
void process(ValidateDeviceX509CertRequestMsg msg,
TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg> callback);
void process(GetOrCreateDeviceFromGatewayRequestMsg msg,
TransportServiceCallback<GetOrCreateDeviceFromGatewayResponseMsg> callback);
boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback<Void> callback);
@ -62,7 +68,7 @@ public interface TransportService {
void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback);
void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback<Void> callback);
void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback<Void> callback);

32
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -18,6 +18,7 @@ package org.thingsboard.server.common.transport.service;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
@ -43,6 +44,8 @@ import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -61,6 +64,7 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -194,6 +198,34 @@ public class DefaultTransportService implements TransportService {
sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener));
}
@Override
public TransportProtos.GetTenantRoutingInfoResponseMsg getRoutingInfo(TransportProtos.GetTenantRoutingInfoRequestMsg msg) {
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetTenantRoutingInfoRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
return response.getValue().getGetTenantRoutingInfoResponseMsg();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
TransportProtos.GetTenantRoutingInfoRequestMsg msg = TransportProtos.GetTenantRoutingInfoRequestMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.build();
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetTenantRoutingInfoRequestMsg(msg).build());
try {
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
TransportProtos.GetTenantRoutingInfoResponseMsg routingInfo = response.getValue().getGetTenantRoutingInfoResponseMsg();
return new TenantRoutingInfo(tenantId, routingInfo.getIsolatedTbCore(), routingInfo.getIsolatedTbRuleEngine());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
@Override
public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback) {
log.trace("Processing msg: {}", msg);

52
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java

@ -0,0 +1,52 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.transport.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg;
import org.thingsboard.server.queue.discovery.TenantRoutingInfo;
import org.thingsboard.server.queue.discovery.TenantRoutingInfoService;
@Slf4j
@Service
@ConditionalOnExpression("'${service.type:null}'=='tb-transport'")
public class TransportTenantRoutingInfoService implements TenantRoutingInfoService {
private TransportService transportService;
@Lazy
@Autowired
public void setTransportService(TransportService transportService) {
this.transportService = transportService;
}
@Override
public TenantRoutingInfo getRoutingInfo(TenantId tenantId) {
GetTenantRoutingInfoRequestMsg msg = GetTenantRoutingInfoRequestMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.build();
GetTenantRoutingInfoResponseMsg routingInfo = transportService.getRoutingInfo(msg);
return new TenantRoutingInfo(tenantId, routingInfo.getIsolatedTbCore(), routingInfo.getIsolatedTbRuleEngine());
}
}

35
dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java

@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.TextPageData;
@ -126,7 +127,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
@Override
public void deleteTenants() {
log.trace("Executing deleteTenants");
tenantsRemover.removeEntities(new TenantId(EntityId.NULL_UUID),DEFAULT_TENANT_REGION);
tenantsRemover.removeEntities(new TenantId(EntityId.NULL_UUID), DEFAULT_TENANT_REGION);
}
private DataValidator<Tenant> tenantValidator =
@ -140,19 +141,31 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe
validateEmail(tenant.getEmail());
}
}
};
@Override
protected void validateUpdate(TenantId tenantId, Tenant tenant) {
Tenant old = tenantDao.findById(TenantId.SYS_TENANT_ID, tenantId.getId());
if (old == null) {
throw new DataValidationException("Can't update non existing tenant!");
} else if (old.isIsolatedTbRuleEngine() != tenant.isIsolatedTbRuleEngine()) {
throw new DataValidationException("Can't update isolatedTbRuleEngine property!");
} else if (old.isIsolatedTbCore() != tenant.isIsolatedTbCore()) {
throw new DataValidationException("Can't update isolatedTbCore property!");
}
}
};
private PaginatedRemover<String, Tenant> tenantsRemover =
new PaginatedRemover<String, Tenant>() {
@Override
protected List<Tenant> findEntities(TenantId tenantId, String region, TextPageLink pageLink) {
return tenantDao.findTenantsByRegion(tenantId, region, pageLink);
}
@Override
protected List<Tenant> findEntities(TenantId tenantId, String region, TextPageLink pageLink) {
return tenantDao.findTenantsByRegion(tenantId, region, pageLink);
}
@Override
protected void removeEntity(TenantId tenantId, Tenant entity) {
deleteTenant(new TenantId(entity.getUuidId()));
}
};
@Override
protected void removeEntity(TenantId tenantId, Tenant entity) {
deleteTenant(new TenantId(entity.getUuidId()));
}
};
}

74
transport/mqtt/src/main/resources/tb-mqtt-transport.yml

@ -63,7 +63,7 @@ transport:
max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}"
queue:
type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub or service-bus
type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub or service-bus
kafka:
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
acks: "${TB_KAFKA_ACKS:all}"
@ -111,12 +111,25 @@ queue:
response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
core:
topic: "${TB_QUEUE_CORE_TOPIC:tb.core}"
poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
pack_processing_timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}"
pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}"
stats:
enabled: "${TB_QUEUE_CORE_STATS_ENABLED:false}"
print_interval_ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}"
js:
# JS Eval request topic
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}"
# JS Eval responses topic prefix that is combined with node id
response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}"
# JS Eval max pending requests
max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}"
# JS Eval max request timeout
max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}"
# JS response poll interval
response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}"
# JS response auto commit interval
response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
rule-engine:
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
@ -124,32 +137,39 @@ queue:
stats:
enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}"
print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}"
queues: # TODO 2.5: specify correct ENV variable names.
-
name: "Main"
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.main}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
ack-strategy:
type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
queues:
- name: "Main"
topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}"
poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}"
pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch
processing-strategy:
type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
-
name: "HighPriority"
topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.hp}"
poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:3}"
pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}"
ack-strategy:
type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries;
- name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}"
topic: "${TB_QUEUE_RE_HP_TOPIC:tb.rule-engine.hp}"
poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}"
partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}"
pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}"
submit-strategy:
type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_WITHIN_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL
# For BATCH only
batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch
processing-strategy:
type: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
# For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT
retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:1}"# Time in seconds to wait in consumer thread before retries;
retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited
failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages;
pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries;
transport:
# For high priority notifications that require minimum latency and processing time
notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"

Loading…
Cancel
Save