Browse Source

Refactoring of Transport Communication to Queues

pull/2516/head
Andrii Shvaika 6 years ago
parent
commit
52814d2bfc
  1. 2
      application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java
  2. 57
      application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java
  3. 5
      application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java
  4. 1
      application/src/main/resources/thingsboard.yml
  5. 9
      common/queue/pom.xml
  6. 42
      common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java
  7. 39
      common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java
  8. 63
      common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java
  9. 5
      common/queue/src/main/java/org/thingsboard/server/provider/TransportQueueProvider.java
  10. 72
      common/queue/src/main/proto/transport.proto
  11. 13
      common/transport/transport-api/pom.xml
  12. 35
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java
  13. 98
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

2
application/src/main/java/org/thingsboard/server/service/transport/LocalTransportApiService.java → application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java

@ -52,7 +52,7 @@ import java.util.concurrent.locks.ReentrantLock;
*/
@Slf4j
@Service
public class LocalTransportApiService implements TransportApiService {
public class DefaultTransportApiService implements TransportApiService {
private static final ObjectMapper mapper = new ObjectMapper();

57
application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -32,6 +32,7 @@ import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.kafka.TbNodeIdProvider;
import org.thingsboard.server.provider.TbCoreQueueProvider;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -45,50 +46,32 @@ import java.util.concurrent.*;
@ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote")
public class RemoteTransportApiService {
@Value("${transport.remote.transport_api.requests_topic}")
private String transportApiRequestsTopic;
@Value("${transport.remote.transport_api.max_pending_requests}")
private final TbCoreQueueProvider tbCoreQueueProvider;
private final TransportApiService transportApiService;
@Value("${queue.transport_api.max_pending_requests:10000}")
private int maxPendingRequests;
@Value("${transport.remote.transport_api.request_timeout}")
@Value("${queue.transport_api.max_requests_timeout:10000}")
private long requestTimeout;
@Value("${transport.remote.transport_api.request_poll_interval}")
@Value("${queue.transport_api.request_poll_interval:25}")
private int responsePollDuration;
@Value("${transport.remote.transport_api.request_auto_commit_interval}")
private int autoCommitInterval;
// @Autowired
// private TbKafkaSettings kafkaSettings;
//
@Autowired
private TbNodeIdProvider nodeIdProvider;
@Autowired
private TransportApiService transportApiService;
@Value("${queue.transport_api.max_callback_threads:100}")
private int maxCallbackThreads;
private ExecutorService transportCallbackExecutor;
private TbQueueResponseTemplate<TbProtoQueueMsg<TransportApiRequestMsg>,
TbProtoQueueMsg<TransportApiResponseMsg>> transportApiTemplate;
private TbQueueResponseTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiTemplate;
public RemoteTransportApiService(TbCoreQueueProvider tbCoreQueueProvider, TransportApiService transportApiService) {
this.tbCoreQueueProvider = tbCoreQueueProvider;
this.transportApiService = transportApiService;
}
@PostConstruct
public void init() {
this.transportCallbackExecutor = Executors.newWorkStealingPool(100);
TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<TransportApiResponseMsg> responseBuilder = TBKafkaProducerTemplate.builder();
responseBuilder.settings(kafkaSettings);
responseBuilder.clientId("producer-transport-api-response-" + nodeIdProvider.getNodeId());
responseBuilder.encoder(new TransportApiResponseEncoder());
TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<TransportApiRequestMsg> requestBuilder = TBKafkaConsumerTemplate.builder();
requestBuilder.settings(kafkaSettings);
requestBuilder.topic(transportApiRequestsTopic);
requestBuilder.clientId(nodeIdProvider.getNodeId());
requestBuilder.groupId("tb-node");
requestBuilder.autoCommit(true);
requestBuilder.autoCommitIntervalMs(autoCommitInterval);
requestBuilder.decoder(new TransportApiRequestDecoder());
TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> producer = null;
TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = null;
this.transportCallbackExecutor = Executors.newWorkStealingPool(maxCallbackThreads);
TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> producer = tbCoreQueueProvider.getTransportApiResponseProducer();
TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> consumer = tbCoreQueueProvider.getTransportApiRequestConsumer();
DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder
<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> builder = DefaultTbQueueResponseTemplate.builder();
@ -105,7 +88,7 @@ public class RemoteTransportApiService {
@EventListener(ApplicationReadyEvent.class)
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
log.info("Received application ready event. Starting polling for events.");
transportApiTemplate.init();
transportApiTemplate.init(transportApiService);
}
@PreDestroy

5
application/src/main/java/org/thingsboard/server/service/transport/TransportApiService.java

@ -17,10 +17,11 @@ package org.thingsboard.server.service.transport;
import org.thingsboard.server.TbQueueHandler;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
/**
* Created by ashvayka on 05.10.18.
*/
public interface TransportApiService extends TbQueueHandler<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> {
public interface TransportApiService extends TbQueueHandler<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> {
}

1
application/src/main/resources/thingsboard.yml

@ -548,6 +548,7 @@ queue:
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb.transport.api.responses}"
max_pending_requests: "${TB_QUEUE_TRANSPORT_MAX_PENDING_REQUESTS:10000}"
max_requests_timeout: "${TB_QUEUE_TRANSPORT_MAX_REQUEST_TIMEOUT:10000}"
max_callback_threads: "${TB_QUEUE_TRANSPORT_MAX_CALLBACK_THREADS:100}"
request_poll_interval: "${TB_QUEUE_TRANSPORT_REQUEST_POLL_INTERVAL_MS:25}"
response_poll_interval: "${TB_QUEUE_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
core:

9
common/queue/pom.xml

@ -100,4 +100,13 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

42
common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java

@ -0,0 +1,42 @@
package org.thingsboard.server.provider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
@Component
@ConditionalOnExpression("'${transport.type:null}'=='local' || '${transport.type:null}'=='remote'")
public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider{
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> getTransportMsgProducer() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getRuleEngineMsgProducer() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> getTbCoreMsgProducer() {
return null;
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCoreMsg>> getToCoreMsgConsumer() {
return null;
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>> getTransportApiRequestConsumer() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> getTransportApiResponseProducer() {
return null;
}
}

39
common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java

@ -0,0 +1,39 @@
package org.thingsboard.server.provider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
@Component
@ConditionalOnExpression("'${transport.type:null}'=='null' || '${transport.type}'=='local'")
@Slf4j
public class KafkaTransportQueueProvider implements TransportQueueProvider {
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiRequestTemplate() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer() {
return null;
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsConsumer() {
return null;
}
}

63
common/queue/src/main/java/org/thingsboard/server/provider/TbCoreQueueProvider.java

@ -0,0 +1,63 @@
package org.thingsboard.server.provider;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import org.thingsboard.server.TbQueueResponseTemplate;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
/**
* Responsible for initialization of various Producers and Consumers used by TB Core Node.
* Implementation Depends on the queue queue.type from yml or TB_QUEUE_TYPE environment variable
*/
public interface TbCoreQueueProvider {
/**
* Used to push messages to instances of TB Transport Service
*
* @return
*/
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> getTransportMsgProducer();
/**
* Used to push messages to instances of TB RuleEngine Service
*
* @return
*/
TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer();
/**
* Used to push messages to other instances of TB Core Service
*
* @return
*/
TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer();
/**
* Used to consume messages by TB Core Service
*
* @return
*/
TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> getToCoreMsgConsumer();
/**
* Used to consume Transport API Calls
*
* @return
*/
TbQueueConsumer<TbProtoQueueMsg<TransportApiRequestMsg>> getTransportApiRequestConsumer();
/**
* Used to push replies to Transport API Calls
*
* @return
*/
TbQueueProducer<TbProtoQueueMsg<TransportApiResponseMsg>> getTransportApiResponseProducer();
}

5
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/TransportQueueProvider.java → common/queue/src/main/java/org/thingsboard/server/provider/TransportQueueProvider.java

@ -1,9 +1,10 @@
package org.thingsboard.server.common.transport.queue;
package org.thingsboard.server.provider;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
@ -15,7 +16,7 @@ public interface TransportQueueProvider {
TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getRuleEngineMsgProducer();
TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> getTbCoreMsgProducer();
TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> getTbCoreMsgProducer();
TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> getTransportNotificationsConsumer();

72
common/transport/transport-api/src/main/proto/transport.proto → common/queue/src/main/proto/transport.proto

@ -20,16 +20,18 @@ option java_package = "org.thingsboard.server.gen.transport";
option java_outer_classname = "TransportProtos";
/**
* Data Structures;
* Transport Service Data Structures;
*/
message SessionInfoProto {
string nodeId = 1;
string ServiceId = 1;
int64 sessionIdMSB = 2;
int64 sessionIdLSB = 3;
int64 tenantIdMSB = 4;
int64 tenantIdLSB = 5;
int64 deviceIdMSB = 6;
int64 deviceIdLSB = 7;
string deviceName = 8;
string deviceType = 9;
}
enum SessionEvent {
@ -81,7 +83,7 @@ message DeviceInfoProto {
}
/**
* Messages that use Data Structures;
* Transport Service Messages;
*/
message SessionEventMsg {
SessionType sessionType = 1;
@ -181,7 +183,7 @@ message ClaimDeviceMsg {
int64 durationMs = 4;
}
//Used to report session state to tb-node and persist this state in the cache on the tb-node level.
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
message SubscriptionInfoProto {
int64 lastActivityTime = 1;
bool attributeSubscription = 2;
@ -200,45 +202,61 @@ message DeviceSessionsCacheEntry {
message TransportToDeviceActorMsg {
SessionInfoProto sessionInfo = 1;
SessionEventMsg sessionEvent = 2;
PostTelemetryMsg postTelemetry = 3;
PostAttributeMsg postAttributes = 4;
GetAttributeRequestMsg getAttributes = 5;
SubscribeToAttributeUpdatesMsg subscribeToAttributes = 6;
SubscribeToRPCMsg subscribeToRPC = 7;
ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 8;
ToServerRpcRequestMsg toServerRPCCallRequest = 9;
SubscriptionInfoProto subscriptionInfo = 10;
ClaimDeviceMsg claimDevice = 11;
GetAttributeRequestMsg getAttributes = 3;
SubscribeToAttributeUpdatesMsg subscribeToAttributes = 4;
SubscribeToRPCMsg subscribeToRPC = 5;
ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 6;
SubscriptionInfoProto subscriptionInfo = 7;
ClaimDeviceMsg claimDevice = 8;
}
message TransportToRuleEngineMsg {
SessionInfoProto sessionInfo = 1;
PostTelemetryMsg postTelemetry = 2;
PostAttributeMsg postAttributes = 3;
ToDeviceRpcResponseMsg toDeviceRPCCallResponse = 4;
ToServerRpcRequestMsg toServerRPCCallRequest = 5;
SubscriptionInfoProto subscriptionInfo = 6;
}
message DeviceActorToTransportMsg {
int64 sessionIdMSB = 1;
int64 sessionIdLSB = 2;
SessionCloseNotificationProto sessionCloseNotification = 3;
GetAttributeResponseMsg getAttributesResponse = 4;
AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
ToDeviceRpcRequestMsg toDeviceRequest = 6;
ToServerRpcResponseMsg toServerResponse = 7;
int64 sessionIdMSB = 1;
int64 sessionIdLSB = 2;
SessionCloseNotificationProto sessionCloseNotification = 3;
GetAttributeResponseMsg getAttributesResponse = 4;
AttributeUpdateNotificationMsg attributeUpdateNotification = 5;
ToDeviceRpcRequestMsg toDeviceRequest = 6;
ToServerRpcResponseMsg toServerResponse = 7;
}
/**
* Main messages;
*/
message ToRuleEngineMsg {
TransportToDeviceActorMsg toDeviceActorMsg = 1;
}
message ToTransportMsg {
DeviceActorToTransportMsg toDeviceSessionMsg = 1;
}
/* Request from Transport Service to ThingsBoard Core Service */
message TransportApiRequestMsg {
ValidateDeviceTokenRequestMsg validateTokenRequestMsg = 1;
ValidateDeviceX509CertRequestMsg validateX509CertRequestMsg = 2;
GetOrCreateDeviceFromGatewayRequestMsg getOrCreateDeviceRequestMsg = 3;
}
/* Response from ThingsBoard Core Service to Transport Service */
message TransportApiResponseMsg {
ValidateDeviceCredentialsResponseMsg validateTokenResponseMsg = 1;
GetOrCreateDeviceFromGatewayResponseMsg getOrCreateDeviceResponseMsg = 2;
}
/* Messages that are handled by ThingsBoard Core Service */
message ToCoreMsg {
TransportToDeviceActorMsg toDeviceActorMsg = 1;
}
/* Messages that are handled by ThingsBoard RuleEngine Service */
message ToRuleEngineMsg {
TransportToRuleEngineMsg toRuleEngineMsg = 1;
}
/* Messages that are handled by ThingsBoard Transport Service */
message ToTransportMsg {
DeviceActorToTransportMsg toDeviceSessionMsg = 1;
}

13
common/transport/transport-api/pom.xml

@ -99,19 +99,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

35
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/queue/KafkaTransportQueueProvider.java

@ -1,35 +0,0 @@
package org.thingsboard.server.common.transport.queue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Component;
import org.thingsboard.server.TbQueueConsumer;
import org.thingsboard.server.TbQueueProducer;
import org.thingsboard.server.TbQueueRequestTemplate;
import org.thingsboard.server.common.TbProtoQueueMsg;
import org.thingsboard.server.gen.transport.TransportProtos;
@Component
@ConditionalOnExpression("'${transport.type:null}'=='null' || '${transport.type}'=='local'")
@Slf4j
public class KafkaTransportQueueProvider implements TransportQueueProvider {
@Override
public TbQueueRequestTemplate<TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg>, TbProtoQueueMsg<TransportProtos.TransportApiResponseMsg>> getTransportApiRequestTemplate() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getRuleEngineMsgProducer() {
return null;
}
@Override
public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> getTbCoreMsgProducer() {
return null;
}
@Override
public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToTransportMsg>> getTransportNotificationsConsumer() {
return null;
}
}

98
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -34,12 +34,15 @@ import org.thingsboard.server.common.msg.tools.TbRateLimitsException;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.queue.TransportQueueProvider;
import org.thingsboard.server.provider.TransportQueueProvider;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToRuleEngineMsg;
import org.thingsboard.server.common.AsyncCallbackTemplate;
import javax.annotation.PostConstruct;
@ -81,7 +84,7 @@ public class DefaultTransportService implements TransportService {
protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> tbCoreMsgProducer;
protected TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreMsgProducer;
protected TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> transportNotificationsConsumer;
protected ScheduledExecutorService schedulerExecutor;
@ -188,22 +191,16 @@ public class DefaultTransportService implements TransportService {
if (log.isTraceEnabled()) {
log.trace("[{}] Processing msg: {}", toId(sessionInfo), msg);
}
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscriptionInfo(msg).build()
).build();
sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscriptionInfo(msg).build(), callback);
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSessionEvent(msg).build()
).build();
sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSessionEvent(msg).build(), callback);
}
}
@ -211,11 +208,8 @@ public class DefaultTransportService implements TransportService {
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setPostTelemetry(msg).build()
).build();
sendToRuleEngine(sessionInfo, toRuleEngineMsg, callback);
sendToRuleEngine(sessionInfo, TransportToRuleEngineMsg.newBuilder().setSessionInfo(sessionInfo).
setPostTelemetry(msg).build(), callback);
}
}
@ -223,11 +217,8 @@ public class DefaultTransportService implements TransportService {
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setPostAttributes(msg).build()
).build();
sendToRuleEngine(sessionInfo, toRuleEngineMsg, callback);
sendToRuleEngine(sessionInfo, TransportToRuleEngineMsg.newBuilder().setSessionInfo(sessionInfo).
setPostAttributes(msg).build(), callback);
}
}
@ -235,11 +226,8 @@ public class DefaultTransportService implements TransportService {
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setGetAttributes(msg).build()
).build();
sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setGetAttributes(msg).build(), callback);
}
}
@ -248,11 +236,8 @@ public class DefaultTransportService implements TransportService {
if (checkLimits(sessionInfo, msg, callback)) {
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe());
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscribeToAttributes(msg).build()
).build();
sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscribeToAttributes(msg).build(), callback);
}
}
@ -261,11 +246,8 @@ public class DefaultTransportService implements TransportService {
if (checkLimits(sessionInfo, msg, callback)) {
SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo);
sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe());
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscribeToRPC(msg).build()
).build();
sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSubscribeToRPC(msg).build(), callback);
}
}
@ -273,11 +255,8 @@ public class DefaultTransportService implements TransportService {
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setToDeviceRPCCallResponse(msg).build()
).build();
sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setToDeviceRPCCallResponse(msg).build(), callback);
}
}
@ -286,23 +265,16 @@ public class DefaultTransportService implements TransportService {
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
reportActivityInternal(sessionInfo);
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setToServerRPCCallRequest(msg).build()
).build();
sendToRuleEngine(sessionInfo, toRuleEngineMsg, callback);
sendToRuleEngine(sessionInfo, TransportToRuleEngineMsg.newBuilder().setSessionInfo(sessionInfo).
setToServerRPCCallRequest(msg).build(), callback);
}
}
@Override
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg,
TransportServiceCallback<Void> callback) {
public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback<Void> callback) {
if (checkLimits(sessionInfo, msg, callback)) {
ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setClaimDevice(msg).build()
).build();
sendToDeviceActor(sessionInfo, toRuleEngineMsg, callback);
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setClaimDevice(msg).build(), callback);
}
}
@ -454,13 +426,15 @@ public class DefaultTransportService implements TransportService {
.setEvent(event).build();
}
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) {
tbCoreMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), toRuleEngineMsg), callback != null ?
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
tbCoreMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
new TransportTbQueueCallback(callback) : null);
}
protected void sendToRuleEngine(TransportProtos.SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) {
ruleEngineMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), toRuleEngineMsg), callback != null ?
protected void sendToRuleEngine(TransportProtos.SessionInfoProto sessionInfo, TransportToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) {
ruleEngineMsgProducer.send(new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
ToRuleEngineMsg.newBuilder().setToRuleEngineMsg(toRuleEngineMsg).build()), callback != null ?
new TransportTbQueueCallback(callback) : null);
}
@ -473,16 +447,12 @@ public class DefaultTransportService implements TransportService {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
DefaultTransportService.this.transportCallbackExecutor.submit(() -> {
callback.onSuccess(null);
});
DefaultTransportService.this.transportCallbackExecutor.submit(() -> callback.onSuccess(null));
}
@Override
public void onFailure(Throwable t) {
DefaultTransportService.this.transportCallbackExecutor.submit(() -> {
callback.onError(t);
});
DefaultTransportService.this.transportCallbackExecutor.submit(() -> callback.onError(t));
}
}
}

Loading…
Cancel
Save