Browse Source

Fix race condition on tenant creation; refactoring

pull/10472/head
ViacheslavKlimov 2 years ago
parent
commit
9d1f751f3a
  1. 89
      application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java
  2. 18
      application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java
  3. 16
      application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java
  4. 16
      application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java
  5. 3
      common/dao-api/src/main/java/org/thingsboard/server/dao/tenant/TenantService.java
  6. 3
      dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotificationSettingsService.java
  7. 31
      dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java

89
application/src/main/java/org/thingsboard/server/service/entitiy/EntityStateSourcingListener.java

@ -69,80 +69,71 @@ public class EntityStateSourcingListener {
@TransactionalEventListener(fallbackExecution = true)
public void handleEvent(SaveEntityEvent<?> event) {
log.trace("[{}] SaveEntityEvent called: {}", event.getTenantId(), event);
TenantId tenantId = event.getTenantId();
EntityId entityId = event.getEntityId();
EntityType entityType = entityId.getEntityType();
log.debug("[{}][{}][{}] Handling entity save event: {}", tenantId, entityType, entityId, event);
boolean isCreated = event.getCreated() != null && event.getCreated();
ComponentLifecycleEvent lifecycleEvent = isCreated ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED;
switch (entityType) {
case ASSET:
case ASSET_PROFILE:
case ENTITY_VIEW:
case NOTIFICATION_RULE:
case ASSET, ASSET_PROFILE, ENTITY_VIEW, NOTIFICATION_RULE -> {
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, lifecycleEvent);
break;
case RULE_CHAIN:
}
case RULE_CHAIN -> {
RuleChain ruleChain = (RuleChain) event.getEntity();
if (RuleChainType.CORE.equals(ruleChain.getType())) {
tbClusterService.broadcastEntityStateChangeEvent(ruleChain.getTenantId(), ruleChain.getId(), lifecycleEvent);
}
break;
case TENANT:
}
case TENANT -> {
Tenant tenant = (Tenant) event.getEntity();
onTenantUpdate(tenant, lifecycleEvent);
break;
case TENANT_PROFILE:
}
case TENANT_PROFILE -> {
TenantProfile tenantProfile = (TenantProfile) event.getEntity();
onTenantProfileUpdate(tenantProfile, lifecycleEvent);
break;
case DEVICE:
}
case DEVICE -> {
onDeviceUpdate(event.getEntity(), event.getOldEntity());
break;
case DEVICE_PROFILE:
}
case DEVICE_PROFILE -> {
DeviceProfile deviceProfile = (DeviceProfile) event.getEntity();
onDeviceProfileUpdate(deviceProfile, event.getOldEntity(), isCreated);
break;
case EDGE:
}
case EDGE -> {
handleEdgeEvent(tenantId, entityId, event.getEntity(), lifecycleEvent);
break;
case TB_RESOURCE:
}
case TB_RESOURCE -> {
TbResource tbResource = (TbResource) event.getEntity();
tbClusterService.onResourceChange(tbResource, null);
break;
case API_USAGE_STATE:
}
case API_USAGE_STATE -> {
ApiUsageState apiUsageState = (ApiUsageState) event.getEntity();
tbClusterService.onApiStateChange(apiUsageState, null);
break;
default:
break;
}
default -> {}
}
}
@TransactionalEventListener(fallbackExecution = true)
public void handleEvent(DeleteEntityEvent<?> event) {
log.trace("[{}] DeleteEntityEvent called: {}", event.getTenantId(), event);
TenantId tenantId = event.getTenantId();
EntityId entityId = event.getEntityId();
EntityType entityType = entityId.getEntityType();
log.debug("[{}][{}][{}] Handling entity deletion event: {}", tenantId, entityType, entityId, event);
switch (entityType) {
case ASSET:
case ASSET_PROFILE:
case ENTITY_VIEW:
case CUSTOMER:
case EDGE:
case NOTIFICATION_RULE:
case ASSET, ASSET_PROFILE, ENTITY_VIEW, CUSTOMER, EDGE, NOTIFICATION_RULE -> {
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED);
break;
case NOTIFICATION_REQUEST:
}
case NOTIFICATION_REQUEST -> {
NotificationRequest request = (NotificationRequest) event.getEntity();
if (request.isScheduled()) {
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityId, ComponentLifecycleEvent.DELETED);
}
break;
case RULE_CHAIN:
}
case RULE_CHAIN -> {
RuleChain ruleChain = (RuleChain) event.getEntity();
if (RuleChainType.CORE.equals(ruleChain.getType())) {
Set<RuleChainId> referencingRuleChainIds = JacksonUtil.fromString(event.getBody(), new TypeReference<>() {});
@ -152,29 +143,28 @@ public class EntityStateSourcingListener {
}
tbClusterService.broadcastEntityStateChangeEvent(tenantId, ruleChain.getId(), ComponentLifecycleEvent.DELETED);
}
break;
case TENANT:
}
case TENANT -> {
Tenant tenant = (Tenant) event.getEntity();
onTenantDeleted(tenant);
break;
case TENANT_PROFILE:
}
case TENANT_PROFILE -> {
TenantProfile tenantProfile = (TenantProfile) event.getEntity();
tbClusterService.onTenantProfileDelete(tenantProfile, null);
break;
case DEVICE:
}
case DEVICE -> {
Device device = (Device) event.getEntity();
tbClusterService.onDeviceDeleted(tenantId, device, null);
break;
case DEVICE_PROFILE:
}
case DEVICE_PROFILE -> {
DeviceProfile deviceProfile = (DeviceProfile) event.getEntity();
onDeviceProfileDelete(event.getTenantId(), event.getEntityId(), deviceProfile);
break;
case TB_RESOURCE:
}
case TB_RESOURCE -> {
TbResourceInfo tbResource = (TbResourceInfo) event.getEntity();
tbClusterService.onResourceDeleted(tbResource, null);
break;
default:
break;
}
default -> {}
}
}
@ -186,8 +176,7 @@ public class EntityStateSourcingListener {
&& event.getEntity() instanceof DeviceCredentials) {
tbClusterService.pushMsgToCore(new DeviceCredentialsUpdateNotificationMsg(event.getTenantId(),
(DeviceId) event.getEntityId(), (DeviceCredentials) event.getEntity()), null);
} else if (ActionType.ASSIGNED_TO_TENANT.equals(event.getActionType()) && event.getEntity() instanceof Device) {
Device device = (Device) event.getEntity();
} else if (ActionType.ASSIGNED_TO_TENANT.equals(event.getActionType()) && event.getEntity() instanceof Device device) {
Tenant tenant = JacksonUtil.fromString(event.getBody(), Tenant.class);
if (tenant != null) {
tbClusterService.onDeviceAssignedToTenant(tenant.getId(), device);

18
application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java

@ -16,12 +16,10 @@
package org.thingsboard.server.service.entitiy.tenant;
import lombok.RequiredArgsConstructor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
@ -45,25 +43,19 @@ public class DefaultTbTenantService extends AbstractTbEntityService implements T
private final TbQueueService tbQueueService;
private final TenantProfileService tenantProfileService;
private final EntitiesVersionControlService versionControlService;
private final ApplicationEventPublisher eventPublisher;
@Override
public Tenant save(Tenant tenant) throws Exception {
boolean created = tenant.getId() == null;
Tenant oldTenant = !created ? tenantService.findTenantById(tenant.getId()) : null;
Tenant savedTenant = checkNotNull(tenantService.saveTenant(tenant, !created));
if (created) {
installScripts.createDefaultRuleChains(savedTenant.getId());
installScripts.createDefaultEdgeRuleChains(savedTenant.getId());
installScripts.createDefaultTenantDashboards(savedTenant.getId(), null);
}
Tenant savedTenant = tenantService.saveTenant(tenant, tenantId -> {
installScripts.createDefaultRuleChains(tenantId);
installScripts.createDefaultEdgeRuleChains(tenantId);
installScripts.createDefaultTenantDashboards(tenantId, null);
});
tenantProfileCache.evict(savedTenant.getId());
if (created) {
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(TenantId.SYS_TENANT_ID).entityId(savedTenant.getId()).entity(savedTenant).created(true).build());
}
TenantProfile oldTenantProfile = oldTenant != null ? tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, oldTenant.getTenantProfileId()) : null;
TenantProfile newTenantProfile = tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, savedTenant.getTenantProfileId());
tbQueueService.updateQueuesByTenants(Collections.singletonList(savedTenant.getTenantId()), newTenantProfile, oldTenantProfile);

16
application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java

@ -151,17 +151,18 @@ public class InstallScripts {
}
}
public void createDefaultRuleChains(TenantId tenantId) throws IOException {
public void createDefaultRuleChains(TenantId tenantId) {
Path tenantChainsDir = getTenantRuleChainsDir();
loadRuleChainsFromPath(tenantId, tenantChainsDir);
}
public void createDefaultEdgeRuleChains(TenantId tenantId) throws IOException {
public void createDefaultEdgeRuleChains(TenantId tenantId) {
Path edgeChainsDir = getEdgeRuleChainsDir();
loadRuleChainsFromPath(tenantId, edgeChainsDir);
}
private void loadRuleChainsFromPath(TenantId tenantId, Path ruleChainsPath) throws IOException {
@SneakyThrows
private void loadRuleChainsFromPath(TenantId tenantId, Path ruleChainsPath) {
findRuleChainsFromPath(ruleChainsPath).forEach(path -> {
try {
createRuleChainFromFile(tenantId, path, null);
@ -329,17 +330,18 @@ public class InstallScripts {
}
}
public void loadDashboards(TenantId tenantId, CustomerId customerId) throws Exception {
public void loadDashboards(TenantId tenantId, CustomerId customerId) {
Path dashboardsDir = Paths.get(getDataDir(), JSON_DIR, DEMO_DIR, DASHBOARDS_DIR);
loadDashboardsFromDir(tenantId, customerId, dashboardsDir);
}
public void createDefaultTenantDashboards(TenantId tenantId, CustomerId customerId) throws Exception {
public void createDefaultTenantDashboards(TenantId tenantId, CustomerId customerId) {
Path dashboardsDir = Paths.get(getDataDir(), JSON_DIR, TENANT_DIR, DASHBOARDS_DIR);
loadDashboardsFromDir(tenantId, customerId, dashboardsDir);
}
private void loadDashboardsFromDir(TenantId tenantId, CustomerId customerId, Path dashboardsDir) throws IOException {
@SneakyThrows
private void loadDashboardsFromDir(TenantId tenantId, CustomerId customerId, Path dashboardsDir) {
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dashboardsDir, path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(
path -> {
@ -414,7 +416,7 @@ public class InstallScripts {
}
);
} catch (Exception e) {
log.error("Unable to load resources lwm2m object model from file: [{}]", resourceLwm2mPath.toString());
log.error("Unable to load resources lwm2m object model from file: [{}]", resourceLwm2mPath);
throw new RuntimeException("resource lwm2m object model from file", e);
}
}

16
application/src/test/java/org/thingsboard/server/controller/TenantControllerTest.java

@ -663,7 +663,7 @@ public class TenantControllerTest extends AbstractControllerTest {
savedDifferentTenant.setTenantProfileId(tenantProfile.getId());
savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class);
TenantId tenantId = differentTenantId;
await().atMost(10, TimeUnit.SECONDS)
await().atMost(30, TimeUnit.SECONDS)
.until(() -> {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId);
return !tpi.getTenantId().get().isSysTenantId();
@ -677,7 +677,7 @@ public class TenantControllerTest extends AbstractControllerTest {
tenantProfile.setIsolatedTbRuleEngine(false);
tenantProfile.getProfileData().setQueueConfiguration(Collections.emptyList());
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
await().atMost(10, TimeUnit.SECONDS)
await().atMost(30, TimeUnit.SECONDS)
.until(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, MAIN_QUEUE_NAME, tenantId, tenantId)
.getTenantId().get().isSysTenantId());
@ -689,11 +689,11 @@ public class TenantControllerTest extends AbstractControllerTest {
submittedMsgs.add(tbMsg.getId());
Thread.sleep(timeLeft / msgs);
}
await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> {
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
verify(queueAdmin, times(1)).deleteTopic(eq(isolatedTopic));
});
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
for (UUID msgId : submittedMsgs) {
verify(actorContext).tell(argThat(msg -> {
return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(msgId);
@ -718,13 +718,13 @@ public class TenantControllerTest extends AbstractControllerTest {
savedDifferentTenant.setTenantProfileId(tenantProfile.getId());
savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class);
TenantId tenantId = differentTenantId;
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNotNull();
});
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId);
assertThat(tpi.getTenantId()).hasValue(tenantId);
TbMsg tbMsg = publishTbMsg(tenantId, tpi);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
verify(actorContext).tell(argThat(msg -> {
return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(tbMsg.getId());
}));
@ -732,7 +732,7 @@ public class TenantControllerTest extends AbstractControllerTest {
deleteDifferentTenant();
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNull();
assertThatThrownBy(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId))
.isInstanceOf(TenantNotFoundException.class);
@ -752,7 +752,7 @@ public class TenantControllerTest extends AbstractControllerTest {
}
private void verifyUsedQueueAndMessage(String queue, TenantId tenantId, EntityId entityId, String msgType, Runnable action, Consumer<TopicPartitionInfo> tpiAssert) {
await().atMost(15, TimeUnit.SECONDS)
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, queue, tenantId, entityId);
tpiAssert.accept(tpi);

3
common/dao-api/src/main/java/org/thingsboard/server/dao/tenant/TenantService.java

@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.entity.EntityDaoService;
import java.util.List;
import java.util.function.Consumer;
public interface TenantService extends EntityDaoService {
@ -36,7 +37,7 @@ public interface TenantService extends EntityDaoService {
Tenant saveTenant(Tenant tenant);
Tenant saveTenant(Tenant tenant, boolean publishSaveEvent);
Tenant saveTenant(Tenant tenant, Consumer<TenantId> defaultEntitiesCreator);
boolean tenantExists(TenantId tenantId);

3
dao/src/main/java/org/thingsboard/server/dao/notification/DefaultNotificationSettingsService.java

@ -20,7 +20,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AdminSettings;
@ -157,7 +156,7 @@ public class DefaultNotificationSettingsService implements NotificationSettingsS
return new UserNotificationSettings(prefs);
}
@Transactional(propagation = Propagation.NOT_SUPPORTED) // so that parent transaction is not aborted on method failure
@Transactional
@Override
public void createDefaultNotificationConfigs(TenantId tenantId) {
NotificationTarget allUsers = createTarget(tenantId, "All users", new AllUsersFilter(),

31
dao/src/main/java/org/thingsboard/server/dao/tenant/TenantServiceImpl.java

@ -63,6 +63,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import static org.thingsboard.server.dao.service.Validator.validateId;
@ -187,12 +188,12 @@ public class TenantServiceImpl extends AbstractCachedEntityService<TenantId, Ten
@Override
@Transactional
public Tenant saveTenant(Tenant tenant) {
return saveTenant(tenant, true);
return saveTenant(tenant, null);
}
@Override
@Transactional
public Tenant saveTenant(Tenant tenant, boolean publishSaveEvent) {
public Tenant saveTenant(Tenant tenant, Consumer<TenantId> defaultEntitiesCreator) {
log.trace("Executing saveTenant [{}]", tenant);
tenant.setRegion(DEFAULT_TENANT_REGION);
if (tenant.getTenantProfileId() == null) {
@ -201,20 +202,20 @@ public class TenantServiceImpl extends AbstractCachedEntityService<TenantId, Ten
}
tenantValidator.validate(tenant, Tenant::getId);
boolean create = tenant.getId() == null;
Tenant savedTenant = tenantDao.save(tenant.getId(), tenant);
publishEvictEvent(new TenantEvictEvent(savedTenant.getId(), create));
if (publishSaveEvent) {
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(savedTenant.getId())
.entityId(savedTenant.getId()).entity(savedTenant).created(create).build());
}
if (tenant.getId() == null) {
deviceProfileService.createDefaultDeviceProfile(savedTenant.getId());
assetProfileService.createDefaultAssetProfile(savedTenant.getId());
apiUsageStateService.createDefaultApiUsageState(savedTenant.getId(), null);
try {
notificationSettingsService.createDefaultNotificationConfigs(savedTenant.getId());
} catch (Throwable e) {
log.error("Failed to create default notification configs for tenant {}", savedTenant.getId(), e);
TenantId tenantId = savedTenant.getId();
publishEvictEvent(new TenantEvictEvent(tenantId, create));
eventPublisher.publishEvent(SaveEntityEvent.builder().tenantId(tenantId)
.entityId(tenantId).entity(savedTenant).created(create).build());
if (create) {
deviceProfileService.createDefaultDeviceProfile(tenantId);
assetProfileService.createDefaultAssetProfile(tenantId);
apiUsageStateService.createDefaultApiUsageState(tenantId, null);
notificationSettingsService.createDefaultNotificationConfigs(tenantId);
if (defaultEntitiesCreator != null) {
defaultEntitiesCreator.accept(tenantId);
}
}
return savedTenant;

Loading…
Cancel
Save