|
|
|
@ -55,6 +55,9 @@ import org.thingsboard.server.queue.discovery.PartitionService; |
|
|
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|
|
|
import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
|
|
|
import org.thingsboard.server.queue.provider.TbTransportQueueFactory; |
|
|
|
import org.thingsboard.server.common.msg.stats.MessagesStats; |
|
|
|
import org.thingsboard.server.common.msg.stats.StatsFactory; |
|
|
|
import org.thingsboard.server.common.msg.stats.StatsType; |
|
|
|
|
|
|
|
import javax.annotation.PostConstruct; |
|
|
|
import javax.annotation.PreDestroy; |
|
|
|
@ -101,12 +104,17 @@ public class DefaultTransportService implements TransportService { |
|
|
|
private final TbQueueProducerProvider producerProvider; |
|
|
|
private final PartitionService partitionService; |
|
|
|
private final TbServiceInfoProvider serviceInfoProvider; |
|
|
|
private final StatsFactory statsFactory; |
|
|
|
|
|
|
|
protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate; |
|
|
|
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer; |
|
|
|
protected TbQueueProducer<TbProtoQueueMsg<ToCoreMsg>> tbCoreMsgProducer; |
|
|
|
protected TbQueueConsumer<TbProtoQueueMsg<ToTransportMsg>> transportNotificationsConsumer; |
|
|
|
|
|
|
|
protected MessagesStats ruleEngineProducerStats; |
|
|
|
protected MessagesStats tbCoreProducerStats; |
|
|
|
protected MessagesStats transportApiStats; |
|
|
|
|
|
|
|
protected ScheduledExecutorService schedulerExecutor; |
|
|
|
protected ExecutorService transportCallbackExecutor; |
|
|
|
|
|
|
|
@ -119,11 +127,12 @@ public class DefaultTransportService implements TransportService { |
|
|
|
private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); |
|
|
|
private volatile boolean stopped = false; |
|
|
|
|
|
|
|
public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService) { |
|
|
|
public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, TbTransportQueueFactory queueProvider, TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory) { |
|
|
|
this.serviceInfoProvider = serviceInfoProvider; |
|
|
|
this.queueProvider = queueProvider; |
|
|
|
this.producerProvider = producerProvider; |
|
|
|
this.partitionService = partitionService; |
|
|
|
this.statsFactory = statsFactory; |
|
|
|
} |
|
|
|
|
|
|
|
@PostConstruct |
|
|
|
@ -133,10 +142,14 @@ public class DefaultTransportService implements TransportService { |
|
|
|
new TbRateLimits(perTenantLimitsConf); |
|
|
|
new TbRateLimits(perDevicesLimitsConf); |
|
|
|
} |
|
|
|
this.ruleEngineProducerStats = statsFactory.createMessagesStats(StatsType.RULE_ENGINE.getName() + ".producer"); |
|
|
|
this.tbCoreProducerStats = statsFactory.createMessagesStats(StatsType.CORE.getName() + ".producer"); |
|
|
|
this.transportApiStats = statsFactory.createMessagesStats(StatsType.TRANSPORT.getName() + ".producer"); |
|
|
|
this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler")); |
|
|
|
this.transportCallbackExecutor = Executors.newWorkStealingPool(20); |
|
|
|
this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); |
|
|
|
transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate(); |
|
|
|
transportApiRequestTemplate.setMessagesStats(transportApiStats); |
|
|
|
ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer(); |
|
|
|
tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer(); |
|
|
|
transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer(); |
|
|
|
@ -557,10 +570,14 @@ public class DefaultTransportService implements TransportService { |
|
|
|
if (log.isTraceEnabled()) { |
|
|
|
log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg); |
|
|
|
} |
|
|
|
TransportTbQueueCallback transportTbQueueCallback = callback != null ? |
|
|
|
new TransportTbQueueCallback(callback) : null; |
|
|
|
tbCoreProducerStats.incrementTotal(); |
|
|
|
StatsCallback wrappedCallback = new StatsCallback(transportTbQueueCallback, tbCoreProducerStats); |
|
|
|
tbCoreMsgProducer.send(tpi, |
|
|
|
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo), |
|
|
|
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ? |
|
|
|
new TransportTbQueueCallback(callback) : null); |
|
|
|
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), |
|
|
|
wrappedCallback); |
|
|
|
} |
|
|
|
|
|
|
|
protected void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) { |
|
|
|
@ -571,7 +588,9 @@ public class DefaultTransportService implements TransportService { |
|
|
|
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder().setTbMsg(TbMsg.toByteString(tbMsg)) |
|
|
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
|
|
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()).build(); |
|
|
|
ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback); |
|
|
|
ruleEngineProducerStats.incrementTotal(); |
|
|
|
StatsCallback wrappedCallback = new StatsCallback(callback, ruleEngineProducerStats); |
|
|
|
ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback); |
|
|
|
} |
|
|
|
|
|
|
|
private class TransportTbQueueCallback implements TbQueueCallback { |
|
|
|
@ -592,6 +611,30 @@ public class DefaultTransportService implements TransportService { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private class StatsCallback implements TbQueueCallback { |
|
|
|
private final TbQueueCallback callback; |
|
|
|
private final MessagesStats stats; |
|
|
|
|
|
|
|
private StatsCallback(TbQueueCallback callback, MessagesStats stats) { |
|
|
|
this.callback = callback; |
|
|
|
this.stats = stats; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onSuccess(TbQueueMsgMetadata metadata) { |
|
|
|
stats.incrementSuccessful(); |
|
|
|
if (callback != null) |
|
|
|
callback.onSuccess(metadata); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailure(Throwable t) { |
|
|
|
stats.incrementFailed(); |
|
|
|
if (callback != null) |
|
|
|
callback.onFailure(t); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private class MsgPackCallback implements TbQueueCallback { |
|
|
|
private final AtomicInteger msgCount; |
|
|
|
private final TransportServiceCallback<Void> callback; |
|
|
|
|