|
|
|
@ -93,14 +93,13 @@ import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAS |
|
|
|
@TbCoreComponent |
|
|
|
public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase implements EdgeRpcService { |
|
|
|
|
|
|
|
private static final int DESTROY_SESSION_MAX_ATTEMPTS = 10; |
|
|
|
|
|
|
|
private final ConcurrentMap<EdgeId, EdgeGrpcSession> sessions = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<EdgeId, Lock> sessionNewEventsLocks = new ConcurrentHashMap<>(); |
|
|
|
private final Map<EdgeId, Boolean> sessionNewEvents = new HashMap<>(); |
|
|
|
private final ConcurrentMap<EdgeId, ScheduledFuture<?>> sessionEdgeEventChecks = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<UUID, Consumer<FromEdgeSyncResponse>> localSyncEdgeRequests = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<EdgeId, Boolean> edgeEventsMigrationProcessed = new ConcurrentHashMap<>(); |
|
|
|
private final List<EdgeGrpcSession> zombieSessions = new ArrayList<>(); |
|
|
|
|
|
|
|
@Value("${edges.rpc.port}") |
|
|
|
private int rpcPort; |
|
|
|
@ -193,7 +192,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i |
|
|
|
this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); |
|
|
|
this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); |
|
|
|
this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); |
|
|
|
this.executorService.scheduleAtFixedRate(this::destroyKafkaSessionIfDisconnectedAndConsumerActive, 60, 60, TimeUnit.SECONDS); |
|
|
|
this.executorService.scheduleAtFixedRate(this::cleanupZombieSessions, 60, 60, TimeUnit.SECONDS); |
|
|
|
log.info("Edge RPC service initialized!"); |
|
|
|
} |
|
|
|
|
|
|
|
@ -518,14 +517,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i |
|
|
|
|
|
|
|
private void destroySession(EdgeGrpcSession session) { |
|
|
|
try (session) { |
|
|
|
for (int i = 0; i < DESTROY_SESSION_MAX_ATTEMPTS; i++) { |
|
|
|
if (session.destroy()) { |
|
|
|
break; |
|
|
|
} else { |
|
|
|
try { |
|
|
|
Thread.sleep(100); |
|
|
|
} catch (InterruptedException ignored) {} |
|
|
|
} |
|
|
|
if (!session.destroy()) { |
|
|
|
log.warn("[{}][{}] Session destroy failed for edge [{}] with session id [{}]. Adding to zombie queue for later cleanup.", |
|
|
|
session.getTenantId(), session.getEdge().getId(), session.getEdge().getName(), session.getSessionId()); |
|
|
|
zombieSessions.add(session); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
@ -634,7 +629,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void destroyKafkaSessionIfDisconnectedAndConsumerActive() { |
|
|
|
private void cleanupZombieSessions() { |
|
|
|
try { |
|
|
|
List<EdgeId> toRemove = new ArrayList<>(); |
|
|
|
for (EdgeGrpcSession session : sessions.values()) { |
|
|
|
@ -655,6 +650,17 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
zombieSessions.removeIf(zombie -> { |
|
|
|
if (zombie.destroy()) { |
|
|
|
log.info("[{}][{}] Successfully cleaned up zombie session [{}] for edge [{}].", |
|
|
|
zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()); |
|
|
|
return true; |
|
|
|
} else { |
|
|
|
log.warn("[{}][{}] Failed to remove zombie session [{}] for edge [{}].", |
|
|
|
zombie.getTenantId(), zombie.getEdge().getId(), zombie.getSessionId(), zombie.getEdge().getName()); |
|
|
|
return false; |
|
|
|
} |
|
|
|
}); |
|
|
|
} catch (Exception e) { |
|
|
|
log.warn("Failed to cleanup kafka sessions", e); |
|
|
|
} |
|
|
|
|