|
|
|
@ -50,22 +50,15 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb |
|
|
|
public Queue saveQueue(Queue queue) { |
|
|
|
boolean create = queue.getId() == null; |
|
|
|
Queue oldQueue; |
|
|
|
|
|
|
|
if (create) { |
|
|
|
oldQueue = null; |
|
|
|
} else { |
|
|
|
oldQueue = queueService.findQueueById(queue.getTenantId(), queue.getId()); |
|
|
|
} |
|
|
|
|
|
|
|
//TODO: add checkNotNull
|
|
|
|
Queue savedQueue = queueService.saveQueue(queue); |
|
|
|
|
|
|
|
if (create) { |
|
|
|
onQueueCreated(savedQueue); |
|
|
|
} else { |
|
|
|
onQueueUpdated(savedQueue, oldQueue); |
|
|
|
} |
|
|
|
|
|
|
|
createTopicsIfNeeded(savedQueue, oldQueue); |
|
|
|
tbClusterService.onQueuesUpdate(List.of(savedQueue)); |
|
|
|
return savedQueue; |
|
|
|
} |
|
|
|
|
|
|
|
@ -73,54 +66,14 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb |
|
|
|
public void deleteQueue(TenantId tenantId, QueueId queueId) { |
|
|
|
Queue queue = queueService.findQueueById(tenantId, queueId); |
|
|
|
queueService.deleteQueue(tenantId, queueId); |
|
|
|
onQueueDeleted(queue); |
|
|
|
tbClusterService.onQueuesDelete(List.of(queue)); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void deleteQueueByQueueName(TenantId tenantId, String queueName) { |
|
|
|
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, queueName); |
|
|
|
queueService.deleteQueue(tenantId, queue.getId()); |
|
|
|
onQueueDeleted(queue); |
|
|
|
} |
|
|
|
|
|
|
|
private void onQueueCreated(Queue queue) { |
|
|
|
for (int i = 0; i < queue.getPartitions(); i++) { |
|
|
|
tbQueueAdmin.createTopicIfNotExists( |
|
|
|
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), |
|
|
|
queue.getCustomProperties() |
|
|
|
); |
|
|
|
} |
|
|
|
|
|
|
|
tbClusterService.onQueueChange(queue); |
|
|
|
} |
|
|
|
|
|
|
|
private void onQueueUpdated(Queue queue, Queue oldQueue) { |
|
|
|
int oldPartitions = oldQueue.getPartitions(); |
|
|
|
int currentPartitions = queue.getPartitions(); |
|
|
|
|
|
|
|
if (currentPartitions != oldPartitions) { |
|
|
|
if (currentPartitions > oldPartitions) { |
|
|
|
log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName()); |
|
|
|
for (int i = oldPartitions; i < currentPartitions; i++) { |
|
|
|
tbQueueAdmin.createTopicIfNotExists( |
|
|
|
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), |
|
|
|
queue.getCustomProperties() |
|
|
|
); |
|
|
|
} |
|
|
|
tbClusterService.onQueueChange(queue); |
|
|
|
} else { |
|
|
|
log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName()); |
|
|
|
tbClusterService.onQueueChange(queue); |
|
|
|
// TODO: move all the messages left in old partitions and delete topics
|
|
|
|
} |
|
|
|
} else if (!oldQueue.equals(queue)) { |
|
|
|
tbClusterService.onQueueChange(queue); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void onQueueDeleted(Queue queue) { |
|
|
|
tbClusterService.onQueueDelete(queue); |
|
|
|
// queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId);
|
|
|
|
tbClusterService.onQueuesDelete(List.of(queue)); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -176,26 +129,56 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb |
|
|
|
log.debug("[{}] Handling profile queue config update: creating queues {}, updating {}, deleting {}. Affected tenants: {}", |
|
|
|
newTenantProfile.getUuidId(), toCreate, toUpdate, toRemove, tenantIds); |
|
|
|
} |
|
|
|
tenantIds.forEach(tenantId -> { |
|
|
|
toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))); |
|
|
|
|
|
|
|
toUpdate.forEach(key -> { |
|
|
|
Queue queueToUpdate = new Queue(tenantId, newQueues.get(key)); |
|
|
|
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, key); |
|
|
|
queueToUpdate.setId(foundQueue.getId()); |
|
|
|
queueToUpdate.setCreatedTime(foundQueue.getCreatedTime()); |
|
|
|
List<Queue> updated = new ArrayList<>(); |
|
|
|
List<Queue> deleted = new ArrayList<>(); |
|
|
|
for (TenantId tenantId : tenantIds) { |
|
|
|
for (String name : toCreate) { |
|
|
|
updated.add(new Queue(tenantId, newQueues.get(name))); |
|
|
|
} |
|
|
|
|
|
|
|
if (!queueToUpdate.equals(foundQueue)) { |
|
|
|
saveQueue(queueToUpdate); |
|
|
|
for (String name : toUpdate) { |
|
|
|
Queue queue = new Queue(tenantId, newQueues.get(name)); |
|
|
|
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, name); |
|
|
|
if (foundQueue != null) { |
|
|
|
queue.setId(foundQueue.getId()); |
|
|
|
queue.setCreatedTime(foundQueue.getCreatedTime()); |
|
|
|
} |
|
|
|
}); |
|
|
|
if (!queue.equals(foundQueue)) { |
|
|
|
updated.add(queue); |
|
|
|
createTopicsIfNeeded(queue, foundQueue); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
for (String name : toRemove) { |
|
|
|
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, name); |
|
|
|
deleted.add(queue); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
toRemove.forEach(q -> { |
|
|
|
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q); |
|
|
|
QueueId queueIdForRemove = queue.getId(); |
|
|
|
deleteQueue(tenantId, queueIdForRemove); |
|
|
|
if (!updated.isEmpty()) { |
|
|
|
updated = updated.stream() |
|
|
|
.map(queueService::saveQueue) |
|
|
|
.collect(Collectors.toList()); |
|
|
|
tbClusterService.onQueuesUpdate(updated); |
|
|
|
} |
|
|
|
if (!deleted.isEmpty()) { |
|
|
|
deleted.forEach(queue -> { |
|
|
|
queueService.deleteQueue(queue.getTenantId(), queue.getId()); |
|
|
|
}); |
|
|
|
}); |
|
|
|
tbClusterService.onQueuesDelete(deleted); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void createTopicsIfNeeded(Queue queue, Queue oldQueue) { |
|
|
|
int newPartitions = queue.getPartitions(); |
|
|
|
int oldPartitions = oldQueue != null ? oldQueue.getPartitions() : 0; |
|
|
|
for (int i = oldPartitions; i < newPartitions; i++) { |
|
|
|
tbQueueAdmin.createTopicIfNotExists( |
|
|
|
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), |
|
|
|
queue.getCustomProperties() |
|
|
|
); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|