Browse Source

Merge branch 'entity-view-optimization-develop34-clean' of github.com:smatvienko-tb/thingsboard into develop/3.4

pull/6809/head
Andrii Shvaika 4 years ago
parent
commit
7bad2782ea
  1. 6
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  2. 16
      application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java
  3. 46
      application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java
  4. 9
      application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java
  5. 3
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
  6. 2
      application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java
  7. 13
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  8. 1
      common/dao-api/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java
  9. 20
      common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleListener.java

6
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -80,6 +80,7 @@ import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.entitiy.entityView.TbEntityViewService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
@ -220,6 +221,11 @@ public class ActorSystemContext {
@Getter
private EntityViewService entityViewService;
@Lazy
@Autowired(required = false)
@Getter
private TbEntityViewService tbEntityViewService;
@Autowired
@Getter
private TelemetrySubscriptionService tsSubService;

16
application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java

@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.HasName;
@ -93,13 +94,13 @@ public abstract class AbstractTbEntityService {
@Autowired
protected DbCallbackExecutorService dbExecutor;
@Autowired
@Autowired(required = false)
protected TbNotificationEntityService notificationEntityService;
@Autowired(required = false)
protected EdgeService edgeService;
@Autowired
protected AlarmService alarmService;
@Autowired
@Autowired(required = false)
protected EntityActionService entityActionService;
@Autowired
protected DeviceService deviceService;
@ -111,24 +112,27 @@ public abstract class AbstractTbEntityService {
protected TenantService tenantService;
@Autowired
protected CustomerService customerService;
@Autowired
@Lazy
@Autowired(required = false)
protected ClaimDevicesService claimDevicesService;
@Autowired
protected TbTenantProfileCache tenantProfileCache;
@Autowired
protected RuleChainService ruleChainService;
@Autowired
@Autowired(required = false)
protected TbRuleChainService tbRuleChainService;
@Autowired
@Autowired(required = false)
protected EdgeNotificationService edgeNotificationService;
@Autowired
protected QueueService queueService;
@Autowired
protected DashboardService dashboardService;
@Autowired
protected EntitiesVersionControlService vcService;
@Autowired
protected EntityViewService entityViewService;
@Lazy
@Autowired
protected TelemetrySubscriptionService tsSubService;
@Autowired
@ -149,7 +153,7 @@ public abstract class AbstractTbEntityService {
protected InstallScripts installScripts;
@Autowired
protected UserService userService;
@Autowired
@Autowired(required = false)
protected TbResourceService resourceService;
@Autowired
protected WidgetsBundleService widgetsBundleService;

46
application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java

@ -23,6 +23,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
@ -40,8 +41,9 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
import org.thingsboard.server.service.security.model.SecurityUser;
@ -50,19 +52,22 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@Service
@TbCoreComponent
@AllArgsConstructor
@Slf4j
public class DefaultTbEntityViewService extends AbstractTbEntityService implements TbEntityViewService {
private final TimeseriesService tsService;
final Map<TenantId, Map<EntityId, List<EntityView>>> localCache = new ConcurrentHashMap<>();
@Override
public EntityView save(EntityView entityView, EntityView existingEntityView, SecurityUser user) throws ThingsboardException {
ActionType actionType = entityView.getId() == null ? ActionType.ADDED : ActionType.UPDATED;
@ -71,6 +76,9 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
this.updateEntityViewAttributes(user, savedEntityView, existingEntityView);
notificationEntityService.notifyCreateOrUpdateEntity(savedEntityView.getTenantId(), savedEntityView.getId(), savedEntityView,
null, actionType, user);
localCache.computeIfAbsent(savedEntityView.getTenantId(), (k) -> new ConcurrentReferenceHashMap<>()).clear();
tbClusterService.broadcastEntityStateChangeEvent(savedEntityView.getTenantId(), savedEntityView.getId(),
entityView.getId() == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
return savedEntityView;
} catch (Exception e) {
notificationEntityService.notifyEntity(user.getTenantId(), emptyId(EntityType.ENTITY_VIEW), entityView, null, actionType, user, e);
@ -122,6 +130,9 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
entityViewService.deleteEntityView(tenantId, entityViewId);
notificationEntityService.notifyDeleteEntity(tenantId, entityViewId, entityView, entityView.getCustomerId(), ActionType.DELETED,
relatedEdgeIds, user, entityViewId.toString());
localCache.computeIfAbsent(tenantId, (k) -> new ConcurrentReferenceHashMap<>()).clear();
tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityViewId, ComponentLifecycleEvent.DELETED);
} catch (Exception e) {
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.ENTITY_VIEW), null, null,
ActionType.DELETED, user, e, entityViewId.toString());
@ -214,6 +225,35 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
}
}
@Override
public ListenableFuture<List<EntityView>> findEntityViewsByTenantIdAndEntityIdAsync(TenantId tenantId, EntityId entityId) {
Map<EntityId, List<EntityView>> localCacheByTenant = localCache.computeIfAbsent(tenantId, (k) -> new ConcurrentReferenceHashMap<>());
List<EntityView> fromLocalCache = localCacheByTenant.get(entityId);
if (fromLocalCache != null) {
return Futures.immediateFuture(fromLocalCache);
}
ListenableFuture<List<EntityView>> future = entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId);
return Futures.transform(future, (entityViewList) -> {
localCacheByTenant.put(entityId, entityViewList);
return entityViewList;
}, MoreExecutors.directExecutor());
}
@Override
public void onComponentLifecycleMsg(ComponentLifecycleMsg componentLifecycleMsg) {
Map<EntityId, List<EntityView>> localCacheByTenant = localCache.computeIfAbsent(componentLifecycleMsg.getTenantId(), (k) -> new ConcurrentReferenceHashMap<>());
if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) {
localCacheByTenant.clear(); //we don't know which entity was mapped before deletion
} else {
EntityView entityView = entityViewService.findEntityViewById(componentLifecycleMsg.getTenantId(), new EntityViewId(componentLifecycleMsg.getEntityId().getId()));
if (entityView != null) {
localCacheByTenant.remove(entityView.getEntityId());
}
}
}
private ListenableFuture<List<Void>> copyAttributesFromEntityToEntityView(EntityView entityView, String scope, Collection<String> keys, SecurityUser user) throws ThingsboardException {
EntityViewId entityId = entityView.getId();
if (keys != null && !keys.isEmpty()) {
@ -345,7 +385,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
@Override
public void onFailure(Throwable t) {
try {
logTimeseriesDeleted(entityView.getTenantId(),user, entityId, keys, t);
logTimeseriesDeleted(entityView.getTenantId(), user, entityId, keys, t);
} catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e);
}

9
application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java

@ -15,16 +15,21 @@
*/
package org.thingsboard.server.service.entitiy.entityView;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleListener;
import org.thingsboard.server.service.security.model.SecurityUser;
public interface TbEntityViewService {
import java.util.List;
public interface TbEntityViewService extends ComponentLifecycleListener {
EntityView save(EntityView entityView, EntityView existingEntityView, SecurityUser user) throws ThingsboardException;
@ -46,4 +51,6 @@ public interface TbEntityViewService {
EntityView unassignEntityViewFromCustomer(TenantId tenantId, EntityViewId entityViewId, Customer customer,
SecurityUser user) throws ThingsboardException;
ListenableFuture<List<EntityView>> findEntityViewsByTenantIdAndEntityIdAsync(TenantId tenantId, EntityId entityId);
}

3
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java

@ -364,13 +364,14 @@ public class DefaultTbClusterService implements TbClusterService {
private void broadcast(ComponentLifecycleMsg msg) {
byte[] msgBytes = encodingService.encode(msg);
TbQueueProducer<TbProtoQueueMsg<ToRuleEngineNotificationMsg>> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
Set<String> tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE));
Set<String> tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE);
EntityType entityType = msg.getEntityId().getEntityType();
if (entityType.equals(EntityType.TENANT)
|| entityType.equals(EntityType.TENANT_PROFILE)
|| entityType.equals(EntityType.DEVICE_PROFILE)
|| entityType.equals(EntityType.API_USAGE_STATE)
|| (entityType.equals(EntityType.DEVICE) && msg.getEvent() == ComponentLifecycleEvent.UPDATED)
|| entityType.equals(EntityType.ENTITY_VIEW)
|| entityType.equals(EntityType.EDGE)) {
TbQueueProducer<TbProtoQueueMsg<ToCoreNotificationMsg>> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
Set<String> tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);

2
application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java

@ -180,6 +180,8 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
} else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceId(componentLifecycleMsg.getEntityId().getId()));
} else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg);
} else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
apiUsageStateService.onApiUsageStateUpdate(componentLifecycleMsg.getTenantId());
} else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) {

13
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService;
@ -43,12 +44,12 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.entitiy.entityView.TbEntityViewService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import javax.annotation.Nullable;
@ -75,7 +76,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
private final AttributesService attrService;
private final TimeseriesService tsService;
private final EntityViewService entityViewService;
private final TbEntityViewService tbEntityViewService;
private final TbApiUsageClient apiUsageClient;
private final TbApiUsageStateService apiUsageStateService;
@ -83,7 +84,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
public DefaultTelemetrySubscriptionService(AttributesService attrService,
TimeseriesService tsService,
EntityViewService entityViewService,
@Lazy TbEntityViewService tbEntityViewService,
TbClusterService clusterService,
PartitionService partitionService,
TbApiUsageClient apiUsageClient,
@ -91,7 +92,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
super(clusterService, partitionService);
this.attrService = attrService;
this.tsService = tsService;
this.entityViewService = entityViewService;
this.tbEntityViewService = tbEntityViewService;
this.apiUsageClient = apiUsageClient;
this.apiUsageStateService = apiUsageStateService;
}
@ -182,11 +183,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
Futures.addCallback(this.entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
new FutureCallback<List<EntityView>>() {
@Override
public void onSuccess(@Nullable List<EntityView> result) {
if (result != null) {
if (result != null && !result.isEmpty()) {
Map<String, List<TsKvEntry>> tsMap = new HashMap<>();
for (TsKvEntry entry : ts) {
tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry);

1
common/dao-api/src/main/java/org/thingsboard/server/dao/entityview/EntityViewService.java

@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.TimePageLink;
import java.util.List;

20
common/message/src/main/java/org/thingsboard/server/common/msg/plugin/ComponentLifecycleListener.java

@ -0,0 +1,20 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.msg.plugin;
public interface ComponentLifecycleListener {
void onComponentLifecycleMsg(ComponentLifecycleMsg componentLifecycleMsg);
}
Loading…
Cancel
Save