|
|
|
@ -55,11 +55,13 @@ import java.io.IOException; |
|
|
|
import java.net.URI; |
|
|
|
import java.security.InvalidParameterException; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Queue; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
import java.util.concurrent.Semaphore; |
|
|
|
import java.util.concurrent.LinkedBlockingQueue; |
|
|
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
|
|
|
|
|
|
import static org.thingsboard.server.service.ws.DefaultWebSocketService.NUMBER_OF_PING_ATTEMPTS; |
|
|
|
|
|
|
|
@ -140,7 +142,10 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke |
|
|
|
if (!checkLimits(session, sessionRef)) { |
|
|
|
return; |
|
|
|
} |
|
|
|
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef)); |
|
|
|
var tenantProfileConfiguration = getTenantProfileConfiguration(sessionRef); |
|
|
|
internalSessionMap.put(internalSessionId, new SessionMetaData(session, sessionRef, |
|
|
|
tenantProfileConfiguration != null && tenantProfileConfiguration.getWsMsgQueueLimitPerSession() > 0 ? |
|
|
|
tenantProfileConfiguration.getWsMsgQueueLimitPerSession() : 500)); |
|
|
|
|
|
|
|
externalSessionMap.put(externalSessionId, internalSessionId); |
|
|
|
processInWebSocketService(sessionRef, SessionEvent.onEstablished()); |
|
|
|
@ -215,21 +220,22 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke |
|
|
|
private final RemoteEndpoint.Async asyncRemote; |
|
|
|
private final WebSocketSessionRef sessionRef; |
|
|
|
|
|
|
|
// TODO: carefully review ( + discuss removal of the msgQueue)
|
|
|
|
private final Semaphore sendingSemaphore = new Semaphore(1); |
|
|
|
private final AtomicBoolean isSending = new AtomicBoolean(false); |
|
|
|
private final Queue<TbWebSocketMsg<?>> msgQueue; |
|
|
|
|
|
|
|
private volatile long lastActivityTime; |
|
|
|
|
|
|
|
SessionMetaData(WebSocketSession session, WebSocketSessionRef sessionRef) { |
|
|
|
SessionMetaData(WebSocketSession session, WebSocketSessionRef sessionRef, int maxMsgQueuePerSession) { |
|
|
|
super(); |
|
|
|
this.session = session; |
|
|
|
Session nativeSession = ((NativeWebSocketSession) session).getNativeSession(Session.class); |
|
|
|
this.asyncRemote = nativeSession.getAsyncRemote(); |
|
|
|
this.sessionRef = sessionRef; |
|
|
|
this.msgQueue = new LinkedBlockingQueue<>(maxMsgQueuePerSession); |
|
|
|
this.lastActivityTime = System.currentTimeMillis(); |
|
|
|
} |
|
|
|
|
|
|
|
void sendPing(long currentTime) { |
|
|
|
synchronized void sendPing(long currentTime) { |
|
|
|
try { |
|
|
|
long timeSinceLastActivity = currentTime - lastActivityTime; |
|
|
|
if (timeSinceLastActivity >= pingTimeout) { |
|
|
|
@ -244,37 +250,40 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void closeSession(CloseStatus reason) { |
|
|
|
private void closeSession(CloseStatus reason) { |
|
|
|
try { |
|
|
|
close(this.sessionRef, reason); |
|
|
|
} catch (IOException ioe) { |
|
|
|
log.trace("[{}] Session transport error", session.getId(), ioe); |
|
|
|
} finally { |
|
|
|
sendingSemaphore.release(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void processPongMessage(long currentTime) { |
|
|
|
synchronized void processPongMessage(long currentTime) { |
|
|
|
lastActivityTime = currentTime; |
|
|
|
} |
|
|
|
|
|
|
|
void sendMsg(String msg) { |
|
|
|
synchronized void sendMsg(String msg) { |
|
|
|
sendMsg(new TbWebSocketTextMsg(msg)); |
|
|
|
} |
|
|
|
|
|
|
|
void sendMsg(TbWebSocketMsg<?> msg) { |
|
|
|
try { |
|
|
|
sendingSemaphore.acquire(); |
|
|
|
synchronized void sendMsg(TbWebSocketMsg<?> msg) { |
|
|
|
if (isSending.compareAndSet(false, true)) { |
|
|
|
sendMsgInternal(msg); |
|
|
|
} catch (InterruptedException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} catch (Exception e) { |
|
|
|
sendingSemaphore.release(); |
|
|
|
throw e; |
|
|
|
} else { |
|
|
|
try { |
|
|
|
msgQueue.add(msg); |
|
|
|
} catch (RuntimeException e) { |
|
|
|
if (log.isTraceEnabled()) { |
|
|
|
log.trace("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId(), e); |
|
|
|
} else { |
|
|
|
log.info("[{}][{}] Session closed due to queue error", sessionRef.getSecurityCtx().getTenantId(), session.getId()); |
|
|
|
} |
|
|
|
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!")); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void sendMsgInternal(TbWebSocketMsg<?> msg) { |
|
|
|
private void sendMsgInternal(TbWebSocketMsg<?> msg) { |
|
|
|
try { |
|
|
|
if (TbWebSocketMsgType.TEXT.equals(msg.getType())) { |
|
|
|
TbWebSocketTextMsg textMsg = (TbWebSocketTextMsg) msg; |
|
|
|
@ -282,6 +291,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke |
|
|
|
} else { |
|
|
|
TbWebSocketPingMsg pingMsg = (TbWebSocketPingMsg) msg; |
|
|
|
this.asyncRemote.sendPing(pingMsg.getMsg()); |
|
|
|
processNextMsg(); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
log.trace("[{}] Failed to send msg", session.getId(), e); |
|
|
|
@ -295,7 +305,16 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke |
|
|
|
log.trace("[{}] Failed to send msg", session.getId(), result.getException()); |
|
|
|
closeSession(CloseStatus.SESSION_NOT_RELIABLE); |
|
|
|
} else { |
|
|
|
sendingSemaphore.release(); |
|
|
|
processNextMsg(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void processNextMsg() { |
|
|
|
TbWebSocketMsg<?> msg = msgQueue.poll(); |
|
|
|
if (msg != null) { |
|
|
|
sendMsgInternal(msg); |
|
|
|
} else { |
|
|
|
isSending.set(false); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -476,4 +495,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke |
|
|
|
.map(TenantProfile::getDefaultProfileConfiguration).orElse(null); |
|
|
|
} |
|
|
|
|
|
|
|
private DefaultTenantProfileConfiguration getTenantProfileConfiguration(WebSocketSessionRef sessionRef) { |
|
|
|
return Optional.ofNullable(tenantProfileCache.get(sessionRef.getSecurityCtx().getTenantId())) |
|
|
|
.map(TenantProfile::getDefaultProfileConfiguration).orElse(null); |
|
|
|
} |
|
|
|
|
|
|
|
} |