diff --git a/application/src/main/data/json/edge/install_instructions/docker/instructions.md b/application/src/main/data/json/edge/install_instructions/docker/instructions.md index e308a38076..d8f9731889 100644 --- a/application/src/main/data/json/edge/install_instructions/docker/instructions.md +++ b/application/src/main/data/json/edge/install_instructions/docker/instructions.md @@ -51,7 +51,7 @@ services: - ~/.mytb-edge-logs:/var/log/tb-edge postgres: restart: always - image: "postgres:12" + image: "postgres:15" ports: - "5432" environment: diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 3d4340f6c9..607d5d8791 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -29,6 +29,7 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.SmsService; @@ -322,6 +323,11 @@ public class ActorSystemContext { @Getter private NotificationExecutorService notificationExecutor; + @Lazy + @Autowired + @Getter + private PubSubRuleNodeExecutorProvider pubSubRuleNodeExecutorProvider; + @Autowired @Getter private SharedEventLoopGroupService sharedEventLoopGroupService; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index ca1d174322..28963b383a 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -104,6 +104,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; +import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import org.thingsboard.server.service.script.RuleNodeTbelScriptEngine; @@ -538,6 +539,11 @@ class DefaultTbContext implements TbContext { return mainCtx.getNotificationExecutor(); } + @Override + public PubSubRuleNodeExecutorProvider getPubSubRuleNodeExecutorProvider() { + return mainCtx.getPubSubRuleNodeExecutorProvider(); + } + @Override @Deprecated public ScriptEngine createJsScriptEngine(String script, String... argNames) { diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 5f88752ff4..55f4f3afbd 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -74,7 +74,7 @@ import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.exception.InvalidParametersException; import org.thingsboard.server.exception.UncheckedApiException; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java index 9a2ea2abf6..4ec6376a4d 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/telemetry/EntityDataMsgConstructor.java @@ -28,7 +28,7 @@ import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; import org.thingsboard.server.gen.edge.v1.EntityDataProto; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 186f030a79..9cea8050b6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -52,8 +52,8 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.util.JsonUtils; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.gen.edge.v1.AttributeDeleteMsg; diff --git a/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java b/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java new file mode 100644 index 0000000000..74e1222acf --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.executors; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ExecutorProvider; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.queue.util.TbRuleEngineComponent; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +@Lazy +@TbRuleEngineComponent +@Component +public class PubSubRuleNodeExecutorProvider implements ExecutorProvider { + + @Value("${service.rule_engine.pubsub.executor_thread_pool_size}") + private Integer threadPoolSize; + + /** + * Refers to com.google.cloud.pubsub.v1.Publisher default executor configuration + */ + private static final int THREADS_PER_CPU = 5; + private ScheduledExecutorService executor; + + @PostConstruct + public void init() { + if (threadPoolSize == null) { + threadPoolSize = THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(); + } + executor = Executors.newScheduledThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("pubsub-rule-nodes")); + } + + @Override + public ScheduledExecutorService getExecutor() { + return executor; + } + + @PreDestroy + private void destroy() { + if (executor != null) { + executor.shutdownNow(); + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index af2a54a7ac..14930912d1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -81,7 +81,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; -import static org.thingsboard.server.service.queue.ProtoUtils.toProto; +import static org.thingsboard.server.common.util.ProtoUtils.toProto; @Service @Slf4j diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 0336349046..5283713126 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -45,6 +45,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.stats.StatsFactory; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 12fa5d3f27..273d7afc97 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.dao.queue.QueueService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java index c03dd8dcb7..30765bd959 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java @@ -47,7 +47,7 @@ import org.thingsboard.server.common.data.sync.ie.importing.csv.BulkImportColumn import org.thingsboard.server.common.data.sync.ie.importing.csv.BulkImportRequest; import org.thingsboard.server.common.data.sync.ie.importing.csv.BulkImportResult; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.controller.BaseController; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.service.action.EntityActionService; diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index faa799b7e4..93348d02ee 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1420,6 +1420,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Thread pool size for pubsub queue executor provider. If set to 0 - default pubsub executor provider value will be used (5 * number of available processors) + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:0}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" @@ -1648,6 +1650,9 @@ service: # Comma-separated list of tenant profile ids assigned to this Rule Engine. # This Rule Engine will only be responsible for tenants with these profiles (in case 'isolation' option is enabled in the profile). assigned_tenant_profiles: "${TB_RULE_ENGINE_ASSIGNED_TENANT_PROFILES:}" + pubsub: + # Thread pool size for pubsub rule node executor provider. If not set - default pubsub executor provider value will be used (5 * number of available processors) + executor_thread_pool_size: "${TB_RULE_ENGINE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:0}" # Metrics parameters metrics: diff --git a/application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java index cefe24e935..7cafe2bbcb 100644 --- a/application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java @@ -28,12 +28,14 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.mockito.AdditionalAnswers; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.test.context.ContextConfiguration; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.Customer; @@ -72,6 +74,7 @@ import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.DeviceCredentialsValidationException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.service.DaoSqlTest; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.state.DeviceStateService; @@ -83,7 +86,6 @@ import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.containsString; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -1345,27 +1347,27 @@ public class DeviceControllerTest extends AbstractControllerTest { ActionType.ASSIGNED_TO_TENANT, savedDifferentTenant.getId().getId().toString(), savedDifferentTenant.getTitle()); testNotificationUpdateGatewayNever(); - Mockito.verify(deviceStateService, times(1)).onQueueMsg( - argThat(proto -> - proto.getTenantIdMSB() == savedTenant.getUuidId().getMostSignificantBits() && - proto.getTenantIdLSB() == savedTenant.getUuidId().getLeastSignificantBits() && - proto.getDeviceIdMSB() == savedDevice.getUuidId().getMostSignificantBits() && - proto.getDeviceIdLSB() == savedDevice.getUuidId().getLeastSignificantBits() && - proto.getDeleted() - ), - any() - ); - - Mockito.verify(deviceStateService, times(1)).onQueueMsg( - argThat(proto -> - proto.getTenantIdMSB() == savedDifferentTenant.getUuidId().getMostSignificantBits() && - proto.getTenantIdLSB() == savedDifferentTenant.getUuidId().getLeastSignificantBits() && - proto.getDeviceIdMSB() == savedDevice.getUuidId().getMostSignificantBits() && - proto.getDeviceIdLSB() == savedDevice.getUuidId().getLeastSignificantBits() && - proto.getAdded() - ), - any() - ); + ArgumentCaptor protoCaptor = ArgumentCaptor.forClass(TransportProtos.DeviceStateServiceMsgProto.class); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> { + Mockito.verify(deviceStateService, Mockito.atLeastOnce()).onQueueMsg(protoCaptor.capture(), any()); + return protoCaptor.getAllValues().stream().anyMatch(proto -> + proto.getTenantIdMSB() == savedTenant.getUuidId().getMostSignificantBits() && + proto.getTenantIdLSB() == savedTenant.getUuidId().getLeastSignificantBits() && + proto.getDeviceIdMSB() == savedDevice.getUuidId().getMostSignificantBits() && + proto.getDeviceIdLSB() == savedDevice.getUuidId().getLeastSignificantBits() && + proto.getDeleted()); + }); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> { + Mockito.verify(deviceStateService, Mockito.atLeastOnce()).onQueueMsg(protoCaptor.capture(), any()); + return protoCaptor.getAllValues().stream().anyMatch(proto -> + proto.getTenantIdMSB() == savedDifferentTenant.getUuidId().getMostSignificantBits() && + proto.getTenantIdLSB() == savedDifferentTenant.getUuidId().getLeastSignificantBits() && + proto.getDeviceIdMSB() == savedDevice.getUuidId().getMostSignificantBits() && + proto.getDeviceIdLSB() == savedDevice.getUuidId().getLeastSignificantBits() && + proto.getAdded()); + }); login("tenant9@thingsboard.org", "testPassword1"); diff --git a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java index 9c3f0a9bb7..56b8897ed3 100644 --- a/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/DeviceEdgeTest.java @@ -52,7 +52,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.session.FeatureType; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsRequestMsg; diff --git a/common/cluster-api/pom.xml b/common/cluster-api/pom.xml index 6a0831de47..cd8f551aa7 100644 --- a/common/cluster-api/pom.xml +++ b/common/cluster-api/pom.xml @@ -40,6 +40,10 @@ org.thingsboard.common data + + org.thingsboard.common + proto + org.thingsboard.common message diff --git a/common/pom.xml b/common/pom.xml index b00b13db57..76387a1773 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -48,6 +48,7 @@ edge-api version-control script + proto diff --git a/common/proto/pom.xml b/common/proto/pom.xml new file mode 100644 index 0000000000..4ccf154ead --- /dev/null +++ b/common/proto/pom.xml @@ -0,0 +1,94 @@ + + + 4.0.0 + + org.thingsboard + 3.6.2-SNAPSHOT + common + + org.thingsboard.common + proto + jar + + Thingsboard Server Common Protobuf and gRPC structures + https://thingsboard.io + + + UTF-8 + ${basedir}/../.. + + + + + org.thingsboard.common + data + + + org.thingsboard.common + message + + + com.google.protobuf + protobuf-java + + + com.google.protobuf + protobuf-java-util + + + org.springframework.boot + spring-boot-starter-web + provided + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + test + + + org.awaitility + awaitility + test + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + + + + + thingsboard-repo-deploy + ThingsBoard Repo Deployment + https://repo.thingsboard.io/artifactory/libs-release-public + + + + \ No newline at end of file diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/AdaptorException.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/AdaptorException.java similarity index 94% rename from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/AdaptorException.java rename to common/proto/src/main/java/org/thingsboard/server/common/adaptor/AdaptorException.java index 8b2908a4b5..1c22163c1f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/AdaptorException.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/AdaptorException.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.transport.adaptor; +package org.thingsboard.server.common.adaptor; public class AdaptorException extends Exception { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java similarity index 99% rename from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java rename to common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java index 51e74d5097..c51bc784a6 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.transport.adaptor; +package org.thingsboard.server.common.adaptor; import com.google.gson.Gson; import com.google.gson.JsonArray; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverterConfig.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverterConfig.java similarity index 96% rename from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverterConfig.java rename to common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverterConfig.java index 0a9df150e5..e160c051f5 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/JsonConverterConfig.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/JsonConverterConfig.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.transport.adaptor; +package org.thingsboard.server.common.adaptor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/ProtoConverter.java similarity index 99% rename from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java rename to common/proto/src/main/java/org/thingsboard/server/common/adaptor/ProtoConverter.java index 54358709f0..098108c864 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/adaptor/ProtoConverter.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/adaptor/ProtoConverter.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.transport.adaptor; +package org.thingsboard.server.common.adaptor; import com.google.gson.Gson; import com.google.gson.JsonElement; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java similarity index 99% rename from application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java rename to common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java index 946993151d..fd9e35b860 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java +++ b/common/proto/src/main/java/org/thingsboard/server/common/util/ProtoUtils.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.queue; +package org.thingsboard.server.common.util; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; diff --git a/common/cluster-api/src/main/proto/jsinvoke.proto b/common/proto/src/main/proto/jsinvoke.proto similarity index 100% rename from common/cluster-api/src/main/proto/jsinvoke.proto rename to common/proto/src/main/proto/jsinvoke.proto diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto similarity index 100% rename from common/cluster-api/src/main/proto/queue.proto rename to common/proto/src/main/proto/queue.proto diff --git a/common/transport/transport-api/src/main/proto/transport.proto b/common/proto/src/main/proto/transport.proto similarity index 100% rename from common/transport/transport-api/src/main/proto/transport.proto rename to common/proto/src/main/proto/transport.proto diff --git a/common/transport/transport-api/src/test/java/JsonConverterTest.java b/common/proto/src/test/java/org/thingsboard/server/common/adaptor/JsonConverterTest.java similarity index 98% rename from common/transport/transport-api/src/test/java/JsonConverterTest.java rename to common/proto/src/test/java/org/thingsboard/server/common/adaptor/JsonConverterTest.java index 39ed04a29a..8c32677afb 100644 --- a/common/transport/transport-api/src/test/java/JsonConverterTest.java +++ b/common/proto/src/test/java/org/thingsboard/server/common/adaptor/JsonConverterTest.java @@ -13,13 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.thingsboard.server.common.adaptor; + import com.google.gson.JsonParser; import com.google.gson.JsonSyntaxException; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; import java.util.ArrayList; diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java b/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java similarity index 99% rename from application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java rename to common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java index 1a696b730d..a41b2be208 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java +++ b/common/proto/src/test/java/org/thingsboard/server/common/util/ProtoUtilsTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.queue; +package org.thingsboard.server.common.util; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java index 97b7b47843..1a84b80cd2 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubConsumerTemplate.java @@ -17,6 +17,7 @@ package org.thingsboard.server.queue.pubsub; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStub; import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; @@ -76,6 +77,7 @@ public class TbPubSubConsumerTemplate extends AbstractPara SubscriberStubSettings.defaultGrpcTransportProviderBuilder() .setMaxInboundMessageSize(pubSubSettings.getMaxMsgSize()) .build()) + .setExecutorProvider(pubSubSettings.getExecutorProvider()) .build(); this.subscriber = GrpcSubscriberStub.create(subscriberStubSettings); } catch (IOException e) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java index 0d84a8f839..3107855ab1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubProducerTemplate.java @@ -18,6 +18,7 @@ package org.thingsboard.server.queue.pubsub; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.cloud.pubsub.v1.Publisher; import com.google.gson.Gson; import com.google.protobuf.ByteString; @@ -120,7 +121,10 @@ public class TbPubSubProducerTemplate implements TbQueuePr try { admin.createTopicIfNotExists(topic); ProjectTopicName topicName = ProjectTopicName.of(pubSubSettings.getProjectId(), topic); - Publisher publisher = Publisher.newBuilder(topicName).setCredentialsProvider(pubSubSettings.getCredentialsProvider()).build(); + Publisher publisher = Publisher.newBuilder(topicName) + .setCredentialsProvider(pubSubSettings.getCredentialsProvider()) + .setExecutorProvider(pubSubSettings.getExecutorProvider()) + .build(); publisherMap.put(topic, publisher); return publisher; } catch (IOException e) { diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java index fe9d85c306..145661fed1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/pubsub/TbPubSubSettings.java @@ -17,16 +17,20 @@ package org.thingsboard.server.queue.pubsub; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.auth.oauth2.ServiceAccountCredentials; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.concurrent.Executors; @Slf4j @ConditionalOnExpression("'${queue.type:null}'=='pubsub'") @@ -46,6 +50,16 @@ public class TbPubSubSettings { @Value("${queue.pubsub.max_messages}") private int maxMessages; + @Value("${queue.pubsub.executor_thread_pool_size:0}") + private int threadPoolSize; + + /** + * Refers to com.google.cloud.pubsub.v1.Publisher default executor configuration + */ + private static final int THREADS_PER_CPU = 5; + + private FixedExecutorProvider executorProvider; + private CredentialsProvider credentialsProvider; @PostConstruct @@ -53,6 +67,17 @@ public class TbPubSubSettings { ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream( new ByteArrayInputStream(serviceAccount.getBytes())); credentialsProvider = FixedCredentialsProvider.create(credentials); + if (threadPoolSize == 0) { + threadPoolSize = THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(); + } + executorProvider = FixedExecutorProvider + .create(Executors.newScheduledThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("pubsub-queue-executor"))); } + @PreDestroy + private void destroy() { + if (executorProvider != null) { + executorProvider.getExecutor().shutdownNow(); + } + } } diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index bd02d9fcc4..63979ce500 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -35,8 +35,8 @@ import org.thingsboard.server.common.data.TransportPayloadType; import org.thingsboard.server.common.data.security.DeviceTokenCredentials; import org.thingsboard.server.common.msg.session.FeatureType; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.callback.CoapDeviceAuthCallback; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapAdaptorUtils.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapAdaptorUtils.java index 710f18a8b7..629e0380e0 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapAdaptorUtils.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapAdaptorUtils.java @@ -17,7 +17,7 @@ package org.thingsboard.server.transport.coap.adaptors; import org.eclipse.californium.core.coap.Request; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.Arrays; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java index 05a38e1ad1..e88251c93c 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/CoapTransportAdaptor.java @@ -19,7 +19,7 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java index d81ff32edd..fffe1a7e1a 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/JsonCoapAdaptor.java @@ -29,8 +29,8 @@ import org.eclipse.californium.core.coap.Response; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.CoapTransportResource; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java index 4d3b0015ef..5287645591 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/adaptors/ProtoCoapAdaptor.java @@ -28,9 +28,9 @@ import org.eclipse.californium.core.coap.Response; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.common.transport.adaptor.ProtoConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.CoapTransportResource; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/GetAttributesSyncSessionCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/GetAttributesSyncSessionCallback.java index 907e49f600..caacd31012 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/GetAttributesSyncSessionCallback.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/GetAttributesSyncSessionCallback.java @@ -20,7 +20,7 @@ import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.server.resources.CoapExchange; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.client.TbCoapClientState; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/ToServerRpcSyncSessionCallback.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/ToServerRpcSyncSessionCallback.java index 2209ea6613..fc35da0bd6 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/ToServerRpcSyncSessionCallback.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/callback/ToServerRpcSyncSessionCallback.java @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Request; import org.eclipse.californium.core.server.resources.CoapExchange; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.client.TbCoapClientState; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/CoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/CoapClientContext.java index a8531a7fba..a98a04ce28 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/CoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/CoapClientContext.java @@ -19,7 +19,7 @@ import org.eclipse.californium.core.observe.ObserveRelation; import org.eclipse.californium.core.server.resources.CoapExchange; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.transport.coap.CoapSessionMsgType; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java index 26effe7058..f4346946a7 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/client/DefaultCoapClientContext.java @@ -53,7 +53,7 @@ import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java index 1c0a599082..b3304c6a97 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/CoapEfentoTransportResource.java @@ -33,8 +33,8 @@ import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.device.profile.CoapDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.device.profile.EfentoCoapDeviceTypeConfiguration; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.ProtoConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.coap.ConfigProtos; @@ -56,7 +56,6 @@ import java.util.Map; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import static com.google.gson.JsonParser.parseString; import static org.thingsboard.server.transport.coap.CoapTransportService.CONFIGURATION; diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/adaptor/EfentoCoapAdaptor.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/adaptor/EfentoCoapAdaptor.java index ce75608a7d..fa6394d41b 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/adaptor/EfentoCoapAdaptor.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/efento/adaptor/EfentoCoapAdaptor.java @@ -19,8 +19,8 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.coap.efento.CoapEfentoTransportResource; diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index 041ae0510d..589e93d803 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -46,7 +46,7 @@ import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/adaptors/LwM2MJsonAdaptor.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/adaptors/LwM2MJsonAdaptor.java index feb74b47f9..afe1d160ce 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/adaptors/LwM2MJsonAdaptor.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/adaptors/LwM2MJsonAdaptor.java @@ -19,8 +19,8 @@ import com.google.gson.JsonElement; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/adaptors/LwM2MTransportAdaptor.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/adaptors/LwM2MTransportAdaptor.java index 96e6008c63..fe24b21053 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/adaptors/LwM2MTransportAdaptor.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/adaptors/LwM2MTransportAdaptor.java @@ -16,7 +16,7 @@ package org.thingsboard.server.transport.lwm2m.server.adaptors; import com.google.gson.JsonElement; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.Collection; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 3bafbb518e..87f6a47150 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -64,7 +64,7 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.SessionInfoCreator; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java index 40be642a1a..73bba60cff 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/BackwardCompatibilityAdaptor.java @@ -21,7 +21,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.ota.OtaPackageType; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java index 513fd97cf6..a30749b17d 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/JsonMqttAdaptor.java @@ -30,8 +30,8 @@ import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.ota.OtaPackageType; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.mqtt.session.MqttDeviceAwareSessionContext; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java index 3764c2a9a9..10599d0be7 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/MqttTransportAdaptor.java @@ -24,7 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttPublishVariableHeader; import org.thingsboard.server.common.data.ota.OtaPackageType; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java index 39176fa212..3ff4faf8e8 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/adaptors/ProtoMqttAdaptor.java @@ -25,12 +25,12 @@ import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.device.profile.MqttTopics; import org.thingsboard.server.common.data.ota.OtaPackageType; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java index 8f964f8369..64aa448931 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/AbstractGatewaySessionHandler.java @@ -40,9 +40,9 @@ import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.common.transport.adaptor.ProtoConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.gen.transport.TransportApiProtos; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index de0f2a502b..6ff505cb36 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -17,7 +17,7 @@ package org.thingsboard.server.transport.mqtt.session; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttPublishMessage; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import java.util.UUID; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java index 48998d58cd..f3aaa2708f 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/SparkplugNodeSessionHandler.java @@ -32,9 +32,9 @@ import org.springframework.util.CollectionUtils; import org.thingsboard.server.common.data.device.profile.MqttDeviceProfileTransportConfiguration; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; -import org.thingsboard.server.common.transport.adaptor.ProtoConverter; +import org.thingsboard.server.common.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/MetricDataType.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/MetricDataType.java index d5e781a402..c49286a4b1 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/MetricDataType.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/util/sparkplug/MetricDataType.java @@ -16,7 +16,7 @@ package org.thingsboard.server.transport.mqtt.util.sparkplug; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; +import org.thingsboard.server.common.adaptor.AdaptorException; import org.thingsboard.server.gen.transport.mqtt.SparkplugBProto; import java.math.BigInteger; diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index 0b60db68b1..014fcf8d67 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -53,7 +53,7 @@ import org.thingsboard.server.common.data.transport.snmp.SnmpMethod; import org.thingsboard.server.common.data.transport.snmp.config.RepeatingQueryingSnmpCommunicationConfig; import org.thingsboard.server.common.data.transport.snmp.config.SnmpCommunicationConfig; import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbSnmpTransportComponent; import org.thingsboard.server.transport.snmp.SnmpTransportContext; diff --git a/common/util/src/main/java/org/thingsboard/common/util/ExecutorProvider.java b/common/util/src/main/java/org/thingsboard/common/util/ExecutorProvider.java new file mode 100644 index 0000000000..61379c8c89 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/ExecutorProvider.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import java.util.concurrent.ScheduledExecutorService; + +public interface ExecutorProvider { + + ScheduledExecutorService getExecutor(); +} diff --git a/docker/docker-compose.hybrid.yml b/docker/docker-compose.hybrid.yml index 29cd65a127..7f952fe6eb 100644 --- a/docker/docker-compose.hybrid.yml +++ b/docker/docker-compose.hybrid.yml @@ -19,7 +19,7 @@ version: '3.0' services: postgres: restart: always - image: "postgres:12" + image: "postgres:15" ports: - "5432" environment: diff --git a/docker/docker-compose.postgres.yml b/docker/docker-compose.postgres.yml index 7f5f69a61f..218642317b 100644 --- a/docker/docker-compose.postgres.yml +++ b/docker/docker-compose.postgres.yml @@ -19,7 +19,7 @@ version: '3.0' services: postgres: restart: always - image: "postgres:12" + image: "postgres:15" ports: - "5432" environment: diff --git a/msa/tb/docker-cassandra/Dockerfile b/msa/tb/docker-cassandra/Dockerfile index 2f23aec399..0a0c36ed88 100644 --- a/msa/tb/docker-cassandra/Dockerfile +++ b/msa/tb/docker-cassandra/Dockerfile @@ -16,7 +16,7 @@ FROM thingsboard/openjdk11:bullseye-slim -ENV PG_MAJOR=12 +ENV PG_MAJOR=15 ENV DATA_FOLDER=/data diff --git a/msa/vc-executor/src/main/resources/tb-vc-executor.yml b/msa/vc-executor/src/main/resources/tb-vc-executor.yml index 4ffa1ab702..8c363b019c 100644 --- a/msa/vc-executor/src/main/resources/tb-vc-executor.yml +++ b/msa/vc-executor/src/main/resources/tb-vc-executor.yml @@ -172,6 +172,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" #in bytes # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Thread pool size for pubsub queue executor provider. If not set - default pubsub executor provider value will be used (5 * number of available processors) + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:0}" queue-properties: # Pub/Sub properties for Core subscribers, messages which will commit after ackDeadlineInSec period can be consumed again core: "${TB_QUEUE_PUBSUB_CORE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/pom.xml b/pom.xml index 2dca3c54f7..ed62452870 100755 --- a/pom.xml +++ b/pom.xml @@ -895,6 +895,11 @@ version-control ${project.version} + + org.thingsboard.common + proto + ${project.version} + org.thingsboard.common cache diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index e645110a9d..40ed2c378a 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.api; import io.netty.channel.EventLoopGroup; +import org.thingsboard.common.util.ExecutorProvider; import org.thingsboard.common.util.ListeningExecutor; import org.thingsboard.rule.engine.api.slack.SlackService; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; @@ -318,6 +319,8 @@ public interface TbContext { ListeningExecutor getNotificationExecutor(); + ExecutorProvider getPubSubRuleNodeExecutorProvider(); + MailService getMailService(boolean isSystem); SmsService getSmsService(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java index c103dd4006..af4c7bc0c2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java @@ -36,7 +36,7 @@ import org.thingsboard.server.common.data.objects.AttributesEntityView; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import javax.annotation.Nullable; import java.util.ArrayList; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java index c55113783c..3067947445 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/gcp/pubsub/TbPubSubNode.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.auth.oauth2.ServiceAccountCredentials; import com.google.cloud.pubsub.v1.Publisher; @@ -68,7 +69,7 @@ public class TbPubSubNode extends TbAbstractExternalNode { super.init(ctx); this.config = TbNodeUtils.convert(configuration, TbPubSubNodeConfiguration.class); try { - this.pubSubClient = initPubSubClient(); + this.pubSubClient = initPubSubClient(ctx); } catch (Exception e) { throw new TbNodeException(e); } @@ -128,7 +129,7 @@ public class TbPubSubNode extends TbAbstractExternalNode { return TbMsg.transformMsgMetadata(origMsg, metaData); } - private Publisher initPubSubClient() throws IOException { + private Publisher initPubSubClient(TbContext ctx) throws IOException { ProjectTopicName topicName = ProjectTopicName.of(config.getProjectId(), config.getTopicName()); ServiceAccountCredentials credentials = ServiceAccountCredentials.fromStream( @@ -148,6 +149,7 @@ public class TbPubSubNode extends TbAbstractExternalNode { return Publisher.newBuilder(topicName) .setCredentialsProvider(credProvider) .setRetrySettings(retrySettings) + .setExecutorProvider(FixedExecutorProvider.create(ctx.getPubSubRuleNodeExecutorProvider().getExecutor())) .build(); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java index f31896a8c9..6bb37438a1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java @@ -41,7 +41,7 @@ import org.thingsboard.server.common.data.query.KeyFilterPredicate; import org.thingsboard.server.common.data.query.NumericFilterPredicate; import org.thingsboard.server.common.data.query.StringFilterPredicate; import org.thingsboard.server.common.msg.tools.SchedulerUtils; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import java.time.Instant; import java.time.ZoneId; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java index 2aeb4eedf5..b38fd29af7 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java @@ -40,7 +40,7 @@ import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.rule.RuleNodeState; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.dao.sql.query.EntityKeyMapping; import java.util.ArrayList; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 9bbee45477..06498241fb 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -34,7 +34,7 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import java.util.ArrayList; import java.util.List; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 799089d4a8..8762839c07 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -31,7 +31,7 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.adaptor.JsonConverter; import java.util.ArrayList; import java.util.List; diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 959b3285de..a45b41803e 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -299,6 +299,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Thread pool size for pubsub queue executor provider. If not set - default pubsub executor provider value will be used (5 * number of available processors) + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:0}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 7edaa640a1..81fdf414bb 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -282,6 +282,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per a consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Thread pool size for pubsub queue executor provider. If not set - default pubsub executor provider value will be used (5 * number of available processors) + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:0}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consume again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index cebc0425d5..f68f158dc1 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -378,6 +378,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Thread pool size for pubsub queue executor provider. If not set - default pubsub executor provider value will be used (5 * number of available processors) + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:0}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 330473a624..16c2c019df 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -315,6 +315,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Thread pool size for pubsub queue executor provider. If not set - default pubsub executor provider value will be used (5 * number of available processors) + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:0}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index ca13212293..2b6cc17d9b 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -268,6 +268,8 @@ queue: max_msg_size: "${TB_QUEUE_PUBSUB_MAX_MSG_SIZE:1048576}" # Number of messages per consumer max_messages: "${TB_QUEUE_PUBSUB_MAX_MESSAGES:1000}" + # Thread pool size for pubsub queue executor provider. If not set - default pubsub executor provider value will be used (5 * number of available processors) + executor_thread_pool_size: "${TB_QUEUE_PUBSUB_EXECUTOR_THREAD_POOL_SIZE:0}" queue-properties: # Pub/Sub properties for Rule Engine subscribers, messages which will commit after ackDeadlineInSec period can be consumed again rule-engine: "${TB_QUEUE_PUBSUB_RE_QUEUE_PROPERTIES:ackDeadlineInSec:30;messageRetentionInSec:604800}"