|
|
|
@ -41,6 +41,8 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends |
|
|
|
|
|
|
|
protected ListeningScheduledExecutorService scheduledExecutor; |
|
|
|
|
|
|
|
abstract protected String getServiceName(); |
|
|
|
|
|
|
|
abstract protected String getSchedulerExecutorName(); |
|
|
|
|
|
|
|
abstract protected void onAddedPartitions(Set<TopicPartitionInfo> addedPartitions); |
|
|
|
@ -53,7 +55,7 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends |
|
|
|
|
|
|
|
protected void init() { |
|
|
|
// Should be always single threaded due to absence of locks.
|
|
|
|
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("device-state-scheduled"))); |
|
|
|
scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getSchedulerExecutorName()))); |
|
|
|
} |
|
|
|
|
|
|
|
protected ServiceType getServiceType() { |
|
|
|
@ -93,18 +95,18 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends |
|
|
|
|
|
|
|
private void initStateFromDB(Set<TopicPartitionInfo> partitions) { |
|
|
|
try { |
|
|
|
log.info("CURRENT PARTITIONS: {}", partitionedEntities.keySet()); |
|
|
|
log.info("NEW PARTITIONS: {}", partitions); |
|
|
|
log.info("[{}] CURRENT PARTITIONS: {}", getServiceName(), partitionedEntities.keySet()); |
|
|
|
log.info("[{}] NEW PARTITIONS: {}", getServiceName(), partitions); |
|
|
|
|
|
|
|
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions); |
|
|
|
addedPartitions.removeAll(partitionedEntities.keySet()); |
|
|
|
|
|
|
|
log.info("ADDED PARTITIONS: {}", addedPartitions); |
|
|
|
log.info("[{}] ADDED PARTITIONS: {}", getServiceName(), addedPartitions); |
|
|
|
|
|
|
|
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(partitionedEntities.keySet()); |
|
|
|
removedPartitions.removeAll(partitions); |
|
|
|
|
|
|
|
log.info("REMOVED PARTITIONS: {}", removedPartitions); |
|
|
|
log.info("[{}] REMOVED PARTITIONS: {}", getServiceName(), removedPartitions); |
|
|
|
|
|
|
|
// We no longer manage current partition of entities;
|
|
|
|
removedPartitions.forEach(partition -> { |
|
|
|
@ -120,12 +122,12 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends |
|
|
|
onAddedPartitions(addedPartitions); |
|
|
|
} |
|
|
|
|
|
|
|
log.info("Managing following partitions:"); |
|
|
|
log.info("[{}] Managing following partitions:", getServiceName()); |
|
|
|
partitionedEntities.forEach((tpi, entities) -> { |
|
|
|
log.info("[{}]: {} entities", tpi.getFullTopicName(), entities.size()); |
|
|
|
log.info("[{}][{}]: {} entities", getServiceName(), tpi.getFullTopicName(), entities.size()); |
|
|
|
}); |
|
|
|
} catch (Throwable t) { |
|
|
|
log.warn("Failed to init entities state from DB", t); |
|
|
|
log.warn("[{}] Failed to init entities state from DB", getServiceName(), t); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|