Browse Source

Address PR review comments for edge session refactoring

- Deduplicate forEach iteration over both session maps (HashSet)
- Use null-safe getEdgeId() instead of getEdge().getId() in remove()
- Add defensive null checks on getState() in onConfigurationUpdate and destroyAndMarkAsZombieIfFailed
- Add null check on pendingMsgsMap.get() in onDownlinkResponse
- Move scheduleAtFixedRate to @PostConstruct in DefaultZombieSessionCleanupService
- Fix misleading "kafka sessions" log message to "zombie sessions"
- Chain IOException in GrpcServer RuntimeException
- Guard against race condition on removeByEdgeId in onEdgeDisconnect
- Rename shadowed parameter in shutdownExecutorSafely
pull/14618/head
Nikita Mazurenko 3 months ago
parent
commit
dc7b86fa1d
  1. 2
      application/src/main/java/org/thingsboard/server/service/edge/rpc/GrpcServer.java
  2. 12
      application/src/main/java/org/thingsboard/server/service/edge/rpc/service/EdgeGrpcService.java
  3. 7
      application/src/main/java/org/thingsboard/server/service/edge/rpc/session/DefaultZombieSessionCleanupService.java
  4. 2
      application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeGrpcSession.java
  5. 10
      application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeSessionsHolder.java
  6. 25
      application/src/main/java/org/thingsboard/server/service/edge/rpc/session/manager/AbstractEdgeGrpcSessionManager.java

2
application/src/main/java/org/thingsboard/server/service/edge/rpc/GrpcServer.java

@ -85,7 +85,7 @@ public class GrpcServer {
server.start();
} catch (IOException e) {
log.error("Failed to start Edge RPC server!", e);
throw new RuntimeException("Failed to start Edge RPC server!");
throw new RuntimeException("Failed to start Edge RPC server!", e);
}
log.info("Edge RPC server initialized!");
}

12
application/src/main/java/org/thingsboard/server/service/edge/rpc/service/EdgeGrpcService.java

@ -221,8 +221,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
EdgeGrpcSessionManager current = sessions.getByEdgeId(edgeId);
if (current != null && current.getState().getSessionId().equals(sessionId)) {
EdgeGrpcSessionManager toRemove = sessions.removeByEdgeId(edgeId);
toRemove.onEdgeDisconnect();
toRemove.destroyAndMarkAsZombieIfFailed();
if (toRemove != null) {
toRemove.onEdgeDisconnect();
toRemove.destroyAndMarkAsZombieIfFailed();
}
sessions.removeBySessionId(sessionId);
save(tenantId, edgeId, ACTIVITY_STATE, false);
long lastDisconnectTs = System.currentTimeMillis();
@ -382,9 +384,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
}
private void shutdownExecutorSafely(ExecutorService sendDownlinkExecutorService) {
if (sendDownlinkExecutorService != null && !sendDownlinkExecutorService.isShutdown()) {
sendDownlinkExecutorService.shutdown();
private void shutdownExecutorSafely(ExecutorService e) {
if (e != null && !e.isShutdown()) {
e.shutdown();
}
}
}

7
application/src/main/java/org/thingsboard/server/service/edge/rpc/session/DefaultZombieSessionCleanupService.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.edge.rpc.session;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -50,6 +51,10 @@ public class DefaultZombieSessionCleanupService implements ZombieSessionCleanupS
public DefaultZombieSessionCleanupService() {
this.zombieSessions = new ConcurrentLinkedQueue<>();
this.zombieSessionsExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zombie-sessions");
}
@PostConstruct
public void init() {
this.zombieSessionsExecutorService.scheduleAtFixedRate(this::cleanupZombieSessions, 30, 60, TimeUnit.SECONDS);
}
@ -86,7 +91,7 @@ public class DefaultZombieSessionCleanupService implements ZombieSessionCleanupS
}
});
} catch (Exception e) {
log.warn("Failed to cleanup kafka sessions", e);
log.warn("Failed to cleanup zombie sessions", e);
}
}

2
application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeGrpcSession.java

@ -334,7 +334,7 @@ public class EdgeGrpcSession implements EdgeSession {
log.debug("[{}][{}][{}] Msg processing failed! Msg Id: [{}], Error msg: {}", getTenantId(), getEdgeId(), getSessionId(), msg.getDownlinkMsgId(), msg.getErrorMsg());
DownlinkMsg downlinkMsg = state.getPendingMsgsMap().get(msg.getDownlinkMsgId());
// if NOT timeseries or attributes failures - ack failed downlink
if (downlinkMsg.getEntityDataCount() == 0) {
if (downlinkMsg != null && downlinkMsg.getEntityDataCount() == 0) {
state.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
}
}

10
application/src/main/java/org/thingsboard/server/service/edge/rpc/session/EdgeSessionsHolder.java

@ -24,6 +24,8 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.rpc.EdgeSessionState;
import org.thingsboard.server.service.edge.rpc.session.manager.EdgeGrpcSessionManager;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -40,8 +42,10 @@ public class EdgeSessionsHolder {
private final ConcurrentMap<UUID, EdgeGrpcSessionManager> sessionsById = new ConcurrentHashMap<>();
public void forEach(Consumer<EdgeGrpcSessionManager> consumer) {
sessions.values().forEach(consumer);
sessionsById.values().forEach(consumer);
Set<EdgeGrpcSessionManager> unique = new HashSet<>(sessions.values());
unique.addAll(sessionsById.values());
unique.forEach(consumer);
}
public void put(EdgeGrpcSessionManager session) {
@ -73,7 +77,7 @@ public class EdgeSessionsHolder {
return;
}
EdgeSessionState sessionState = session.getState();
removeByEdgeId(sessionState.getEdge().getId()); // todo: react to warnings
removeByEdgeId(sessionState.getEdgeId());
removeBySessionId(sessionState.getSessionId());
}
}

25
application/src/main/java/org/thingsboard/server/service/edge/rpc/session/manager/AbstractEdgeGrpcSessionManager.java

@ -101,11 +101,12 @@ public abstract class AbstractEdgeGrpcSessionManager extends EdgeGrpcSessionDele
@Override
public void onConfigurationUpdate(Edge edge) {
EdgeSessionState state = getState();
CustomerId stateCustomerId = null;
if (state != null) {
stateCustomerId = state.getEdge().getCustomerId();
state.setEdge(edge);
if (state == null) {
log.warn("Session state is null, skipping configuration update for edge [{}]", edge.getId());
return;
}
CustomerId stateCustomerId = state.getEdge().getCustomerId();
state.setEdge(edge);
if (stateCustomerId != null && !stateCustomerId.equals(edge.getCustomerId())) {
// do not send edge configuration message on customer update
// message send by separate flow from assign_to or unassing_from customer
@ -128,13 +129,21 @@ public abstract class AbstractEdgeGrpcSessionManager extends EdgeGrpcSessionDele
try (finalSession) {
if (!destroy()) {
log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.",
state.getTenantId(), state.getEdgeId(), state.getEdge().getName(), state.getSessionId());
if (state != null) {
log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.",
state.getTenantId(), state.getEdgeId(), state.getEdge().getName(), state.getSessionId());
} else {
log.warn("Session destroy failed (state is null). Adding to zombie queue for later cleanup.");
}
zombieSessionCleanupService.add(this);
}
} catch (Exception e) {
log.warn("[{}][{}] Exception during session destroy for edge [{}] with session id [{}]",
state.getTenantId(), state.getEdgeId(), state.getEdge().getName(), state.getSessionId(), e);
if (state != null) {
log.warn("[{}][{}] Exception during session destroy for edge [{}] with session id [{}]",
state.getTenantId(), state.getEdgeId(), state.getEdge().getName(), state.getSessionId(), e);
} else {
log.warn("Exception during session destroy (state is null)", e);
}
}
}
}

Loading…
Cancel
Save