Browse Source

Issue #1686 Introduced ScheduledFuture in SessionMap to be able to cancel

deregisterSession call after successful responses in CoAP to avoid
5.03 responses after ACK.
pull/1724/head
Mike Lohmann 7 years ago
committed by Jan Christoph Bernack
parent
commit
2db79aa556
No known key found for this signature in database GPG Key ID: BD807E5761329D0A
  1. 13
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java
  2. 26
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java

13
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/AbstractTransportService.java

@ -178,15 +178,24 @@ public abstract class AbstractTransportService implements TransportService {
@Override
public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) {
sessions.putIfAbsent(toId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener));
schedulerExecutor.schedule(() -> {
SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener);
sessions.putIfAbsent(toId(sessionInfo), currentSession);
ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> {
listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance());
deregisterSession(sessionInfo);
}, timeout, TimeUnit.MILLISECONDS);
currentSession.setScheduledFuture(executorFuture);
}
@Override
public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) {
SessionMetaData currentSession = sessions.get(toId(sessionInfo));
if (currentSession.hasScheduledFuture()) {
log.debug("Stopping scheduler to avoid resending response if request has been ack.");
currentSession.getScheduledFuture().cancel(false);
}
sessions.remove(toId(sessionInfo));
}

26
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/SessionMetaData.java

@ -1,12 +1,12 @@
/**
* Copyright © 2016-2019 The Thingsboard Authors
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -19,6 +19,8 @@ import lombok.Data;
import org.thingsboard.server.common.transport.SessionMsgListener;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.concurrent.ScheduledFuture;
/**
* Created by ashvayka on 15.10.18.
*/
@ -29,19 +31,33 @@ class SessionMetaData {
private final TransportProtos.SessionType sessionType;
private final SessionMsgListener listener;
private ScheduledFuture scheduledFuture;
private volatile long lastActivityTime;
private volatile boolean subscribedToAttributes;
private volatile boolean subscribedToRPC;
SessionMetaData(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionType sessionType, SessionMsgListener listener) {
SessionMetaData(
TransportProtos.SessionInfoProto sessionInfo,
TransportProtos.SessionType sessionType,
SessionMsgListener listener
) {
this.sessionInfo = sessionInfo;
this.sessionType = sessionType;
this.listener = listener;
this.lastActivityTime = System.currentTimeMillis();
this.scheduledFuture = null;
}
void updateLastActivityTime() {
this.lastActivityTime = System.currentTimeMillis();
}
void setScheduledFuture(ScheduledFuture scheduledFuture) { this.scheduledFuture = scheduledFuture; }
public ScheduledFuture getScheduledFuture() {
return scheduledFuture;
}
public boolean hasScheduledFuture() { return null != this.scheduledFuture; }
}

Loading…
Cancel
Save