67 changed files with 2519 additions and 921 deletions
@ -0,0 +1,65 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue; |
|||
|
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
|
|||
import java.util.Arrays; |
|||
import java.util.UUID; |
|||
|
|||
public class ProtoUtils { |
|||
|
|||
private static final EntityType[] entityTypeByProtoNumber; |
|||
|
|||
static { |
|||
int arraySize = Arrays.stream(EntityType.values()).mapToInt(EntityType::getProtoNumber).max().orElse(0); |
|||
entityTypeByProtoNumber = new EntityType[arraySize + 1]; |
|||
Arrays.stream(EntityType.values()).forEach(entityType -> entityTypeByProtoNumber[entityType.getProtoNumber()] = entityType); |
|||
} |
|||
|
|||
public static TransportProtos.ComponentLifecycleMsgProto toProto(ComponentLifecycleMsg msg) { |
|||
return TransportProtos.ComponentLifecycleMsgProto.newBuilder() |
|||
.setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) |
|||
.setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) |
|||
.setEntityType(toProto(msg.getEntityId().getEntityType())) |
|||
.setEntityIdMSB(msg.getEntityId().getId().getMostSignificantBits()) |
|||
.setEntityIdLSB(msg.getEntityId().getId().getLeastSignificantBits()) |
|||
.setEvent(TransportProtos.ComponentLifecycleEvent.forNumber(msg.getEvent().ordinal())) |
|||
.build(); |
|||
} |
|||
|
|||
public static TransportProtos.EntityTypeProto toProto(EntityType entityType) { |
|||
return TransportProtos.EntityTypeProto.forNumber(entityType.getProtoNumber()); |
|||
} |
|||
|
|||
public static ComponentLifecycleMsg fromProto(TransportProtos.ComponentLifecycleMsgProto proto) { |
|||
return new ComponentLifecycleMsg( |
|||
TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), |
|||
EntityIdFactory.getByTypeAndUuid(fromProto(proto.getEntityType()), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())), |
|||
ComponentLifecycleEvent.values()[proto.getEventValue()] |
|||
); |
|||
} |
|||
|
|||
public static EntityType fromProto(TransportProtos.EntityTypeProto entityType) { |
|||
return entityTypeByProtoNumber[entityType.getNumber()]; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue.ruleengine; |
|||
|
|||
import java.io.Serializable; |
|||
|
|||
public enum QueueEvent implements Serializable { |
|||
|
|||
PARTITION_CHANGE, CONFIG_UPDATE, DELETE |
|||
|
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue.ruleengine; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.ToString; |
|||
import org.thingsboard.server.common.data.queue.Queue; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
|
|||
import java.util.Set; |
|||
|
|||
@Getter |
|||
@ToString |
|||
public class TbQueueConsumerManagerTask { |
|||
|
|||
private final QueueEvent event; |
|||
private Queue queue; |
|||
private Set<TopicPartitionInfo> partitions; |
|||
|
|||
public TbQueueConsumerManagerTask(QueueEvent event) { |
|||
this.event = event; |
|||
} |
|||
|
|||
public TbQueueConsumerManagerTask(QueueEvent event, Queue queue) { |
|||
this.event = event; |
|||
this.queue = queue; |
|||
} |
|||
|
|||
public TbQueueConsumerManagerTask(QueueEvent event, Set<TopicPartitionInfo> partitions) { |
|||
this.event = event; |
|||
this.partitions = partitions; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,70 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue.ruleengine; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.Setter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
|
|||
import java.util.Set; |
|||
import java.util.concurrent.Future; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@RequiredArgsConstructor |
|||
@Slf4j |
|||
public class TbQueueConsumerTask { |
|||
|
|||
@Getter |
|||
private final Object key; |
|||
@Getter |
|||
private final TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> consumer; |
|||
|
|||
@Setter |
|||
private Future<?> task; |
|||
|
|||
public void subscribe(Set<TopicPartitionInfo> partitions) { |
|||
log.trace("[{}] Subscribing to partitions: {}", key, partitions); |
|||
consumer.subscribe(partitions); |
|||
} |
|||
|
|||
public void initiateStop() { |
|||
log.debug("[{}] Initiating stop", key); |
|||
consumer.stop(); |
|||
} |
|||
|
|||
public void awaitCompletion() { |
|||
log.trace("[{}] Awaiting finish", key); |
|||
if (isRunning()) { |
|||
try { |
|||
task.get(30, TimeUnit.SECONDS); |
|||
log.trace("[{}] Awaited finish", key); |
|||
} catch (Exception e) { |
|||
log.warn("[{}] Failed to await for consumer to stop", key, e); |
|||
} |
|||
task = null; |
|||
} |
|||
} |
|||
|
|||
public boolean isRunning() { |
|||
return task != null; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,89 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue.ruleengine; |
|||
|
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.common.util.ThingsBoardExecutors; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.common.stats.StatsFactory; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
|||
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; |
|||
import org.thingsboard.server.queue.util.TbRuleEngineComponent; |
|||
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; |
|||
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory; |
|||
import org.thingsboard.server.service.stats.RuleEngineStatisticsService; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.concurrent.ScheduledExecutorService; |
|||
|
|||
@Component |
|||
@TbRuleEngineComponent |
|||
@Slf4j |
|||
@Data |
|||
public class TbRuleEngineConsumerContext { |
|||
|
|||
@Value("${queue.rule-engine.poll-interval}") |
|||
private long pollDuration; |
|||
@Value("${queue.rule-engine.pack-processing-timeout}") |
|||
private long packProcessingTimeout; |
|||
@Value("${queue.rule-engine.stats.enabled:true}") |
|||
private boolean statsEnabled; |
|||
@Value("${queue.rule-engine.prometheus-stats.enabled:false}") |
|||
private boolean prometheusStatsEnabled; |
|||
@Value("${queue.rule-engine.topic-deletion-delay:15}") |
|||
private int topicDeletionDelayInSec; |
|||
@Value("${queue.rule-engine.management-thread-pool-size:12}") |
|||
private int mgmtThreadPoolSize; |
|||
|
|||
private final ActorSystemContext actorContext; |
|||
private final StatsFactory statsFactory; |
|||
private final TbRuleEngineSubmitStrategyFactory submitStrategyFactory; |
|||
private final TbRuleEngineProcessingStrategyFactory processingStrategyFactory; |
|||
private final TbRuleEngineQueueFactory queueFactory; |
|||
private final RuleEngineStatisticsService statisticsService; |
|||
private final TbServiceInfoProvider serviceInfoProvider; |
|||
private final PartitionService partitionService; |
|||
private final TbQueueProducerProvider producerProvider; |
|||
private final TbQueueAdmin queueAdmin; |
|||
|
|||
private ExecutorService consumersExecutor; |
|||
private ExecutorService mgmtExecutor; |
|||
private ScheduledExecutorService scheduler; |
|||
|
|||
private volatile boolean isReady = false; |
|||
|
|||
@PostConstruct |
|||
void init() { |
|||
this.consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer")); |
|||
this.mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(mgmtThreadPoolSize, "tb-rule-engine-mgmt"); |
|||
this.scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-scheduler")); |
|||
} |
|||
|
|||
public void stop() { |
|||
scheduler.shutdownNow(); |
|||
consumersExecutor.shutdownNow(); |
|||
mgmtExecutor.shutdownNow(); |
|||
} |
|||
} |
|||
@ -0,0 +1,486 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue.ruleengine; |
|||
|
|||
import com.google.protobuf.ProtocolStringList; |
|||
import lombok.Getter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.queue.Queue; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.gen.MsgProtos; |
|||
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; |
|||
import org.thingsboard.server.common.msg.queue.RuleEngineException; |
|||
import org.thingsboard.server.common.msg.queue.RuleNodeInfo; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TbMsgCallback; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; |
|||
import org.thingsboard.server.queue.TbQueueConsumer; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.QueueKey; |
|||
import org.thingsboard.server.service.queue.TbMsgPackCallback; |
|||
import org.thingsboard.server.service.queue.TbMsgPackProcessingContext; |
|||
import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats; |
|||
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingDecision; |
|||
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingResult; |
|||
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategy; |
|||
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy; |
|||
|
|||
import java.util.Collection; |
|||
import java.util.Collections; |
|||
import java.util.HashMap; |
|||
import java.util.HashSet; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ConcurrentLinkedQueue; |
|||
import java.util.concurrent.Future; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.locks.ReentrantLock; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Slf4j |
|||
public class TbRuleEngineQueueConsumerManager { |
|||
|
|||
public static final String SUCCESSFUL_STATUS = "successful"; |
|||
public static final String FAILED_STATUS = "failed"; |
|||
|
|||
private final TbRuleEngineConsumerContext ctx; |
|||
private final QueueKey queueKey; |
|||
private final TbRuleEngineConsumerStats stats; |
|||
private final ReentrantLock lock = new ReentrantLock(); //NonfairSync
|
|||
|
|||
@Getter |
|||
private volatile Queue queue; |
|||
@Getter |
|||
private volatile Set<TopicPartitionInfo> partitions; |
|||
private volatile ConsumerWrapper consumerWrapper; |
|||
|
|||
private volatile boolean stopped; |
|||
|
|||
private final java.util.Queue<TbQueueConsumerManagerTask> tasks = new ConcurrentLinkedQueue<>(); |
|||
|
|||
public TbRuleEngineQueueConsumerManager(TbRuleEngineConsumerContext ctx, QueueKey queueKey) { |
|||
this.ctx = ctx; |
|||
this.queueKey = queueKey; |
|||
this.stats = new TbRuleEngineConsumerStats(queueKey, ctx.getStatsFactory()); |
|||
} |
|||
|
|||
public void init(Queue queue) { |
|||
this.queue = queue; |
|||
if (queue.isConsumerPerPartition()) { |
|||
this.consumerWrapper = new ConsumerPerPartitionWrapper(); |
|||
} else { |
|||
this.consumerWrapper = new SingleConsumerWrapper(); |
|||
} |
|||
log.debug("[{}] Initialized consumer for queue: {}", queueKey, queue); |
|||
} |
|||
|
|||
public void update(Queue queue) { |
|||
addTask(new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue)); |
|||
} |
|||
|
|||
public void update(Set<TopicPartitionInfo> partitions) { |
|||
addTask(new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, partitions)); |
|||
} |
|||
|
|||
public void delete() { |
|||
addTask(new TbQueueConsumerManagerTask(QueueEvent.DELETE)); |
|||
} |
|||
|
|||
private void addTask(TbQueueConsumerManagerTask todo) { |
|||
if (stopped) { |
|||
return; |
|||
} |
|||
tasks.add(todo); |
|||
log.trace("[{}] Added task: {}", queueKey, todo); |
|||
tryProcessTasks(); |
|||
} |
|||
|
|||
private void tryProcessTasks() { |
|||
if (!ctx.isReady()) { |
|||
log.debug("[{}] TbRuleEngineConsumerContext is not ready yet, will process tasks later", queueKey); |
|||
ctx.getScheduler().schedule(this::tryProcessTasks, 1, TimeUnit.SECONDS); |
|||
return; |
|||
} |
|||
ctx.getMgmtExecutor().submit(() -> { |
|||
if (lock.tryLock()) { |
|||
try { |
|||
Queue newConfiguration = null; |
|||
Set<TopicPartitionInfo> newPartitions = null; |
|||
while (!stopped) { |
|||
TbQueueConsumerManagerTask task = tasks.poll(); |
|||
if (task == null) { |
|||
break; |
|||
} |
|||
log.trace("[{}] Processing task: {}", queueKey, task); |
|||
|
|||
if (task.getEvent() == QueueEvent.PARTITION_CHANGE) { |
|||
newPartitions = task.getPartitions(); |
|||
} else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) { |
|||
newConfiguration = task.getQueue(); |
|||
} else if (task.getEvent() == QueueEvent.DELETE) { |
|||
doDelete(); |
|||
return; |
|||
} |
|||
} |
|||
if (stopped) { |
|||
return; |
|||
} |
|||
if (newConfiguration != null) { |
|||
doUpdate(newConfiguration); |
|||
} |
|||
if (newPartitions != null) { |
|||
doUpdate(newPartitions); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("[{}] Failed to process tasks", queueKey, e); |
|||
} finally { |
|||
lock.unlock(); |
|||
} |
|||
} else { |
|||
log.trace("[{}] Failed to acquire lock", queueKey); |
|||
ctx.getScheduler().schedule(this::tryProcessTasks, 1, TimeUnit.SECONDS); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private void doUpdate(Queue newQueue) { |
|||
log.info("[{}] Processing queue update: {}", queueKey, newQueue); |
|||
var oldQueue = this.queue; |
|||
this.queue = newQueue; |
|||
if (log.isTraceEnabled()) { |
|||
log.trace("[{}] Old queue configuration: {}", queueKey, oldQueue); |
|||
log.trace("[{}] New queue configuration: {}", queueKey, newQueue); |
|||
} |
|||
|
|||
if (oldQueue == null) { |
|||
init(queue); |
|||
} else if (newQueue.isConsumerPerPartition() != oldQueue.isConsumerPerPartition()) { |
|||
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::initiateStop); |
|||
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion); |
|||
|
|||
init(queue); |
|||
if (partitions != null) { |
|||
doUpdate(partitions); // even if partitions number was changed, there can be no partition change event
|
|||
} |
|||
} else { |
|||
// do nothing, because partitions change (if they changed) will be handled on PartitionChangeEvent,
|
|||
// and changes to pollInterval/packProcessingTimeout/submitStrategy/processingStrategy will be picked up by consumer on the fly,
|
|||
// and queue topic and name are immutable
|
|||
} |
|||
} |
|||
|
|||
private void doUpdate(Set<TopicPartitionInfo> partitions) { |
|||
this.partitions = partitions; |
|||
consumerWrapper.updatePartitions(partitions); |
|||
} |
|||
|
|||
public void stop() { |
|||
log.debug("[{}] Stopping consumers", queueKey); |
|||
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::initiateStop); |
|||
stopped = true; |
|||
} |
|||
|
|||
public void awaitStop() { |
|||
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion); |
|||
log.debug("[{}] Unsubscribed and stopped consumers", queueKey); |
|||
} |
|||
|
|||
private void doDelete() { |
|||
stopped = true; |
|||
log.info("[{}] Handling queue deletion", queueKey); |
|||
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion); |
|||
|
|||
List<TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> queueConsumers = consumerWrapper.getConsumers().stream() |
|||
.map(TbQueueConsumerTask::getConsumer).collect(Collectors.toList()); |
|||
ctx.getConsumersExecutor().submit(() -> { |
|||
drainQueue(queueConsumers); |
|||
|
|||
queueConsumers.forEach(consumer -> { |
|||
for (String topic : consumer.getFullTopicNames()) { |
|||
try { |
|||
ctx.getQueueAdmin().deleteTopic(topic); |
|||
log.info("Deleted topic {}", topic); |
|||
} catch (Exception e) { |
|||
log.error("Failed to delete topic {}", topic, e); |
|||
} |
|||
} |
|||
try { |
|||
consumer.unsubscribe(); |
|||
} catch (Exception e) { |
|||
log.error("[{}] Failed to unsubscribe consumer", queueKey, e); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
private void launchConsumer(TbQueueConsumerTask consumerTask) { |
|||
log.info("[{}] Launching consumer", consumerTask.getKey()); |
|||
Future<?> consumerLoop = ctx.getConsumersExecutor().submit(() -> { |
|||
ThingsBoardThreadFactory.updateCurrentThreadName(consumerTask.getKey().toString()); |
|||
try { |
|||
consumerLoop(consumerTask.getConsumer()); |
|||
} catch (Throwable e) { |
|||
log.error("Failure in consumer loop", e); |
|||
} |
|||
}); |
|||
consumerTask.setTask(consumerLoop); |
|||
} |
|||
|
|||
private void consumerLoop(TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer) { |
|||
while (!stopped && !consumer.isStopped()) { |
|||
try { |
|||
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(queue.getPollInterval()); |
|||
if (msgs.isEmpty()) { |
|||
continue; |
|||
} |
|||
processMsgs(msgs, consumer, queue); |
|||
} catch (Exception e) { |
|||
if (!consumer.isStopped()) { |
|||
log.warn("Failed to process messages from queue", e); |
|||
try { |
|||
Thread.sleep(ctx.getPollDuration()); |
|||
} catch (InterruptedException e2) { |
|||
log.trace("Failed to wait until the server has capacity to handle new requests", e2); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
if (consumer.isStopped()) { |
|||
consumer.unsubscribe(); |
|||
} |
|||
log.info("Rule Engine consumer stopped"); |
|||
} |
|||
|
|||
private void processMsgs(List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs, |
|||
TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer, |
|||
Queue queue) throws InterruptedException { |
|||
TbRuleEngineSubmitStrategy submitStrategy = getSubmitStrategy(queue); |
|||
TbRuleEngineProcessingStrategy ackStrategy = getProcessingStrategy(queue); |
|||
submitStrategy.init(msgs); |
|||
while (!stopped && !consumer.isStopped()) { |
|||
TbMsgPackProcessingContext packCtx = new TbMsgPackProcessingContext(queue.getName(), submitStrategy, ackStrategy.isSkipTimeoutMsgs()); |
|||
submitStrategy.submitAttempt((id, msg) -> submitMessage(packCtx, id, msg)); |
|||
|
|||
final boolean timeout = !packCtx.await(queue.getPackProcessingTimeout(), TimeUnit.MILLISECONDS); |
|||
|
|||
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(queue.getName(), timeout, packCtx); |
|||
if (timeout) { |
|||
printFirstOrAll(packCtx, packCtx.getPendingMap(), "Timeout"); |
|||
} |
|||
if (!packCtx.getFailedMap().isEmpty()) { |
|||
printFirstOrAll(packCtx, packCtx.getFailedMap(), "Failed"); |
|||
} |
|||
packCtx.printProfilerStats(); |
|||
|
|||
TbRuleEngineProcessingDecision decision = ackStrategy.analyze(result); |
|||
if (ctx.isStatsEnabled()) { |
|||
stats.log(result, decision.isCommit()); |
|||
} |
|||
|
|||
packCtx.cleanup(); |
|||
|
|||
if (decision.isCommit()) { |
|||
submitStrategy.stop(); |
|||
consumer.commit(); |
|||
break; |
|||
} else { |
|||
submitStrategy.update(decision.getReprocessMap()); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private TbRuleEngineSubmitStrategy getSubmitStrategy(Queue queue) { |
|||
return ctx.getSubmitStrategyFactory().newInstance(queue.getName(), queue.getSubmitStrategy()); |
|||
} |
|||
|
|||
private TbRuleEngineProcessingStrategy getProcessingStrategy(Queue queue) { |
|||
return ctx.getProcessingStrategyFactory().newInstance(queue.getName(), queue.getProcessingStrategy()); |
|||
} |
|||
|
|||
private void submitMessage(TbMsgPackProcessingContext packCtx, UUID id, TbProtoQueueMsg<ToRuleEngineMsg> msg) { |
|||
log.trace("[{}] Creating callback for topic {} message: {}", id, queue.getName(), msg.getValue()); |
|||
ToRuleEngineMsg toRuleEngineMsg = msg.getValue(); |
|||
TenantId tenantId = TenantId.fromUUID(new UUID(toRuleEngineMsg.getTenantIdMSB(), toRuleEngineMsg.getTenantIdLSB())); |
|||
TbMsgCallback callback = ctx.isPrometheusStatsEnabled() ? |
|||
new TbMsgPackCallback(id, tenantId, packCtx, stats.getTimer(tenantId, SUCCESSFUL_STATUS), stats.getTimer(tenantId, FAILED_STATUS)) : |
|||
new TbMsgPackCallback(id, tenantId, packCtx); |
|||
try { |
|||
if (!toRuleEngineMsg.getTbMsg().isEmpty()) { |
|||
forwardToRuleEngineActor(queue.getName(), tenantId, toRuleEngineMsg, callback); |
|||
} else { |
|||
callback.onSuccess(); |
|||
} |
|||
} catch (Exception e) { |
|||
callback.onFailure(new RuleEngineException(e.getMessage(), e)); |
|||
} |
|||
} |
|||
|
|||
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { |
|||
TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback); |
|||
QueueToRuleEngineMsg msg; |
|||
ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList(); |
|||
Set<String> relationTypes; |
|||
if (relationTypesList.size() == 1) { |
|||
relationTypes = Collections.singleton(relationTypesList.get(0)); |
|||
} else { |
|||
relationTypes = new HashSet<>(relationTypesList); |
|||
} |
|||
msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage()); |
|||
ctx.getActorContext().tell(msg); |
|||
} |
|||
|
|||
private void printFirstOrAll(TbMsgPackProcessingContext ctx, Map<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> map, String prefix) { |
|||
boolean printAll = log.isTraceEnabled(); |
|||
log.info("[{}] {} to process [{}] messages", queueKey, prefix, map.size()); |
|||
for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) { |
|||
ToRuleEngineMsg tmp = pending.getValue().getValue(); |
|||
TbMsg tmpMsg = TbMsg.fromBytes(queue.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); |
|||
RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey()); |
|||
if (printAll) { |
|||
log.trace("[{}][{}] {} to process message: {}, Last Rule Node: {}", queueKey, TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); |
|||
} else { |
|||
log.info("[{}] {} to process message: {}, Last Rule Node: {}", TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void printStats(long ts) { |
|||
stats.printStats(); |
|||
ctx.getStatisticsService().reportQueueStats(ts, stats); |
|||
stats.reset(); |
|||
} |
|||
|
|||
private void drainQueue(List<TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers) { |
|||
long finishTs = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ctx.getTopicDeletionDelayInSec()); |
|||
try { |
|||
int n = 0; |
|||
while (System.currentTimeMillis() <= finishTs) { |
|||
for (TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> consumer : consumers) { |
|||
List<TbProtoQueueMsg<ToRuleEngineMsg>> msgs = consumer.poll(queue.getPollInterval()); |
|||
if (msgs.isEmpty()) { |
|||
continue; |
|||
} |
|||
for (TbProtoQueueMsg<ToRuleEngineMsg> msg : msgs) { |
|||
try { |
|||
MsgProtos.TbMsgProto tbMsgProto = MsgProtos.TbMsgProto.parseFrom(msg.getValue().getTbMsg().toByteArray()); |
|||
EntityId originator = EntityIdFactory.getByTypeAndUuid(tbMsgProto.getEntityType(), new UUID(tbMsgProto.getEntityIdMSB(), tbMsgProto.getEntityIdLSB())); |
|||
|
|||
TopicPartitionInfo tpi = ctx.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, queue.getName(), TenantId.SYS_TENANT_ID, originator); |
|||
ctx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, msg, null); |
|||
n++; |
|||
} catch (Throwable e) { |
|||
log.warn("Failed to move message to system {}: {}", consumer.getTopic(), msg, e); |
|||
} |
|||
} |
|||
consumer.commit(); |
|||
} |
|||
} |
|||
if (n > 0) { |
|||
log.info("Moved {} messages from {} to system {}", n, queueKey, queue.getName()); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("[{}] Failed to drain queue", queueKey, e); |
|||
} |
|||
} |
|||
|
|||
private static String partitionsToString(Collection<TopicPartitionInfo> partitions) { |
|||
return partitions.stream().map(TopicPartitionInfo::getFullTopicName).collect(Collectors.joining(", ", "[", "]")); |
|||
} |
|||
|
|||
interface ConsumerWrapper { |
|||
|
|||
void updatePartitions(Set<TopicPartitionInfo> partitions); |
|||
|
|||
Collection<TbQueueConsumerTask> getConsumers(); |
|||
|
|||
} |
|||
|
|||
class ConsumerPerPartitionWrapper implements ConsumerWrapper { |
|||
private final Map<TopicPartitionInfo, TbQueueConsumerTask> consumers = new HashMap<>(); |
|||
|
|||
@Override |
|||
public void updatePartitions(Set<TopicPartitionInfo> partitions) { |
|||
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions); |
|||
addedPartitions.removeAll(consumers.keySet()); |
|||
|
|||
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(consumers.keySet()); |
|||
removedPartitions.removeAll(partitions); |
|||
log.info("[{}] Added partitions: {}, removed partitions: {}", queueKey, partitionsToString(addedPartitions), partitionsToString(removedPartitions)); |
|||
|
|||
removedPartitions.forEach((tpi) -> { |
|||
consumers.get(tpi).initiateStop(); |
|||
}); |
|||
removedPartitions.forEach((tpi) -> { |
|||
consumers.remove(tpi).awaitCompletion(); |
|||
}); |
|||
|
|||
addedPartitions.forEach((tpi) -> { |
|||
String key = queueKey + "-" + tpi.getPartition().orElse(-999999); |
|||
TbQueueConsumerTask consumer = new TbQueueConsumerTask(key, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue)); |
|||
consumers.put(tpi, consumer); |
|||
consumer.subscribe(Set.of(tpi)); |
|||
launchConsumer(consumer); |
|||
}); |
|||
} |
|||
|
|||
@Override |
|||
public Collection<TbQueueConsumerTask> getConsumers() { |
|||
return consumers.values(); |
|||
} |
|||
} |
|||
|
|||
class SingleConsumerWrapper implements ConsumerWrapper { |
|||
private TbQueueConsumerTask consumer; |
|||
|
|||
@Override |
|||
public void updatePartitions(Set<TopicPartitionInfo> partitions) { |
|||
log.info("[{}] New partitions: {}", queueKey, partitionsToString(partitions)); |
|||
if (partitions.isEmpty()) { |
|||
if (consumer != null && consumer.isRunning()) { |
|||
consumer.initiateStop(); |
|||
consumer.awaitCompletion(); |
|||
} |
|||
consumer = null; |
|||
return; |
|||
} |
|||
|
|||
if (consumer == null) { |
|||
consumer = new TbQueueConsumerTask(queueKey, ctx.getQueueFactory().createToRuleEngineMsgConsumer(queue)); |
|||
} |
|||
consumer.subscribe(partitions); |
|||
if (!consumer.isRunning()) { |
|||
launchConsumer(consumer); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public Collection<TbQueueConsumerTask> getConsumers() { |
|||
if (consumer == null) { |
|||
return Collections.emptyList(); |
|||
} |
|||
return List.of(consumer); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,49 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue; |
|||
|
|||
import org.junit.jupiter.api.Test; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.RuleChainId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; |
|||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; |
|||
import org.thingsboard.server.gen.transport.TransportProtos; |
|||
import java.util.UUID; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
|
|||
class ProtoUtilsTest { |
|||
|
|||
TenantId tenantId = TenantId.fromUUID(UUID.fromString("35e10f77-16e7-424d-ae46-ee780f87ac4f")); |
|||
EntityId entityId = new RuleChainId(UUID.fromString("c640b635-4f0f-41e6-b10b-25a86003094e")); |
|||
@Test |
|||
void protoComponentLifecycleSerialization() { |
|||
ComponentLifecycleMsg msg = new ComponentLifecycleMsg(tenantId, entityId, ComponentLifecycleEvent.UPDATED); |
|||
assertThat(ProtoUtils.fromProto(ProtoUtils.toProto(msg))).as("deserialized").isEqualTo(msg); |
|||
msg = new ComponentLifecycleMsg(tenantId, entityId, ComponentLifecycleEvent.STARTED); |
|||
assertThat(ProtoUtils.fromProto(ProtoUtils.toProto(msg))).as("deserialized").isEqualTo(msg); |
|||
} |
|||
|
|||
@Test |
|||
void protoEntityTypeSerialization() { |
|||
for(EntityType entityType : EntityType.values()){ |
|||
assertThat(ProtoUtils.fromProto(ProtoUtils.toProto(entityType))).as(entityType.getNormalName()).isEqualTo(entityType); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,773 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.queue.ruleengine; |
|||
|
|||
import lombok.Getter; |
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.junit.After; |
|||
import org.junit.Before; |
|||
import org.junit.Test; |
|||
import org.junit.runner.RunWith; |
|||
import org.mockito.Mock; |
|||
import org.mockito.Mockito; |
|||
import org.mockito.junit.MockitoJUnitRunner; |
|||
import org.testcontainers.shaded.org.apache.commons.lang3.RandomUtils; |
|||
import org.thingsboard.common.util.JacksonUtil; |
|||
import org.thingsboard.server.actors.ActorSystemContext; |
|||
import org.thingsboard.server.common.data.id.DeviceId; |
|||
import org.thingsboard.server.common.data.id.QueueId; |
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
import org.thingsboard.server.common.data.msg.TbMsgType; |
|||
import org.thingsboard.server.common.data.queue.ProcessingStrategy; |
|||
import org.thingsboard.server.common.data.queue.ProcessingStrategyType; |
|||
import org.thingsboard.server.common.data.queue.Queue; |
|||
import org.thingsboard.server.common.data.queue.SubmitStrategy; |
|||
import org.thingsboard.server.common.data.queue.SubmitStrategyType; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.common.msg.TbMsgMetaData; |
|||
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; |
|||
import org.thingsboard.server.common.msg.queue.ServiceType; |
|||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
|||
import org.thingsboard.server.common.stats.StatsFactory; |
|||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; |
|||
import org.thingsboard.server.queue.TbQueueAdmin; |
|||
import org.thingsboard.server.queue.TbQueueProducer; |
|||
import org.thingsboard.server.queue.common.AbstractTbQueueConsumerTemplate; |
|||
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|||
import org.thingsboard.server.queue.discovery.PartitionService; |
|||
import org.thingsboard.server.queue.discovery.QueueKey; |
|||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
|||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider; |
|||
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; |
|||
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory; |
|||
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory; |
|||
import org.thingsboard.server.service.stats.RuleEngineStatisticsService; |
|||
|
|||
import java.io.IOException; |
|||
import java.util.Arrays; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
import java.util.Objects; |
|||
import java.util.Set; |
|||
import java.util.UUID; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.concurrent.TimeUnit; |
|||
import java.util.concurrent.atomic.AtomicInteger; |
|||
import java.util.function.Supplier; |
|||
import java.util.stream.Collectors; |
|||
import java.util.stream.IntStream; |
|||
|
|||
import static org.assertj.core.api.Assertions.assertThat; |
|||
import static org.awaitility.Awaitility.await; |
|||
import static org.mockito.ArgumentMatchers.any; |
|||
import static org.mockito.ArgumentMatchers.argThat; |
|||
import static org.mockito.ArgumentMatchers.eq; |
|||
import static org.mockito.Mockito.after; |
|||
import static org.mockito.Mockito.atLeast; |
|||
import static org.mockito.Mockito.atLeastOnce; |
|||
import static org.mockito.Mockito.clearInvocations; |
|||
import static org.mockito.Mockito.doAnswer; |
|||
import static org.mockito.Mockito.mock; |
|||
import static org.mockito.Mockito.never; |
|||
import static org.mockito.Mockito.spy; |
|||
import static org.mockito.Mockito.times; |
|||
import static org.mockito.Mockito.verify; |
|||
import static org.mockito.Mockito.verifyNoInteractions; |
|||
import static org.mockito.Mockito.verifyNoMoreInteractions; |
|||
import static org.mockito.Mockito.when; |
|||
|
|||
@Slf4j |
|||
@RunWith(MockitoJUnitRunner.class) |
|||
public class TbRuleEngineQueueConsumerManagerTest { |
|||
|
|||
@Mock |
|||
private ActorSystemContext actorContext; |
|||
@Mock |
|||
private StatsFactory statsFactory; |
|||
@Mock |
|||
private TbRuleEngineQueueFactory queueFactory; |
|||
@Mock |
|||
private RuleEngineStatisticsService statisticsService; |
|||
@Mock |
|||
private TbServiceInfoProvider serviceInfoProvider; |
|||
@Mock |
|||
private PartitionService partitionService; |
|||
@Mock |
|||
private TbQueueProducerProvider producerProvider; |
|||
private TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer; |
|||
@Mock |
|||
private TbQueueAdmin queueAdmin; |
|||
private TbRuleEngineConsumerContext ruleEngineConsumerContext; |
|||
|
|||
private TbRuleEngineQueueConsumerManager consumerManager; |
|||
private Queue queue; |
|||
|
|||
private Set<TestConsumer> consumers; |
|||
private boolean generateQueueMsgs; |
|||
private AtomicInteger totalConsumedMsgs; |
|||
private AtomicInteger totalProcessedMsgs; |
|||
|
|||
@Before |
|||
public void beforeEach() { |
|||
ruleEngineConsumerContext = new TbRuleEngineConsumerContext( |
|||
actorContext, statsFactory, spy(new TbRuleEngineSubmitStrategyFactory()), |
|||
spy(new TbRuleEngineProcessingStrategyFactory()), queueFactory, statisticsService, |
|||
serviceInfoProvider, partitionService, producerProvider, queueAdmin |
|||
); |
|||
consumers = ConcurrentHashMap.newKeySet(); |
|||
generateQueueMsgs = true; |
|||
totalConsumedMsgs = new AtomicInteger(); |
|||
totalProcessedMsgs = new AtomicInteger(); |
|||
doAnswer(inv -> { |
|||
QueueToRuleEngineMsg msg = inv.getArgument(0); |
|||
msg.getMsg().getCallback().onSuccess(); |
|||
totalProcessedMsgs.incrementAndGet(); |
|||
log.trace("totalProcessedMsgs = {}", totalProcessedMsgs); |
|||
return null; |
|||
}).when(actorContext).tell(any()); |
|||
ruleEngineMsgProducer = mock(TbQueueProducer.class); |
|||
when(producerProvider.getRuleEngineMsgProducer()).thenReturn(ruleEngineMsgProducer); |
|||
ruleEngineConsumerContext.setMgmtThreadPoolSize(2); |
|||
ruleEngineConsumerContext.setTopicDeletionDelayInSec(5); |
|||
ruleEngineConsumerContext.init(); |
|||
ruleEngineConsumerContext.setReady(false); |
|||
|
|||
queue = new Queue(); |
|||
queue.setName("Test"); |
|||
queue.setTenantId(TenantId.SYS_TENANT_ID); |
|||
queue.setId(new QueueId(UUID.randomUUID())); |
|||
queue.setTopic("tb_test"); |
|||
queue.setPartitions(10); |
|||
queue.setConsumerPerPartition(true); |
|||
queue.setPollInterval(250); |
|||
queue.setPackProcessingTimeout(2000); |
|||
SubmitStrategy submitStrategy = new SubmitStrategy(); |
|||
submitStrategy.setType(SubmitStrategyType.BURST); |
|||
submitStrategy.setBatchSize(200); |
|||
queue.setSubmitStrategy(submitStrategy); |
|||
ProcessingStrategy processingStrategy = new ProcessingStrategy(); |
|||
processingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES_AND_TIMED_OUT); |
|||
processingStrategy.setRetries(0); |
|||
queue.setProcessingStrategy(processingStrategy); |
|||
|
|||
doAnswer(i -> { |
|||
TestConsumer consumer = spy(new TestConsumer(queue.getTopic())); |
|||
if (generateQueueMsgs) { |
|||
consumer.setUpTestMsg(); |
|||
} |
|||
consumers.add(consumer); |
|||
return consumer; |
|||
}).when(queueFactory).createToRuleEngineMsgConsumer(any()); |
|||
|
|||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); |
|||
consumerManager = new TbRuleEngineQueueConsumerManager(ruleEngineConsumerContext, queueKey); |
|||
} |
|||
|
|||
@After |
|||
public void afterEach() { |
|||
consumerManager.stop(); |
|||
consumerManager.awaitStop(); |
|||
ruleEngineConsumerContext.stop(); |
|||
|
|||
if (generateQueueMsgs) { |
|||
await().atMost(10, TimeUnit.SECONDS) |
|||
.untilAsserted(() -> { |
|||
log.debug("totalConsumedMsgs = {}, totalProcessedMsgs = {}", totalConsumedMsgs.get(), totalProcessedMsgs.get()); |
|||
assertThat(totalProcessedMsgs.get()).isEqualTo(totalConsumedMsgs.get()); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testInit_consumerPerPartition() { |
|||
queue.setConsumerPerPartition(true); |
|||
consumerManager.init(queue); |
|||
|
|||
Set<TopicPartitionInfo> partitions = createTpis(2, 3, 4); |
|||
consumerManager.update(partitions); |
|||
partitions = createTpis(3, 4, 5); |
|||
consumerManager.update(partitions); |
|||
partitions = createTpis(1, 2, 3); |
|||
consumerManager.update(partitions); |
|||
// simulated multiple partition change events before consumer is ready; only latest partitions should be processed
|
|||
verifyNoInteractions(queueFactory); |
|||
|
|||
ruleEngineConsumerContext.setReady(true); |
|||
await().atMost(2, TimeUnit.SECONDS) |
|||
.until(() -> consumers.size() == 3); |
|||
for (TopicPartitionInfo partition : partitions) { |
|||
TestConsumer consumer = getConsumer(partition); |
|||
verifySubscribedAndLaunched(consumer, Set.of(partition)); |
|||
} |
|||
} |
|||
|
|||
@Test |
|||
public void testInit_singleConsumer() { |
|||
queue.setConsumerPerPartition(false); |
|||
consumerManager.init(queue); |
|||
|
|||
Set<TopicPartitionInfo> partitions = createTpis(2, 3, 4); |
|||
consumerManager.update(partitions); |
|||
partitions = createTpis(3, 4, 5); |
|||
consumerManager.update(partitions); |
|||
partitions = createTpis(1, 2, 3); |
|||
consumerManager.update(partitions); |
|||
|
|||
verifyNoInteractions(queueFactory); |
|||
|
|||
ruleEngineConsumerContext.setReady(true); |
|||
await().atMost(2, TimeUnit.SECONDS) |
|||
.until(() -> consumers.size() == 1); |
|||
TestConsumer consumer = getConsumer(); |
|||
verifySubscribedAndLaunched(consumer, partitions); |
|||
} |
|||
|
|||
@Test |
|||
public void testPartitionsUpdate_singleConsumer() { |
|||
queue.setConsumerPerPartition(false); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
|
|||
Set<TopicPartitionInfo> partitions = Collections.emptySet(); |
|||
consumerManager.update(partitions); |
|||
verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any()); |
|||
|
|||
partitions = createTpis(1); |
|||
consumerManager.update(partitions); |
|||
TestConsumer consumer = getConsumer(); |
|||
verifySubscribedAndLaunched(consumer, partitions); |
|||
|
|||
partitions = createTpis(1, 2, 3); |
|||
consumerManager.update(partitions); |
|||
verifySubscribedAndLaunched(consumer, partitions); |
|||
|
|||
partitions = createTpis(4, 5, 6); |
|||
consumerManager.update(partitions); |
|||
verifySubscribedAndLaunched(consumer, partitions); |
|||
|
|||
partitions = Collections.emptySet(); |
|||
consumerManager.update(partitions); |
|||
verifyUnsubscribedAndStopped(consumer); |
|||
|
|||
partitions = createTpis(1, 2, 3); |
|||
consumerManager.update(partitions); |
|||
consumer = getConsumer(); |
|||
verifySubscribedAndLaunched(consumer, partitions); |
|||
} |
|||
|
|||
@Test |
|||
public void testPartitionsUpdate_consumerPerPartition() { |
|||
queue.setConsumerPerPartition(true); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
|
|||
consumerManager.update(Collections.emptySet()); |
|||
verify(queueFactory, after(1000).never()).createToRuleEngineMsgConsumer(any()); |
|||
|
|||
consumerManager.update(createTpis(1)); |
|||
TestConsumer consumer1 = getConsumer(1); |
|||
verifySubscribedAndLaunched(consumer1, 1); |
|||
|
|||
consumerManager.update(createTpis(1, 2, 3)); |
|||
TestConsumer consumer2 = getConsumer(2); |
|||
TestConsumer consumer3 = getConsumer(3); |
|||
verifySubscribedAndLaunched(consumer2, 2); |
|||
verifySubscribedAndLaunched(consumer3, 3); |
|||
verifyNotTouched(consumer1); |
|||
|
|||
consumerManager.update(createTpis(3, 4, 5)); |
|||
TestConsumer consumer4 = getConsumer(4); |
|||
TestConsumer consumer5 = getConsumer(5); |
|||
verifySubscribedAndLaunched(consumer4, 4); |
|||
verifySubscribedAndLaunched(consumer5, 5); |
|||
verifyUnsubscribedAndStopped(consumer1); |
|||
verifyUnsubscribedAndStopped(consumer2); |
|||
verifyNotTouched(consumer3); |
|||
|
|||
consumerManager.update(Collections.emptySet()); |
|||
verifyUnsubscribedAndStopped(consumer3); |
|||
verifyUnsubscribedAndStopped(consumer4); |
|||
verifyUnsubscribedAndStopped(consumer5); |
|||
|
|||
consumerManager.update(createTpis(1, 2, 3)); |
|||
consumer1 = getConsumer(1); |
|||
consumer2 = getConsumer(2); |
|||
consumer3 = getConsumer(3); |
|||
verifySubscribedAndLaunched(consumer1, 1); |
|||
verifySubscribedAndLaunched(consumer2, 2); |
|||
verifySubscribedAndLaunched(consumer3, 3); |
|||
} |
|||
|
|||
@Test |
|||
public void testConfigUpdate_singleConsumer() { |
|||
queue.setConsumerPerPartition(false); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
Set<TopicPartitionInfo> partitions = createTpis(1, 2, 3); |
|||
consumerManager.update(partitions); |
|||
TestConsumer consumer = getConsumer(); |
|||
verifySubscribedAndLaunched(consumer, partitions); |
|||
|
|||
Queue newConfig = JacksonUtil.clone(queue); |
|||
newConfig.setPollInterval(queue.getPollInterval() / 2); |
|||
newConfig.setPartitions(queue.getPartitions() / 2); |
|||
newConfig.setPackProcessingTimeout(queue.getPackProcessingTimeout() * 2); |
|||
newConfig.getSubmitStrategy().setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR); |
|||
newConfig.getProcessingStrategy().setType(ProcessingStrategyType.RETRY_ALL); |
|||
consumerManager.update(newConfig); |
|||
|
|||
await().atMost(2, TimeUnit.SECONDS) |
|||
.untilAsserted(() -> { |
|||
verify(consumer, atLeastOnce()).poll(eq((long) newConfig.getPollInterval())); |
|||
verify(ruleEngineConsumerContext.getSubmitStrategyFactory(), atLeastOnce()).newInstance(any(), eq(newConfig.getSubmitStrategy())); |
|||
verify(ruleEngineConsumerContext.getProcessingStrategyFactory(), atLeastOnce()).newInstance(any(), eq(newConfig.getProcessingStrategy())); |
|||
}); |
|||
} |
|||
|
|||
@Test |
|||
public void testConfigUpdate_consumerPerPartition() { |
|||
queue.setConsumerPerPartition(true); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
Set<TopicPartitionInfo> partitions = createTpis(1, 2, 3); |
|||
consumerManager.update(partitions); |
|||
TestConsumer consumer1 = getConsumer(1); |
|||
TestConsumer consumer2 = getConsumer(2); |
|||
TestConsumer consumer3 = getConsumer(3); |
|||
verifySubscribedAndLaunched(consumer1, 1); |
|||
verifySubscribedAndLaunched(consumer2, 2); |
|||
verifySubscribedAndLaunched(consumer3, 3); |
|||
|
|||
Queue newConfig = JacksonUtil.clone(queue); |
|||
newConfig.setPollInterval(queue.getPollInterval() / 2); |
|||
newConfig.setPartitions(queue.getPartitions() / 2); |
|||
newConfig.setPackProcessingTimeout(queue.getPackProcessingTimeout() * 2); |
|||
newConfig.getSubmitStrategy().setType(SubmitStrategyType.SEQUENTIAL_BY_ORIGINATOR); |
|||
newConfig.getProcessingStrategy().setType(ProcessingStrategyType.RETRY_ALL); |
|||
consumerManager.update(newConfig); |
|||
|
|||
await().atMost(2, TimeUnit.SECONDS) |
|||
.untilAsserted(() -> { |
|||
verify(consumer1, atLeastOnce()).poll(eq((long) newConfig.getPollInterval())); |
|||
verify(consumer2, atLeastOnce()).poll(eq((long) newConfig.getPollInterval())); |
|||
verify(consumer3, atLeastOnce()).poll(eq((long) newConfig.getPollInterval())); |
|||
}); |
|||
verifyNotTouched(consumer1); |
|||
verifyNotTouched(consumer2); |
|||
verifyNotTouched(consumer3); |
|||
} |
|||
|
|||
@Test |
|||
public void testConfigUpdate_fromSingleToConsumerPerPartition() { |
|||
queue.setConsumerPerPartition(false); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
Set<TopicPartitionInfo> partitions = createTpis(1, 2, 3); |
|||
consumerManager.update(partitions); |
|||
TestConsumer consumer = getConsumer(); |
|||
verifySubscribedAndLaunched(consumer, partitions); |
|||
|
|||
Queue newConfig = JacksonUtil.clone(queue); |
|||
newConfig.setConsumerPerPartition(true); |
|||
consumerManager.update(newConfig); |
|||
|
|||
verifyUnsubscribedAndStopped(consumer); |
|||
verifySubscribedAndLaunched(getConsumer(1), 1); |
|||
verifySubscribedAndLaunched(getConsumer(2), 2); |
|||
verifySubscribedAndLaunched(getConsumer(3), 3); |
|||
} |
|||
|
|||
@Test |
|||
public void testConfigUpdate_fromConsumerPerPartitionToSingle() { |
|||
queue.setConsumerPerPartition(true); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
Set<TopicPartitionInfo> partitions = createTpis(1, 2, 3); |
|||
consumerManager.update(partitions); |
|||
TestConsumer consumer1 = getConsumer(1); |
|||
TestConsumer consumer2 = getConsumer(2); |
|||
TestConsumer consumer3 = getConsumer(3); |
|||
verifySubscribedAndLaunched(consumer1, 1); |
|||
verifySubscribedAndLaunched(consumer2, 2); |
|||
verifySubscribedAndLaunched(consumer3, 3); |
|||
|
|||
Queue newConfig = JacksonUtil.clone(queue); |
|||
newConfig.setConsumerPerPartition(false); |
|||
consumerManager.update(newConfig); |
|||
|
|||
verifyUnsubscribedAndStopped(consumer1); |
|||
verifyUnsubscribedAndStopped(consumer2); |
|||
verifyUnsubscribedAndStopped(consumer3); |
|||
verifySubscribedAndLaunched(getConsumer(), partitions); |
|||
} |
|||
|
|||
@Test |
|||
public void testStop() { |
|||
queue.setConsumerPerPartition(true); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
consumerManager.update(createTpis(1)); |
|||
TestConsumer consumer = getConsumer(1); |
|||
verifySubscribedAndLaunched(consumer, 1); |
|||
verify(queueFactory, times(1)).createToRuleEngineMsgConsumer(any()); |
|||
|
|||
consumerManager.stop(); |
|||
consumerManager.update(createTpis(1, 2, 3, 4)); // to check that no new tasks after stop are processed
|
|||
consumerManager.update(createTpis(5, 6, 7)); |
|||
|
|||
verifyUnsubscribedAndStopped(consumer); |
|||
verifyNoMoreInteractions(queueFactory); |
|||
} |
|||
|
|||
@Test |
|||
public void testDelete_consumerPerPartition() { |
|||
queue.setConsumerPerPartition(true); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
Set<TopicPartitionInfo> partitions = createTpis(1, 2); |
|||
consumerManager.update(partitions); |
|||
TestConsumer consumer1 = getConsumer(1); |
|||
TestConsumer consumer2 = getConsumer(2); |
|||
verifySubscribedAndLaunched(consumer1, 1); |
|||
verifySubscribedAndLaunched(consumer2, 2); |
|||
verifyMsgProcessed(consumer1.testMsg); |
|||
verifyMsgProcessed(consumer2.testMsg); |
|||
|
|||
consumerManager.delete(); |
|||
|
|||
await().atMost(2, TimeUnit.SECONDS) |
|||
.untilAsserted(() -> { |
|||
verify(ruleEngineMsgProducer).send(any(), any(), any()); |
|||
}); |
|||
clearInvocations(actorContext); |
|||
verify(consumer1, never()).unsubscribe(); |
|||
verify(consumer2, never()).unsubscribe(); |
|||
int msgCount = totalConsumedMsgs.get(); |
|||
|
|||
await().atLeast(4, TimeUnit.SECONDS) // based on topicDeletionDelayInSec
|
|||
.atMost(7, TimeUnit.SECONDS) |
|||
.untilAsserted(() -> { |
|||
partitions.stream() |
|||
.map(TopicPartitionInfo::getFullTopicName) |
|||
.forEach(topic -> { |
|||
verify(queueAdmin).deleteTopic(eq(topic)); |
|||
}); |
|||
}); |
|||
verify(consumer1).unsubscribe(); |
|||
verify(consumer2).unsubscribe(); |
|||
|
|||
int totalMovedMsgs = totalConsumedMsgs.get() - msgCount; |
|||
assertThat(totalMovedMsgs).isNotZero(); |
|||
verify(ruleEngineMsgProducer, atLeast(totalMovedMsgs)).send(any(), any(), any()); |
|||
verify(actorContext, never()).tell(any()); |
|||
generateQueueMsgs = false; |
|||
} |
|||
|
|||
@Test |
|||
public void testDelete_singleConsumer() { |
|||
queue.setConsumerPerPartition(false); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
Set<TopicPartitionInfo> partitions = createTpis(1, 2); |
|||
consumerManager.update(partitions); |
|||
TestConsumer consumer = getConsumer(); |
|||
verifySubscribedAndLaunched(consumer, partitions); |
|||
verifyMsgProcessed(consumer.testMsg); |
|||
|
|||
consumerManager.delete(); |
|||
|
|||
await().atMost(2, TimeUnit.SECONDS) |
|||
.untilAsserted(() -> { |
|||
verify(ruleEngineMsgProducer).send(any(), any(), any()); |
|||
}); |
|||
clearInvocations(actorContext); |
|||
verify(consumer, never()).unsubscribe(); |
|||
int msgCount = totalConsumedMsgs.get(); |
|||
|
|||
await().atLeast(4, TimeUnit.SECONDS) |
|||
.atMost(7, TimeUnit.SECONDS) |
|||
.untilAsserted(() -> { |
|||
partitions.stream() |
|||
.map(TopicPartitionInfo::getFullTopicName) |
|||
.forEach(topic -> { |
|||
verify(queueAdmin).deleteTopic(eq(topic)); |
|||
}); |
|||
}); |
|||
verify(consumer).unsubscribe(); |
|||
|
|||
int movedMsgs = totalConsumedMsgs.get() - msgCount; |
|||
assertThat(movedMsgs).isNotZero(); |
|||
verify(ruleEngineMsgProducer, atLeast(movedMsgs)).send(any(), any(), any()); |
|||
verify(actorContext, never()).tell(any()); |
|||
generateQueueMsgs = false; |
|||
} |
|||
|
|||
@Test |
|||
public void testManyDifferentUpdates() throws Exception { |
|||
queue.setConsumerPerPartition(RandomUtils.nextBoolean()); |
|||
consumerManager.init(queue); |
|||
ruleEngineConsumerContext.setReady(true); |
|||
|
|||
Supplier<Queue> queueConfigUpdater = () -> { |
|||
Queue oldConfig = consumerManager.getQueue(); |
|||
Queue newConfig = JacksonUtil.clone(oldConfig); |
|||
newConfig.setConsumerPerPartition(RandomUtils.nextBoolean()); |
|||
newConfig.setPollInterval(RandomUtils.nextInt(100, 501)); |
|||
newConfig.setPartitions(RandomUtils.nextInt(1, 10)); |
|||
newConfig.setPackProcessingTimeout(RandomUtils.nextLong(100, 5001)); |
|||
newConfig.getSubmitStrategy().setType(SubmitStrategyType.values()[RandomUtils.nextInt(0, SubmitStrategyType.values().length)]); |
|||
newConfig.getProcessingStrategy().setType(ProcessingStrategyType.values()[RandomUtils.nextInt(0, ProcessingStrategyType.values().length)]); |
|||
log.info("Generated new config: consumerPerPartition={}, pollInterval={}, processingStrategy={}", |
|||
newConfig.isConsumerPerPartition(), newConfig.getPollInterval(), newConfig.getProcessingStrategy().getType()); |
|||
return newConfig; |
|||
}; |
|||
Supplier<Set<TopicPartitionInfo>> partitionsUpdater = () -> { |
|||
int partitionsCount = RandomUtils.nextInt(0, 20); |
|||
int[] partitions = IntStream.generate(() -> RandomUtils.nextInt(0, 20)) |
|||
.distinct().limit(partitionsCount) |
|||
.sorted().toArray(); |
|||
log.info("Generated new partitions: {}", Arrays.toString(partitions)); |
|||
return createTpis(partitions); |
|||
}; |
|||
|
|||
int iterations = 100; |
|||
Queue latestConfig = queue; |
|||
Set<TopicPartitionInfo> latestPartitions = Collections.emptySet(); |
|||
for (int i = 1; i <= iterations; i++) { |
|||
boolean updateQueueConfig = RandomUtils.nextBoolean(); |
|||
boolean updatePartitions = !updateQueueConfig; |
|||
if (updateQueueConfig) { |
|||
latestConfig = queueConfigUpdater.get(); |
|||
consumerManager.update(latestConfig); |
|||
} |
|||
if (updatePartitions) { |
|||
latestPartitions = partitionsUpdater.get(); |
|||
consumerManager.update(latestPartitions); |
|||
} |
|||
Thread.sleep(RandomUtils.nextLong(0, 200)); |
|||
} |
|||
if (latestPartitions.isEmpty()) { |
|||
do { |
|||
latestPartitions = partitionsUpdater.get(); |
|||
} while (latestPartitions.isEmpty()); |
|||
consumerManager.update(latestPartitions); |
|||
} |
|||
|
|||
Queue expectedConfig = latestConfig; |
|||
Set<TopicPartitionInfo> expectedPartitions = latestPartitions; |
|||
await().atMost(5, TimeUnit.SECONDS) |
|||
.untilAsserted(() -> { |
|||
assertThat(consumerManager.getQueue()).isEqualTo(expectedConfig); |
|||
assertThat(consumerManager.getPartitions()).isEqualTo(expectedPartitions); |
|||
}); |
|||
|
|||
if (expectedConfig.isConsumerPerPartition()) { |
|||
await().atMost(5, TimeUnit.SECONDS).until(() -> { |
|||
for (TopicPartitionInfo partition : expectedPartitions) { |
|||
if (consumers.stream().noneMatch(consumer -> consumer.subscribed && |
|||
consumer.pollingStarted && Set.of(partition).equals(consumer.getPartitions()))) { |
|||
return false; |
|||
} |
|||
} |
|||
return consumers.size() == expectedPartitions.size(); |
|||
}); |
|||
} else { |
|||
await().atMost(5, TimeUnit.SECONDS).until(() -> { |
|||
return consumers.size() == 1 && consumers.stream() |
|||
.anyMatch(consumer -> consumer.subscribed && consumer.pollingStarted && |
|||
expectedPartitions.equals(consumer.getPartitions())); |
|||
}); |
|||
} |
|||
Mockito.reset(ruleEngineConsumerContext.getSubmitStrategyFactory()); |
|||
Mockito.reset(ruleEngineConsumerContext.getProcessingStrategyFactory()); |
|||
consumers.forEach(Mockito::clearInvocations); |
|||
|
|||
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
for (TestConsumer consumer : consumers) { |
|||
verify(consumer, atLeastOnce().description("consumer " + consumer.topics)).poll(expectedConfig.getPollInterval()); |
|||
} |
|||
verify(ruleEngineConsumerContext.getSubmitStrategyFactory(), atLeastOnce()).newInstance(any(), eq(expectedConfig.getSubmitStrategy())); |
|||
verify(ruleEngineConsumerContext.getProcessingStrategyFactory(), atLeastOnce()).newInstance(any(), eq(expectedConfig.getProcessingStrategy())); |
|||
}); |
|||
} |
|||
|
|||
private void verifySubscribedAndLaunched(TestConsumer consumer, Set<TopicPartitionInfo> expectedPartitions) { |
|||
await().atMost(2, TimeUnit.SECONDS) |
|||
.until(() -> consumer.subscribed && consumer.getPartitions().equals(expectedPartitions) && consumer.pollingStarted); |
|||
verify(consumer, times(1)).subscribe(any()); |
|||
verify(consumer).subscribe(eq(expectedPartitions)); |
|||
verify(consumer).doSubscribe(argThat(topics -> topics.containsAll(expectedPartitions.stream() |
|||
.map(TopicPartitionInfo::getFullTopicName).collect(Collectors.toList())))); |
|||
verify(consumer, atLeastOnce()).poll(eq((long) queue.getPollInterval())); |
|||
verify(consumer, atLeastOnce()).doPoll(eq((long) queue.getPollInterval())); |
|||
verify(consumer, never()).unsubscribe(); |
|||
Mockito.reset(consumer); |
|||
} |
|||
|
|||
private void verifySubscribedAndLaunched(TestConsumer consumer, int... expectedPartitions) { |
|||
verifySubscribedAndLaunched(consumer, createTpis(expectedPartitions)); |
|||
} |
|||
|
|||
private void verifyUnsubscribedAndStopped(TestConsumer consumer) { |
|||
await().atMost(2, TimeUnit.SECONDS) |
|||
.until(() -> !consumer.subscribed && !consumer.topics.isEmpty()); |
|||
verify(consumer, never()).subscribe(any()); |
|||
verify(consumer, never()).doSubscribe(any()); |
|||
assertThat(consumers).doesNotContain(consumer); |
|||
Mockito.reset(consumer); |
|||
} |
|||
|
|||
private void verifyNotTouched(TestConsumer consumer) { |
|||
verify(consumer, never()).subscribe(any()); |
|||
verify(consumer, never()).subscribe(); |
|||
verify(consumer, never()).doSubscribe(any()); |
|||
verify(consumer, never()).unsubscribe(); |
|||
verify(consumer, never()).doUnsubscribe(); |
|||
} |
|||
|
|||
private void verifyMsgProcessed(TbMsg tbMsg) { |
|||
await().atMost(2, TimeUnit.SECONDS).untilAsserted(() -> { |
|||
verify(actorContext, atLeastOnce()).tell(argThat(msg -> { |
|||
return ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(tbMsg.getId()); |
|||
})); |
|||
}); |
|||
} |
|||
|
|||
// for consumer-per-partition
|
|||
private TestConsumer getConsumer(TopicPartitionInfo tpi) { |
|||
return await().atMost(5, TimeUnit.SECONDS) |
|||
.until(() -> consumers.stream() |
|||
.filter(consumer -> consumer.getPartitions() != null && |
|||
consumer.getPartitions().size() == 1 && |
|||
consumer.getPartitions().contains(tpi)) |
|||
.findFirst().orElse(null), Objects::nonNull); |
|||
} |
|||
|
|||
private TestConsumer getConsumer(int partition) { |
|||
return await().atMost(5, TimeUnit.SECONDS) |
|||
.until(() -> consumers.stream() |
|||
.filter(consumer -> consumer.getPartitions() != null && |
|||
consumer.getPartitions().size() == 1 && |
|||
consumer.getPartitions().stream() |
|||
.anyMatch(tpi -> tpi.getPartition().get().equals(partition))) |
|||
.findFirst().orElse(null), Objects::nonNull); |
|||
} |
|||
|
|||
// for single consumer
|
|||
private TestConsumer getConsumer() { |
|||
return await().atMost(5, TimeUnit.SECONDS) |
|||
.until(() -> consumers.size() == 1 ? consumers.iterator().next() : null, Objects::nonNull); |
|||
} |
|||
|
|||
private Set<TopicPartitionInfo> createTpis(int... partitions) { |
|||
return Arrays.stream(partitions) |
|||
.mapToObj(n -> TopicPartitionInfo.builder() |
|||
.tenantId(queue.getTenantId()) |
|||
.topic(queue.getTopic()) |
|||
.partition(n) |
|||
.myPartition(true) |
|||
.build()) |
|||
.collect(Collectors.toSet()); |
|||
} |
|||
|
|||
|
|||
class TestConsumer extends AbstractTbQueueConsumerTemplate<TbMsg, TbProtoQueueMsg<ToRuleEngineMsg>> { |
|||
|
|||
@Getter |
|||
private List<String> topics; |
|||
|
|||
private boolean subscribed; |
|||
private boolean pollingStarted; |
|||
|
|||
private TbMsg testMsg; |
|||
|
|||
public TestConsumer(String topic) { |
|||
super(topic); |
|||
} |
|||
|
|||
@SneakyThrows |
|||
@Override |
|||
protected List<TbMsg> doPoll(long durationInMillis) { |
|||
log.debug("doPoll({} ms)", durationInMillis); |
|||
if (!subscribed) { |
|||
throw new IllegalStateException("Cannot poll because not subscribed"); |
|||
} |
|||
pollingStarted = true; |
|||
if (testMsg != null && RandomUtils.nextBoolean()) { |
|||
Thread.sleep(100); |
|||
return List.of(testMsg); |
|||
} |
|||
return Collections.emptyList(); |
|||
} |
|||
|
|||
@Override |
|||
protected TbProtoQueueMsg<ToRuleEngineMsg> decode(TbMsg tbMsg) throws IOException { |
|||
log.debug("decode()"); |
|||
UUID tenantId = UUID.randomUUID(); |
|||
return new TbProtoQueueMsg<>(UUID.randomUUID(), ToRuleEngineMsg.newBuilder() |
|||
.setTenantIdMSB(tenantId.getMostSignificantBits()) |
|||
.setTenantIdLSB(tenantId.getLeastSignificantBits()) |
|||
.addRelationTypes("Success") |
|||
.setTbMsg(TbMsg.toByteString(tbMsg)) |
|||
.build()); |
|||
} |
|||
|
|||
@Override |
|||
protected void doSubscribe(List<String> topicNames) { |
|||
log.debug("doSubscribe({})", topicNames); |
|||
this.topics = topicNames; |
|||
subscribed = true; |
|||
} |
|||
|
|||
@Override |
|||
protected void doCommit() { |
|||
if (!subscribed) { |
|||
throw new IllegalStateException("Cannot commit because not subscribed"); |
|||
} |
|||
log.debug("doCommit() totalConsumedMsgs = {}", totalConsumedMsgs.incrementAndGet()); |
|||
} |
|||
|
|||
@Override |
|||
public void unsubscribe() { |
|||
super.unsubscribe(); |
|||
consumers.remove(this); |
|||
} |
|||
|
|||
@Override |
|||
protected void doUnsubscribe() { |
|||
log.debug("doUnsubscribe()"); |
|||
if (!subscribed) { |
|||
throw new IllegalStateException("Already unsubscribed!"); |
|||
} |
|||
subscribed = false; |
|||
} |
|||
|
|||
@Override |
|||
protected boolean isLongPollingSupported() { |
|||
return false; |
|||
} |
|||
|
|||
public Set<TopicPartitionInfo> getPartitions() { |
|||
return partitions; |
|||
} |
|||
|
|||
public void setUpTestMsg() { |
|||
testMsg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, new DeviceId(UUID.randomUUID()), new TbMsgMetaData(), "{}"); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,105 @@ |
|||
/** |
|||
* Copyright © 2016-2023 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.common.util; |
|||
|
|||
import lombok.SneakyThrows; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; |
|||
import org.bouncycastle.cert.X509CertificateHolder; |
|||
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; |
|||
import org.bouncycastle.jce.provider.BouncyCastleProvider; |
|||
import org.bouncycastle.openssl.PEMDecryptorProvider; |
|||
import org.bouncycastle.openssl.PEMEncryptedKeyPair; |
|||
import org.bouncycastle.openssl.PEMKeyPair; |
|||
import org.bouncycastle.openssl.PEMParser; |
|||
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; |
|||
import org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder; |
|||
import org.bouncycastle.operator.InputDecryptorProvider; |
|||
import org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo; |
|||
import org.bouncycastle.pkcs.jcajce.JcePKCSPBEInputDecryptorProviderBuilder; |
|||
import org.thingsboard.server.common.data.StringUtils; |
|||
|
|||
import java.io.StringReader; |
|||
import java.security.PrivateKey; |
|||
import java.security.Security; |
|||
import java.security.cert.X509Certificate; |
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
|
|||
@Slf4j |
|||
public class SslUtil { |
|||
|
|||
public static final char[] EMPTY_PASS = {}; |
|||
|
|||
public static final BouncyCastleProvider DEFAULT_PROVIDER = new BouncyCastleProvider(); |
|||
|
|||
static { |
|||
if (Security.getProvider(BouncyCastleProvider.PROVIDER_NAME) == null) { |
|||
Security.addProvider(DEFAULT_PROVIDER); |
|||
} |
|||
} |
|||
|
|||
private SslUtil() { |
|||
} |
|||
|
|||
@SneakyThrows |
|||
public static List<X509Certificate> readCertFile(String fileContent) { |
|||
List<X509Certificate> certificates = new ArrayList<>(); |
|||
JcaX509CertificateConverter certConverter = new JcaX509CertificateConverter(); |
|||
try (PEMParser pemParser = new PEMParser(new StringReader(fileContent))) { |
|||
Object object; |
|||
while ((object = pemParser.readObject()) != null) { |
|||
if (object instanceof X509CertificateHolder) { |
|||
X509Certificate x509Cert = certConverter.getCertificate((X509CertificateHolder) object); |
|||
certificates.add(x509Cert); |
|||
} |
|||
} |
|||
} |
|||
return certificates; |
|||
} |
|||
|
|||
@SneakyThrows |
|||
public static PrivateKey readPrivateKey(String fileContent, String passStr) { |
|||
char[] password = StringUtils.isEmpty(passStr) ? EMPTY_PASS : passStr.toCharArray(); |
|||
|
|||
PrivateKey privateKey = null; |
|||
JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter(); |
|||
if (StringUtils.isNotEmpty(fileContent)) { |
|||
try (PEMParser pemParser = new PEMParser(new StringReader(fileContent))) { |
|||
Object object; |
|||
while ((object = pemParser.readObject()) != null) { |
|||
if (object instanceof PEMEncryptedKeyPair) { |
|||
PEMDecryptorProvider decProv = new JcePEMDecryptorProviderBuilder().build(password); |
|||
privateKey = keyConverter.getKeyPair(((PEMEncryptedKeyPair) object).decryptKeyPair(decProv)).getPrivate(); |
|||
break; |
|||
} else if (object instanceof PKCS8EncryptedPrivateKeyInfo) { |
|||
InputDecryptorProvider decProv = |
|||
new JcePKCSPBEInputDecryptorProviderBuilder().setProvider(DEFAULT_PROVIDER).build(password); |
|||
privateKey = keyConverter.getPrivateKey(((PKCS8EncryptedPrivateKeyInfo) object).decryptPrivateKeyInfo(decProv)); |
|||
break; |
|||
} else if (object instanceof PEMKeyPair) { |
|||
privateKey = keyConverter.getKeyPair((PEMKeyPair) object).getPrivate(); |
|||
break; |
|||
} else if (object instanceof PrivateKeyInfo) { |
|||
privateKey = keyConverter.getPrivateKey((PrivateKeyInfo) object); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return privateKey; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,13 @@ |
|||
-----BEGIN CERTIFICATE----- |
|||
MIICCDCCAa2gAwIBAgIUGx/SZqIWza/i/gaKFUVIyTEu2oMwCgYIKoZIzj0EAwIw |
|||
WTELMAkGA1UEBhMCVUExDTALBgNVBAgMBEtZSVYxDTALBgNVBAcMBEtZSVYxCzAJ |
|||
BgNVBAoMAlRCMQswCQYDVQQLDAJUQjESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTIz |
|||
MTAxNjEyMjMyMVoXDTI0MTAxNTEyMjMyMVowWTELMAkGA1UEBhMCVUExDTALBgNV |
|||
BAgMBEtZSVYxDTALBgNVBAcMBEtZSVYxCzAJBgNVBAoMAlRCMQswCQYDVQQLDAJU |
|||
QjESMBAGA1UEAwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE |
|||
z4MgawieJfVc5zUOPiw5WFxfHGJf7dOMsHvudDxdOs27PXPbJfi09BVJ3+JjNxA2 |
|||
wQz9KUk877oWRYrN/e+MbKNTMFEwHQYDVR0OBBYEFDTV8VD3m+8IBQOBJ+V/bcbl |
|||
4preMB8GA1UdIwQYMBaAFDTV8VD3m+8IBQOBJ+V/bcbl4preMA8GA1UdEwEB/wQF |
|||
MAMBAf8wCgYIKoZIzj0EAwIDSQAwRgIhAOgIkl8j8m51W7pWlNUAuUnHnOVhVjGr |
|||
h8Rc6cbwTapKAiEA2CLrduTweXEF5fBRtWyOsG8c9af6+MWHKmwHL1IDw9Q= |
|||
-----END CERTIFICATE----- |
|||
@ -0,0 +1,8 @@ |
|||
-----BEGIN EC PARAMETERS----- |
|||
BggqhkjOPQMBBw== |
|||
-----END EC PARAMETERS----- |
|||
-----BEGIN EC PRIVATE KEY----- |
|||
MHcCAQEEIIEd0mMh0EEy3fMbOpbUY6kW0oAYcaYoTvoVpZxDr5qZoAoGCCqGSM49 |
|||
AwEHoUQDQgAEz4MgawieJfVc5zUOPiw5WFxfHGJf7dOMsHvudDxdOs27PXPbJfi0 |
|||
9BVJ3+JjNxA2wQz9KUk877oWRYrN/e+MbA== |
|||
-----END EC PRIVATE KEY----- |
|||
@ -0,0 +1,32 @@ |
|||
-----BEGIN CERTIFICATE----- |
|||
MIIFkzCCA3ugAwIBAgIUUQa3cWUVoF58dzg8ycb/y7SdCj8wDQYJKoZIhvcNAQEL |
|||
BQAwWTELMAkGA1UEBhMCVUExDTALBgNVBAgMBEtZSVYxDTALBgNVBAcMBEtZSVYx |
|||
CzAJBgNVBAoMAlRCMQswCQYDVQQLDAJUQjESMBAGA1UEAwwJbG9jYWxob3N0MB4X |
|||
DTIzMTAxMzEyMzcwMVoXDTI0MTAxMjEyMzcwMVowWTELMAkGA1UEBhMCVUExDTAL |
|||
BgNVBAgMBEtZSVYxDTALBgNVBAcMBEtZSVYxCzAJBgNVBAoMAlRCMQswCQYDVQQL |
|||
DAJUQjESMBAGA1UEAwwJbG9jYWxob3N0MIICIjANBgkqhkiG9w0BAQEFAAOCAg8A |
|||
MIICCgKCAgEAsHn27cH+pYFI0eJYer8ww29g/xlKgr9aarYlkILeXnBhPPHBCXG+ |
|||
FegeMpHa8FUPANIqYJiwM13altO6hMLPa0J7+nQhwF5NCbxzAdi/kU8ofhIwJH+K |
|||
gOsD3BKdR7Ua7KMDQFnGTFRR9ZxsuYZ/0AHuzPHwxSLUvvMbiWbu5P2FYMrEyyLo |
|||
uVVihZPkeBhcnI6SJRyCdMdMy282nWQ+47gAUI3cFa7dXxUcXvRbbToMNPTIDUy4 |
|||
VhxJYhL4T6ED0Ds7tZRsG71LcMfw2RQUgiS1FuYh+O7N8lUMukMy2/umQluM0+qB |
|||
CYWa2p1UCbVzlrW1qgKQm1Q8E91XSR9KL/zdO8m9/uNeI1jyJu6i1cibWR7gnh6J |
|||
ChLxouQlrBzuLzSz7PG8q1MOWi+oHYJWSvmsckbQDhwEsfhFrYVgndJdxnmlkzvS |
|||
1OP7RGSYXLfMF+ZxC2YEJiU65QACCl2IHknyNiL8Jg5ahXgZMNshyfvOv5RB5jnz |
|||
4vzRpGhUYCcyLzORT+5gY9ZYbX/51cOomQV1ryTTQs+zA8mfEVLjbbLqvYdI84LC |
|||
3chMdcOm8Z9U1xdb2FX/c724XDyPnQNy1PLggzqvOFZzLeey0nBVUWyVrcCydbS5 |
|||
PAvVoAucO8kqP6b7uB7QnDeGaCiAVF+9QaXxjyQEdLEu3z5JMM7uH4UCAwEAAaNT |
|||
MFEwHQYDVR0OBBYEFHXrT3L+O3kJ2xNZ4Lh1ThGG6M1vMB8GA1UdIwQYMBaAFHXr |
|||
T3L+O3kJ2xNZ4Lh1ThGG6M1vMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL |
|||
BQADggIBABFDqkTdxsJyu5L2x3WSpw4jw4vgJYlgUvTSeU8i54DaSzncLZdpWsqb |
|||
37LFHkvlquIfvOi9f9EBT2KuZwaajPQBNE4m7kLchoAv8Mc8a2EXhN2caXamnN3F |
|||
vWAb4QW/VHKKz2vWprfARwqQO58TEPgzU4FcW1lPpX2ULBeoS5kZDDEgyfaFZETF |
|||
FnsSb9E3/YuH6sJCu880kbW8BIyQmbUytrbn+16J/iaZBwc+iD49t2VBLDOsr1x4 |
|||
5qzxknG3h9wiz9ob9v6hWFfMpdiK12S0P5FVsUkCpxoae8jc8rPS7W3HaYowFjVR |
|||
OHOjtWy5/SV2rypKShjg9manf6iwGdTGkD0qoqsRs9JQFabjNR23IQv+1OUbrEVC |
|||
DbS65IjwLJlIZBX8JuJaU3I8zqj/9q7TtRDp1NCiG5W0NgipERRCciWaLJ+Fz6Lu |
|||
QzhI2ZOJrl49hmr6e0bsyNUv9l89WcbKm3/IC+V7o80uADYCOaz2jDGfKbvcPHzN |
|||
mTma8qVsjpcedttsvNMyZOsM/Rpk+dbChgReRVvcmzQV0izEvJJBWFr4HrfcM6Ev |
|||
sZrnUiT8ENUZqiK40d+T3Q6JheHwm+ENI1aUDkYCpoWZ/PzKe+Bj8lR8dPvmeVrc |
|||
eiwS37nMFO/5X7aIkszTouScNO99cN0UqPldfJo+8ZTbai5VFxGD |
|||
-----END CERTIFICATE----- |
|||
@ -0,0 +1,22 @@ |
|||
-----BEGIN CERTIFICATE----- |
|||
MIIDkzCCAnugAwIBAgIUKAylzm/K5OfbXSjm1zY9bX1a8HQwDQYJKoZIhvcNAQEL |
|||
BQAwWTELMAkGA1UEBhMCVUExDTALBgNVBAgMBEtZSVYxDTALBgNVBAcMBEtZSVYx |
|||
CzAJBgNVBAoMAlRCMQswCQYDVQQLDAJUQjESMBAGA1UEAwwJbG9jYWxob3N0MB4X |
|||
DTIzMTAxNjIxMDQwNVoXDTI0MTAxNTIxMDQwNVowWTELMAkGA1UEBhMCVUExDTAL |
|||
BgNVBAgMBEtZSVYxDTALBgNVBAcMBEtZSVYxCzAJBgNVBAoMAlRCMQswCQYDVQQL |
|||
DAJUQjESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A |
|||
MIIBCgKCAQEA87nLliszEWml8QvyAC+H80NZCxf4TcG826NBOp0AUPJ8xQBHCzc1 |
|||
t1ohVm2/fn2VJZAYXG2xSVcHyXjjjv3iGLE2AIDbXh06/yFg4TVjlbrWrAHFehyN |
|||
FwrK8ez36oGLa3ZVq+mx1fLfBQw5mStbh09NXmKTzqP6m9ggKtt63cUwoWdUTemT |
|||
qrjryJd69LiJi+MVqtbKO2j30/lgAZmaHtbojl9EcvWfeXLb20TnXRIctaIS1VGo |
|||
SluzjbNQErdN/VRW4RAOP6UFsK0xID2EuLODBmAWnI49fXO/OS+u3Kd3suABE0o9 |
|||
slfDXqNTp0r5N0OoSAFcc4EsV3+9Gf+mqwIDAQABo1MwUTAdBgNVHQ4EFgQUhS5K |
|||
XQDxGvaBCpKY1de+JZl8zjYwHwYDVR0jBBgwFoAUhS5KXQDxGvaBCpKY1de+JZl8 |
|||
zjYwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAxez4vLtBCBNM |
|||
l6AQghViNAR9iiwYMxUwKwlU+uZftRGnT+6dXgfTR3PV6LCfMMmtuNs0JTGy0ff8 |
|||
erbzfZxExvHfIFXCwepwTWawQhvRRn9GHOJXIzESDRRhsXoJDzd0JVOx0wWxp1cz |
|||
EUts+ZbKLoC+kIhsOGY+0a+sopeV2rMO5bUMpA8P0mKZlGynEGMLzKxz65E/IA9h |
|||
EQKpJjpvYfN+7eUkF6ZRXNV2LI/8BCoG6mOVoOMEXnloPwwBtOevoCB43U3sT9Er |
|||
WQWgZdbeI4gEyEqgMTibNogZZF0KW+5as3iv7avDd8pCgONvD0iwKSlvi9RNjiw8 |
|||
p6bwNmBcuA== |
|||
-----END CERTIFICATE----- |
|||
@ -0,0 +1,30 @@ |
|||
-----BEGIN ENCRYPTED PRIVATE KEY----- |
|||
MIIFLTBXBgkqhkiG9w0BBQ0wSjApBgkqhkiG9w0BBQwwHAQILTHGLs8mGUkCAggA |
|||
MAwGCCqGSIb3DQIJBQAwHQYJYIZIAWUDBAEqBBAUnb+mChJ9Wu9F7q6ingLYBIIE |
|||
0Mwe8Zl6fs5kiT1AL7gXrSSXmyOVvxDFt0V3TX1w399VIadcUUO0RQeEqXoMEUzl |
|||
5at99Xmoo7ByvZSPWCdV4d/j5Yw+Z15euxzclSZJnmBgvQx8cFPLCTTaqlgv5r/Y |
|||
lTzBrczbgruMFKtzkvHgvZYiagccOtFHDNC1fUBcUR8dkOOsgTiy4QCVo5kkXHt/ |
|||
rblE/uVWI8/E318WZBZaz68HcmGIG6ivdMEsKskSKrH6zA3eLjyGB+zSAIPRB/Mp |
|||
s7Rj+RK74zFSYyaq6fgdTG8lug2f3rHImSOtQThcfme5XL4P66rUgJsh/sml1vqH |
|||
e848VArGoVy3wfvkss6CyXIJevhFh2xVWRyVqG1nraw2QssEnVqIZvdAnaJJONX8 |
|||
r1trjHkZ1JD0nO9Mns1c/bw6hjK6W3UwGfgEMM9VQ7wNI2B6CFOXHKTHg6r3us4k |
|||
UqaQtfbpTv+d0YKF/rolDcK+mK/rkxP3rtJA7Ud8nQ4VjxyYX4jTs3/BzDkP7Tsj |
|||
5gKy9e1zuTF+MUWs3G5oKGQUKVcbgoYJ+iOqgVSd1JbecRo7Pl8XgDv3I9RWHzUr |
|||
EAMjVJjRU9tJuPvILFBkpl1/OPC9sGxJz9Hy9qLtEGhGLUhNz6XmIy/aWPCyA4ea |
|||
ZES/n7f+aYmXIxulcxS7MUejkwl1EtNqVyrKvLiRBXjBk2HPCb7Te8fRu/LpHZXN |
|||
D7wjymg1fGZPPFzdKh7wMdAKiK50KIMGXTxS6kHb6qW/755oSUjWRLPGcPCfdbjn |
|||
UiC5WC9FCog6jfRq1rMlz5b8yjyb+UbJ6N4qFSHeQf+7WLeS0Di8k3cLDSWl6T7M |
|||
z82ePof2V2TrADNpXvAcR78uiDfUpfa7DhkimvbBZpRVaaQVU7unxUPVc93WgCWV |
|||
a7kBuFJAGt+Wt1LPPD/5KOQ5pRINSoh4VhiZzmnY/m7RDPWEaL5gsMjF5bFoP2UZ |
|||
MuyAiTmvO299lJDrdQyQds7yafO9PrTE4msuqpuZSHW7ZZIdRO6EXlfZ1We3icWr |
|||
+jE47bUIEl04k1PvXyv9LeoqlHZJTagxZIerMOEwq976MaVR7RJbqUpRUV9FFNCL |
|||
gTouPCwUcVtLCaTYQjz/+12/YeVkiBHIWkI8Vv5Mn3Vkwy303ygGCQ+brht1e8x+ |
|||
BbgzSpiX8aHiEuDAKooewxnKrf3Dk9BcwbnftxajOZcZ3iphk07t5VLRy86zLKCq |
|||
ZOY+KymcDCGaOPnSHFrZK3lZOOT+BB9Vi6EYAkxZCZgoDsb/voMEdpPlxK7ultf6 |
|||
is5/JQeQbeP9wbNh4Ru2x3p5Ir4wffhh1KT3UsMobusosTo55ErhMHPvH5amppwq |
|||
IrxdM7heo7JMaNKmtol4y45IqSt58iluF5m2Ds4m85xjDteRgEOjtNBStFxPCMAB |
|||
KUEzRxEaplAcJfzYzpYtoHZuZ8W3Gi7yeXQ+BV8Q9DeaZc5DDDhIcIkOoHOAKhit |
|||
d7Gpr8hpwc60AgHRjua8OdbhM4ntT1xnyDEqZbP8mN+UBAohOHMrqo+f4DL1ibB9 |
|||
qSwfdLiVItsEBqlfANV3i9rEeKNH5tOFwFCmmH1yBCSDCtWPmPUJ5tZae6fIetK5 |
|||
uSstFXLaDpm6fcHgkeqrnyteWpnk5X5SQQ+fMHPjQ2vp |
|||
-----END ENCRYPTED PRIVATE KEY----- |
|||
@ -0,0 +1,22 @@ |
|||
-----BEGIN CERTIFICATE----- |
|||
MIIDkzCCAnugAwIBAgIUIo+5l07ZrQR/LxEEmUbnn4yxCwIwDQYJKoZIhvcNAQEL |
|||
BQAwWTELMAkGA1UEBhMCVUExDTALBgNVBAgMBEtZSVYxDTALBgNVBAcMBEtZSVYx |
|||
CzAJBgNVBAoMAlRCMQswCQYDVQQLDAJUQjESMBAGA1UEAwwJbG9jYWxob3N0MB4X |
|||
DTIzMTAxNjIxMDMxNVoXDTI0MTAxNTIxMDMxNVowWTELMAkGA1UEBhMCVUExDTAL |
|||
BgNVBAgMBEtZSVYxDTALBgNVBAcMBEtZSVYxCzAJBgNVBAoMAlRCMQswCQYDVQQL |
|||
DAJUQjESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A |
|||
MIIBCgKCAQEA87nLliszEWml8QvyAC+H80NZCxf4TcG826NBOp0AUPJ8xQBHCzc1 |
|||
t1ohVm2/fn2VJZAYXG2xSVcHyXjjjv3iGLE2AIDbXh06/yFg4TVjlbrWrAHFehyN |
|||
FwrK8ez36oGLa3ZVq+mx1fLfBQw5mStbh09NXmKTzqP6m9ggKtt63cUwoWdUTemT |
|||
qrjryJd69LiJi+MVqtbKO2j30/lgAZmaHtbojl9EcvWfeXLb20TnXRIctaIS1VGo |
|||
SluzjbNQErdN/VRW4RAOP6UFsK0xID2EuLODBmAWnI49fXO/OS+u3Kd3suABE0o9 |
|||
slfDXqNTp0r5N0OoSAFcc4EsV3+9Gf+mqwIDAQABo1MwUTAdBgNVHQ4EFgQUhS5K |
|||
XQDxGvaBCpKY1de+JZl8zjYwHwYDVR0jBBgwFoAUhS5KXQDxGvaBCpKY1de+JZl8 |
|||
zjYwDwYDVR0TAQH/BAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAiUTgjnsVIg90 |
|||
Dm+XSlscIPbZEj/mJanoFFfAfbVJz1DadygG9viVUMf3jVQBcsJGeBDckR2b3OHY |
|||
82cQVpdu3Heqld+gnfsyi8QBi7EdK4i0q8NVqFgpw83KxNm9xt7xrgHtxhE0kWfW |
|||
dpTgeIu0hFf0qLUObw/g8+0awBuxNY2crLtLXQM/dRgtv5Zt/DilW3jMLAE5wke+ |
|||
/HM4/emOJO6DSI9BC8iUsmNpIpq45267jcjpczNBo3ap7Bad+jM/paRDng9Uavvr |
|||
VCsaJFaL5HG6TtNXN60npBouOWnivPzUeuTI4PnjGRgdp3lgb0IuXbuwxIW6FVG/ |
|||
73RHc0gGOA== |
|||
-----END CERTIFICATE----- |
|||
@ -0,0 +1,30 @@ |
|||
-----BEGIN RSA PRIVATE KEY----- |
|||
Proc-Type: 4,ENCRYPTED |
|||
DEK-Info: DES-EDE3-CBC,FB5DE36A7A8B25DA |
|||
|
|||
skR15rUvZmdLzGqU5/BF4Yc3E6dxtXTvlOhuGnqH/idItMKUMWIIlQ7ZfWYF/CrC |
|||
CkeAUqhF4y+y+2eR4ejUzZKs6bYjTtkXAXAqQvCsTrTBdQSCcLwbHWLWMro0UG24 |
|||
e23Rx8kD0YC7VHqyr08NlLh94wR7kanhEeRUbmKBonZT/I9AZ5ntiBxq9QVBtc8M |
|||
f7LKIsnQDcd39cVXSo3LOJ/x7YVB//Ln1R1dexwxE0sXOyLq2hhrxzfHGuGXXW41 |
|||
/3+CeTgmX9Rzpawrq9vbVabPUgFcJlrogNRSnUAm9kz4b2zadCxEaCejVmhBy4wx |
|||
z2AJGcmE+D4VkQK1AAj5+AQQrPOIIFQnyGjwHkJSTGVTcKmttRYZBvUjdfQfj1fK |
|||
NsKOSZLzZGknM8Pz5MHgHqk70C7f+nm0uVhVuAiykA4PY4JdCAuTWJxMM2LWM3/q |
|||
rYCEMwxCGa6U92dakfC+W+d9pAbeN+xYOWkDrqG7BdAYg1P70cuMRPdd5bsq3YPp |
|||
G4n5NVyQvLlocGhZgC7NVzUtc18+rGblV4D657+GZwJnBZJN4TYey4+r2D5fv9rO |
|||
kcRVwfR704BbUBkjIzVzD7nXtBbr3ni8HSqde3g4aVL4WyV9XNvjUsYyrZ0u9Mt1 |
|||
IccAsa1xBquUNMxwO1H1mFLtzPKmFzKtlzqiiDsRQoRylwUa1k03sHKUflZRa8Sf |
|||
g4MpTRzK+vo1opMemlonty5YbvWXKlH68ioo49L8N457Y0hIUJOQgywg80NxT33t |
|||
x8y66lawd1Iv+Q7pptVxJtA4JmcdPvGwBLJZY4DacMyp/JqchAQfSQfmQ0tC+RfJ |
|||
z2By/s5wOEuVDksgp8RF1gn+VmvLyOoLK7tq+zpMO1mhfYTCMgSiz2GkNdiW6i25 |
|||
gjNWN/F62YL+9VJo4+olrcsYDFiiJq+deQk3H1tQJzu6qECfDqKDyw7IunvTwFil |
|||
5/d43LvLbRj75Kf/++xwTjfHudeTgw02/yPyELnURkUazvkOFsn7n8tU46Qm5TWh |
|||
fPFXSYxRf3m9rkKZB98YOJo595RuZyiYg9dEQX8Gybl1/7H7l4Cvw6yp71kgLrrI |
|||
JRYEt9pmWbQX97UFC3WTVMdKWakFziYVGPvFKkIzrHgcutbQVNsZ7GbO+rdWMIxr |
|||
SfUe6jCEclzGQjI9Ep4PTLjZvbusUMkoUjGasAluXFXDC4RKtpuXd4RmbOLdVuyN |
|||
OnZ5KZHFjrv4ch5PakRTViWFWSddV5CJ4fMkCG9qUHKrUWGjOvzu5rbcUzq3xJZG |
|||
9loIvlA4ekEAhQPHwx69uBmUwnCgyB9CosQGmUlwmC3KALA/EiXklTA6w0fGiiPk |
|||
uLA9oBGrVcD8Peug9Owfmj4fWbxJGP7x1UR/nZWpynIfzME0AD8MK5uqoWmQTG8I |
|||
cLSjVAB1CO//AZe4LpYQulMPq4dipmE+YnKLi0WPXuZVARDciAGGV2BfH6Iv8j9k |
|||
o9IoklcpGg22zXLoGn4tu+7Y5GcoV/mx68Gun1E/QFuY59damAqoc82EEsxr+UX0 |
|||
4vGX+KZn283xSYiilE3qpCZOER0ZUFInphUwJzzYfW3mW/AWR78tFQIQiuVKV2/a |
|||
-----END RSA PRIVATE KEY----- |
|||
@ -0,0 +1,52 @@ |
|||
-----BEGIN PRIVATE KEY----- |
|||
MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCwefbtwf6lgUjR |
|||
4lh6vzDDb2D/GUqCv1pqtiWQgt5ecGE88cEJcb4V6B4ykdrwVQ8A0ipgmLAzXdqW |
|||
07qEws9rQnv6dCHAXk0JvHMB2L+RTyh+EjAkf4qA6wPcEp1HtRrsowNAWcZMVFH1 |
|||
nGy5hn/QAe7M8fDFItS+8xuJZu7k/YVgysTLIui5VWKFk+R4GFycjpIlHIJ0x0zL |
|||
bzadZD7juABQjdwVrt1fFRxe9FttOgw09MgNTLhWHEliEvhPoQPQOzu1lGwbvUtw |
|||
x/DZFBSCJLUW5iH47s3yVQy6QzLb+6ZCW4zT6oEJhZranVQJtXOWtbWqApCbVDwT |
|||
3VdJH0ov/N07yb3+414jWPIm7qLVyJtZHuCeHokKEvGi5CWsHO4vNLPs8byrUw5a |
|||
L6gdglZK+axyRtAOHASx+EWthWCd0l3GeaWTO9LU4/tEZJhct8wX5nELZgQmJTrl |
|||
AAIKXYgeSfI2IvwmDlqFeBkw2yHJ+86/lEHmOfPi/NGkaFRgJzIvM5FP7mBj1lht |
|||
f/nVw6iZBXWvJNNCz7MDyZ8RUuNtsuq9h0jzgsLdyEx1w6bxn1TXF1vYVf9zvbhc |
|||
PI+dA3LU8uCDOq84VnMt57LScFVRbJWtwLJ1tLk8C9WgC5w7ySo/pvu4HtCcN4Zo |
|||
KIBUX71BpfGPJAR0sS7fPkkwzu4fhQIDAQABAoICABm8z+yA/Hh60Hn7vte4Bo6a |
|||
MdVChQFokvE5O2VGENRJI4VV5MdR1V0wiybo6rteTF/cRt3rptb2+yhAHNW767BC |
|||
8/3k7f82QZoH5+X/DIFOwCMS1/6as0J2BAwWkuWgXhrg81pxPWBoc8OUWq78FKvr |
|||
fD5bkrfNiqWGox946aJv7wHc0LKnlrVg5IuCtDFnrCoRCPNsowIRBvwsbhSqSBnB |
|||
/hnBdrWa2SJC2+5lSOg3LQyUHpEB/Whhm7o39gr2+q1l1iF3UgUBqHz8S/381bjd |
|||
TaPXUGETwulyyfZoUoSOwQKwg2tsqgEPgTQc+eKomgEC40m2MgzVTiW/hDlf3NvA |
|||
aEaUUU1izN/t8tuXS/UBWSsVwPeVm2oPTWTVovoj9PFMSLrJ4oM9iMHl27lKP/Xt |
|||
aShnCIu0qwVSLqwCM0HxZNZLEvJIFJe3OV7dvFlbnMiEOlDsz4k2sylFdICOOqxC |
|||
Nb61hX8n6iYmAID9hahExOAFcJfpV/MrGnF1IfNDdOim5az1k0PUZlA+50NLjOzK |
|||
umfAQpsa7ZUpjfNq6HkX5mhJXelvv+pWuvbBogG10nio13I4J/YwydC/0qaijrhp |
|||
XTuV3Or2HhGr5Fe9lpzrnWB90q7iqAgGzevds8AagE0EdUIFupx6vr28Gb3mmlvD |
|||
yObUj/9cB+eYIae1jmzJAoIBAQC9pN8I0ltyR5IuuPBcEZcqdrRV04iwZylxEcYy |
|||
TIj1YBYvU6LP8AOg671Q4vGXOCo1uq/UsCzZMPn5yFa5fMg2AHu2B51nM/NILvD0 |
|||
QCgpyvNV64q+Mci9VWoctWZs93QiQyUKe1c/vUMYlWsYXbz/8osChX+r5doxBIQx |
|||
w60aXr9FLKfVfYfBfn1nakdjOVVvAFDPyoV6Dmfn/cAfPH50NIeOGYRpVTPHwHYr |
|||
ZCcIRW9MIzmS00GogAH1BM8JjRr0F9F+rRESeJKdLSqJbgLy2LtJnvU8YjlRUVWO |
|||
FLzqaUyT2PJS9Vqbohrlk52Znq5Gl+SbGhSEx+oTH3JM/n0jAoIBAQDuOZ8bqtRY |
|||
p4BuBOPOaiHRIor+ng48m+nhXec4TuKUlwKLFJHXu+lsEZfs7BWijbRqMH4I5GZl |
|||
10EmE4mpkp843kqbFi71s7l9xWnM7jgXWSauH0O4Cleq8/9l2ZOissYzakLhNz3C |
|||
9IT0JcnHFmPOPlH4McjoKM3zWDniKI2fRn9q4DAEvRxDuB6PYxbz2NY2OAVI6xSA |
|||
bNevnyYA0bwvWeigr6dNCAs1z9QwX2oDGfIEUk3ixdGIqIkpL3WcVPXva2hRPAm4 |
|||
1gaI39+q86rEPiZJWfpasUEBY2Ho1eyENqaPTGHCTfBq5fMVntOVhs79TbQ+s70c |
|||
1Wyfo6sjHh83AoIBAQCuqdLhhRzEPDbe4WY+5dScP4gIJDOYhOseQIiSevsJQ94q |
|||
6JTjfuNYqsZKYTqxVAFMSwz2juw/fWQ+Mc3uOIcNdZR7KrhF/QrsSI+T5iMXmtxT |
|||
HgVC9wczmh+JIWmcoqxLghvzc3YANog9dCCW6H7SHMj7IYldAO3ch5RZYSdlSi5P |
|||
v7k0X9FQ3PcS8Eefk4akHV5Qgu48ZFg+yu7P1h+BV4Ah2E6j1N1D9Hbhr/RjIdBI |
|||
B4lXOUsXrg4fZLZqzZMtjWJdkXhP0sz2BktPGAuPLx4PyF+FpdG0m3x4x5DXNPRa |
|||
l01YKrGw9bRgDXzxp7xLOEpMr9CGGrnzstrLHviRAoIBAFoyuwmQvuHqWfhOJasM |
|||
CE3VFGeflKhiKEXKdjedtrCoFLBwU2ApqBHg/3MXWIG5wavLPI1FXXgF7obqMt9f |
|||
wqWXlQvvdExXhk4Wpx6Ou/IrMTgQYmWWlOcHh5YasYmSwvTIsRXxApOEXarLfADD |
|||
e3qlogelYfp1KLWQnCoDTMwXtzrSM5w3tjH1zqxfylr9qO3SfD3FtHeDvo6iZZM9 |
|||
1lDfa/MbTu8dspDnZeIC3nLaKgZ020SXveROW9CaRZ+xk4TZWCAZ6VxwvPyqN1fU |
|||
9r1jAsAXL3GTV5ec939fMDRHNP1g4Erfk74F3uo6vsYIyuqhtzNefqYiMQSoxa2A |
|||
RDUCggEAFlN3ih1gpyvErW4Vy/wUd1ckSH/lojlNjbbyXocKE2eiUnJUwTPerVwX |
|||
dI8vqvlPohfDIZqfuBVV+8hiJQGeMiAts6roTQ0pu/w1+euQ4DsOpzUErqadVSOj |
|||
h8SpvfxDxrftZSMaN6F7g0Pxlix6qt79XH3Kpfzf9BGOfCG7lslXRAjfuk+HUptK |
|||
PijoVHwMwFuZVlN8GBh3uzg+wvME92c4Vr1tHpwqjTqDwZN4RmdnrfGdDb1HJUJW |
|||
kv+fD65qKnJz1fZ0RTAcWv4bVFi5GJhZarXD3Vr3C5SH8zNZxDeR29OrSuG7+23g |
|||
wOqb/axEbvcj5sV6/4p2zz6AzFPEmQ== |
|||
-----END PRIVATE KEY----- |
|||
Loading…
Reference in new issue