diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/session/DefaultLwM2MSessionManager.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/session/DefaultLwM2MSessionManager.java index ee6e4a779b..a9198f9fcd 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/session/DefaultLwM2MSessionManager.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/session/DefaultLwM2MSessionManager.java @@ -19,7 +19,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.common.transport.service.DefaultTransportService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.util.TbLwM2mTransportComponent; import org.thingsboard.server.transport.lwm2m.server.LwM2mSessionMsgListener; @@ -27,6 +26,11 @@ import org.thingsboard.server.transport.lwm2m.server.attributes.LwM2MAttributesS import org.thingsboard.server.transport.lwm2m.server.rpc.LwM2MRpcRequestHandler; import org.thingsboard.server.transport.lwm2m.server.uplink.LwM2mUplinkMsgHandler; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; + @Slf4j @Service @TbLwM2mTransportComponent @@ -52,16 +56,16 @@ public class DefaultLwM2MSessionManager implements LwM2MSessionManager { transportService.registerAsyncSession(sessionInfo, new LwM2mSessionMsgListener(uplinkHandler, attributesService, rpcHandler, sessionInfo, transportService)); TransportProtos.TransportToDeviceActorMsg msg = TransportProtos.TransportToDeviceActorMsg.newBuilder() .setSessionInfo(sessionInfo) - .setSessionEvent(DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN)) - .setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build()) - .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder().setSessionType(TransportProtos.SessionType.ASYNC).build()) + .setSessionEvent(SESSION_EVENT_MSG_OPEN) + .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) + .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) .build(); transportService.process(msg, null); } @Override public void deregister(TransportProtos.SessionInfoProto sessionInfo) { - transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); + transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); transportService.deregisterSession(sessionInfo); } } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index fe39721f27..c99b4e1a45 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -65,7 +65,6 @@ import org.thingsboard.server.common.transport.service.SessionMetaData; import org.thingsboard.server.common.transport.util.SslUtil; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor; @@ -102,6 +101,8 @@ import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; /** * @author Andrew Shvayka @@ -929,7 +930,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement public void doDisconnect() { if (deviceSessionCtx.isConnected()) { log.debug("[{}] Client disconnected!", sessionId); - transportService.process(deviceSessionCtx.getSessionInfo(), DefaultTransportService.getSessionEventMsg(SessionEvent.CLOSED), null); + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); if (gatewaySessionHandler != null) { gatewaySessionHandler.onGatewayDisconnect(); @@ -948,7 +949,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo()); deviceSessionCtx.setDeviceProfile(msg.getDeviceProfile()); deviceSessionCtx.setSessionInfo(SessionInfoCreator.create(msg, context, sessionId)); - transportService.process(deviceSessionCtx.getSessionInfo(), DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback() { + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_OPEN, new TransportServiceCallback() { @Override public void onSuccess(Void msg) { SessionMetaData sessionMetaData = transportService.registerAsyncSession(deviceSessionCtx.getSessionInfo(), MqttTransportHandler.this); diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java index 7b45b5f9d9..cb984327da 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/session/GatewaySessionHandler.java @@ -44,7 +44,6 @@ import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.common.transport.adaptor.ProtoConverter; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; -import org.thingsboard.server.common.transport.service.DefaultTransportService; import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; @@ -68,6 +67,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static org.springframework.util.ConcurrentReferenceHashMap.ReferenceType; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG; +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SUBSCRIBE_TO_RPC_ASYNC_MSG; /** * Created by ashvayka on 19.01.17. @@ -267,11 +270,9 @@ public class GatewaySessionHandler { transportService.registerAsyncSession(deviceSessionInfo, deviceSessionCtx); transportService.process(TransportProtos.TransportToDeviceActorMsg.newBuilder() .setSessionInfo(deviceSessionInfo) - .setSessionEvent(DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.OPEN)) - .setSubscribeToAttributes(TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder() - .setSessionType(TransportProtos.SessionType.ASYNC).build()) - .setSubscribeToRPC(TransportProtos.SubscribeToRPCMsg.newBuilder() - .setSessionType(TransportProtos.SessionType.ASYNC).build()) + .setSessionEvent(SESSION_EVENT_MSG_OPEN) + .setSubscribeToAttributes(SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG) + .setSubscribeToRPC(SUBSCRIBE_TO_RPC_ASYNC_MSG) .build(), null); } futureToSet.set(devices.get(deviceName)); @@ -720,7 +721,7 @@ public class GatewaySessionHandler { private void deregisterSession(String deviceName, GatewayDeviceSessionCtx deviceSessionCtx) { transportService.deregisterSession(deviceSessionCtx.getSessionInfo()); - transportService.process(deviceSessionCtx.getSessionInfo(), DefaultTransportService.getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); + transportService.process(deviceSessionCtx.getSessionInfo(), SESSION_EVENT_MSG_CLOSED, null); log.debug("[{}] Removed device [{}] from the gateway session", sessionId, deviceName); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 57d1da63b4..98ffcd721e 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.gson.JsonObject; import com.google.protobuf.ByteString; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; @@ -124,6 +123,15 @@ import java.util.stream.Collectors; public class DefaultTransportService implements TransportService { public static final String OVERWRITE_ACTIVITY_TIME = "overwriteActivityTime"; + public static final String SESSION_EXPIRED_MESSAGE = "Session has expired due to last activity time!"; + public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_OPEN = getSessionEventMsg(TransportProtos.SessionEvent.OPEN); + public static final TransportProtos.SessionEventMsg SESSION_EVENT_MSG_CLOSED = getSessionEventMsg(TransportProtos.SessionEvent.CLOSED); + public static final TransportProtos.SessionCloseNotificationProto SESSION_CLOSE_NOTIFICATION_PROTO = TransportProtos.SessionCloseNotificationProto.newBuilder() + .setMessage(SESSION_EXPIRED_MESSAGE).build(); + public static final TransportProtos.SubscribeToAttributeUpdatesMsg SUBSCRIBE_TO_ATTRIBUTE_UPDATES_ASYNC_MSG = TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder() + .setSessionType(TransportProtos.SessionType.ASYNC).build(); + public static final TransportProtos.SubscribeToRPCMsg SUBSCRIBE_TO_RPC_ASYNC_MSG = TransportProtos.SubscribeToRPCMsg.newBuilder() + .setSessionType(TransportProtos.SessionType.ASYNC).build(); private final AtomicInteger atomicTs = new AtomicInteger(0); @@ -267,9 +275,7 @@ public class DefaultTransportService implements TransportService { @Override public SessionMetaData registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) { - SessionMetaData newValue = new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener); - SessionMetaData oldValue = sessions.putIfAbsent(toSessionId(sessionInfo), newValue); - return oldValue != null ? oldValue : newValue; + return sessions.computeIfAbsent(toSessionId(sessionInfo), (x) -> new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener)); } @Override @@ -719,12 +725,8 @@ public class DefaultTransportService implements TransportService { } sessions.remove(uuid); sessionsToRemove.add(uuid); - process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); - TransportProtos.SessionCloseNotificationProto sessionCloseNotificationProto = TransportProtos.SessionCloseNotificationProto - .newBuilder() - .setMessage("Session has expired due to last activity time!") - .build(); - sessionMD.getListener().onRemoteSessionCloseCommand(uuid, sessionCloseNotificationProto); + process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); + sessionMD.getListener().onRemoteSessionCloseCommand(uuid, SESSION_CLOSE_NOTIFICATION_PROTO); } } else { if (lastActivityTime > sessionAD.getLastReportedActivityTime()) { @@ -1018,7 +1020,7 @@ public class DefaultTransportService implements TransportService { return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); } - public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) { + private static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) { return TransportProtos.SessionEventMsg.newBuilder() .setSessionType(TransportProtos.SessionType.ASYNC) .setEvent(event).build();