From f0bccc7cfdfd92b35c586be9e6b364a1e20678d1 Mon Sep 17 00:00:00 2001 From: Andrew Shvayka Date: Tue, 9 Oct 2018 14:16:08 +0300 Subject: [PATCH] WIP: Implementation --- .../server/actors/ActorSystemContext.java | 7 + .../server/actors/app/AppActor.java | 13 +- .../server/actors/device/DeviceActor.java | 6 +- .../device/DeviceActorMessageProcessor.java | 220 ++++++++++-------- .../actors/device/PendingSessionMsgData.java | 3 +- .../server/actors/device/SessionInfo.java | 16 +- .../actors/session/ASyncMsgProcessor.java | 156 ------------- .../AbstractSessionActorMsgProcessor.java | 122 ---------- .../server/actors/session/SessionActor.java | 143 ------------ .../actors/session/SessionManagerActor.java | 180 -------------- .../actors/session/SyncMsgProcessor.java | 95 -------- .../server/actors/tenant/TenantActor.java | 2 +- .../RemoteRuleEngineTransportService.java | 187 +++++++++++++++ .../transport/RuleEngineTransportService.java | 32 +++ .../transport/ToRuleEngineMsgDecoder.java | 31 +++ .../transport/ToTransportMsgEncoder.java | 29 +++ .../msg/TransportToDeviceActorMsgWrapper.java | 36 +++ .../src/main/resources/thingsboard.yml | 6 +- .../server/common/msg/MsgType.java | 13 +- .../device/BasicDeviceToDeviceActorMsg.java | 107 --------- .../msg/device/DeviceToDeviceActorMsg.java | 41 ---- .../server/kafka/TBKafkaConsumerTemplate.java | 12 +- .../server/kafka/TBKafkaProducerTemplate.java | 29 +-- .../server/kafka/TbKafkaResponseTemplate.java | 109 +++++---- .../common/transport/SessionMsgListener.java | 22 +- .../common/transport/TransportService.java | 34 ++- .../session/DeviceAwareSessionContext.java | 35 +-- .../transport/src/main/proto/transport.proto | 80 +++++-- .../transport/mqtt/MqttTransportHandler.java | 67 ++---- .../mqtt/service/MqttTransportService.java | 168 ++++++++++++- .../mqtt/service/ToRuleEngineMsgEncoder.java | 29 +++ .../ToTransportMsgResponseDecoder.java | 31 +++ .../src/main/resources/tb-mqtt-transport.yml | 4 + 33 files changed, 901 insertions(+), 1164 deletions(-) delete mode 100644 application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java delete mode 100644 application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java delete mode 100644 application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java delete mode 100644 application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java delete mode 100644 application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java create mode 100644 application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java create mode 100644 application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java create mode 100644 application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java create mode 100644 application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java create mode 100644 application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java delete mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java delete mode 100644 common/message/src/main/java/org/thingsboard/server/common/msg/device/DeviceToDeviceActorMsg.java rename application/src/main/java/org/thingsboard/server/actors/session/SessionTerminationMsg.java => common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java (59%) create mode 100644 transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java create mode 100644 transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToTransportMsgResponseDecoder.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 8930f8476e..8ff61aca46 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -31,6 +31,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.server.actors.service.ActorService; @@ -69,6 +70,7 @@ import org.thingsboard.server.service.script.JsExecutorService; import org.thingsboard.server.service.script.JsInvokeService; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; +import org.thingsboard.server.service.transport.RuleEngineTransportService; import javax.annotation.Nullable; import java.io.IOException; @@ -204,6 +206,11 @@ public class ActorSystemContext { @Getter private DeviceStateService deviceStateService; + @Lazy + @Autowired + @Getter + private RuleEngineTransportService ruleEngineTransportService; + @Value("${cluster.partition_id}") @Getter private long queuePartitionId; diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 6a78f78d1b..14ca58610d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -39,7 +39,6 @@ import org.thingsboard.server.common.msg.aware.TenantAwareMsg; import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg; import org.thingsboard.server.dao.model.ModelConstants; @@ -105,7 +104,7 @@ public class AppActor extends RuleChainManagerActor { case SERVICE_TO_RULE_ENGINE_MSG: onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg); break; - case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG: + case TRANSPORT_TO_DEVICE_ACTOR_MSG: case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: @@ -169,16 +168,6 @@ public class AppActor extends RuleChainManagerActor { getOrCreateTenantActor(msg.getTenantId()).tell(msg, ActorRef.noSender()); } - private void processDeviceMsg(DeviceToDeviceActorMsg deviceToDeviceActorMsg) { - TenantId tenantId = deviceToDeviceActorMsg.getTenantId(); - ActorRef tenantActor = getOrCreateTenantActor(tenantId); - if (deviceToDeviceActorMsg.getPayload().getMsgType().requiresRulesProcessing()) { -// tenantActor.tell(new RuleChainDeviceMsg(deviceToDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self()); - } else { - tenantActor.tell(deviceToDeviceActorMsg, context().self()); - } - } - private ActorRef getOrCreateTenantActor(TenantId tenantId) { return tenantActors.computeIfAbsent(tenantId, k -> context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId)) .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString())); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index 99d004574e..7fabe493dd 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -26,12 +26,12 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; public class DeviceActor extends ContextAwareActor { @@ -50,8 +50,8 @@ public class DeviceActor extends ContextAwareActor { case CLUSTER_EVENT_MSG: processor.processClusterEventMsg((ClusterEventMsg) msg); break; - case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG: - processor.process(context(), (DeviceToDeviceActorMsg) msg); + case TRANSPORT_TO_DEVICE_ACTOR_MSG: + processor.process(context(), (TransportToDeviceActorMsgWrapper) msg); break; case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg); diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index a2ea0487cf..1313f60be5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -61,7 +61,6 @@ import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg; import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg; import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg; -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.session.FromDeviceMsg; @@ -71,9 +70,11 @@ import org.thingsboard.server.common.msg.session.ToDeviceMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg; import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg; +import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg; +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; import javax.annotation.Nullable; import java.util.ArrayList; @@ -92,6 +93,8 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.thingsboard.server.gen.transport.TransportProtos.*; + /** * @author Andrew Shvayka */ @@ -99,12 +102,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso private final TenantId tenantId; private final DeviceId deviceId; - private final Map sessions; - private final Map attributeSubscriptions; - private final Map rpcSubscriptions; + private final Map sessions; + private final Map attributeSubscriptions; + private final Map rpcSubscriptions; private final Map toDeviceRpcPendingMap; private final Map toServerRpcPendingMap; - private final Map pendingMsgs; private final Gson gson = new Gson(); private final JsonParser jsonParser = new JsonParser(); @@ -123,7 +125,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso this.rpcSubscriptions = new HashMap<>(); this.toDeviceRpcPendingMap = new HashMap<>(); this.toServerRpcPendingMap = new HashMap<>(); - this.pendingMsgs = new HashMap<>(); initAttributes(); } @@ -154,11 +155,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso boolean sent = rpcSubscriptions.size() > 0; Set syncSessionSet = new HashSet<>(); rpcSubscriptions.entrySet().forEach(sub -> { - ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey()); - sendMsgToSessionActor(response, sub.getValue().getServer()); - if (SessionType.SYNC == sub.getValue().getType()) { - syncSessionSet.add(sub.getKey()); - } +// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey()); +// sendMsgToSessionActor(response, sub.getValue().getServer()); +// if (SessionType.SYNC == sub.getValue().getType()) { +// syncSessionSet.add(sub.getKey()); +// } }); syncSessionSet.forEach(rpcSubscriptions::remove); @@ -191,15 +192,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - void processQueueTimeout(ActorContext context, DeviceActorQueueTimeoutMsg msg) { - PendingSessionMsgData data = pendingMsgs.remove(msg.getId()); - if (data != null) { - logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId()); - ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT); - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress()); - } - } - void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) { PendingSessionMsgData data = pendingMsgs.remove(msg.getId()); if (data != null && data.isReplyOnQueueAck()) { @@ -252,31 +244,37 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso }; } - void process(ActorContext context, DeviceToDeviceActorMsg msg) { - processSubscriptionCommands(context, msg); - processRpcResponses(context, msg); - processSessionStateMsgs(msg); + void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) { + TransportToDeviceActorMsg msg = wrapper.getMsg(); +// processSubscriptionCommands(context, msg); +// processRpcResponses(context, msg); + if (msg.hasSessionEvent()) { + processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent()); + } - SessionMsgType sessionMsgType = msg.getPayload().getMsgType(); - if (sessionMsgType.requiresRulesProcessing()) { - switch (sessionMsgType) { - case GET_ATTRIBUTES_REQUEST: - handleGetAttributesRequest(msg); - break; - case POST_ATTRIBUTES_REQUEST: - handlePostAttributesRequest(context, msg); - reportActivity(); - break; - case POST_TELEMETRY_REQUEST: - handlePostTelemetryRequest(context, msg); - reportActivity(); - break; - case TO_SERVER_RPC_REQUEST: - handleClientSideRPCRequest(context, msg); - reportActivity(); - break; - } + if (msg.hasPostAttributes()) { + handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes()); + reportActivity(); } + if (msg.hasPostTelemetry()) { + handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry()); + reportActivity(); + } + if (msg.hasGetAttributes()) { + handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes()); + } +// SessionMsgType sessionMsgType = msg.getPayload().getMsgType(); +// if (sessionMsgType.requiresRulesProcessing()) { +// switch (sessionMsgType) { +// case GET_ATTRIBUTES_REQUEST: +// handleGetAttributesRequest(msg); +// break; +// case TO_SERVER_RPC_REQUEST: +// handleClientSideRPCRequest(context, msg); +// reportActivity(); +// break; +// } +// } } private void reportActivity() { @@ -291,6 +289,39 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso systemContext.getDeviceStateService().onDeviceDisconnect(deviceId); } + private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) { + ListenableFuture> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList())); + ListenableFuture> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList())); + + Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback>>() { + @Override + public void onSuccess(@Nullable List> result) { + systemContext.getRuleEngineTransportService().process(); + BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(), + request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1))); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress()); + } + + @Override + public void onFailure(Throwable t) { + if (t instanceof Exception) { + ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t); + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress()); + } else { + logger.error("[{}] Failed to process attributes request", deviceId, t); + } + } + }); + } + + private Optional> toOptionalSet(List strings) { + if (strings == null || strings.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(new HashSet<>(strings)); + } + } + private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) { GetAttributesRequest request = (GetAttributesRequest) src.getPayload(); ListenableFuture> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames()); @@ -328,43 +359,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - private void handlePostAttributesRequest(ActorContext context, DeviceToDeviceActorMsg src) { - AttributesUpdateRequest request = (AttributesUpdateRequest) src.getPayload(); - - JsonObject json = new JsonObject(); - for (AttributeKvEntry kv : request.getAttributes()) { - kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); - kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); - kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); - kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); - } - - TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(), TbMsgDataType.JSON, gson.toJson(json), null, null, 0L); - PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), - SessionMsgType.POST_ATTRIBUTES_REQUEST, request.getRequestId(), true, 1); - pushToRuleEngineWithTimeout(context, tbMsg, msgData); + private void handlePostAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, PostAttributeMsg postAttributes) { + JsonObject json = getJsonObject(postAttributes.getKvList()); + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(), + TbMsgDataType.JSON, gson.toJson(json), null, null, 0L); + pushToRuleEngine(context, tbMsg); } - private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) { - TelemetryUploadRequest request = (TelemetryUploadRequest) src.getPayload(); - - Map> tsData = request.getData(); - - PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(), - SessionMsgType.POST_TELEMETRY_REQUEST, request.getRequestId(), true, tsData.size()); - - for (Map.Entry> entry : tsData.entrySet()) { - JsonObject json = new JsonObject(); - for (KvEntry kv : entry.getValue()) { - kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); - kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); - kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); - kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v)); - } + private void handlePostTelemetryRequest(ActorContext context, SessionInfoProto sessionInfo, PostTelemetryMsg postTelemetry) { + for (TsKvListProto tsKv : postTelemetry.getTsKvListList()) { + JsonObject json = getJsonObject(tsKv.getKvList()); TbMsgMetaData metaData = defaultMetaData.copy(); - metaData.putValue("ts", entry.getKey() + ""); + metaData.putValue("ts", tsKv.getTs() + ""); TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L); - pushToRuleEngineWithTimeout(context, tbMsg, msgData); + pushToRuleEngine(context, tbMsg); } } @@ -401,16 +409,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, PendingSessionMsgData pendingMsgData) { - SessionMsgType sessionMsgType = pendingMsgData.getSessionMsgType(); - int requestId = pendingMsgData.getRequestId(); - if (systemContext.isQueuePersistenceEnabled()) { - pendingMsgs.put(tbMsg.getId(), pendingMsgData); - scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout()); - } else { - ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId()); - sendMsgToSessionActor(response, pendingMsgData.getServerAddress()); - } + private void pushToRuleEngine(ActorContext context, TbMsg tbMsg) { context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self()); } @@ -497,13 +496,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } } - private void processSessionStateMsgs(DeviceToDeviceActorMsg msg) { - SessionId sessionId = msg.getSessionId(); - FromDeviceMsg inMsg = msg.getPayload(); - if (inMsg instanceof SessionOpenMsg) { + private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + if (msg.getEvent() == SessionEvent.OPEN) { logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) { - SessionId sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); + UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null); if (sessionIdToRemove != null) { closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove)); } @@ -512,6 +510,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso if (sessions.size() == 1) { reportSessionOpen(); } + } + FromDeviceMsg inMsg = msg.getPayload(); + if (inMsg instanceof SessionOpenMsg) { + logger.debug("[{}] Processing new session [{}]", deviceId, sessionId); } else if (inMsg instanceof SessionCloseMsg) { logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId); sessions.remove(sessionId); @@ -540,8 +542,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso rpcSubscriptions.clear(); } - private void closeSession(SessionId sessionId, SessionInfo sessionInfo) { - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), sessionId), sessionInfo.getServer()); + private void closeSession(UUID sessionId, SessionInfo sessionInfo) { + DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder() + .setSessionIdMSB(sessionId.getMostSignificantBits()) + .setSessionIdLSB(sessionId.getLeastSignificantBits()) + .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build(); + systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg); } void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) { @@ -552,4 +558,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso this.defaultMetaData.putValue("deviceType", deviceType); } + private JsonObject getJsonObject(List tsKv) { + JsonObject json = new JsonObject(); + for (KeyValueProto kv : tsKv) { + switch (kv.getType()) { + case BOOLEAN_V: + json.addProperty(kv.getKey(), kv.getBoolV()); + break; + case LONG_V: + json.addProperty(kv.getKey(), kv.getLongV()); + break; + case DOUBLE_V: + json.addProperty(kv.getKey(), kv.getDoubleV()); + break; + case STRING_V: + json.addProperty(kv.getKey(), kv.getStringV()); + break; + } + } + return json; + } } diff --git a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java index 23ad966049..dfa07cf065 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/PendingSessionMsgData.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress; import org.thingsboard.server.common.msg.session.SessionMsgType; import java.util.Optional; +import java.util.UUID; /** * Created by ashvayka on 17.04.18. @@ -30,7 +31,7 @@ import java.util.Optional; @AllArgsConstructor public final class PendingSessionMsgData { - private final SessionId sessionId; + private final UUID sessionId; private final Optional serverAddress; private final SessionMsgType sessionMsgType; private final int requestId; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java index 04c457c92e..9faaade208 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/SessionInfo.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,10 +16,7 @@ package org.thingsboard.server.actors.device; import lombok.Data; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.session.SessionType; - -import java.util.Optional; +import org.thingsboard.server.gen.transport.TransportProtos.SessionType; /** * @author Andrew Shvayka @@ -27,5 +24,6 @@ import java.util.Optional; @Data public class SessionInfo { private final SessionType type; - private final Optional server; + private final String nodeId; + } diff --git a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java deleted file mode 100644 index a8f14fedf5..0000000000 --- a/application/src/main/java/org/thingsboard/server/actors/session/ASyncMsgProcessor.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.actors.session; - -import akka.actor.ActorContext; -import akka.event.LoggingAdapter; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.shared.SessionTimeoutMsg; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.core.AttributesSubscribeMsg; -import org.thingsboard.server.common.msg.core.ResponseMsg; -import org.thingsboard.server.common.msg.core.RpcSubscribeMsg; -import org.thingsboard.server.common.msg.core.SessionCloseMsg; -import org.thingsboard.server.common.msg.core.SessionOpenMsg; -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; -import org.thingsboard.server.common.msg.session.BasicSessionActorToAdaptorMsg; -import org.thingsboard.server.common.msg.session.FromDeviceMsg; -import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; -import org.thingsboard.server.common.msg.session.SessionMsgType; -import org.thingsboard.server.common.msg.session.SessionType; -import org.thingsboard.server.common.msg.session.ToDeviceMsg; -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.session.ex.SessionException; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor { - - private boolean firstMsg = true; - private Map pendingMap = new HashMap<>(); - private Optional currentTargetServer; - private boolean subscribedToAttributeUpdates; - private boolean subscribedToRpcCommands; - - public ASyncMsgProcessor(ActorSystemContext ctx, LoggingAdapter logger, SessionId sessionId) { - super(ctx, logger, sessionId); - } - - @Override - protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) { - updateSessionCtx(msg, SessionType.ASYNC); - DeviceToDeviceActorMsg pendingMsg = toDeviceMsg(msg); - FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload(); - if (firstMsg) { - if (fromDeviceMsg.getMsgType() != SessionMsgType.SESSION_OPEN) { - toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m)); - } - firstMsg = false; - } - switch (fromDeviceMsg.getMsgType()) { - case POST_TELEMETRY_REQUEST: - case POST_ATTRIBUTES_REQUEST: - FromDeviceRequestMsg requestMsg = (FromDeviceRequestMsg) fromDeviceMsg; - if (requestMsg.getRequestId() >= 0) { - logger.debug("[{}] Pending request {} registered", requestMsg.getRequestId(), requestMsg.getMsgType()); - //TODO: handle duplicates. - pendingMap.put(requestMsg.getRequestId(), pendingMsg); - } - break; - case SUBSCRIBE_ATTRIBUTES_REQUEST: - subscribedToAttributeUpdates = true; - break; - case UNSUBSCRIBE_ATTRIBUTES_REQUEST: - subscribedToAttributeUpdates = false; - break; - case SUBSCRIBE_RPC_COMMANDS_REQUEST: - subscribedToRpcCommands = true; - break; - case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: - subscribedToRpcCommands = false; - break; - default: - break; - } - currentTargetServer = forwardToAppActor(ctx, pendingMsg); - } - - @Override - public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) { - try { - if (msg.getSessionMsgType() != SessionMsgType.SESSION_CLOSE) { - switch (msg.getSessionMsgType()) { - case STATUS_CODE_RESPONSE: - case GET_ATTRIBUTES_RESPONSE: - ResponseMsg responseMsg = (ResponseMsg) msg; - if (responseMsg.getRequestId() >= 0) { - logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg); - pendingMap.remove(responseMsg.getRequestId()); - } - break; - default: - break; - } - sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg)); - } else { - sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId())); - } - } catch (SessionException e) { - logger.warning("Failed to push session response msg", e); - } - } - - @Override - public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) { - // TODO Auto-generated method stub - } - - @Override - protected void cleanupSession(ActorContext ctx) { - toDeviceMsg(new SessionCloseMsg()).ifPresent(m -> forwardToAppActor(ctx, m)); - } - - @Override - public void processClusterEvent(ActorContext context, ClusterEventMsg msg) { - if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) { - Optional newTargetServer = systemContext.getRoutingService().resolveById(getDeviceId()); - if (!newTargetServer.equals(currentTargetServer)) { - firstMsg = true; - currentTargetServer = newTargetServer; - pendingMap.values().forEach(v -> { - forwardToAppActor(context, v, currentTargetServer); - if (currentTargetServer.isPresent()) { - logger.debug("[{}] Forwarded msg to new server: {}", sessionId, currentTargetServer.get()); - } else { - logger.debug("[{}] Forwarded msg to local server.", sessionId); - } - }); - if (subscribedToAttributeUpdates) { - toDeviceMsg(new AttributesSubscribeMsg()).ifPresent(m -> forwardToAppActor(context, m, currentTargetServer)); - logger.debug("[{}] Forwarded attributes subscription.", sessionId); - } - if (subscribedToRpcCommands) { - toDeviceMsg(new RpcSubscribeMsg()).ifPresent(m -> forwardToAppActor(context, m, currentTargetServer)); - logger.debug("[{}] Forwarded rpc commands subscription.", sessionId); - } - } - } - } -} diff --git a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java deleted file mode 100644 index 469cda9c5f..0000000000 --- a/application/src/main/java/org/thingsboard/server/actors/session/AbstractSessionActorMsgProcessor.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.actors.session; - -import akka.actor.ActorContext; -import akka.actor.ActorRef; -import akka.event.LoggingAdapter; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; -import org.thingsboard.server.actors.shared.SessionTimeoutMsg; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.cluster.SendToClusterMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.device.BasicDeviceToDeviceActorMsg; -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.FromDeviceMsg; -import org.thingsboard.server.common.msg.session.SessionContext; -import org.thingsboard.server.common.msg.session.SessionCtrlMsg; -import org.thingsboard.server.common.msg.session.SessionType; -import org.thingsboard.server.common.msg.session.ToDeviceMsg; -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; - -import java.util.Optional; - -abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgProcessor { - - protected final SessionId sessionId; - protected SessionContext sessionCtx; - protected DeviceToDeviceActorMsg deviceToDeviceActorMsgPrototype; - - protected AbstractSessionActorMsgProcessor(ActorSystemContext ctx, LoggingAdapter logger, SessionId sessionId) { - super(ctx, logger); - this.sessionId = sessionId; - } - - protected abstract void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg); - - protected abstract void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg); - - protected abstract void processToDeviceMsg(ActorContext context, ToDeviceMsg msg); - - public abstract void processClusterEvent(ActorContext context, ClusterEventMsg msg); - - protected void processSessionCtrlMsg(ActorContext ctx, SessionCtrlMsg msg) { - if (msg instanceof SessionCloseMsg) { - cleanupSession(ctx); - terminateSession(ctx, sessionId); - } - } - - protected void cleanupSession(ActorContext ctx) { - } - - protected void updateSessionCtx(TransportToDeviceSessionActorMsg msg, SessionType type) { - sessionCtx = msg.getSessionMsg().getSessionContext(); - deviceToDeviceActorMsgPrototype = new BasicDeviceToDeviceActorMsg(msg, type); - } - - protected DeviceToDeviceActorMsg toDeviceMsg(TransportToDeviceSessionActorMsg msg) { - AdaptorToSessionActorMsg adaptorMsg = msg.getSessionMsg(); - return new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, adaptorMsg.getMsg()); - } - - protected Optional toDeviceMsg(FromDeviceMsg msg) { - if (deviceToDeviceActorMsgPrototype != null) { - return Optional.of(new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, msg)); - } else { - return Optional.empty(); - } - } - - protected Optional forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward) { - Optional address = systemContext.getRoutingService().resolveById(toForward.getDeviceId()); - forwardToAppActor(ctx, toForward, address); - return address; - } - - protected Optional forwardToAppActorIfAddressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional oldAddress) { - - Optional newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId()); - if (!newAddress.equals(oldAddress)) { - getAppActor().tell(new SendToClusterMsg(toForward.getDeviceId(), toForward - .toOtherAddress(systemContext.getRoutingService().getCurrentServer())), ctx.self()); - } - return newAddress; - } - - protected void forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional address) { - if (address.isPresent()) { - systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(address.get(), - toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer()))); - } else { - getAppActor().tell(toForward, ctx.self()); - } - } - - public static void terminateSession(ActorContext ctx, SessionId sessionId) { - ctx.parent().tell(new SessionTerminationMsg(sessionId), ActorRef.noSender()); - ctx.stop(ctx.self()); - } - - public DeviceId getDeviceId() { - return deviceToDeviceActorMsgPrototype.getDeviceId(); - } -} diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java deleted file mode 100644 index 05926c1825..0000000000 --- a/application/src/main/java/org/thingsboard/server/actors/session/SessionActor.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.actors.session; - -import akka.actor.OneForOneStrategy; -import akka.actor.SupervisorStrategy; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.service.ContextAwareActor; -import org.thingsboard.server.actors.service.ContextBasedCreator; -import org.thingsboard.server.actors.shared.SessionTimeoutMsg; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.session.SessionCtrlMsg; -import org.thingsboard.server.common.msg.session.SessionMsg; -import org.thingsboard.server.common.msg.session.SessionType; -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; -import scala.concurrent.duration.Duration; - -public class SessionActor extends ContextAwareActor { - - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this); - - private final SessionId sessionId; - private AbstractSessionActorMsgProcessor processor; - - private SessionActor(ActorSystemContext systemContext, SessionId sessionId) { - super(systemContext); - this.sessionId = sessionId; - } - - @Override - public SupervisorStrategy supervisorStrategy() { - return new OneForOneStrategy(-1, Duration.Inf(), - throwable -> { - logger.error(throwable, "Unknown session error"); - if (throwable instanceof Error) { - return OneForOneStrategy.escalate(); - } else { - return OneForOneStrategy.resume(); - } - }); - } - - @Override - protected boolean process(TbActorMsg msg) { - switch (msg.getMsgType()) { - case TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG: - processTransportToSessionMsg((TransportToDeviceSessionActorMsg) msg); - break; - case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG: - processActorsToSessionMsg((ActorSystemToDeviceSessionActorMsg) msg); - break; - case SESSION_TIMEOUT_MSG: - processTimeoutMsg((SessionTimeoutMsg) msg); - break; - case SESSION_CTRL_MSG: - processSessionCloseMsg((SessionCtrlMsg) msg); - break; - case CLUSTER_EVENT_MSG: - processClusterEvent((ClusterEventMsg) msg); - break; - default: return false; - } - return true; - } - - private void processClusterEvent(ClusterEventMsg msg) { - processor.processClusterEvent(context(), msg); - } - - private void processTransportToSessionMsg(TransportToDeviceSessionActorMsg msg) { - initProcessor(msg); - processor.processToDeviceActorMsg(context(), msg); - } - - private void processActorsToSessionMsg(ActorSystemToDeviceSessionActorMsg msg) { - processor.processToDeviceMsg(context(), msg.getMsg()); - } - - private void processTimeoutMsg(SessionTimeoutMsg msg) { - if (processor != null) { - processor.processTimeoutMsg(context(), msg); - } else { - logger.warning("[{}] Can't process timeout msg: {} without processor", sessionId, msg); - } - } - - private void processSessionCloseMsg(SessionCtrlMsg msg) { - if (processor != null) { - processor.processSessionCtrlMsg(context(), msg); - } else if (msg instanceof SessionCloseMsg) { - AbstractSessionActorMsgProcessor.terminateSession(context(), sessionId); - } else { - logger.warning("[{}] Can't process session ctrl msg: {} without processor", sessionId, msg); - } - } - - private void initProcessor(TransportToDeviceSessionActorMsg msg) { - if (processor == null) { - SessionMsg sessionMsg = (SessionMsg) msg.getSessionMsg(); - if (sessionMsg.getSessionContext().getSessionType() == SessionType.SYNC) { - processor = new SyncMsgProcessor(systemContext, logger, sessionId); - } else { - processor = new ASyncMsgProcessor(systemContext, logger, sessionId); - } - } - } - - public static class ActorCreator extends ContextBasedCreator { - private static final long serialVersionUID = 1L; - - private final SessionId sessionId; - - public ActorCreator(ActorSystemContext context, SessionId sessionId) { - super(context); - this.sessionId = sessionId; - } - - @Override - public SessionActor create() throws Exception { - return new SessionActor(context, sessionId); - } - } - -} diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java deleted file mode 100644 index 9039d302e3..0000000000 --- a/application/src/main/java/org/thingsboard/server/actors/session/SessionManagerActor.java +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.actors.session; - -import akka.actor.ActorInitializationException; -import akka.actor.ActorRef; -import akka.actor.InvalidActorNameException; -import akka.actor.LocalActorRef; -import akka.actor.OneForOneStrategy; -import akka.actor.Props; -import akka.actor.SupervisorStrategy; -import akka.actor.Terminated; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.japi.Function; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.service.ContextAwareActor; -import org.thingsboard.server.actors.service.ContextBasedCreator; -import org.thingsboard.server.actors.service.DefaultActorService; -import org.thingsboard.server.actors.shared.SessionTimeoutMsg; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.msg.aware.SessionAwareMsg; -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.core.SessionCloseMsg; -import org.thingsboard.server.common.msg.session.SessionCtrlMsg; -import scala.concurrent.duration.Duration; - -import java.util.HashMap; -import java.util.Map; - -public class SessionManagerActor extends ContextAwareActor { - - private static final int INITIAL_SESSION_MAP_SIZE = 1024; - - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - - private final Map sessionActors; - - SessionManagerActor(ActorSystemContext systemContext) { - super(systemContext); - this.sessionActors = new HashMap<>(INITIAL_SESSION_MAP_SIZE); - } - - @Override - public SupervisorStrategy supervisorStrategy() { - return strategy; - } - - @Override - protected boolean process(TbActorMsg msg) { - //TODO Move everything here, to work with TbActorMsg - return false; - } - - @Override - public void onReceive(Object msg) throws Exception { - if (msg instanceof SessionCtrlMsg) { - onSessionCtrlMsg((SessionCtrlMsg) msg); - } else if (msg instanceof SessionAwareMsg) { - forwardToSessionActor((SessionAwareMsg) msg); - } else if (msg instanceof SessionTerminationMsg) { - onSessionTermination((SessionTerminationMsg) msg); - } else if (msg instanceof Terminated) { - onTermination((Terminated) msg); - } else if (msg instanceof SessionTimeoutMsg) { - onSessionTimeout((SessionTimeoutMsg) msg); - } else if (msg instanceof ClusterEventMsg) { - broadcast(msg); - } - } - - private void broadcast(Object msg) { - sessionActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender())); - } - - private void onSessionTimeout(SessionTimeoutMsg msg) { - String sessionIdStr = msg.getSessionId().toUidStr(); - ActorRef sessionActor = sessionActors.get(sessionIdStr); - if (sessionActor != null) { - sessionActor.tell(msg, ActorRef.noSender()); - } - } - - private void onSessionCtrlMsg(SessionCtrlMsg msg) { - String sessionIdStr = msg.getSessionId().toUidStr(); - ActorRef sessionActor = sessionActors.get(sessionIdStr); - if (sessionActor != null) { - sessionActor.tell(msg, ActorRef.noSender()); - } - } - - private void onSessionTermination(SessionTerminationMsg msg) { - String sessionIdStr = msg.getId().toUidStr(); - ActorRef sessionActor = sessionActors.remove(sessionIdStr); - if (sessionActor != null) { - log.debug("[{}] Removed session actor.", sessionIdStr); - //TODO: onSubscriptionUpdate device actor about session close; - } else { - log.debug("[{}] Session actor was already removed.", sessionIdStr); - } - } - - private void forwardToSessionActor(SessionAwareMsg msg) { - if (msg instanceof ActorSystemToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) { - String sessionIdStr = msg.getSessionId().toUidStr(); - ActorRef sessionActor = sessionActors.get(sessionIdStr); - if (sessionActor != null) { - sessionActor.tell(msg, ActorRef.noSender()); - } else { - log.debug("[{}] Session actor was already removed.", sessionIdStr); - } - } else { - try { - getOrCreateSessionActor(msg.getSessionId()).tell(msg, self()); - } catch (InvalidActorNameException e) { - log.info("Invalid msg : {}", msg); - } - } - } - - private ActorRef getOrCreateSessionActor(SessionId sessionId) { - String sessionIdStr = sessionId.toUidStr(); - ActorRef sessionActor = sessionActors.get(sessionIdStr); - if (sessionActor == null) { - log.debug("[{}] Creating session actor.", sessionIdStr); - sessionActor = context().actorOf( - Props.create(new SessionActor.ActorCreator(systemContext, sessionId)).withDispatcher(DefaultActorService.SESSION_DISPATCHER_NAME), - sessionIdStr); - sessionActors.put(sessionIdStr, sessionActor); - log.debug("[{}] Created session actor.", sessionIdStr); - } - return sessionActor; - } - - private void onTermination(Terminated message) { - ActorRef terminated = message.actor(); - if (terminated instanceof LocalActorRef) { - log.info("Removed actor: {}.", terminated); - //TODO: cleanup session actors map - } else { - throw new IllegalStateException("Remote actors are not supported!"); - } - } - - public static class ActorCreator extends ContextBasedCreator { - private static final long serialVersionUID = 1L; - - public ActorCreator(ActorSystemContext context) { - super(context); - } - - @Override - public SessionManagerActor create() throws Exception { - return new SessionManagerActor(context); - } - } - - private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function() { - @Override - public SupervisorStrategy.Directive apply(Throwable t) { - logger.error(t, "Unknown failure"); - return SupervisorStrategy.stop(); - } - }); -} diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java b/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java deleted file mode 100644 index cf8df1385f..0000000000 --- a/application/src/main/java/org/thingsboard/server/actors/session/SyncMsgProcessor.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.actors.session; - -import akka.actor.ActorContext; -import akka.event.LoggingAdapter; -import org.thingsboard.server.actors.ActorSystemContext; -import org.thingsboard.server.actors.shared.SessionTimeoutMsg; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg; -import org.thingsboard.server.common.msg.session.BasicSessionActorToAdaptorMsg; -import org.thingsboard.server.common.msg.session.SessionContext; -import org.thingsboard.server.common.msg.session.SessionType; -import org.thingsboard.server.common.msg.session.ToDeviceMsg; -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; -import org.thingsboard.server.common.msg.session.ex.SessionException; - -import java.util.Optional; - -class SyncMsgProcessor extends AbstractSessionActorMsgProcessor { - private DeviceToDeviceActorMsg pendingMsg; - private Optional currentTargetServer; - private boolean pendingResponse; - - public SyncMsgProcessor(ActorSystemContext ctx, LoggingAdapter logger, SessionId sessionId) { - super(ctx, logger, sessionId); - } - - @Override - protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) { - updateSessionCtx(msg, SessionType.SYNC); - pendingMsg = toDeviceMsg(msg); - pendingResponse = true; - currentTargetServer = forwardToAppActor(ctx, pendingMsg); - scheduleMsgWithDelay(ctx, new SessionTimeoutMsg(sessionId), getTimeout(systemContext, msg.getSessionMsg().getSessionContext()), ctx.parent()); - } - - public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) { - if (pendingResponse) { - try { - sessionCtx.onMsg(SessionCloseMsg.onTimeout(sessionId)); - } catch (SessionException e) { - logger.warning("Failed to push session close msg", e); - } - terminateSession(context, this.sessionId); - } - } - - public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) { - try { - sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg)); - pendingResponse = false; - } catch (SessionException e) { - logger.warning("Failed to push session response msg", e); - } - terminateSession(context, this.sessionId); - } - - @Override - public void processClusterEvent(ActorContext context, ClusterEventMsg msg) { - if (pendingResponse) { - Optional newTargetServer = forwardToAppActorIfAddressChanged(context, pendingMsg, currentTargetServer); - if (logger.isDebugEnabled()) { - if (!newTargetServer.equals(currentTargetServer)) { - if (newTargetServer.isPresent()) { - logger.debug("[{}] Forwarded msg to new server: {}", sessionId, newTargetServer.get()); - } else { - logger.debug("[{}] Forwarded msg to local server.", sessionId); - } - } - } - currentTargetServer = newTargetServer; - } - } - - private long getTimeout(ActorSystemContext ctx, SessionContext sessionCtx) { - return sessionCtx.getTimeout() > 0 ? sessionCtx.getTimeout() : ctx.getSyncSessionTimeout(); - } -} diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 460b64c772..347483a185 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -87,7 +87,7 @@ public class TenantActor extends RuleChainManagerActor { case DEVICE_ACTOR_TO_RULE_ENGINE_MSG: onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg); break; - case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG: + case TRANSPORT_TO_DEVICE_ACTOR_MSG: case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG: case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG: diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java new file mode 100644 index 0000000000..6b827921cd --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/RemoteRuleEngineTransportService.java @@ -0,0 +1,187 @@ +package org.thingsboard.server.service.transport; + +import akka.actor.ActorRef; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; +import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.actors.service.ActorService; +import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; +import org.thingsboard.server.kafka.TBKafkaProducerTemplate; +import org.thingsboard.server.kafka.TbKafkaSettings; +import org.thingsboard.server.service.cluster.discovery.DiscoveryService; +import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; +import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; +import org.thingsboard.server.service.encoding.DataDecodingEncodingService; +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +/** + * Created by ashvayka on 09.10.18. + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true") +public class RemoteRuleEngineTransportService implements RuleEngineTransportService { + + private static final ObjectMapper mapper = new ObjectMapper(); + + @Value("${transport.remote.rule_engine.topic}") + private String ruleEngineTopic; + @Value("${transport.remote.notifications.topic}") + private String notificationsTopic; + @Value("${transport.remote.rule_engine.poll_interval}") + private int pollDuration; + @Value("${transport.remote.rule_engine.auto_commit_interval}") + private int autoCommitInterval; + + @Autowired + private TbKafkaSettings kafkaSettings; + + @Autowired + private DiscoveryService discoveryService; + + @Autowired + private ActorSystemContext actorContext; + + @Autowired + private ActorService actorService; + + //TODO: completely replace this routing with the Kafka routing by partition ids. + @Autowired + private ClusterRoutingService routingService; + @Autowired + private ClusterRpcService rpcService; + @Autowired + private DataDecodingEncodingService encodingService; + + private TBKafkaConsumerTemplate ruleEngineConsumer; + private TBKafkaProducerTemplate notificationsProducer; + + private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(); + + private volatile boolean stopped = false; + + @PostConstruct + public void init() { + TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder notificationsProducerBuilder = TBKafkaProducerTemplate.builder(); + notificationsProducerBuilder.settings(kafkaSettings); + notificationsProducerBuilder.defaultTopic(notificationsTopic); + notificationsProducerBuilder.encoder(new ToTransportMsgEncoder()); + + notificationsProducer = notificationsProducerBuilder.build(); + notificationsProducer.init(); + + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder ruleEngineConsumerBuilder = TBKafkaConsumerTemplate.builder(); + ruleEngineConsumerBuilder.settings(kafkaSettings); + ruleEngineConsumerBuilder.topic(ruleEngineTopic); + ruleEngineConsumerBuilder.clientId(discoveryService.getNodeId()); + ruleEngineConsumerBuilder.groupId("tb-node"); + ruleEngineConsumerBuilder.autoCommit(true); + ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval); + ruleEngineConsumerBuilder.decoder(new ToRuleEngineMsgDecoder()); + + ruleEngineConsumer = ruleEngineConsumerBuilder.build(); + ruleEngineConsumer.subscribe(); + + mainConsumerExecutor.execute(() -> { + while (!stopped) { + try { + ConsumerRecords records = ruleEngineConsumer.poll(Duration.ofMillis(pollDuration)); + records.forEach(record -> { + try { + ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record); + if (toRuleEngineMsg.hasToDeviceActorMsg()) { + forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg()); + } + } catch (Throwable e) { + log.warn("Failed to process the notification.", e); + } + }); + } catch (Exception e) { + log.warn("Failed to obtain messages from queue.", e); + try { + Thread.sleep(pollDuration); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new requests", e2); + } + } + } + }); + } + + @Override + public void process(String nodeId, DeviceActorToTransportMsg msg) { + process(nodeId, msg, null, null); + } + + @Override + public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer onFailure) { + notificationsProducer.send(notificationsTopic + "." + nodeId, + ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build() + , new QueueCallbackAdaptor(onSuccess, onFailure)); + } + + private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) { + TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg); + Optional address = routingService.resolveById(wrapper.getDeviceId()); + if (address.isPresent()) { + rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper)); + } else { + actorContext.getAppActor().tell(wrapper, ActorRef.noSender()); + } + } + + @PreDestroy + public void destroy() { + stopped = true; + if (ruleEngineConsumer != null) { + ruleEngineConsumer.unsubscribe(); + } + if (mainConsumerExecutor != null) { + mainConsumerExecutor.shutdownNow(); + } + } + + private static class QueueCallbackAdaptor implements Callback { + private final Runnable onSuccess; + private final Consumer onFailure; + + QueueCallbackAdaptor(Runnable onSuccess, Consumer onFailure) { + this.onSuccess = onSuccess; + this.onFailure = onFailure; + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception == null) { + if (onSuccess != null) { + onSuccess.run(); + } + } else { + if (onFailure != null) { + onFailure.accept(exception); + } + } + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java new file mode 100644 index 0000000000..e6b3dd3369 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/RuleEngineTransportService.java @@ -0,0 +1,32 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.transport; + +import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.; + +import java.util.function.Consumer; + +/** + * Created by ashvayka on 05.10.18. + */ +public interface RuleEngineTransportService { + + void process(String nodeId, DeviceActorToTransportMsg msg); + + void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer onFailure); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java b/application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java new file mode 100644 index 0000000000..16954eea10 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.transport; + +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.kafka.TbKafkaDecoder; + +import java.io.IOException; + +/** + * Created by ashvayka on 05.10.18. + */ +public class ToRuleEngineMsgDecoder implements TbKafkaDecoder { + @Override + public ToRuleEngineMsg decode(byte[] data) throws IOException { + return ToRuleEngineMsg.parseFrom(data); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java new file mode 100644 index 0000000000..5f3c026249 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/ToTransportMsgEncoder.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.transport; + +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.kafka.TbKafkaEncoder; + +/** + * Created by ashvayka on 05.10.18. + */ +public class ToTransportMsgEncoder implements TbKafkaEncoder { + @Override + public byte[] encode(ToTransportMsg value) { + return value.toByteArray(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java new file mode 100644 index 0000000000..3ee185893a --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/transport/msg/TransportToDeviceActorMsgWrapper.java @@ -0,0 +1,36 @@ +package org.thingsboard.server.service.transport.msg; + +import lombok.Data; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; +import org.thingsboard.server.common.msg.aware.TenantAwareMsg; +import org.thingsboard.server.common.msg.cluster.ServerAddress; +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; + +import java.io.Serializable; +import java.util.UUID; + +/** + * Created by ashvayka on 09.10.18. + */ +@Data +public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAwareMsg, TenantAwareMsg, Serializable { + + private final TenantId tenantId; + private final DeviceId deviceId; + private final TransportToDeviceActorMsg msg; + + public TransportToDeviceActorMsgWrapper(TransportToDeviceActorMsg msg) { + this.msg = msg; + this.tenantId = new TenantId(new UUID(msg.getSessionInfo().getTenantIdMSB(), msg.getSessionInfo().getTenantIdLSB())); + this.deviceId = new DeviceId(new UUID(msg.getSessionInfo().getDeviceIdMSB(), msg.getSessionInfo().getDeviceIdLSB())); + } + + @Override + public MsgType getMsgType() { + return MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG; + } +} diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c58f3a1a38..923976c302 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -462,4 +462,8 @@ transport: request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}" request_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:1000}" rule_engine: - topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}" \ No newline at end of file + topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}" + poll_interval: "${TB_RULE_ENGINE_POLL_INTERVAL_MS:25}" + auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}" + notifications: + topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}" \ No newline at end of file diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index 60e5469fa5..dfd8f98ec3 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -77,11 +77,6 @@ public enum MsgType { */ RULE_TO_SELF_MSG, - /** - * Message that is sent by Session Actor to Device Actor. Represents messages from the device itself. - */ - DEVICE_SESSION_TO_DEVICE_ACTOR_MSG, - DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG, DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG, @@ -111,6 +106,12 @@ public enum MsgType { TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG, SESSION_TIMEOUT_MSG, SESSION_CTRL_MSG, - STATS_PERSIST_TICK_MSG; + STATS_PERSIST_TICK_MSG, + + + /** + * Message that is sent by TransportRuleEngineService to Device Actor. Represents messages from the device itself. + */ + TRANSPORT_TO_DEVICE_ACTOR_MSG; } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java deleted file mode 100644 index 92c5105673..0000000000 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/device/BasicDeviceToDeviceActorMsg.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.common.msg.device; - -import lombok.ToString; -import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.msg.MsgType; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.session.FromDeviceMsg; -import org.thingsboard.server.common.msg.session.SessionType; -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg; - -import java.util.Optional; - -@ToString -public class BasicDeviceToDeviceActorMsg implements DeviceToDeviceActorMsg { - - private static final long serialVersionUID = -1866795134993115408L; - - private final TenantId tenantId; - private final CustomerId customerId; - private final DeviceId deviceId; - private final SessionId sessionId; - private final SessionType sessionType; - private final ServerAddress serverAddress; - private final FromDeviceMsg msg; - - public BasicDeviceToDeviceActorMsg(DeviceToDeviceActorMsg other, FromDeviceMsg msg) { - this(null, other.getTenantId(), other.getCustomerId(), other.getDeviceId(), other.getSessionId(), other.getSessionType(), msg); - } - - public BasicDeviceToDeviceActorMsg(TransportToDeviceSessionActorMsg msg, SessionType sessionType) { - this(null, msg.getTenantId(), msg.getCustomerId(), msg.getDeviceId(), msg.getSessionId(), sessionType, msg.getSessionMsg().getMsg()); - } - - private BasicDeviceToDeviceActorMsg(ServerAddress serverAddress, TenantId tenantId, CustomerId customerId, DeviceId deviceId, SessionId sessionId, SessionType sessionType, - FromDeviceMsg msg) { - super(); - this.serverAddress = serverAddress; - this.tenantId = tenantId; - this.customerId = customerId; - this.deviceId = deviceId; - this.sessionId = sessionId; - this.sessionType = sessionType; - this.msg = msg; - } - - @Override - public DeviceId getDeviceId() { - return deviceId; - } - - @Override - public CustomerId getCustomerId() { - return customerId; - } - - public TenantId getTenantId() { - return tenantId; - } - - @Override - public SessionId getSessionId() { - return sessionId; - } - - @Override - public SessionType getSessionType() { - return sessionType; - } - - @Override - public Optional getServerAddress() { - return Optional.ofNullable(serverAddress); - } - - @Override - public FromDeviceMsg getPayload() { - return msg; - } - - @Override - public DeviceToDeviceActorMsg toOtherAddress(ServerAddress otherAddress) { - return new BasicDeviceToDeviceActorMsg(otherAddress, tenantId, customerId, deviceId, sessionId, sessionType, msg); - } - - @Override - public MsgType getMsgType() { - return MsgType.DEVICE_SESSION_TO_DEVICE_ACTOR_MSG; - } -} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/device/DeviceToDeviceActorMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/device/DeviceToDeviceActorMsg.java deleted file mode 100644 index fe8d9f2c21..0000000000 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/device/DeviceToDeviceActorMsg.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright © 2016-2018 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.common.msg.device; - -import java.io.Serializable; -import java.util.Optional; - -import org.thingsboard.server.common.data.id.SessionId; -import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.msg.aware.CustomerAwareMsg; -import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; -import org.thingsboard.server.common.msg.aware.TenantAwareMsg; -import org.thingsboard.server.common.msg.cluster.ServerAddress; -import org.thingsboard.server.common.msg.session.FromDeviceMsg; -import org.thingsboard.server.common.msg.session.SessionType; - -public interface DeviceToDeviceActorMsg extends TbActorMsg, DeviceAwareMsg, CustomerAwareMsg, TenantAwareMsg, Serializable { - - SessionId getSessionId(); - - SessionType getSessionType(); - - Optional getServerAddress(); - - FromDeviceMsg getPayload(); - - DeviceToDeviceActorMsg toOtherAddress(ServerAddress otherAddress); -} diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java index 3adb1c368d..90f15a4892 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaConsumerTemplate.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -49,7 +49,9 @@ public class TBKafkaConsumerTemplate { boolean autoCommit, int autoCommitIntervalMs) { Properties props = settings.toProps(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + if (groupId != null) { + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + } props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java index 1e109d2877..8f4c095ed7 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TBKafkaProducerTemplate.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -89,28 +90,28 @@ public class TBKafkaProducerTemplate { } } - public Future send(String key, T value) { - return send(key, value, null, null); + public Future send(String key, T value, Callback callback) { + return send(key, value, null, callback); } - public Future send(String key, T value, Iterable

headers) { - return send(key, value, null, headers); + public Future send(String key, T value, Iterable
headers, Callback callback) { + return send(key, value, null, headers, callback); } - public Future send(String key, T value, Long timestamp, Iterable
headers) { - return send(this.defaultTopic, key, value, timestamp, headers); + public Future send(String key, T value, Long timestamp, Iterable
headers, Callback callback) { + return send(this.defaultTopic, key, value, timestamp, headers, callback); } - public Future send(String topic, String key, T value, Iterable
headers) { - return send(topic, key, value, null, headers); + public Future send(String topic, String key, T value, Iterable
headers, Callback callback) { + return send(topic, key, value, null, headers, callback); } - public Future send(String topic, String key, T value, Long timestamp, Iterable
headers) { + public Future send(String topic, String key, T value, Long timestamp, Iterable
headers, Callback callback) { byte[] data = encoder.encode(value); ProducerRecord record; Integer partition = getPartition(topic, key, value, data); record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers); - return producer.send(record); + return producer.send(record, callback); } private Integer getPartition(String topic, String key, T value, byte[] data) { diff --git a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java index 0dbf45db14..6f9e8a791b 100644 --- a/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/kafka/TbKafkaResponseTemplate.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -77,55 +77,64 @@ public class TbKafkaResponseTemplate extends AbstractTbKafkaT requestTemplate.subscribe(); loopExecutor.submit(() -> { while (!stopped) { - while (pendingRequestCount.get() >= maxPendingRequests) { + try { + while (pendingRequestCount.get() >= maxPendingRequests) { + try { + Thread.sleep(pollInterval); + } catch (InterruptedException e) { + log.trace("Failed to wait until the server has capacity to handle new requests", e); + } + } + ConsumerRecords requests = requestTemplate.poll(Duration.ofMillis(pollInterval)); + requests.forEach(request -> { + Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); + if (requestIdHeader == null) { + log.error("[{}] Missing requestId in header", request); + return; + } + UUID requestId = bytesToUuid(requestIdHeader.value()); + if (requestId == null) { + log.error("[{}] Missing requestId in header and body", request); + return; + } + Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER); + if (responseTopicHeader == null) { + log.error("[{}] Missing response topic in header", request); + return; + } + String responseTopic = bytesToString(responseTopicHeader.value()); + try { + pendingRequestCount.getAndIncrement(); + Request decodedRequest = requestTemplate.decode(request); + AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest), + response -> { + pendingRequestCount.decrementAndGet(); + reply(requestId, responseTopic, response); + }, + e -> { + pendingRequestCount.decrementAndGet(); + if (e.getCause() != null && e.getCause() instanceof TimeoutException) { + log.warn("[{}] Timedout to process the request: {}", requestId, request, e); + } else { + log.trace("[{}] Failed to process the request: {}", requestId, request, e); + } + }, + requestTimeout, + timeoutExecutor, + callbackExecutor); + } catch (Throwable e) { + pendingRequestCount.decrementAndGet(); + log.warn("[{}] Failed to process the request: {}", requestId, request, e); + } + }); + } catch (Throwable e) { + log.warn("Failed to obtain messages from queue.", e); try { Thread.sleep(pollInterval); - } catch (InterruptedException e) { - log.trace("Failed to wait until the server has capacity to handle new requests", e); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new requests", e2); } } - ConsumerRecords requests = requestTemplate.poll(Duration.ofMillis(pollInterval)); - requests.forEach(request -> { - Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER); - if (requestIdHeader == null) { - log.error("[{}] Missing requestId in header", request); - return; - } - UUID requestId = bytesToUuid(requestIdHeader.value()); - if (requestId == null) { - log.error("[{}] Missing requestId in header and body", request); - return; - } - Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER); - if (responseTopicHeader == null) { - log.error("[{}] Missing response topic in header", request); - return; - } - String responseTopic = bytesToString(responseTopicHeader.value()); - try { - pendingRequestCount.getAndIncrement(); - Request decodedRequest = requestTemplate.decode(request); - AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest), - response -> { - pendingRequestCount.decrementAndGet(); - reply(requestId, responseTopic, response); - }, - e -> { - pendingRequestCount.decrementAndGet(); - if (e.getCause() != null && e.getCause() instanceof TimeoutException) { - log.warn("[{}] Timedout to process the request: {}", requestId, request, e); - } else { - log.trace("[{}] Failed to process the request: {}", requestId, request, e); - } - }, - requestTimeout, - timeoutExecutor, - callbackExecutor); - } catch (Throwable e) { - pendingRequestCount.decrementAndGet(); - log.warn("[{}] Failed to process the request: {}", requestId, request, e); - } - }); } }); } @@ -141,7 +150,7 @@ public class TbKafkaResponseTemplate extends AbstractTbKafkaT } private void reply(UUID requestId, String topic, Response response) { - responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId)))); + responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))), null); } } diff --git a/application/src/main/java/org/thingsboard/server/actors/session/SessionTerminationMsg.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java similarity index 59% rename from application/src/main/java/org/thingsboard/server/actors/session/SessionTerminationMsg.java rename to common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java index 38cf878fa7..d0e6d18de2 100644 --- a/application/src/main/java/org/thingsboard/server/actors/session/SessionTerminationMsg.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/SessionMsgListener.java @@ -1,26 +1,26 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.actors.session; +package org.thingsboard.server.common.transport; -import org.thingsboard.server.actors.shared.ActorTerminationMsg; -import org.thingsboard.server.common.data.id.SessionId; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; -public class SessionTerminationMsg extends ActorTerminationMsg { +/** + * Created by ashvayka on 04.10.18. + */ +public interface SessionMsgListener { - public SessionTerminationMsg(SessionId id) { - super(id); - } + void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse); } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java index 84d34e12cb..dc2e3062f2 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,23 +15,33 @@ */ package org.thingsboard.server.common.transport; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; /** * Created by ashvayka on 04.10.18. */ public interface TransportService { - void process(TransportProtos.ValidateDeviceTokenRequestMsg msg, - TransportServiceCallback callback); + void process(ValidateDeviceTokenRequestMsg msg, + TransportServiceCallback callback); - void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg, - TransportServiceCallback callback); + void process(ValidateDeviceX509CertRequestMsg msg, + TransportServiceCallback callback); - void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback callback); - void process(TransportProtos.PostTelemetryMsg msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback); - void process(TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback); + void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback callback); + + void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener); + + void deregisterSession(SessionInfoProto sessionInfo); } diff --git a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java index 7b2c05e400..30689748c5 100644 --- a/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java +++ b/common/transport/src/main/java/org/thingsboard/server/common/transport/session/DeviceAwareSessionContext.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,17 +16,12 @@ package org.thingsboard.server.common.transport.session; import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.security.DeviceCredentialsFilter; -import org.thingsboard.server.common.data.security.DeviceTokenCredentials; +import lombok.Getter; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.msg.session.SessionContext; -import org.thingsboard.server.common.transport.SessionMsgProcessor; -import org.thingsboard.server.common.transport.auth.DeviceAuthResult; -import org.thingsboard.server.common.transport.auth.DeviceAuthService; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; -import java.util.Optional; +import java.util.UUID; /** * @author Andrew Shvayka @@ -34,7 +29,9 @@ import java.util.Optional; @Data public abstract class DeviceAwareSessionContext implements SessionContext { - private volatile TransportProtos.DeviceInfoProto deviceInfo; + @Getter + private volatile DeviceId deviceId; + private volatile DeviceInfoProto deviceInfo; public long getDeviceIdMSB() { return deviceInfo.getDeviceIdMSB(); @@ -44,6 +41,16 @@ public abstract class DeviceAwareSessionContext implements SessionContext { return deviceInfo.getDeviceIdLSB(); } + public DeviceId getDeviceId() { + return deviceId; + } + + public void setDeviceInfo(DeviceInfoProto deviceInfo) { + this.deviceInfo = deviceInfo; + this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB())); + } + + public boolean isConnected() { return deviceInfo != null; } diff --git a/common/transport/src/main/proto/transport.proto b/common/transport/src/main/proto/transport.proto index e78f87351e..f3b3ae87a4 100644 --- a/common/transport/src/main/proto/transport.proto +++ b/common/transport/src/main/proto/transport.proto @@ -23,9 +23,12 @@ option java_outer_classname = "TransportProtos"; * Data Structures; */ message SessionInfoProto { - string nodeId = 1; - int64 sessionIdMSB = 2; - int64 sessionIdLSB = 3; + int64 sessionIdMSB = 1; + int64 sessionIdLSB = 2; + int64 tenantIdMSB = 3; + int64 tenantIdLSB = 4; + int64 deviceIdMSB = 5; + int64 deviceIdLSB = 6; } enum SessionEvent { @@ -33,12 +36,25 @@ enum SessionEvent { CLOSED = 1; } +enum SessionType { + SYNC = 0; + ASYNC = 1; +} + +enum KeyValueType { + BOOLEAN_V = 0; + LONG_V = 1; + DOUBLE_V = 2; + STRING_V = 3; +} + message KeyValueProto { string key = 1; - bool bool_v = 2; - int64 long_v = 3; - double double_v = 4; - string string_v = 5; + KeyValueType type = 2; + bool bool_v = 3; + int64 long_v = 4; + double double_v = 5; + string string_v = 6; } message TsKvListProto { @@ -60,33 +76,28 @@ message DeviceInfoProto { * Messages that use Data Structures; */ message SessionEventMsg { - SessionInfoProto sessionInfo = 1; - int64 deviceIdMSB = 2; - int64 deviceIdLSB = 3; - SessionEvent event = 4; + string nodeId = 1; + SessionType sessionType = 2; + SessionEvent event = 3; } message PostTelemetryMsg { - SessionInfoProto sessionInfo = 1; - repeated TsKvListProto tsKvList = 2; + repeated TsKvListProto tsKvList = 1; } message PostAttributeMsg { - SessionInfoProto sessionInfo = 1; - repeated TsKvListProto tsKvList = 2; + repeated KeyValueProto kv = 1; } message GetAttributeRequestMsg { - SessionInfoProto sessionInfo = 1; - repeated string clientAttributeNames = 2; - repeated string sharedAttributeNames = 3; + repeated string clientAttributeNames = 1; + repeated string sharedAttributeNames = 2; } message GetAttributeResponseMsg { - SessionInfoProto sessionInfo = 1; - repeated TsKvListProto clientAttributeList = 2; - repeated TsKvListProto sharedAttributeList = 3; - repeated string deletedAttributeKeys = 4; + repeated TsKvListProto clientAttributeList = 1; + repeated TsKvListProto sharedAttributeList = 2; + repeated string deletedAttributeKeys = 3; } message ValidateDeviceTokenRequestMsg { @@ -101,11 +112,34 @@ message ValidateDeviceCredentialsResponseMsg { DeviceInfoProto deviceInfo = 1; } +message SessionCloseNotificationProto { + string message = 1; +} + +message TransportToDeviceActorMsg { + SessionInfoProto sessionInfo = 1; + SessionEventMsg sessionEvent = 2; + PostTelemetryMsg postTelemetry = 3; + PostAttributeMsg postAttributes = 4; + GetAttributeRequestMsg getAttributes = 5; +} + +message DeviceActorToTransportMsg { + int64 sessionIdMSB = 1; + int64 sessionIdLSB = 2; + SessionCloseNotificationProto sessionCloseNotification = 3; + GetAttributeResponseMsg getAttributesResponse = 4; +} + /** * Main messages; */ -message TransportToRuleEngineMsg { +message ToRuleEngineMsg { + TransportToDeviceActorMsg toDeviceActorMsg = 1; +} +message ToTransportMsg { + DeviceActorToTransportMsg toDeviceSessionMsg = 1; } message TransportApiRequestMsg { diff --git a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 23bc4cc3bd..d8f1d10b11 100644 --- a/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/transport/mqtt-common/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,32 +26,25 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttPubAckMessage; -import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckPayload; -import io.netty.handler.codec.mqtt.MqttSubscribeMessage; -import io.netty.handler.codec.mqtt.MqttTopicSubscription; -import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import lombok.extern.slf4j.Slf4j; import org.springframework.util.StringUtils; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.security.DeviceTokenCredentials; -import org.thingsboard.server.common.data.security.DeviceX509Credentials; -import org.thingsboard.server.common.msg.core.SessionOpenMsg; -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg; -import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg; -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; -import org.thingsboard.server.common.transport.adaptor.AdaptorException; import org.thingsboard.server.common.transport.quota.QuotaService; import org.thingsboard.server.dao.EncryptionUtil; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx; import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx; @@ -61,49 +54,19 @@ import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; import java.io.IOException; import java.net.InetSocketAddress; -import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.thingsboard.server.gen.transport.TransportProtos.*; - import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED; import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK; -import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP; import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK; import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK; -import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; -import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; -import static org.thingsboard.server.common.msg.session.SessionMsgType.GET_ATTRIBUTES_REQUEST; -import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_ATTRIBUTES_REQUEST; -import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_TELEMETRY_REQUEST; -import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST; -import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST; -import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_DEVICE_RPC_RESPONSE; -import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_SERVER_RPC_REQUEST; -import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST; -import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST; -import static org.thingsboard.server.transport.mqtt.MqttTopics.BASE_GATEWAY_API_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX; -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_TELEMETRY_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_CONNECT_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_DISCONNECT_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_RPC_TOPIC; -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_TELEMETRY_TOPIC; /** * @author Andrew Shvayka @@ -389,7 +352,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else { ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); - transportService.process(getSessionEventMsg(SessionEvent.OPEN), null); + transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null); checkGatewaySession(); } } @@ -418,7 +381,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } else { ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED)); deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); - transportService.process(getSessionEventMsg(SessionEvent.OPEN), null); + transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null); checkGatewaySession(); } } @@ -452,7 +415,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement private void processDisconnect(ChannelHandlerContext ctx) { ctx.close(); if (deviceSessionCtx.isConnected()) { - transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null); + transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null); if (gatewaySessionCtx != null) { gatewaySessionCtx.onGatewayDisconnect(); } @@ -534,7 +497,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @Override public void operationComplete(Future future) throws Exception { if (deviceSessionCtx.isConnected()) { - transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null); + transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null); } } } diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java index 4d945de394..76c1c1b8c1 100644 --- a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/MqttTransportService.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2018 The Thingsboard Authors - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,21 +15,41 @@ */ package org.thingsboard.server.mqtt.service; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.kafka.AsyncCallbackTemplate; import org.thingsboard.server.kafka.TBKafkaConsumerTemplate; import org.thingsboard.server.kafka.TBKafkaProducerTemplate; import org.thingsboard.server.kafka.TbKafkaRequestTemplate; -import org.thingsboard.server.gen.transport.TransportProtos.*; import org.thingsboard.server.kafka.TbKafkaSettings; import org.thingsboard.server.transport.mqtt.MqttTransportContext; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -37,10 +57,17 @@ import java.util.concurrent.Executors; * Created by ashvayka on 05.10.18. */ @Service +@Slf4j public class MqttTransportService implements TransportService { @Value("${kafka.rule_engine.topic}") private String ruleEngineTopic; + @Value("${kafka.notifications.topic}") + private String notificationsTopic; + @Value("${kafka.notifications.poll_interval}") + private int notificationsPollDuration; + @Value("${kafka.notifications.auto_commit_interval}") + private int notificationsAutoCommitInterval; @Value("${kafka.transport_api.requests_topic}") private String transportApiRequestsTopic; @Value("${kafka.transport_api.responses_topic}") @@ -54,6 +81,8 @@ public class MqttTransportService implements TransportService { @Value("${kafka.transport_api.response_auto_commit_interval}") private int autoCommitInterval; + private ConcurrentMap sessions = new ConcurrentHashMap<>(); + @Autowired private TbKafkaSettings kafkaSettings; //We use this to get the node id. We should replace this with a component that provides the node id. @@ -63,6 +92,12 @@ public class MqttTransportService implements TransportService { private ExecutorService transportCallbackExecutor; private TbKafkaRequestTemplate transportApiTemplate; + private TBKafkaProducerTemplate ruleEngineProducer; + private TBKafkaConsumerTemplate mainConsumer; + + private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(); + + private volatile boolean stopped = false; @PostConstruct public void init() { @@ -77,7 +112,7 @@ public class MqttTransportService implements TransportService { responseBuilder.settings(kafkaSettings); responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId()); responseBuilder.clientId(transportContext.getNodeId()); - responseBuilder.groupId("transport-node"); + responseBuilder.groupId(null); responseBuilder.autoCommit(true); responseBuilder.autoCommitIntervalMs(autoCommitInterval); responseBuilder.decoder(new TransportApiResponseDecoder()); @@ -91,16 +126,79 @@ public class MqttTransportService implements TransportService { builder.pollInterval(responsePollDuration); transportApiTemplate = builder.build(); transportApiTemplate.init(); + + TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder(); + ruleEngineProducerBuilder.settings(kafkaSettings); + ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic); + ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder()); + ruleEngineProducer = ruleEngineProducerBuilder.build(); + ruleEngineProducer.init(); + + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder mainConsumerBuilder = TBKafkaConsumerTemplate.builder(); + mainConsumerBuilder.settings(kafkaSettings); + mainConsumerBuilder.topic(notificationsTopic + "." + transportContext.getNodeId()); + mainConsumerBuilder.clientId(transportContext.getNodeId()); + mainConsumerBuilder.groupId(null); + mainConsumerBuilder.autoCommit(true); + mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval); + mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder()); + mainConsumer = mainConsumerBuilder.build(); + mainConsumer.subscribe(); + + mainConsumerExecutor.execute(() -> { + while (!stopped) { + try { + ConsumerRecords records = mainConsumer.poll(Duration.ofMillis(notificationsPollDuration)); + records.forEach(record -> { + try { + ToTransportMsg toTransportMsg = mainConsumer.decode(record); + if (toTransportMsg.hasToDeviceSessionMsg()) { + TransportProtos.DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg(); + UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); + SessionMsgListener listener = sessions.get(sessionId); + if (listener != null) { + transportCallbackExecutor.submit(() -> { + if (toSessionMsg.hasGetAttributesResponse()) { + listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse()); + } + }); + } else { + //TODO: should we notify the device actor about missed session? + log.debug("[{}] Missing session.", sessionId); + } + + } + } catch (Throwable e) { + log.warn("Failed to process the notification.", e); + } + }); + } catch (Exception e) { + log.warn("Failed to obtain messages from queue.", e); + try { + Thread.sleep(notificationsPollDuration); + } catch (InterruptedException e2) { + log.trace("Failed to wait until the server has capacity to handle new requests", e2); + } + } + } + }); } @PreDestroy public void destroy() { + stopped = true; if (transportApiTemplate != null) { transportApiTemplate.stop(); } if (transportCallbackExecutor != null) { transportCallbackExecutor.shutdownNow(); } + if (mainConsumer != null) { + mainConsumer.unsubscribe(); + } + if (mainConsumerExecutor != null) { + mainConsumerExecutor.shutdownNow(); + } } @Override @@ -118,17 +216,69 @@ public class MqttTransportService implements TransportService { } @Override - public void process(SessionEventMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setSessionEvent(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + @Override + public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setPostTelemetry(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); } @Override - public void process(PostTelemetryMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback callback) { + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg( + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) + .setPostAttributes(msg).build() + ).build(); + send(sessionInfo, toRuleEngineMsg, callback); + } + @Override + public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) { + sessions.putIfAbsent(toId(sessionInfo), listener); + //TODO: monitor sessions periodically: PING REQ/RESP, etc. } @Override - public void process(PostAttributeMsg msg, TransportServiceCallback callback) { + public void deregisterSession(SessionInfoProto sessionInfo) { + sessions.remove(toId(sessionInfo)); + } + + private UUID toId(SessionInfoProto sessionInfo) { + return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); + } + + private String getRoutingKey(SessionInfoProto sessionInfo) { + return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString(); + } + + private static class TransportCallbackAdaptor implements Callback { + private final TransportServiceCallback callback; + + TransportCallbackAdaptor(TransportServiceCallback callback) { + this.callback = callback; + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception == null) { + callback.onSuccess(null); + } else { + callback.onError(exception); + } + } + } + private void send(SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback callback) { + ruleEngineProducer.send(getRoutingKey(sessionInfo), toRuleEngineMsg, new TransportCallbackAdaptor(callback)); } } diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java new file mode 100644 index 0000000000..ee929ceb6f --- /dev/null +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToRuleEngineMsgEncoder.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.mqtt.service; + +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.kafka.TbKafkaEncoder; + +/** + * Created by ashvayka on 05.10.18. + */ +public class ToRuleEngineMsgEncoder implements TbKafkaEncoder { + @Override + public byte[] encode(ToRuleEngineMsg value) { + return value.toByteArray(); + } +} diff --git a/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToTransportMsgResponseDecoder.java b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToTransportMsgResponseDecoder.java new file mode 100644 index 0000000000..b8d42049de --- /dev/null +++ b/transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToTransportMsgResponseDecoder.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2018 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.mqtt.service; + +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.kafka.TbKafkaDecoder; + +import java.io.IOException; + +/** + * Created by ashvayka on 05.10.18. + */ +public class ToTransportMsgResponseDecoder implements TbKafkaDecoder { + @Override + public ToTransportMsg decode(byte[] data) throws IOException { + return ToTransportMsg.parseFrom(data); + } +} diff --git a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml index 707fb4bcc1..735eee03ed 100644 --- a/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt-transport/src/main/resources/tb-mqtt-transport.yml @@ -82,3 +82,7 @@ kafka: response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}" rule_engine: topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}" + notifications: + topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}" + poll_interval: "${TB_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}" + auto_commit_interval: "${TB_TRANSPORT_NOTIFICATIONS_AUTO_COMMIT_INTERVAL_MS:100}"