diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java new file mode 100644 index 0000000000..d919857318 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -0,0 +1,126 @@ +package org.thingsboard.server.service.queue; + +import akka.actor.ActorRef; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; +import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.TbQueueConsumer; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.provider.TbCoreQueueProvider; +import org.thingsboard.server.service.transport.RuleEngineStats; +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +@Service +@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core')") +@Slf4j +public class DefaultTbCoreConsumerService implements TbCoreConsumerService { + + @Value("${queue.core.poll_interval}") + private long pollDuration; + @Value("${queue.core.pack_processing_timeout}") + private long packProcessingTimeout; + @Value("${queue.core.stats.enabled:false}") + private boolean statsEnabled; + + private final ActorSystemContext actorContext; + private final TbQueueConsumer> consumer; + private final RuleEngineStats stats = new RuleEngineStats(); + private volatile ExecutorService mainConsumerExecutor; + private volatile boolean stopped = false; + + public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext) { + this.consumer = tbCoreQueueProvider.getToCoreMsgConsumer(); + this.actorContext = actorContext; + } + + @PostConstruct + public void init() { + this.consumer.subscribe(); + this.mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-consumer")); + } + + @EventListener(ApplicationReadyEvent.class) + public void onApplicationEvent(ApplicationReadyEvent event) { + mainConsumerExecutor.execute(() -> { + while (!stopped) { + try { + List> msgs = consumer.poll(pollDuration); + ConcurrentMap> ackMap = msgs.stream().collect( + Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); + CountDownLatch processingTimeoutLatch = new CountDownLatch(1); + ackMap.forEach((id, msg) -> { + TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap); + try { + ToCoreMsg toRuleEngineMsg = msg.getValue(); + log.trace("Forwarding message to rule engine {}", toRuleEngineMsg); + if (toRuleEngineMsg.hasToDeviceActorMsg()) { + forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg(), callback); + } else { + callback.onSuccess(); + } + } catch (Throwable e) { + callback.onFailure(e); + } + }); + if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { + ackMap.forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); + } + consumer.commit(); + } catch (Exception e) { + log.warn("Failed to obtain messages from queue.", e); + try { + Thread.sleep(pollDuration); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new requests", e2); + } + } + } + }); + } + + private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) { + if (statsEnabled) { + stats.log(toDeviceActorMsg); + } + actorContext.getAppActor().tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender()); + } + + @Scheduled(fixedDelayString = "${queue.core.stats.print_interval_ms}") + public void printStats() { + if (statsEnabled) { + stats.printStats(); + } + } + + @PreDestroy + public void destroy() { + stopped = true; + if (consumer != null) { + consumer.unsubscribe(); + } + if (mainConsumerExecutor != null) { + mainConsumerExecutor.shutdownNow(); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java new file mode 100644 index 0000000000..c700c18dc0 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/MsgPackCallback.java @@ -0,0 +1,37 @@ +package org.thingsboard.server.service.queue; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.TbProtoQueueMsg; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; + +@Slf4j +public class MsgPackCallback implements TbMsgCallback { + private final CountDownLatch processingTimeoutLatch; + private final ConcurrentMap> ackMap; + private final UUID id; + + public MsgPackCallback(UUID id, CountDownLatch processingTimeoutLatch, ConcurrentMap> ackMap) { + this.id = id; + this.processingTimeoutLatch = processingTimeoutLatch; + this.ackMap = ackMap; + } + + @Override + public void onSuccess() { + if (ackMap.remove(id) != null && ackMap.isEmpty()) { + processingTimeoutLatch.countDown(); + } + } + + @Override + public void onFailure(Throwable t) { + TbProtoQueueMsg message = ackMap.remove(id); + log.warn("Failed to process message: {}", message.getValue(), t); + if (ackMap.isEmpty()) { + processingTimeoutLatch.countDown(); + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerService.java new file mode 100644 index 0000000000..0541de7c63 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerService.java @@ -0,0 +1,9 @@ +package org.thingsboard.server.service.queue; + +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.function.Consumer; + +public interface TbCoreConsumerService { + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbMsgCallback.java b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgCallback.java new file mode 100644 index 0000000000..44dd058e1e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbMsgCallback.java @@ -0,0 +1,9 @@ +package org.thingsboard.server.service.queue; + +public interface TbMsgCallback { + + void onSuccess(); + + void onFailure(Throwable t); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java new file mode 100644 index 0000000000..c1b0608d44 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTbCoreToTransportService.java @@ -0,0 +1,72 @@ +package org.thingsboard.server.service.transport; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Service; +import org.thingsboard.server.TbQueueCallback; +import org.thingsboard.server.TbQueueMsgMetadata; +import org.thingsboard.server.TbQueueProducer; +import org.thingsboard.server.common.TbProtoQueueMsg; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.provider.TbCoreQueueProvider; + +import java.util.UUID; +import java.util.function.Consumer; + +@Slf4j +@Service +@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core')") +public class DefaultTbCoreToTransportService implements TbCoreToTransportService { + + private final TbCoreQueueProvider tbCoreQueueProvider; + private final TbQueueProducer> tbTransportProducer; + + @Value("${queue.notifications.topic}") + private String notificationsTopic; + + public DefaultTbCoreToTransportService(TbCoreQueueProvider tbCoreQueueProvider) { + this.tbCoreQueueProvider = tbCoreQueueProvider; + this.tbTransportProducer = tbCoreQueueProvider.getTransportMsgProducer(); + } + + @Override + public void process(String nodeId, DeviceActorToTransportMsg msg) { + process(nodeId, msg, null, null); + } + + @Override + public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer onFailure) { + String topic = notificationsTopic + "." + nodeId; + UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build(); + log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg); + TbProtoQueueMsg queueMsg = new TbProtoQueueMsg<>(sessionId, transportMsg); + tbTransportProducer.send(topic, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure)); + } + + private static class QueueCallbackAdaptor implements TbQueueCallback { + private final Runnable onSuccess; + private final Consumer onFailure; + + QueueCallbackAdaptor(Runnable onSuccess, Consumer onFailure) { + this.onSuccess = onSuccess; + this.onFailure = onFailure; + } + + @Override + public void onSuccess(TbQueueMsgMetadata metadata) { + if (onSuccess != null) { + onSuccess.run(); + } + } + + @Override + public void onFailure(Throwable t) { + if (onFailure != null) { + onFailure.accept(t); + } + } + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java index 644d4191b1..354252642f 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteTransportApiService.java @@ -22,6 +22,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; import org.thingsboard.server.TbQueueConsumer; import org.thingsboard.server.TbQueueProducer; import org.thingsboard.server.TbQueueRequestTemplate; @@ -42,7 +43,7 @@ import java.util.concurrent.*; * Created by ashvayka on 05.10.18. */ @Slf4j -@Component +@Service @ConditionalOnProperty(prefix = "transport", value = "type", havingValue = "remote") public class RemoteTransportApiService { diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java deleted file mode 100644 index 0d401e4d9c..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.transport; - -import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; - -import java.util.function.Consumer; - -/** - * Created by ashvayka on 05.10.18. - */ -public interface RuleEngineTransportService { - - void process(String nodeId, DeviceActorToTransportMsg msg); - - void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer onFailure); - -} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/TbCoreToTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreToTransportService.java new file mode 100644 index 0000000000..3324f02440 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/TbCoreToTransportService.java @@ -0,0 +1,13 @@ +package org.thingsboard.server.service.transport; + +import org.thingsboard.server.gen.transport.TransportProtos; + +import java.util.function.Consumer; + +public interface TbCoreToTransportService { + + void process(String nodeId, TransportProtos.DeviceActorToTransportMsg msg); + + void process(String nodeId, TransportProtos.DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer onFailure); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java index 34abac72bb..880d7507ce 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; import org.thingsboard.server.common.msg.aware.TenantAwareMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.service.queue.TbMsgCallback; import java.io.Serializable; import java.util.UUID; @@ -36,9 +37,11 @@ public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAware private final TenantId tenantId; private final DeviceId deviceId; private final TransportToDeviceActorMsg msg; + private final TbMsgCallback callback; - public TransportToDeviceActorMsgWrapper(TransportToDeviceActorMsg msg) { + public TransportToDeviceActorMsgWrapper(TransportToDeviceActorMsg msg, TbMsgCallback callback) { this.msg = msg; + this.callback = callback; this.tenantId = new TenantId(new UUID(msg.getSessionInfo().getTenantIdMSB(), msg.getSessionInfo().getTenantIdLSB())); this.deviceId = new DeviceId(new UUID(msg.getSessionInfo().getDeviceIdMSB(), msg.getSessionInfo().getDeviceIdLSB())); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index dc479191e7..2ded608203 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -466,7 +466,6 @@ js: print_interval_ms: "${TB_JS_REMOTE_STATS_PRINT_INTERVAL_MS:10000}" transport: - type: "${TRANSPORT_TYPE:local}" # local or remote sessions: inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" @@ -533,7 +532,6 @@ swagger: url: "${SWAGGER_LICENSE_URL:https://github.com/thingsboard/thingsboard/blob/master/LICENSE}" version: "${SWAGGER_VERSION:2.0}" - queue: type: "${TB_QUEUE_TYPE:kafka}" kafka: @@ -555,6 +553,10 @@ queue: topic: "${TB_QUEUE_CORE_TOPIC:tb.core}" poll_interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" partitions: "${TB_QUEUE_CORE_PARTITIONS:100}" + pack_processing_timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:60000}" + stats: + enabled: "${TB_QUEUE_CORE_STATS_ENABLED:false}" + print_interval_ms: "${TB_QUEUE_CORE_STATS_PRINT_INTERVAL_MS:10000}" rule_engine: topic: "${TB_QUEUE_RULE_ENGINE_TOPIC:tb.rule-engine}" poll_interval: "${TB_QUEUE_RULE_ENGINE_POLL_INTERVAL_MS:25}" @@ -565,5 +567,5 @@ queue: notifications: topic: "${TB_QUEUE_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}" -rule_engine: - type: "${RULE_ENGINE_TYPE:local}" # local or remote \ No newline at end of file +service: + type: "${TB_SERVICE_TYPE:monolith}" # monolith or tb-core or tb-rule-engine or tb-transport \ No newline at end of file diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java index eac81b80c3..2f9a9ad8f8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTbCoreQueueProvider.java @@ -8,7 +8,7 @@ import org.thingsboard.server.common.TbProtoQueueMsg; import org.thingsboard.server.gen.transport.TransportProtos; @Component -@ConditionalOnExpression("'${transport.type:null}'=='local' || '${transport.type:null}'=='remote'") +@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && ${queue.type:null}'=='kafka'") public class KafkaTbCoreQueueProvider implements TbCoreQueueProvider{ @Override public TbQueueProducer> getTransportMsgProducer() { diff --git a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java index 6686b3704f..945888a45a 100644 --- a/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/provider/KafkaTransportQueueProvider.java @@ -14,7 +14,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestM import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; @Component -@ConditionalOnExpression("'${transport.type:null}'=='null' || '${transport.type}'=='local'") +@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-transport') && ${queue.type:null}'=='kafka'") @Slf4j public class KafkaTransportQueueProvider implements TransportQueueProvider { @Override diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 741adc6795..721db2605d 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -62,6 +62,9 @@ transport: # Maximum allowed string value length when processing Telemetry/Attributes JSON (0 value disables string value length check) max_string_value_length: "${JSON_MAX_STRING_VALUE_LENGTH:0}" +service: + type: "${TB_SERVICE_TYPE:tb-transport}" # monolith or tb-core or tb-rule-engine or tb-transport + queue: type: "${TB_QUEUE_TYPE:kafka}" kafka: