From 52814d2bfcb7e11feb44ccf26a81fc96bd337637 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 12 Mar 2020 09:36:48 +0200 Subject: [PATCH] Refactoring of Transport Communication to Queues --- ...e.java => DefaultTransportApiService.java} | 2 +- .../transport/RemoteTransportApiService.java | 57 ++++------- .../transport/TransportApiService.java | 5 +- .../src/main/resources/thingsboard.yml | 1 + common/queue/pom.xml | 9 ++ .../provider/KafkaTbCoreQueueProvider.java | 42 ++++++++ .../provider/KafkaTransportQueueProvider.java | 39 ++++++++ .../server/provider/TbCoreQueueProvider.java | 63 ++++++++++++ .../provider}/TransportQueueProvider.java | 5 +- .../src/main/proto/transport.proto | 72 +++++++++----- common/transport/transport-api/pom.xml | 13 --- .../queue/KafkaTransportQueueProvider.java | 35 ------- .../service/DefaultTransportService.java | 98 +++++++------------ 13 files changed, 260 insertions(+), 181 deletions(-) rename application/src/main/java/org/thingsboard/server/service/transport/{LocalTransportApiService.java => DefaultTransportApiService.java} (99%) create mode 100644 common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java create mode 100644 common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java rename common/{transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue => queue/src/main/java/org/thingsboard/server/provider}/TransportQueueProvider.java (82%) rename common/{transport/transport-api => queue}/src/main/proto/transport.proto (75%) delete mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java diff --git a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java similarity index 99% rename from application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java rename to application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index f6434036fd..51597877b4 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -52,7 +52,7 @@ import java.util.concurrent.locks.ReentrantLock; */ @Slf4j @Service -public class LocalTransportApiService implements TransportApiService { +public class DefaultTransportApiService implements TransportApiService { private static final ObjectMapper mapper = new ObjectMapper(); diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index e354da7daa..644d4191b1 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -32,6 +32,7 @@ import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.kafka.TbNodeIdProvider; +import org.thingsboard.server.provider.TbCoreQueueProvider; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -45,50 +46,32 @@ import java.util.concurrent.*; @ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote") public class RemoteTransportApiService { - @Value("${transport.remote.transport_api.requests_topic}") - private String transportApiRequestsTopic; - @Value("${transport.remote.transport_api.max_pending_requests}") + private final TbCoreQueueProvider tbCoreQueueProvider; + private final TransportApiService transportApiService; + + @Value("${queue.transport_api.max_pending_requests:10000}") private int maxPendingRequests; - @Value("${transport.remote.transport_api.request_timeout}") + @Value("${queue.transport_api.max_requests_timeout:10000}") private long requestTimeout; - @Value("${transport.remote.transport_api.request_poll_interval}") + @Value("${queue.transport_api.request_poll_interval:25}") private int responsePollDuration; - @Value("${transport.remote.transport_api.request_auto_commit_interval}") - private int autoCommitInterval; - -// @Autowired -// private TbKafkaSettings kafkaSettings; -// - @Autowired - private TbNodeIdProvider nodeIdProvider; - - @Autowired - private TransportApiService transportApiService; + @Value("${queue.transport_api.max_callback_threads:100}") + private int maxCallbackThreads; private ExecutorService transportCallbackExecutor; + private TbQueueResponseTemplate, + TbProtoQueueMsg> transportApiTemplate; - private TbQueueResponseTemplate, TbProtoQueueMsg> transportApiTemplate; + public RemoteTransportApiService(TbCoreQueueProvider tbCoreQueueProvider, TransportApiService transportApiService) { + this.tbCoreQueueProvider = tbCoreQueueProvider; + this.transportApiService = transportApiService; + } @PostConstruct public void init() { - this.transportCallbackExecutor = Executors.newWorkStealingPool(100); - - TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder responseBuilder = TBKafkaProducerTemplate.builder(); - responseBuilder.settings(kafkaSettings); - responseBuilder.clientId("producer-transport-api-response-" + nodeIdProvider.getNodeId()); - responseBuilder.encoder(new TransportApiResponseEncoder()); - - TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder requestBuilder = TBKafkaConsumerTemplate.builder(); - requestBuilder.settings(kafkaSettings); - requestBuilder.topic(transportApiRequestsTopic); - requestBuilder.clientId(nodeIdProvider.getNodeId()); - requestBuilder.groupId("tb-node"); - requestBuilder.autoCommit(true); - requestBuilder.autoCommitIntervalMs(autoCommitInterval); - requestBuilder.decoder(new TransportApiRequestDecoder()); - TbQueueProducer> producer = null; - TbQueueConsumer> consumer = null; - + this.transportCallbackExecutor = Executors.newWorkStealingPool(maxCallbackThreads); + TbQueueProducer> producer = tbCoreQueueProvider.getTransportApiResponseProducer(); + TbQueueConsumer> consumer = tbCoreQueueProvider.getTransportApiRequestConsumer(); DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder , TbProtoQueueMsg> builder = DefaultTbQueueResponseTemplate.builder(); @@ -105,7 +88,7 @@ public class RemoteTransportApiService { @EventListener(ApplicationReadyEvent.class) public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { log.info("Received application ready event. Starting polling for events."); - transportApiTemplate.init(); + transportApiTemplate.init(transportApiService); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java index 7d572d08ed..eab2e07b0e 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java @@ -17,10 +17,11 @@ package org.thingsboard.server.service.transport; import org.thingsboard.server.TbQueueHandler; import org.thingsboard.server.common.TbProtoQueueMsg; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; /** * Created by ashvayka on 05.10.18. */ -public interface TransportApiService extends TbQueueHandler, TbProtoQueueMsg> { +public interface TransportApiService extends TbQueueHandler, TbProtoQueueMsg> { } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 0cd5be8fc8..dc479191e7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -548,6 +548,7 @@ queue: responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}" max_pending_requests: "${TB_QUEUE_TRANSPORT_MAX_PENDING_REQUESTS:10000}" max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}" + max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}" request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}" response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" core: diff --git a/common/queue/pom.xml b/common/queue/pom.xml index 952b3c540d..fd26358cc6 100644 --- a/common/queue/pom.xml +++ b/common/queue/pom.xml @@ -100,4 +100,13 @@ + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + + diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java new file mode 100644 index 0000000000..eac81b80c3 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java @@ -0,0 +1,42 @@ +package org.thingsboard.server.provider; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos; + +@Component +@ConditionalOnExpression("'${transport.type:null}'=='local' || '${transport.type:null}'=='remote'") +public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider{ + @Override + public TbQueueProducer> getTransportMsgProducer() { + return null; + } + + @Override + public TbQueueProducer> getRuleEngineMsgProducer() { + return null; + } + + @Override + public TbQueueProducer> getTbCoreMsgProducer() { + return null; + } + + @Override + public TbQueueConsumer> getToCoreMsgConsumer() { + return null; + } + + @Override + public TbQueueConsumer> getTransportApiRequestConsumer() { + return null; + } + + @Override + public TbQueueProducer> getTransportApiResponseProducer() { + return null; + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java new file mode 100644 index 0000000000..6686b3704f --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java @@ -0,0 +1,39 @@ +package org.thingsboard.server.provider; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.TbQueueRequestTemplate; +import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; + +@Component +@ConditionalOnExpression("'${transport.type:null}'=='null' || '${transport.type}'=='local'") +@Slf4j +public class KafkaTransportQueueProvider implements TransportQueueProvider { + @Override + public TbQueueRequestTemplate, TbProtoQueueMsg> getTransportApiRequestTemplate() { + return null; + } + + @Override + public TbQueueProducer> getRuleEngineMsgProducer() { + return null; + } + + @Override + public TbQueueProducer> getTbCoreMsgProducer() { + return null; + } + + @Override + public TbQueueConsumer> getTransportNotificationsConsumer() { + return null; + } +} diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java new file mode 100644 index 0000000000..b7e210f0f6 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java @@ -0,0 +1,63 @@ +package org.thingsboard.server.provider; + +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.TbQueueRequestTemplate; +import org.thingsboard.server.TbQueueResponseTemplate; +import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; + +/** + * Responsible for initialization of various Producers and Consumers used by TB Core Node. + * Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable + */ +public interface TbCoreQueueProvider { + + /** + * Used to push messages to instances of TB Transport Service + * + * @return + */ + TbQueueProducer> getTransportMsgProducer(); + + /** + * Used to push messages to instances of TB RuleEngine Service + * + * @return + */ + TbQueueProducer> getRuleEngineMsgProducer(); + + /** + * Used to push messages to other instances of TB Core Service + * + * @return + */ + TbQueueProducer> getTbCoreMsgProducer(); + + /** + * Used to consume messages by TB Core Service + * + * @return + */ + TbQueueConsumer> getToCoreMsgConsumer(); + + /** + * Used to consume Transport API Calls + * + * @return + */ + TbQueueConsumer> getTransportApiRequestConsumer(); + + /** + * Used to push replies to Transport API Calls + * + * @return + */ + TbQueueProducer> getTransportApiResponseProducer(); + + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/TransportQueueProvider.java similarity index 82% rename from common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportQueueProvider.java rename to common/queue/src/main/java/org/thingsboard/server/provider/TransportQueueProvider.java index d0bbafc615..14d8efd534 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/TransportQueueProvider.java @@ -1,9 +1,10 @@ -package org.thingsboard.server.common.transport.queue; +package org.thingsboard.server.provider; import org.thingsboard.server.TbQueueConsumer; import org.thingsboard.server.TbQueueProducer; import org.thingsboard.server.TbQueueRequestTemplate; import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; @@ -15,7 +16,7 @@ public interface TransportQueueProvider { TbQueueProducer> getRuleEngineMsgProducer(); - TbQueueProducer> getTbCoreMsgProducer(); + TbQueueProducer> getTbCoreMsgProducer(); TbQueueConsumer> getTransportNotificationsConsumer(); diff --git a/common/transport/transport-api/src/main/proto/transport.proto b/common/queue/src/main/proto/transport.proto similarity index 75% rename from common/transport/transport-api/src/main/proto/transport.proto rename to common/queue/src/main/proto/transport.proto index e8b513574a..807aa571bf 100644 --- a/common/transport/transport-api/src/main/proto/transport.proto +++ b/common/queue/src/main/proto/transport.proto @@ -20,16 +20,18 @@ option java_package = "org.thingsboard.server.gen.transport"; option java_outer_classname = "TransportProtos"; /** - * Data Structures; + * Transport Service Data Structures; */ message SessionInfoProto { - string nodeId = 1; + string ServiceId = 1; int64 sessionIdMSB = 2; int64 sessionIdLSB = 3; int64 tenantIdMSB = 4; int64 tenantIdLSB = 5; int64 deviceIdMSB = 6; int64 deviceIdLSB = 7; + string deviceName = 8; + string deviceType = 9; } enum SessionEvent { @@ -81,7 +83,7 @@ message DeviceInfoProto { } /** - * Messages that use Data Structures; + * Transport Service Messages; */ message SessionEventMsg { SessionType sessionType = 1; @@ -181,7 +183,7 @@ message ClaimDeviceMsg { int64 durationMs = 4; } -//Used to report session state to tb-node and persist this state in the cache on the tb-node level. +//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. message SubscriptionInfoProto { int64 lastActivityTime = 1; bool attributeSubscription = 2; @@ -200,45 +202,61 @@ message DeviceSessionsCacheEntry { message TransportToDeviceActorMsg { SessionInfoProto sessionInfo = 1; SessionEventMsg sessionEvent = 2; - PostTelemetryMsg postTelemetry = 3; - PostAttributeMsg postAttributes = 4; - GetAttributeRequestMsg getAttributes = 5; - SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6; - SubscribeToRPCMsg subscribeToRPC = 7; - ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8; - ToServerRpcRequestMsg toServerRPCCallRequest = 9; - SubscriptionInfoProto subscriptionInfo = 10; - ClaimDeviceMsg claimDevice = 11; + GetAttributeRequestMsg getAttributes = 3; + SubscribeToAttributeUpdatesMsg subscribeToAttributes = 4; + SubscribeToRPCMsg subscribeToRPC = 5; + ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 6; + SubscriptionInfoProto subscriptionInfo = 7; + ClaimDeviceMsg claimDevice = 8; +} + +message TransportToRuleEngineMsg { + SessionInfoProto sessionInfo = 1; + PostTelemetryMsg postTelemetry = 2; + PostAttributeMsg postAttributes = 3; + ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 4; + ToServerRpcRequestMsg toServerRPCCallRequest = 5; + SubscriptionInfoProto subscriptionInfo = 6; } message DeviceActorToTransportMsg { - int64 sessionIdMSB = 1; - int64 sessionIdLSB = 2; - SessionCloseNotificationProto sessionCloseNotification = 3; - GetAttributeResponseMsg getAttributesResponse = 4; - AttributeUpdateNotificationMsg attributeUpdateNotification = 5; - ToDeviceRpcRequestMsg toDeviceRequest = 6; - ToServerRpcResponseMsg toServerResponse = 7; + int64 sessionIdMSB = 1; + int64 sessionIdLSB = 2; + SessionCloseNotificationProto sessionCloseNotification = 3; + GetAttributeResponseMsg getAttributesResponse = 4; + AttributeUpdateNotificationMsg attributeUpdateNotification = 5; + ToDeviceRpcRequestMsg toDeviceRequest = 6; + ToServerRpcResponseMsg toServerResponse = 7; } /** * Main messages; */ -message ToRuleEngineMsg { - TransportToDeviceActorMsg toDeviceActorMsg = 1; -} - -message ToTransportMsg { - DeviceActorToTransportMsg toDeviceSessionMsg = 1; -} +/* Request from Transport Service to ThingsBoard Core Service */ message TransportApiRequestMsg { ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1; ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2; GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3; } +/* Response from ThingsBoard Core Service to Transport Service */ message TransportApiResponseMsg { ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1; GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2; } + +/* Messages that are handled by ThingsBoard Core Service */ +message ToCoreMsg { + TransportToDeviceActorMsg toDeviceActorMsg = 1; +} + +/* Messages that are handled by ThingsBoard RuleEngine Service */ +message ToRuleEngineMsg { + TransportToRuleEngineMsg toRuleEngineMsg = 1; +} + +/* Messages that are handled by ThingsBoard Transport Service */ +message ToTransportMsg { + DeviceActorToTransportMsg toDeviceSessionMsg = 1; +} diff --git a/common/transport/transport-api/pom.xml b/common/transport/transport-api/pom.xml index 3920234f7c..d6dcbbabc6 100644 --- a/common/transport/transport-api/pom.xml +++ b/common/transport/transport-api/pom.xml @@ -99,19 +99,6 @@ org.apache.commons commons-lang3 - - com.google.protobuf - protobuf-java - - - - - org.xolstice.maven.plugins - protobuf-maven-plugin - - - - diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java deleted file mode 100644 index e16c7fd93c..0000000000 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.thingsboard.server.common.transport.queue; - -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.stereotype.Component; -import org.thingsboard.server.TbQueueConsumer; -import org.thingsboard.server.TbQueueProducer; -import org.thingsboard.server.TbQueueRequestTemplate; -import org.thingsboard.server.common.TbProtoQueueMsg; -import org.thingsboard.server.gen.transport.TransportProtos; - -@Component -@ConditionalOnExpression("'${transport.type:null}'=='null' || '${transport.type}'=='local'") -@Slf4j -public class KafkaTransportQueueProvider implements TransportQueueProvider { - @Override - public TbQueueRequestTemplate, TbProtoQueueMsg> getTransportApiRequestTemplate() { - return null; - } - - @Override - public TbQueueProducer> getRuleEngineMsgProducer() { - return null; - } - - @Override - public TbQueueProducer> getTbCoreMsgProducer() { - return null; - } - - @Override - public TbQueueConsumer> getTransportNotificationsConsumer() { - return null; - } -} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 9ccb5369f8..e4dc2ba1f1 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -34,12 +34,15 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.queue.TransportQueueProvider; +import org.thingsboard.server.provider.TransportQueueProvider; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToRuleEngineMsg; import org.thingsboard.server.common.AsyncCallbackTemplate; import javax.annotation.PostConstruct; @@ -81,7 +84,7 @@ public class DefaultTransportService implements TransportService { protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate; protected TbQueueProducer> ruleEngineMsgProducer; - protected TbQueueProducer> tbCoreMsgProducer; + protected TbQueueProducer> tbCoreMsgProducer; protected TbQueueConsumer> transportNotificationsConsumer; protected ScheduledExecutorService schedulerExecutor; @@ -188,22 +191,16 @@ public class DefaultTransportService implements TransportService { if (log.isTraceEnabled()) { log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg); } - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscriptionInfo(msg).build() - ).build(); - sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscriptionInfo(msg).build(), callback); } @Override public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSessionEvent(msg).build() - ).build(); - sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSessionEvent(msg).build(), callback); } } @@ -211,11 +208,8 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setPostTelemetry(msg).build() - ).build(); - sendToRuleEngine(sessionInfo, toRuleEngineMsg, callback); + sendToRuleEngine(sessionInfo, TransportToRuleEngineMsg.newBuilder().setSessionInfo(sessionInfo). + setPostTelemetry(msg).build(), callback); } } @@ -223,11 +217,8 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setPostAttributes(msg).build() - ).build(); - sendToRuleEngine(sessionInfo, toRuleEngineMsg, callback); + sendToRuleEngine(sessionInfo, TransportToRuleEngineMsg.newBuilder().setSessionInfo(sessionInfo). + setPostAttributes(msg).build(), callback); } } @@ -235,11 +226,8 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setGetAttributes(msg).build() - ).build(); - sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setGetAttributes(msg).build(), callback); } } @@ -248,11 +236,8 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToAttributes(msg).build() - ).build(); - sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscribeToAttributes(msg).build(), callback); } } @@ -261,11 +246,8 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToRPC(msg).build() - ).build(); - sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSubscribeToRPC(msg).build(), callback); } } @@ -273,11 +255,8 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setToDeviceRPCCallResponse(msg).build() - ).build(); - sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setToDeviceRPCCallResponse(msg).build(), callback); } } @@ -286,23 +265,16 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setToServerRPCCallRequest(msg).build() - ).build(); - sendToRuleEngine(sessionInfo, toRuleEngineMsg, callback); + sendToRuleEngine(sessionInfo, TransportToRuleEngineMsg.newBuilder().setSessionInfo(sessionInfo). + setToServerRPCCallRequest(msg).build(), callback); } } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, - TransportServiceCallback callback) { + public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { - ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( - TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setClaimDevice(msg).build() - ).build(); - sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setClaimDevice(msg).build(), callback); } } @@ -454,13 +426,15 @@ public class DefaultTransportService implements TransportService { .setEvent(event).build(); } - protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback callback) { - tbCoreMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), toRuleEngineMsg), callback != null ? + protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback callback) { + tbCoreMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), + ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ? new TransportTbQueueCallback(callback) : null); } - protected void sendToRuleEngine(TransportProtos.SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback callback) { - ruleEngineMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), toRuleEngineMsg), callback != null ? + protected void sendToRuleEngine(TransportProtos.SessionInfoProto sessionInfo, TransportToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback callback) { + ruleEngineMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), + ToRuleEngineMsg.newBuilder().setToRuleEngineMsg(toRuleEngineMsg).build()), callback != null ? new TransportTbQueueCallback(callback) : null); } @@ -473,16 +447,12 @@ public class DefaultTransportService implements TransportService { @Override public void onSuccess(TbQueueMsgMetadata metadata) { - DefaultTransportService.this.transportCallbackExecutor.submit(() -> { - callback.onSuccess(null); - }); + DefaultTransportService.this.transportCallbackExecutor.submit(() -> callback.onSuccess(null)); } @Override public void onFailure(Throwable t) { - DefaultTransportService.this.transportCallbackExecutor.submit(() -> { - callback.onError(t); - }); + DefaultTransportService.this.transportCallbackExecutor.submit(() -> callback.onError(t)); } } }