diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java index 6140390da3..f55cbbe1d5 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java @@ -178,15 +178,24 @@ public abstract class AbstractTransportService implements TransportService { @Override public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) { - sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener)); - schedulerExecutor.schedule(() -> { + SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener); + sessions.putIfAbsent(toId(sessionInfo), currentSession); + + ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> { listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); deregisterSession(sessionInfo); }, timeout, TimeUnit.MILLISECONDS); + + currentSession.setScheduledFuture(executorFuture); } @Override public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) { + SessionMetaData currentSession = sessions.get(toId(sessionInfo)); + if (currentSession.hasScheduledFuture()) { + log.debug("Stopping scheduler to avoid resending response if request has been ack."); + currentSession.getScheduledFuture().cancel(false); + } sessions.remove(toId(sessionInfo)); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java index 1217a63c8f..5a02d635d7 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java @@ -1,12 +1,12 @@ /** * Copyright © 2016-2019 The Thingsboard Authors - * + *
* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,6 +19,8 @@ import lombok.Data; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.gen.transport.TransportProtos; +import java.util.concurrent.ScheduledFuture; + /** * Created by ashvayka on 15.10.18. */ @@ -29,19 +31,33 @@ class SessionMetaData { private final TransportProtos.SessionType sessionType; private final SessionMsgListener listener; + private ScheduledFuture scheduledFuture; + private volatile long lastActivityTime; private volatile boolean subscribedToAttributes; private volatile boolean subscribedToRPC; - SessionMetaData(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) { + SessionMetaData( + TransportProtos.SessionInfoProto sessionInfo, + TransportProtos.SessionType sessionType, + SessionMsgListener listener + ) { this.sessionInfo = sessionInfo; this.sessionType = sessionType; this.listener = listener; this.lastActivityTime = System.currentTimeMillis(); + this.scheduledFuture = null; } void updateLastActivityTime() { this.lastActivityTime = System.currentTimeMillis(); } + void setScheduledFuture(ScheduledFuture scheduledFuture) { this.scheduledFuture = scheduledFuture; } + + public ScheduledFuture getScheduledFuture() { + return scheduledFuture; + } + + public boolean hasScheduledFuture() { return null != this.scheduledFuture; } }