|
|
|
@ -31,11 +31,26 @@ import org.springframework.test.web.servlet.ResultActions; |
|
|
|
import org.thingsboard.common.util.ThingsBoardExecutors; |
|
|
|
import org.thingsboard.server.common.data.Tenant; |
|
|
|
import org.thingsboard.server.common.data.TenantInfo; |
|
|
|
import org.thingsboard.server.common.data.TenantProfile; |
|
|
|
import org.thingsboard.server.common.data.User; |
|
|
|
import org.thingsboard.server.common.data.page.PageData; |
|
|
|
import org.thingsboard.server.common.data.page.PageLink; |
|
|
|
import org.thingsboard.server.common.data.queue.ProcessingStrategy; |
|
|
|
import org.thingsboard.server.common.data.queue.ProcessingStrategyType; |
|
|
|
import org.thingsboard.server.common.data.queue.Queue; |
|
|
|
import org.thingsboard.server.common.data.queue.SubmitStrategy; |
|
|
|
import org.thingsboard.server.common.data.queue.SubmitStrategyType; |
|
|
|
import org.thingsboard.server.common.data.security.Authority; |
|
|
|
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; |
|
|
|
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; |
|
|
|
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration; |
|
|
|
|
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.Comparator; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Random; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
@ -328,4 +343,180 @@ public abstract class BaseTenantControllerTest extends AbstractControllerTest { |
|
|
|
return Futures.allAsList(futures); |
|
|
|
} |
|
|
|
|
|
|
|
@Test |
|
|
|
public void testUpdateQueueConfigForIsolatedTenant() throws Exception { |
|
|
|
Comparator<Queue> queueComparator = Comparator.comparing(Queue::getName); |
|
|
|
final String username = "isolatedtenant@thingsboard.org"; |
|
|
|
final String password = "123456"; |
|
|
|
loginSysAdmin(); |
|
|
|
|
|
|
|
List<Queue> sysAdminQueues; |
|
|
|
PageLink pageLink = new PageLink(10); |
|
|
|
PageData<Queue> pageData; |
|
|
|
pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference<>() { |
|
|
|
}, pageLink); |
|
|
|
sysAdminQueues = pageData.getData(); |
|
|
|
|
|
|
|
Tenant tenant = new Tenant(); |
|
|
|
tenant.setTitle("Isolated tenant"); |
|
|
|
tenant = doPost("/api/tenant", tenant, Tenant.class); |
|
|
|
|
|
|
|
User tenantUser = new User(); |
|
|
|
tenantUser.setAuthority(Authority.TENANT_ADMIN); |
|
|
|
tenantUser.setTenantId(tenant.getId()); |
|
|
|
tenantUser.setEmail(username); |
|
|
|
createUserAndLogin(tenantUser, password); |
|
|
|
|
|
|
|
List<Queue> foundTenantQueues; |
|
|
|
|
|
|
|
pageLink = new PageLink(10); |
|
|
|
pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference<>() {}, pageLink); |
|
|
|
foundTenantQueues = pageData.getData(); |
|
|
|
|
|
|
|
Assert.assertEquals(sysAdminQueues, foundTenantQueues); |
|
|
|
|
|
|
|
loginSysAdmin(); |
|
|
|
|
|
|
|
TenantProfile tenantProfile = new TenantProfile(); |
|
|
|
tenantProfile.setName("isolated-tb-rule-engine"); |
|
|
|
TenantProfileData tenantProfileData = new TenantProfileData(); |
|
|
|
tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration()); |
|
|
|
tenantProfile.setProfileData(tenantProfileData); |
|
|
|
tenantProfile.setIsolatedTbRuleEngine(true); |
|
|
|
addQueueConfig(tenantProfile, "Main"); |
|
|
|
addQueueConfig(tenantProfile, "Test"); |
|
|
|
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class); |
|
|
|
|
|
|
|
tenant.setTenantProfileId(tenantProfile.getId()); |
|
|
|
doPost("/api/tenant", tenant, Tenant.class); |
|
|
|
|
|
|
|
login(username, password); |
|
|
|
|
|
|
|
pageLink = new PageLink(10); |
|
|
|
pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference<>() {}, pageLink); |
|
|
|
foundTenantQueues = pageData.getData(); |
|
|
|
|
|
|
|
Assert.assertEquals(2, foundTenantQueues.size()); |
|
|
|
|
|
|
|
List<Queue> queuesFromConfig = getQueuesFromConfig(tenantProfile.getProfileData().getQueueConfiguration(), foundTenantQueues); |
|
|
|
queuesFromConfig.sort(queueComparator); |
|
|
|
foundTenantQueues.sort(queueComparator); |
|
|
|
|
|
|
|
Assert.assertEquals(queuesFromConfig, foundTenantQueues); |
|
|
|
|
|
|
|
loginSysAdmin(); |
|
|
|
|
|
|
|
TenantProfile tenantProfile2 = new TenantProfile(); |
|
|
|
tenantProfile2.setName("isolated-tb-rule-engine2"); |
|
|
|
TenantProfileData tenantProfileData2 = new TenantProfileData(); |
|
|
|
tenantProfileData2.setConfiguration(new DefaultTenantProfileConfiguration()); |
|
|
|
tenantProfile2.setProfileData(tenantProfileData2); |
|
|
|
tenantProfile2.setIsolatedTbRuleEngine(true); |
|
|
|
addQueueConfig(tenantProfile2, "Main"); |
|
|
|
addQueueConfig(tenantProfile2, "Test"); |
|
|
|
addQueueConfig(tenantProfile2, "Test2"); |
|
|
|
tenantProfile2 = doPost("/api/tenantProfile", tenantProfile2, TenantProfile.class); |
|
|
|
|
|
|
|
tenant.setTenantProfileId(tenantProfile2.getId()); |
|
|
|
doPost("/api/tenant", tenant, Tenant.class); |
|
|
|
|
|
|
|
login(username, password); |
|
|
|
|
|
|
|
pageLink = new PageLink(10); |
|
|
|
pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference<>() {}, pageLink); |
|
|
|
foundTenantQueues = pageData.getData(); |
|
|
|
|
|
|
|
Assert.assertEquals(3, foundTenantQueues.size()); |
|
|
|
|
|
|
|
queuesFromConfig = getQueuesFromConfig(tenantProfile2.getProfileData().getQueueConfiguration(), foundTenantQueues); |
|
|
|
queuesFromConfig.sort(queueComparator); |
|
|
|
foundTenantQueues.sort(queueComparator); |
|
|
|
|
|
|
|
Assert.assertEquals(queuesFromConfig, foundTenantQueues); |
|
|
|
|
|
|
|
loginSysAdmin(); |
|
|
|
|
|
|
|
tenantProfile2.getProfileData().getQueueConfiguration().removeIf(q -> q.getName().equals("Test")); |
|
|
|
tenantProfile2.getProfileData().getQueueConfiguration().removeIf(q -> q.getName().equals("Test2")); |
|
|
|
addQueueConfig(tenantProfile2, "Test2"); |
|
|
|
addQueueConfig(tenantProfile2, "Test3"); |
|
|
|
|
|
|
|
tenantProfile2 = doPost("/api/tenantProfile", tenantProfile2, TenantProfile.class); |
|
|
|
|
|
|
|
login(username, password); |
|
|
|
|
|
|
|
pageLink = new PageLink(10); |
|
|
|
pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", new TypeReference<>() {}, pageLink); |
|
|
|
foundTenantQueues = pageData.getData(); |
|
|
|
|
|
|
|
Assert.assertEquals(3, foundTenantQueues.size()); |
|
|
|
|
|
|
|
queuesFromConfig = getQueuesFromConfig(tenantProfile2.getProfileData().getQueueConfiguration(), foundTenantQueues); |
|
|
|
queuesFromConfig.sort(queueComparator); |
|
|
|
foundTenantQueues.sort(queueComparator); |
|
|
|
|
|
|
|
Assert.assertEquals(queuesFromConfig, foundTenantQueues); |
|
|
|
|
|
|
|
loginSysAdmin(); |
|
|
|
|
|
|
|
tenant.setTenantProfileId(null); |
|
|
|
doPost("/api/tenant", tenant, Tenant.class); |
|
|
|
|
|
|
|
login(username, password); |
|
|
|
for (Queue queue : foundTenantQueues) { |
|
|
|
doGet("/api/queues/" + queue.getId()).andExpect(status().isNotFound()); |
|
|
|
} |
|
|
|
|
|
|
|
loginSysAdmin(); |
|
|
|
doDelete("/api/tenant/" + tenant.getId().getId().toString()).andExpect(status().isOk()); |
|
|
|
} |
|
|
|
|
|
|
|
private void addQueueConfig(TenantProfile tenantProfile, String queueName) { |
|
|
|
TenantProfileQueueConfiguration queueConfiguration = new TenantProfileQueueConfiguration(); |
|
|
|
queueConfiguration.setName(queueName); |
|
|
|
queueConfiguration.setTopic("tb_rule_engine." + queueName.toLowerCase()); |
|
|
|
queueConfiguration.setPollInterval(25); |
|
|
|
queueConfiguration.setPartitions(new Random().nextInt(100)); |
|
|
|
queueConfiguration.setConsumerPerPartition(true); |
|
|
|
queueConfiguration.setPackProcessingTimeout(2000); |
|
|
|
SubmitStrategy submitStrategy = new SubmitStrategy(); |
|
|
|
submitStrategy.setType(SubmitStrategyType.BURST); |
|
|
|
submitStrategy.setBatchSize(1000); |
|
|
|
queueConfiguration.setSubmitStrategy(submitStrategy); |
|
|
|
ProcessingStrategy processingStrategy = new ProcessingStrategy(); |
|
|
|
processingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES); |
|
|
|
processingStrategy.setRetries(3); |
|
|
|
processingStrategy.setFailurePercentage(0); |
|
|
|
processingStrategy.setPauseBetweenRetries(3); |
|
|
|
processingStrategy.setMaxPauseBetweenRetries(3); |
|
|
|
queueConfiguration.setProcessingStrategy(processingStrategy); |
|
|
|
TenantProfileData profileData = tenantProfile.getProfileData(); |
|
|
|
|
|
|
|
List<TenantProfileQueueConfiguration> configs = profileData.getQueueConfiguration(); |
|
|
|
if (configs == null) { |
|
|
|
configs = new ArrayList<>(); |
|
|
|
} |
|
|
|
configs.add(queueConfiguration); |
|
|
|
profileData.setQueueConfiguration(configs); |
|
|
|
tenantProfile.setProfileData(profileData); |
|
|
|
} |
|
|
|
|
|
|
|
private List<Queue> getQueuesFromConfig(List<TenantProfileQueueConfiguration> queueConfiguration, List<Queue> queues) { |
|
|
|
List<Queue> result = new ArrayList<>(); |
|
|
|
Map<String, Queue> queueMap = new HashMap<>(); |
|
|
|
for (Queue queue : queues) { |
|
|
|
queueMap.put(queue.getName(), queue); |
|
|
|
} |
|
|
|
|
|
|
|
for (TenantProfileQueueConfiguration config : queueConfiguration) { |
|
|
|
Queue queue = queueMap.get(config.getName()); |
|
|
|
if (queue != null) { |
|
|
|
Queue expectedQueue = new Queue(queue.getTenantId(), config); |
|
|
|
expectedQueue.setId(queue.getId()); |
|
|
|
expectedQueue.setCreatedTime(queue.getCreatedTime()); |
|
|
|
result.add(queue); |
|
|
|
} |
|
|
|
} |
|
|
|
return result; |
|
|
|
} |
|
|
|
} |
|
|
|
|