13 changed files with 283 additions and 39 deletions
@ -0,0 +1,126 @@ |
|||
package org.thingsboard.server.service.queue; |
|||
|
|||
import akka.actor.ActorRef; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.boot.context.event.ApplicationReadyEvent; |
|||
import org.springframework.context.event.EventListener; |
|||
import org.springframework.scheduling.annotation.Scheduled; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.TbQueueConsumer; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; |
|||
import org.thingsboard.server.provider.TbCoreQueueProvider; |
|||
import org.thingsboard.server.service.transport.RuleEngineStats; |
|||
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import javax.annotation.PreDestroy; |
|||
import java.util.List; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.concurrent.CountDownLatch; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.function.Function; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Service |
|||
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core')") |
|||
@Slf4j |
|||
public class DefaultTbCoreConsumerService implements TbCoreConsumerService { |
|||
|
|||
@Value("${queue.core.poll_interval}") |
|||
private long pollDuration; |
|||
@Value("${queue.core.pack_processing_timeout}") |
|||
private long packProcessingTimeout; |
|||
@Value("${queue.core.stats.enabled:false}") |
|||
private boolean statsEnabled; |
|||
|
|||
private final ActorSystemContext actorContext; |
|||
private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> consumer; |
|||
private final RuleEngineStats stats = new RuleEngineStats(); |
|||
private volatile ExecutorService mainConsumerExecutor; |
|||
private volatile boolean stopped = false; |
|||
|
|||
public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext) { |
|||
this.consumer = tbCoreQueueProvider.getToCoreMsgConsumer(); |
|||
this.actorContext = actorContext; |
|||
} |
|||
|
|||
@PostConstruct |
|||
public void init() { |
|||
this.consumer.subscribe(); |
|||
this.mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("tb-core-consumer")); |
|||
} |
|||
|
|||
@EventListener(ApplicationReadyEvent.class) |
|||
public void onApplicationEvent(ApplicationReadyEvent event) { |
|||
mainConsumerExecutor.execute(() -> { |
|||
while (!stopped) { |
|||
try { |
|||
List<TbProtoQueueMsg<ToCoreMsg>> msgs = consumer.poll(pollDuration); |
|||
ConcurrentMap<UUID, TbProtoQueueMsg<ToCoreMsg>> ackMap = msgs.stream().collect( |
|||
Collectors.toConcurrentMap(s -> UUID.randomUUID(), Function.identity())); |
|||
CountDownLatch processingTimeoutLatch = new CountDownLatch(1); |
|||
ackMap.forEach((id, msg) -> { |
|||
TbMsgCallback callback = new MsgPackCallback<>(id, processingTimeoutLatch, ackMap); |
|||
try { |
|||
ToCoreMsg toRuleEngineMsg = msg.getValue(); |
|||
log.trace("Forwarding message to rule engine {}", toRuleEngineMsg); |
|||
if (toRuleEngineMsg.hasToDeviceActorMsg()) { |
|||
forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg(), callback); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
} catch (Throwable e) { |
|||
callback.onFailure(e); |
|||
} |
|||
}); |
|||
if (!processingTimeoutLatch.await(packProcessingTimeout, TimeUnit.MILLISECONDS)) { |
|||
ackMap.forEach((id, msg) -> log.warn("[{}] Timeout to process message: {}", id, msg.getValue())); |
|||
} |
|||
consumer.commit(); |
|||
} catch (Exception e) { |
|||
log.warn("Failed to obtain messages from queue.", e); |
|||
try { |
|||
Thread.sleep(pollDuration); |
|||
} catch (InterruptedException e2) { |
|||
log.trace("Failed to wait until the server has capacity to handle new requests", e2); |
|||
} |
|||
} |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbMsgCallback callback) { |
|||
if (statsEnabled) { |
|||
stats.log(toDeviceActorMsg); |
|||
} |
|||
actorContext.getAppActor().tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback), ActorRef.noSender()); |
|||
} |
|||
|
|||
@Scheduled(fixedDelayString = "${queue.core.stats.print_interval_ms}") |
|||
public void printStats() { |
|||
if (statsEnabled) { |
|||
stats.printStats(); |
|||
} |
|||
} |
|||
|
|||
@PreDestroy |
|||
public void destroy() { |
|||
stopped = true; |
|||
if (consumer != null) { |
|||
consumer.unsubscribe(); |
|||
} |
|||
if (mainConsumerExecutor != null) { |
|||
mainConsumerExecutor.shutdownNow(); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
package org.thingsboard.server.service.queue; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.TbProtoQueueMsg; |
|||
|
|||
import java.util.UUID; |
|||
import java.util.concurrent.ConcurrentMap; |
|||
import java.util.concurrent.CountDownLatch; |
|||
|
|||
@Slf4j |
|||
public class MsgPackCallback<T extends com.google.protobuf.GeneratedMessageV3> implements TbMsgCallback { |
|||
private final CountDownLatch processingTimeoutLatch; |
|||
private final ConcurrentMap<UUID, TbProtoQueueMsg<T>> ackMap; |
|||
private final UUID id; |
|||
|
|||
public MsgPackCallback(UUID id, CountDownLatch processingTimeoutLatch, ConcurrentMap<UUID, TbProtoQueueMsg<T>> ackMap) { |
|||
this.id = id; |
|||
this.processingTimeoutLatch = processingTimeoutLatch; |
|||
this.ackMap = ackMap; |
|||
} |
|||
|
|||
@Override |
|||
public void onSuccess() { |
|||
if (ackMap.remove(id) != null && ackMap.isEmpty()) { |
|||
processingTimeoutLatch.countDown(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
TbProtoQueueMsg<T> message = ackMap.remove(id); |
|||
log.warn("Failed to process message: {}", message.getValue(), t); |
|||
if (ackMap.isEmpty()) { |
|||
processingTimeoutLatch.countDown(); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
package org.thingsboard.server.service.queue; |
|||
|
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
|
|||
import java.util.function.Consumer; |
|||
|
|||
public interface TbCoreConsumerService { |
|||
|
|||
} |
|||
@ -0,0 +1,9 @@ |
|||
package org.thingsboard.server.service.queue; |
|||
|
|||
public interface TbMsgCallback { |
|||
|
|||
void onSuccess(); |
|||
|
|||
void onFailure(Throwable t); |
|||
|
|||
} |
|||
@ -0,0 +1,72 @@ |
|||
package org.thingsboard.server.service.transport; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.TbQueueCallback; |
|||
import org.thingsboard.server.TbQueueMsgMetadata; |
|||
import org.thingsboard.server.TbQueueProducer; |
|||
import org.thingsboard.server.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; |
|||
import org.thingsboard.server.provider.TbCoreQueueProvider; |
|||
|
|||
import java.util.UUID; |
|||
import java.util.function.Consumer; |
|||
|
|||
@Slf4j |
|||
@Service |
|||
@ConditionalOnExpression("'${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core')") |
|||
public class DefaultTbCoreToTransportService implements TbCoreToTransportService { |
|||
|
|||
private final TbCoreQueueProvider tbCoreQueueProvider; |
|||
private final TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> tbTransportProducer; |
|||
|
|||
@Value("${queue.notifications.topic}") |
|||
private String notificationsTopic; |
|||
|
|||
public DefaultTbCoreToTransportService(TbCoreQueueProvider tbCoreQueueProvider) { |
|||
this.tbCoreQueueProvider = tbCoreQueueProvider; |
|||
this.tbTransportProducer = tbCoreQueueProvider.getTransportMsgProducer(); |
|||
} |
|||
|
|||
@Override |
|||
public void process(String nodeId, DeviceActorToTransportMsg msg) { |
|||
process(nodeId, msg, null, null); |
|||
} |
|||
|
|||
@Override |
|||
public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) { |
|||
String topic = notificationsTopic + "." + nodeId; |
|||
UUID sessionId = new UUID(msg.getSessionIdMSB(), msg.getSessionIdLSB()); |
|||
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build(); |
|||
log.trace("[{}][{}] Pushing session data to topic: {}", topic, sessionId, transportMsg); |
|||
TbProtoQueueMsg<ToTransportMsg> queueMsg = new TbProtoQueueMsg<>(sessionId, transportMsg); |
|||
tbTransportProducer.send(topic, queueMsg, new QueueCallbackAdaptor(onSuccess, onFailure)); |
|||
} |
|||
|
|||
private static class QueueCallbackAdaptor implements TbQueueCallback { |
|||
private final Runnable onSuccess; |
|||
private final Consumer<Throwable> onFailure; |
|||
|
|||
QueueCallbackAdaptor(Runnable onSuccess, Consumer<Throwable> onFailure) { |
|||
this.onSuccess = onSuccess; |
|||
this.onFailure = onFailure; |
|||
} |
|||
|
|||
@Override |
|||
public void onSuccess(TbQueueMsgMetadata metadata) { |
|||
if (onSuccess != null) { |
|||
onSuccess.run(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
if (onFailure != null) { |
|||
onFailure.accept(t); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,31 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.transport; |
|||
|
|||
import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; |
|||
|
|||
import java.util.function.Consumer; |
|||
|
|||
/** |
|||
* Created by ashvayka on 05.10.18. |
|||
*/ |
|||
public interface RuleEngineTransportService { |
|||
|
|||
void process(String nodeId, DeviceActorToTransportMsg msg); |
|||
|
|||
void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure); |
|||
|
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
package org.thingsboard.server.service.transport; |
|||
|
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
|
|||
import java.util.function.Consumer; |
|||
|
|||
public interface TbCoreToTransportService { |
|||
|
|||
void process(String nodeId, TransportProtos.DeviceActorToTransportMsg msg); |
|||
|
|||
void process(String nodeId, TransportProtos.DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure); |
|||
|
|||
} |
|||
Loading…
Reference in new issue