diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 145ae10d06..7e4773aa8b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -39,10 +39,13 @@ import org.thingsboard.server.common.msg.aware.TenantAwareMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.tenant.TenantService; import scala.concurrent.duration.Duration; +import java.util.Optional; + public class AppActor extends ContextAwareActor { private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); @@ -68,7 +71,7 @@ public class AppActor extends ContextAwareActor { @Override protected boolean process(TbActorMsg msg) { if (!ruleChainsInitialized) { - initRuleChainsAndTenantActors(); + initTenantActors(); ruleChainsInitialized = true; if (msg.getMsgType() != MsgType.APP_INIT_MSG) { log.warn("Rule Chains initialized by unexpected message: {}", msg); @@ -100,15 +103,30 @@ public class AppActor extends ContextAwareActor { return true; } - private void initRuleChainsAndTenantActors() { + private void initTenantActors() { log.info("Starting main system actor."); try { - if (systemContext.isTenantComponentsInitEnabled()) { - PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT); - for (Tenant tenant : tenantIterator) { + // This Service may be started for specific tenant only. + Optional isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant(); + if (isolatedTenantId.isPresent()) { + Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get()); + if (tenant != null) { log.debug("[{}] Creating tenant actor", tenant.getId()); getOrCreateTenantActor(tenant.getId()); log.debug("Tenant actor created."); + } else { + log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get()); + } + } else if (systemContext.isTenantComponentsInitEnabled()) { + PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT); + boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); + boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); + for (Tenant tenant : tenantIterator) { + if (isCore || (isRuleEngine && !tenant.isIsolatedTbRuleEngine())) { + log.debug("[{}] Creating tenant actor", tenant.getId()); + getOrCreateTenantActor(tenant.getId()); + log.debug("[{}] Tenant actor created.", tenant.getId()); + } } } log.info("Main system actor started."); diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index e54b955419..a1bf343efe 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -22,7 +22,6 @@ import akka.actor.OneForOneStrategy; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.actor.Terminated; -import akka.japi.Function; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import org.thingsboard.server.actors.ActorSystemContext; @@ -31,6 +30,7 @@ import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor; import org.thingsboard.server.actors.service.ContextBasedCreator; import org.thingsboard.server.actors.service.DefaultActorService; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; @@ -47,12 +47,13 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import scala.concurrent.duration.Duration; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public class TenantActor extends RuleChainManagerActor { private final BiMap deviceActors; - private boolean isRuleEngine; + private boolean isRuleEngineForCurrentTenant; private boolean isCore; private TenantActor(ActorSystemContext systemContext, TenantId tenantId) { @@ -69,10 +70,19 @@ public class TenantActor extends RuleChainManagerActor { public void preStart() { log.info("[{}] Starting tenant actor.", tenantId); try { - isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); + Tenant tenant = systemContext.getTenantService().findTenantById(tenantId); + // This Service may be started for specific tenant only. + Optional isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant(); + + isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE); isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); - if (isRuleEngine) { - initRuleChains(); + + if (isRuleEngineForCurrentTenant) { + if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) { + initRuleChains(); + } else { + isRuleEngineForCurrentTenant = false; + } } log.info("[{}] Tenant actor started.", tenantId); } catch (Exception e) { @@ -132,8 +142,9 @@ public class TenantActor extends RuleChainManagerActor { } private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) { - if (!isRuleEngine) { + if (!isRuleEngineForCurrentTenant) { log.warn("RECEIVED INVALID MESSAGE: {}", msg); + return; } TbMsg tbMsg = msg.getTbMsg(); if (tbMsg.getRuleChainId() == null) { @@ -167,7 +178,7 @@ public class TenantActor extends RuleChainManagerActor { } private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { - if (isRuleEngine) { + if (isRuleEngineForCurrentTenant) { ActorRef target = getEntityActorRef(msg.getEntityId()); if (target != null) { if (msg.getEntityId().getEntityType() == EntityType.RULE_CHAIN) { diff --git a/application/src/main/java/org/thingsboard/server/controller/TenantController.java b/application/src/main/java/org/thingsboard/server/controller/TenantController.java index abd6bf6804..9def943e88 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TenantController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TenantController.java @@ -76,7 +76,6 @@ public class TenantController extends BaseController { accessControlService.checkPermission(getCurrentUser(), Resource.TENANT, operation, tenant.getId(), tenant); - tenant = checkNotNull(tenantService.saveTenant(tenant)); if (newTenant) { installScripts.createDefaultRuleChains(tenant.getId()); @@ -96,7 +95,6 @@ public class TenantController extends BaseController { TenantId tenantId = new TenantId(toUUID(strTenantId)); checkTenantId(tenantId, Operation.DELETE); tenantService.deleteTenant(tenantId); - tbClusterService.onEntityStateChange(tenantId, tenantId, ComponentLifecycleEvent.DELETED); } catch (Exception e) { throw handleException(e); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 5851fd919c..fa6aa9e886 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -54,6 +54,14 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onToRuleEngineMsg(TenantId tenantId, EntityId entityId, TbMsg tbMsg) { + if (tenantId.isNullUid()) { + if (entityId.getEntityType().equals(EntityType.TENANT)) { + tenantId = new TenantId(entityId.getId()); + } else { + log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg); + return; + } + } TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java new file mode 100644 index 0000000000..aae3cef0ac --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTenantRoutingInfoService.java @@ -0,0 +1,47 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.queue; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.tenant.TenantService; +import org.thingsboard.server.queue.discovery.TenantRoutingInfo; +import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; + +@Slf4j +@Service +@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core' || '${service.type:null}'=='tb-rule-engine'") +public class DefaultTenantRoutingInfoService implements TenantRoutingInfoService { + + private final TenantService tenantService; + + public DefaultTenantRoutingInfoService(TenantService tenantService) { + this.tenantService = tenantService; + } + + @Override + public TenantRoutingInfo getRoutingInfo(TenantId tenantId) { + Tenant tenant = tenantService.findTenantById(tenantId); + if (tenant != null) { + return new TenantRoutingInfo(tenantId, tenant.isIsolatedTbCore(), tenant.isIsolatedTbRuleEngine()); + } else { + throw new RuntimeException("Tenant not found!"); + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index 0d151c3f46..e841acae12 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -173,19 +173,21 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer @Override public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) { - Set removedPartitions = new HashSet<>(currentPartitions); - removedPartitions.removeAll(partitionChangeEvent.getPartitions()); - - currentPartitions.clear(); - currentPartitions.addAll(partitionChangeEvent.getPartitions()); - - // We no longer manage current partition of devices; - removedPartitions.forEach(partition -> { - Set subs = partitionedSubscriptions.remove(partition); - if (subs != null) { - subs.forEach(this::removeSubscriptionFromEntityMap); - } - }); + if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) { + Set removedPartitions = new HashSet<>(currentPartitions); + removedPartitions.removeAll(partitionChangeEvent.getPartitions()); + + currentPartitions.clear(); + currentPartitions.addAll(partitionChangeEvent.getPartitions()); + + // We no longer manage current partition of devices; + removedPartitions.forEach(partition -> { + Set subs = partitionedSubscriptions.remove(partition); + if (subs != null) { + subs.forEach(this::removeSubscriptionFromEntityMap); + } + }); + } } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 4ff647b4d9..7456ca7662 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -24,8 +24,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.relation.EntityRelation; @@ -34,14 +34,18 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.dao.device.DeviceCredentialsService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.relation.RelationService; +import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.state.DeviceStateService; @@ -59,6 +63,10 @@ public class DefaultTransportApiService implements TransportApiService { private static final ObjectMapper mapper = new ObjectMapper(); + //TODO: Constructor dependencies; + @Autowired + private TenantService tenantService; + @Autowired private DeviceService deviceService; @@ -87,6 +95,8 @@ public class DefaultTransportApiService implements TransportApiService { return Futures.transform(validateCredentials(msg.getHash(), DeviceCredentialsType.X509_CERTIFICATE), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { return Futures.transform(handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); + } else if (transportApiRequestMsg.hasGetTenantRoutingInfoRequestMsg()) { + return Futures.transform(handle(transportApiRequestMsg.getGetTenantRoutingInfoRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); } return Futures.transform(getEmptyTransportApiResponseFuture(), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); } @@ -129,6 +139,13 @@ public class DefaultTransportApiService implements TransportApiService { }, dbCallbackExecutorService); } + private ListenableFuture handle(GetTenantRoutingInfoRequestMsg requestMsg) { + TenantId tenantId = new TenantId(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB())); + ListenableFuture tenantFuture = tenantService.findTenantByIdAsync(TenantId.SYS_TENANT_ID, tenantId); + return Futures.transform(tenantFuture, tenant -> TransportApiResponseMsg.newBuilder() + .setGetTenantRoutingInfoResponseMsg(GetTenantRoutingInfoResponseMsg.newBuilder().setIsolatedTbCore(tenant.isIsolatedTbCore()) + .setIsolatedTbRuleEngine(tenant.isIsolatedTbRuleEngine()).build()).build(), dbCallbackExecutorService); + } private ListenableFuture getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) { return Futures.transform(deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, deviceId), device -> { diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index 8389fa5d71..08a976592d 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -27,6 +27,8 @@ + + diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index affd7e7908..a2a06b57d0 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -437,18 +437,6 @@ js: print_interval_ms: "${TB_JS_LOCAL_STATS_PRINT_INTERVAL_MS:10000}" # Remote JavaScript environment properties remote: - # JS Eval request topic - request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}" - # JS Eval responses topic prefix that is combined with node id - response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}" - # JS Eval max pending requests - max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}" - # JS Eval max request timeout - max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}" - # JS response poll interval - response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}" - # JS response auto commit interval - response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" # Maximum allowed JavaScript execution errors before JavaScript will be blacklisted max_errors: "${REMOTE_JS_SANDBOX_MAX_ERRORS:3}" # Maximum time in seconds for black listed function to stay in the list. @@ -579,6 +567,19 @@ queue: stats: enabled: "${TB_QUEUE_CORE_STATS_ENABLED:false}" print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}" + js: + # JS Eval request topic + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}" + # JS Eval responses topic prefix that is combined with node id + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}" + # JS Eval max pending requests + max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}" + # JS Eval max request timeout + max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}" + # JS response poll interval + response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}" + # JS response auto commit interval + response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" rule-engine: topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}" poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" @@ -586,7 +587,7 @@ queue: stats: enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}" print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}" - queues: # TODO 2.5: specify correct ENV variable names. + queues: - name: "Main" topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}" poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" diff --git a/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java index 2634d5af99..3f7619ed1e 100644 --- a/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cluster/routing/ConsistentHashParitionServiceTest.java @@ -30,6 +30,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; import java.util.ArrayList; import java.util.Collections; @@ -49,6 +50,7 @@ public class ConsistentHashParitionServiceTest { private ConsistentHashPartitionService clusterRoutingService; private TbServiceInfoProvider discoveryService; + private TenantRoutingInfoService routingInfoService; private ApplicationEventPublisher applicationEventPublisher; private String hashFunctionName = "murmur3_128"; @@ -59,7 +61,8 @@ public class ConsistentHashParitionServiceTest { public void setup() throws Exception { discoveryService = mock(TbServiceInfoProvider.class); applicationEventPublisher = mock(ApplicationEventPublisher.class); - clusterRoutingService = new ConsistentHashPartitionService(discoveryService, applicationEventPublisher); + routingInfoService = mock(TenantRoutingInfoService.class); + clusterRoutingService = new ConsistentHashPartitionService(discoveryService, routingInfoService, applicationEventPublisher); ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core"); ReflectionTestUtils.setField(clusterRoutingService, "corePartitions", 3); ReflectionTestUtils.setField(clusterRoutingService, "ruleEngineTopic", "tb.rule-engine"); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java index 0552aa05c6..dfd1437b5f 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/ConsistentHashPartitionService.java @@ -60,11 +60,12 @@ public class ConsistentHashPartitionService implements PartitionService { private final ApplicationEventPublisher applicationEventPublisher; private final TbServiceInfoProvider serviceInfoProvider; + private final TenantRoutingInfoService tenantRoutingInfoService; private final ConcurrentMap partitionTopics = new ConcurrentHashMap<>(); private final ConcurrentMap partitionSizes = new ConcurrentHashMap<>(); + private final ConcurrentMap tenantRoutingInfoMap = new ConcurrentHashMap<>(); + private ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); - //TODO: Fetch this from the database, together with size of partitions for each service for each tenant. - private ConcurrentMap> isolatedTenants = new ConcurrentHashMap<>(); private ConcurrentMap tpiCache = new ConcurrentHashMap<>(); private Map tbCoreNotificationTopics = new HashMap<>(); @@ -73,8 +74,9 @@ public class ConsistentHashPartitionService implements PartitionService { private HashFunction hashFunction; - public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, ApplicationEventPublisher applicationEventPublisher) { + public ConsistentHashPartitionService(TbServiceInfoProvider serviceInfoProvider, TenantRoutingInfoService tenantRoutingInfoService, ApplicationEventPublisher applicationEventPublisher) { this.serviceInfoProvider = serviceInfoProvider; + this.tenantRoutingInfoService = tenantRoutingInfoService; this.applicationEventPublisher = applicationEventPublisher; } @@ -122,10 +124,10 @@ public class ConsistentHashPartitionService implements PartitionService { addNode(circles, other); } ConcurrentMap> oldPartitions = myPartitions; - TenantId myTenantId = getSystemOrIsolatedTenantId(currentService); + TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService); myPartitions = new ConcurrentHashMap<>(); partitionSizes.forEach((type, size) -> { - ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(type, myTenantId); + ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(type, myIsolatedOrSystemTenantId); for (int i = 0; i < size; i++) { ServiceInfo serviceInfo = resolveByPartitionIdx(circles.get(myServiceQueueKey), i); if (currentService.equals(serviceInfo)) { @@ -247,7 +249,30 @@ public class ConsistentHashPartitionService implements PartitionService { } private boolean isIsolated(ServiceQueue serviceQueue, TenantId tenantId) { - return isolatedTenants.get(tenantId) != null && isolatedTenants.get(tenantId).contains(serviceQueue.getType()); + if (TenantId.SYS_TENANT_ID.equals(tenantId)) { + return false; + } + TenantRoutingInfo routingInfo = tenantRoutingInfoMap.get(tenantId); + if (routingInfo == null) { + synchronized (tenantRoutingInfoMap) { + routingInfo = tenantRoutingInfoMap.get(tenantId); + if (routingInfo == null) { + routingInfo = tenantRoutingInfoService.getRoutingInfo(tenantId); + tenantRoutingInfoMap.put(tenantId, routingInfo); + } + } + } + if (routingInfo == null) { + throw new RuntimeException("Tenant not found!"); + } + switch (serviceQueue.getType()) { + case TB_CORE: + return routingInfo.isIsolatedTbCore(); + case TB_RULE_ENGINE: + return routingInfo.isIsolatedTbRuleEngine(); + default: + return false; + } } private void logServiceInfo(TransportProtos.ServiceInfo server) { @@ -265,12 +290,6 @@ public class ConsistentHashPartitionService implements PartitionService { private void addNode(Map> circles, ServiceInfo instance) { TenantId tenantId = getSystemOrIsolatedTenantId(instance); - if (!tenantId.isNullUid()) { - isolatedTenants.putIfAbsent(tenantId, new HashSet<>()); - for (String serviceType : instance.getServiceTypesList()) { - isolatedTenants.get(tenantId).add(ServiceType.valueOf(serviceType.toUpperCase())); - } - } for (String serviceTypeStr : instance.getServiceTypesList()) { ServiceType serviceType = ServiceType.valueOf(serviceTypeStr.toUpperCase()); if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java new file mode 100644 index 0000000000..07c4c2d9c5 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfo.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.discovery; + +import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; + +@Data +public class TenantRoutingInfo { + private final TenantId tenantId; + private final boolean isolatedTbCore; + private final boolean isolatedTbRuleEngine; +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java new file mode 100644 index 0000000000..685a36df30 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TenantRoutingInfoService.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.discovery; + +import org.thingsboard.server.common.data.id.TenantId; + +public interface TenantRoutingInfoService { + + TenantRoutingInfo getRoutingInfo(TenantId tenantId); +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java index cdad407b19..cd492e87f1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/settings/TbQueueRemoteJsInvokeSettings.java @@ -22,21 +22,21 @@ import org.springframework.stereotype.Component; @Data @Component public class TbQueueRemoteJsInvokeSettings { - @Value("${js.remote.request_topic}") + @Value("${queue.js.request_topic}") private String requestTopic; - @Value("${js.remote.response_topic_prefix}") + @Value("${queue.js.response_topic_prefix}") private String responseTopic; - @Value("${js.remote.max_pending_requests}") + @Value("${queue.js.max_pending_requests}") private long maxPendingRequests; - @Value("${js.remote.response_poll_interval}") + @Value("${queue.js.response_poll_interval}") private int responsePollInterval; - @Value("${js.remote.response_auto_commit_interval}") + @Value("${queue.js.response_auto_commit_interval}") private int autoCommitInterval; - @Value("${js.remote.max_requests_timeout}") + @Value("${queue.js.max_requests_timeout}") private long maxRequestsTimeout; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/TbCoreComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbCoreComponent.java index c42b9b9f7d..ca2316e174 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/TbCoreComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbCoreComponent.java @@ -17,6 +17,10 @@ package org.thingsboard.server.queue.util; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) @ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core'") public @interface TbCoreComponent { } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/TbRuleEngineComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbRuleEngineComponent.java index 855bebb19e..3b99c94dea 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/TbRuleEngineComponent.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbRuleEngineComponent.java @@ -17,6 +17,10 @@ package org.thingsboard.server.queue.util; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +@Retention(RetentionPolicy.RUNTIME) @ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-rule-engine'") public @interface TbRuleEngineComponent { } diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index e36c4e36a9..3db8495d45 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -158,6 +158,16 @@ message GetOrCreateDeviceFromGatewayResponseMsg { DeviceInfoProto deviceInfo = 1; } +message GetTenantRoutingInfoRequestMsg { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; +} + +message GetTenantRoutingInfoResponseMsg { + bool isolatedTbCore = 1; + bool isolatedTbRuleEngine = 2; +} + message SessionCloseNotificationProto { string message = 1; } @@ -359,12 +369,14 @@ message TransportApiRequestMsg { ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1; ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2; GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3; + GetTenantRoutingInfoRequestMsg getTenantRoutingInfoRequestMsg = 4; } /* Response from ThingsBoard Core Service to Transport Service */ message TransportApiResponseMsg { ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1; GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2; + GetTenantRoutingInfoResponseMsg getTenantRoutingInfoResponseMsg = 4; } /* Messages that are handled by ThingsBoard Core Service */ diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index b0772de945..2af55c9206 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -15,15 +15,19 @@ */ package org.thingsboard.server.common.transport; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; @@ -35,14 +39,16 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509Ce */ public interface TransportService { + GetTenantRoutingInfoResponseMsg getRoutingInfo(GetTenantRoutingInfoRequestMsg msg); + void process(ValidateDeviceTokenRequestMsg msg, TransportServiceCallback callback); void process(ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback callback); - void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg msg, - TransportServiceCallback callback); + void process(GetOrCreateDeviceFromGatewayRequestMsg msg, + TransportServiceCallback callback); boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback); @@ -62,7 +68,7 @@ public interface TransportService { void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback); - void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback callback); void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback callback); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index d107bbbeeb..7b2f157cb4 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.transport.service; import com.google.gson.Gson; import com.google.gson.JsonObject; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; @@ -43,6 +44,8 @@ import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.discovery.TenantRoutingInfo; +import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbTransportQueueFactory; import org.thingsboard.server.gen.transport.TransportProtos; @@ -61,6 +64,7 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -194,6 +198,34 @@ public class DefaultTransportService implements TransportService { sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener)); } + @Override + public TransportProtos.GetTenantRoutingInfoResponseMsg getRoutingInfo(TransportProtos.GetTenantRoutingInfoRequestMsg msg) { + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setGetTenantRoutingInfoRequestMsg(msg).build()); + try { + TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); + return response.getValue().getGetTenantRoutingInfoResponseMsg(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public TenantRoutingInfo getRoutingInfo(TenantId tenantId) { + + TransportProtos.GetTenantRoutingInfoRequestMsg msg = TransportProtos.GetTenantRoutingInfoRequestMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .build(); + TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetTenantRoutingInfoRequestMsg(msg).build()); + try { + TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); + TransportProtos.GetTenantRoutingInfoResponseMsg routingInfo = response.getValue().getGetTenantRoutingInfoResponseMsg(); + return new TenantRoutingInfo(tenantId, routingInfo.getIsolatedTbCore(), routingInfo.getIsolatedTbRuleEngine()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + @Override public void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, TransportServiceCallback callback) { log.trace("Processing msg: {}", msg); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java new file mode 100644 index 0000000000..f2534a64c9 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/TransportTenantRoutingInfoService.java @@ -0,0 +1,52 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetTenantRoutingInfoResponseMsg; +import org.thingsboard.server.queue.discovery.TenantRoutingInfo; +import org.thingsboard.server.queue.discovery.TenantRoutingInfoService; + +@Slf4j +@Service +@ConditionalOnExpression("'${service.type:null}'=='tb-transport'") +public class TransportTenantRoutingInfoService implements TenantRoutingInfoService { + + private TransportService transportService; + + @Lazy + @Autowired + public void setTransportService(TransportService transportService) { + this.transportService = transportService; + } + + @Override + public TenantRoutingInfo getRoutingInfo(TenantId tenantId) { + GetTenantRoutingInfoRequestMsg msg = GetTenantRoutingInfoRequestMsg.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .build(); + GetTenantRoutingInfoResponseMsg routingInfo = transportService.getRoutingInfo(msg); + return new TenantRoutingInfo(tenantId, routingInfo.getIsolatedTbCore(), routingInfo.getIsolatedTbRuleEngine()); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java index 9453fdb4c5..eb916cdb83 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageData; @@ -126,7 +127,7 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe @Override public void deleteTenants() { log.trace("Executing deleteTenants"); - tenantsRemover.removeEntities(new TenantId(EntityId.NULL_UUID),DEFAULT_TENANT_REGION); + tenantsRemover.removeEntities(new TenantId(EntityId.NULL_UUID), DEFAULT_TENANT_REGION); } private DataValidator tenantValidator = @@ -140,19 +141,31 @@ public class TenantServiceImpl extends AbstractEntityService implements TenantSe validateEmail(tenant.getEmail()); } } - }; + + @Override + protected void validateUpdate(TenantId tenantId, Tenant tenant) { + Tenant old = tenantDao.findById(TenantId.SYS_TENANT_ID, tenantId.getId()); + if (old == null) { + throw new DataValidationException("Can't update non existing tenant!"); + } else if (old.isIsolatedTbRuleEngine() != tenant.isIsolatedTbRuleEngine()) { + throw new DataValidationException("Can't update isolatedTbRuleEngine property!"); + } else if (old.isIsolatedTbCore() != tenant.isIsolatedTbCore()) { + throw new DataValidationException("Can't update isolatedTbCore property!"); + } + } + }; private PaginatedRemover tenantsRemover = new PaginatedRemover() { - @Override - protected List findEntities(TenantId tenantId, String region, TextPageLink pageLink) { - return tenantDao.findTenantsByRegion(tenantId, region, pageLink); - } + @Override + protected List findEntities(TenantId tenantId, String region, TextPageLink pageLink) { + return tenantDao.findTenantsByRegion(tenantId, region, pageLink); + } - @Override - protected void removeEntity(TenantId tenantId, Tenant entity) { - deleteTenant(new TenantId(entity.getUuidId())); - } - }; + @Override + protected void removeEntity(TenantId tenantId, Tenant entity) { + deleteTenant(new TenantId(entity.getUuidId())); + } + }; } diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 395f1d09e0..c421c43e33 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -63,7 +63,7 @@ transport: max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" queue: - type: "${TB_QUEUE_TYPE:kafka}" # kafka or aws-sqs or pubsub or service-bus + type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub or service-bus kafka: bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" acks: "${TB_KAFKA_ACKS:all}" @@ -111,12 +111,25 @@ queue: response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" core: topic: "${TB_QUEUE_CORE_TOPIC:tb.core}" - poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" + poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_CORE_PARTITIONS:10}" - pack_processing_timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}" + pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}" stats: enabled: "${TB_QUEUE_CORE_STATS_ENABLED:false}" - print_interval_ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}" + print-interval-ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}" + js: + # JS Eval request topic + request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js.eval.requests}" + # JS Eval responses topic prefix that is combined with node id + response_topic_prefix: "${REMOTE_JS_EVAL_RESPONSE_TOPIC:js.eval.responses}" + # JS Eval max pending requests + max_pending_requests: "${REMOTE_JS_MAX_PENDING_REQUESTS:10000}" + # JS Eval max request timeout + max_requests_timeout: "${REMOTE_JS_MAX_REQUEST_TIMEOUT:10000}" + # JS response poll interval + response_poll_interval: "${REMOTE_JS_RESPONSE_POLL_INTERVAL_MS:25}" + # JS response auto commit interval + response_auto_commit_interval: "${REMOTE_JS_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" rule-engine: topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}" poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" @@ -124,32 +137,39 @@ queue: stats: enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}" print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}" - queues: # TODO 2.5: specify correct ENV variable names. - - - name: "Main" - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.main}" - poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" - partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:10}" - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}" - ack-strategy: - type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT + queues: + - name: "Main" + topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb.rule-engine.main}" + poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" + partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}" + pack-processing-timeout: "${TB_QUEUE_RE_MAIN_PACK_PROCESSING_TIMEOUT_MS:60000}" + submit-strategy: + type: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_TYPE:BURST}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL + # For BATCH only + batch-size: "${TB_QUEUE_RE_MAIN_SUBMIT_STRATEGY_BATCH_SIZE:1000}" # Maximum number of messages in batch + processing-strategy: + type: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited - failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; - pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; - - - name: "HighPriority" - topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine.hp}" - poll-interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" - partitions: "${TB_QUEUE_RULE_ENGINE_PARTITIONS:3}" - pack-processing-timeout: "${TB_QUEUE_RULE_ENGINE_PACK_PROCESSING_TIMEOUT_MS:60000}" - ack-strategy: - type: "${TB_QUEUE_RULE_ENGINE_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT + retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRIES:3}" # Number of retries, 0 is unlimited + failure-percentage: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; + pause-between-retries: "${TB_QUEUE_RE_MAIN_PROCESSING_STRATEGY_RETRY_PAUSE:3}"# Time in seconds to wait in consumer thread before retries; + - name: "${TB_QUEUE_RE_HP_QUEUE_NAME:HighPriority}" + topic: "${TB_QUEUE_RE_HP_TOPIC:tb.rule-engine.hp}" + poll-interval: "${TB_QUEUE_RE_HP_POLL_INTERVAL_MS:25}" + partitions: "${TB_QUEUE_RE_HP_PARTITIONS:3}" + pack-processing-timeout: "${TB_QUEUE_RE_HP_PACK_PROCESSING_TIMEOUT_MS:60000}" + submit-strategy: + type: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_TYPE:SEQUENTIAL_WITHIN_ORIGINATOR}" # BURST, BATCH, SEQUENTIAL_WITHIN_ORIGINATOR, SEQUENTIAL_WITHIN_TENANT, SEQUENTIAL + # For BATCH only + batch-size: "${TB_QUEUE_RE_HP_SUBMIT_STRATEGY_BATCH_SIZE:100}" # Maximum number of messages in batch + processing-strategy: + type: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_TYPE:RETRY_FAILED_AND_TIMED_OUT}" # SKIP_ALL_FAILURES, RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT # For RETRY_ALL, RETRY_FAILED, RETRY_TIMED_OUT, RETRY_FAILED_AND_TIMED_OUT - retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited - failure-percentage: "${TB_QUEUE_RULE_ENGINE_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; - pause-between-retries: "${TB_QUEUE_RULE_ENGINE_STRATEGY_RETRY_PAUSE:1}"# Time in seconds to wait in consumer thread before retries; + retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRIES:0}" # Number of retries, 0 is unlimited + failure-percentage: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_FAILURE_PERCENTAGE:0}" # Skip retry if failures or timeouts are less then X percentage of messages; + pause-between-retries: "${TB_QUEUE_RE_HP_PROCESSING_STRATEGY_RETRY_PAUSE:5}"# Time in seconds to wait in consumer thread before retries; transport: + # For high priority notifications that require minimum latency and processing time notifications_topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}" poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"