Browse Source

Improvement

pull/11924/head
Andrii Landiak 2 years ago
parent
commit
1099ca2aa2
  1. 17
      application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java

17
application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java

@ -43,7 +43,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.service.state.DefaultDeviceStateService.LAST_CONNECT_TIME;
@ -60,9 +59,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
private final TenantService tenantService;
private final EdgeService edgeService;
private final AttributesService attributesService;
private final TbKafkaSettings kafkaSettings;
private final TbKafkaTopicConfigs kafkaTopicConfigs;
private final TbKafkaAdmin kafkaAdmin;
@Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
private long ttlSeconds;
@ -75,8 +72,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
this.tenantService = tenantService;
this.edgeService = edgeService;
this.attributesService = attributesService;
this.kafkaSettings = kafkaSettings;
this.kafkaTopicConfigs = kafkaTopicConfigs;
this.kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
}
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.execution_interval_ms})}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
@ -85,7 +81,6 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
return;
}
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
Set<String> topics = kafkaAdmin.getAllTopics();
if (topics == null || topics.isEmpty()) {
return;
@ -103,10 +98,10 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
long currentTimeMillis = System.currentTimeMillis();
long ttlMillis = TimeUnit.SECONDS.toMillis(ttlSeconds);
tenantEdgeMap.forEach((tenantId, edgeIds) -> processTenantCleanUp(kafkaAdmin, tenantId, edgeIds, ttlMillis, currentTimeMillis));
tenantEdgeMap.forEach((tenantId, edgeIds) -> processTenantCleanUp(tenantId, edgeIds, ttlMillis, currentTimeMillis));
}
private void processTenantCleanUp(TbKafkaAdmin kafkaAdmin, TenantId tenantId, List<EdgeId> edgeIds, long ttlMillis, long currentTimeMillis) {
private void processTenantCleanUp(TenantId tenantId, List<EdgeId> edgeIds, long ttlMillis, long currentTimeMillis) {
boolean tenantExists = tenantService.tenantExists(tenantId);
if (tenantExists) {
for (EdgeId edgeId : edgeIds) {
@ -129,8 +124,8 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
log.info("[{}] Removed topic {} for deleted edge {}", tenantId, topic, edgeId);
}
});
} catch (InterruptedException | ExecutionException e) {
log.error("[{}] Failed to delete topic", tenantId);
} catch (Exception e) {
log.error("[{}] Failed to delete topic for edge {}", tenantId, edgeId, e);
}
}
} else {

Loading…
Cancel
Save