From 962755735b55cef48e8148664014487bde74b02b Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 4 May 2020 13:22:36 +0300 Subject: [PATCH 1/2] Fix for Server-side RPC in cluster mode --- .../queue/DefaultTbCoreConsumerService.java | 13 ++++++++++--- .../DefaultTbRuleEngineConsumerService.java | 18 +++++++++++++++++- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index a5abd77ad5..b7f6bd3dd2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -23,6 +23,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -45,6 +46,7 @@ import org.thingsboard.server.service.encoding.DataDecodingEncodingService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; @@ -100,7 +102,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); if (actorMsg.isPresent()) { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get(), ActorRef.noSender()); + TbActorMsg tbActorMsg = actorMsg.get(); + if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) { + tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg); + } else { + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); + actorContext.tell(actorMsg.get(), ActorRef.noSender()); + } } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 0c48c54412..182a62d397 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; @@ -31,6 +32,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbMsgCallback; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; import org.thingsboard.server.queue.TbQueueConsumer; @@ -48,6 +50,9 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStr import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory; +import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; +import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; import org.thingsboard.server.service.stats.RuleEngineStatisticsService; import javax.annotation.PostConstruct; @@ -81,6 +86,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory; private final TbQueueRuleEngineSettings ruleEngineSettings; private final RuleEngineStatisticsService statisticsService; + private final TbRuleEngineDeviceRpcService tbDeviceRpcService; private final ConcurrentMap>> consumers = new ConcurrentHashMap<>(); private final ConcurrentMap consumerConfigurations = new ConcurrentHashMap<>(); private final ConcurrentMap consumerStats = new ConcurrentHashMap<>(); @@ -90,13 +96,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TbRuleEngineSubmitStrategyFactory submitStrategyFactory, TbQueueRuleEngineSettings ruleEngineSettings, TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService, - ActorSystemContext actorContext, DataDecodingEncodingService encodingService) { + ActorSystemContext actorContext, DataDecodingEncodingService encodingService, + TbRuleEngineDeviceRpcService tbDeviceRpcService) { super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); this.statisticsService = statisticsService; this.ruleEngineSettings = ruleEngineSettings; this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; this.submitStrategyFactory = submitStrategyFactory; this.processingStrategyFactory = processingStrategyFactory; + this.tbDeviceRpcService = tbDeviceRpcService; } @PostConstruct @@ -227,7 +235,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< actorContext.tell(actorMsg.get(), ActorRef.noSender()); } callback.onSuccess(); + } else if (nfMsg.hasFromDeviceRpcResponse()) { + TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse(); + RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; + FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()) + , proto.getResponse(), error); + tbDeviceRpcService.processRpcResponseFromDevice(response); + callback.onSuccess(); } else { + log.trace("Received notification with missing handler"); callback.onSuccess(); } } From 830d02ef6a360ba93cca3c1629f5313303b4bac1 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Mon, 4 May 2020 14:00:18 +0300 Subject: [PATCH 2/2] Fix for HTTP and CoAP device activity --- .../transport/coap/CoapTransportResource.java | 23 +++++++++++++++++-- .../transport/http/DeviceApiController.java | 13 ++++++++++- .../transport/TransportServiceCallback.java | 13 +++++++++++ 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java index 82678fde5d..3b4b26aa47 100644 --- a/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java +++ b/common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java @@ -37,6 +37,7 @@ import org.thingsboard.server.gen.transport.TransportProtos; import java.lang.reflect.Field; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -55,6 +56,8 @@ public class CoapTransportResource extends CoapResource { private final Field observerField; private final long timeout; private final ConcurrentMap tokenToSessionIdMap = new ConcurrentHashMap<>(); + private final Set rpcSubscriptions = ConcurrentHashMap.newKeySet(); + private final Set attributeSubscriptions = ConcurrentHashMap.newKeySet(); public CoapTransportResource(CoapTransportContext context, String name) { super(name); @@ -149,11 +152,13 @@ public class CoapTransportResource extends CoapResource { transportService.process(sessionInfo, transportContext.getAdaptor().convertToPostAttributes(sessionId, request), new CoapOkCallback(exchange)); + reportActivity(sessionId, sessionInfo); break; case POST_TELEMETRY_REQUEST: transportService.process(sessionInfo, transportContext.getAdaptor().convertToPostTelemetry(sessionId, request), new CoapOkCallback(exchange)); + reportActivity(sessionId, sessionInfo); break; case CLAIM_REQUEST: transportService.process(sessionInfo, @@ -161,6 +166,7 @@ public class CoapTransportResource extends CoapResource { new CoapOkCallback(exchange)); break; case SUBSCRIBE_ATTRIBUTES_REQUEST: + attributeSubscriptions.add(sessionId); advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced), registerAsyncCoapSession(exchange, request, sessionInfo, sessionId))); transportService.process(sessionInfo, @@ -168,6 +174,7 @@ public class CoapTransportResource extends CoapResource { new CoapNoOpCallback(exchange)); break; case UNSUBSCRIBE_ATTRIBUTES_REQUEST: + attributeSubscriptions.remove(sessionId); TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(request); if (attrSession != null) { transportService.process(attrSession, @@ -177,6 +184,7 @@ public class CoapTransportResource extends CoapResource { } break; case SUBSCRIBE_RPC_COMMANDS_REQUEST: + rpcSubscriptions.add(sessionId); advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced), registerAsyncCoapSession(exchange, request, sessionInfo, sessionId))); transportService.process(sessionInfo, @@ -184,13 +192,13 @@ public class CoapTransportResource extends CoapResource { new CoapNoOpCallback(exchange)); break; case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: + rpcSubscriptions.remove(sessionId); TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(request); if (rpcSession != null) { transportService.process(rpcSession, TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), new CoapOkCallback(exchange)); - transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); - transportService.deregisterSession(rpcSession); + closeAndDeregister(sessionInfo); } break; case TO_DEVICE_RPC_RESPONSE: @@ -221,6 +229,14 @@ public class CoapTransportResource extends CoapResource { })); } + private void reportActivity(UUID sessionId, TransportProtos.SessionInfoProto sessionInfo) { + transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() + .setAttributeSubscription(attributeSubscriptions.contains(sessionId)) + .setRpcSubscription(rpcSubscriptions.contains(sessionId)) + .setLastActivityTime(System.currentTimeMillis()) + .build(), TransportServiceCallback.EMPTY); + } + private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(Request request) { String token = request.getSource().getHostAddress() + ":" + request.getSourcePort() + ":" + request.getTokenString(); return tokenToSessionIdMap.remove(token); @@ -438,6 +454,9 @@ public class CoapTransportResource extends CoapResource { private void closeAndDeregister(TransportProtos.SessionInfoProto session) { transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); transportService.deregisterSession(session); + UUID sessionId = new UUID(session.getSessionIdMSB(), session.getSessionIdLSB()); + rpcSubscriptions.remove(sessionId); + attributeSubscriptions.remove(sessionId); } } diff --git a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java index c2f171e39b..727600a626 100644 --- a/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java +++ b/common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.TransportContext; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; @@ -102,6 +103,7 @@ public class DeviceApiController { TransportService transportService = transportContext.getTransportService(); transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)), new HttpOkCallback(responseWriter)); + reportActivity(sessionInfo); })); return responseWriter; } @@ -115,6 +117,7 @@ public class DeviceApiController { TransportService transportService = transportContext.getTransportService(); transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)), new HttpOkCallback(responseWriter)); + reportActivity(sessionInfo); })); return responseWriter; } @@ -274,7 +277,6 @@ public class DeviceApiController { } } - private static class HttpSessionListener implements SessionMsgListener { private final DeferredResult responseWriter; @@ -308,4 +310,13 @@ public class DeviceApiController { responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK)); } } + + private void reportActivity(SessionInfoProto sessionInfo) { + transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() + .setAttributeSubscription(false) + .setRpcSubscription(false) + .setLastActivityTime(System.currentTimeMillis()) + .build(), TransportServiceCallback.EMPTY); + } + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java index 1f0d6c4c2c..f5b9ff22dd 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java @@ -20,7 +20,20 @@ package org.thingsboard.server.common.transport; */ public interface TransportServiceCallback { + TransportServiceCallback EMPTY = new TransportServiceCallback() { + @Override + public void onSuccess(Void msg) { + + } + + @Override + public void onError(Throwable e) { + + } + }; + void onSuccess(T msg); + void onError(Throwable e); }