Browse Source

Cover cases when entities added during edge disconnect. Refactoring

pull/4918/head
Volodymyr Babak 5 years ago
parent
commit
a76d3a2b70
  1. 6
      application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java
  2. 14
      application/src/main/java/org/thingsboard/server/controller/BaseController.java
  3. 5
      application/src/main/java/org/thingsboard/server/controller/EdgeController.java
  4. 2
      application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java
  5. 4
      application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java
  6. 48
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java
  7. 31
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java
  8. 298
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
  9. 9
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java
  10. 109
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java
  11. 58
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java
  12. 58
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java
  13. 55
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java
  14. 36
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java
  15. 58
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java
  16. 58
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DeviceProfilesEdgeEventFetcher.java
  17. 31
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/EdgeEventFetcher.java
  18. 50
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java
  19. 58
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/RuleChainsEdgeEventFetcher.java
  20. 36
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemWidgetsBundlesEdgeEventFetcher.java
  21. 34
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java
  22. 36
      application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java
  23. 503
      application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java
  24. 15
      application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java
  25. 1
      application/src/main/resources/logback.xml
  26. 9
      application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java
  27. 2
      application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java
  28. 7
      common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java
  29. 2
      common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java
  30. 66
      common/edge-api/src/main/proto/edge.proto
  31. 1
      dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java

6
application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java

@ -245,11 +245,11 @@ public class TenantActor extends RuleChainManagerActor {
EdgeId edgeId = new EdgeId(msg.getEntityId().getId());
EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService();
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
edgeRpcService.deleteEdge(edgeId);
edgeRpcService.deleteEdge(tenantId, edgeId);
} else {
Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId);
if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) {
edgeRpcService.updateEdge(edge);
edgeRpcService.updateEdge(tenantId, edge);
}
}
} else if (isRuleEngineForCurrentTenant) {
@ -277,7 +277,7 @@ public class TenantActor extends RuleChainManagerActor {
private void onToEdgeSessionMsg(EdgeEventUpdateMsg msg) {
log.trace("[{}] onToEdgeSessionMsg [{}]", msg.getTenantId(), msg);
systemContext.getEdgeRpcService().onEdgeEvent(msg.getEdgeId());
systemContext.getEdgeRpcService().onEdgeEvent(tenantId, msg.getEdgeId());
}
public static class ActorCreator extends ContextBasedCreator {

14
application/src/main/java/org/thingsboard/server/controller/BaseController.java

@ -43,8 +43,8 @@ import org.thingsboard.server.common.data.Firmware;
import org.thingsboard.server.common.data.FirmwareInfo;
import org.thingsboard.server.common.data.HasName;
import org.thingsboard.server.common.data.HasTenantId;
import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.TbResourceInfo;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantInfo;
import org.thingsboard.server.common.data.TenantProfile;
@ -71,9 +71,9 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.FirmwareId;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.id.UserId;
@ -128,10 +128,9 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.firmware.FirmwareStateService;
import org.thingsboard.server.service.edge.EdgeNotificationService;
import org.thingsboard.server.service.edge.rpc.EdgeGrpcService;
import org.thingsboard.server.service.edge.rpc.init.SyncEdgeService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.firmware.FirmwareStateService;
import org.thingsboard.server.service.lwm2m.LwM2MServerSecurityInfoRepository;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.TbClusterService;
@ -276,10 +275,7 @@ public abstract class BaseController {
protected EdgeNotificationService edgeNotificationService;
@Autowired(required = false)
protected SyncEdgeService syncEdgeService;
@Autowired(required = false)
protected EdgeGrpcService edgeGrpcService;
protected EdgeRpcService edgeGrpcService;
@Value("${server.log_controller_error_stack_trace}")
@Getter

5
application/src/main/java/org/thingsboard/server/controller/EdgeController.java

@ -49,7 +49,6 @@ import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.rpc.EdgeGrpcSession;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.security.permission.Resource;
@ -536,9 +535,7 @@ public class EdgeController extends BaseController {
edgeId = checkNotNull(edgeId);
SecurityUser user = getCurrentUser();
TenantId tenantId = user.getTenantId();
EdgeGrpcSession session = edgeGrpcService.getEdgeGrpcSessionById(tenantId, edgeId);
Edge edge = session.getEdge();
syncEdgeService.sync(tenantId, edge);
edgeGrpcService.startSyncProcess(tenantId, edgeId);
} else {
throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL);
}

2
application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java

@ -326,7 +326,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId);
Futures.addCallback(edgeIdsFuture, new FutureCallback<List<EdgeId>>() {
Futures.addCallback(edgeIdsFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<EdgeId> edgeIds) {
if (edgeIds != null && !edgeIds.isEmpty()) {

4
application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java

@ -54,7 +54,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstruct
import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor;
import org.thingsboard.server.service.edge.rpc.init.SyncEdgeService;
import org.thingsboard.server.service.edge.rpc.sync.EdgeRequestsService;
import org.thingsboard.server.service.edge.rpc.processor.AlarmProcessor;
import org.thingsboard.server.service.edge.rpc.processor.DeviceProcessor;
import org.thingsboard.server.service.edge.rpc.processor.RelationProcessor;
@ -149,7 +149,7 @@ public class EdgeContextComponent {
@Lazy
@Autowired
private SyncEdgeService syncEdgeService;
private EdgeRequestsService edgeRequestsService;
@Lazy
@Autowired

48
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java

@ -0,0 +1,48 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc;
import com.fasterxml.jackson.databind.JsonNode;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
public final class EdgeEventUtils {
private EdgeEventUtils() {
}
public static EdgeEvent constructEdgeEvent(TenantId tenantId,
EdgeId edgeId,
EdgeEventType type,
EdgeEventActionType action,
EntityId entityId,
JsonNode body) {
EdgeEvent edgeEvent = new EdgeEvent();
edgeEvent.setTenantId(tenantId);
edgeEvent.setEdgeId(edgeId);
edgeEvent.setType(type);
edgeEvent.setAction(action);
if (entityId != null) {
edgeEvent.setEntityId(entityId.getId());
}
edgeEvent.setBody(body);
return edgeEvent;
}
}

31
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java

@ -53,6 +53,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@ -95,6 +96,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private ScheduledExecutorService scheduler;
private ExecutorService syncExecutorService;
@PostConstruct
public void init() {
log.info("Initializing Edge RPC service!");
@ -120,6 +123,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
throw new RuntimeException("Failed to start Edge RPC server!");
}
this.scheduler = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler"));
this.syncExecutorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("edge-sync"));
log.info("Edge RPC service initialized!");
}
@ -139,29 +144,32 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (scheduler != null) {
scheduler.shutdownNow();
}
if (syncExecutorService != null) {
syncExecutorService.shutdownNow();
}
}
@Override
public StreamObserver<RequestMsg> handleMsgs(StreamObserver<ResponseMsg> outputStream) {
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper).getInputStream();
return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper, syncExecutorService).getInputStream();
}
@Override
public void updateEdge(Edge edge) {
public void updateEdge(TenantId tenantId, Edge edge) {
EdgeGrpcSession session = sessions.get(edge.getId());
if (session != null && session.isConnected()) {
log.debug("[{}] Updating configuration for edge [{}] [{}]", edge.getTenantId(), edge.getName(), edge.getId());
log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
session.onConfigurationUpdate(edge);
} else {
log.debug("[{}] Session doesn't exist for edge [{}] [{}]", edge.getTenantId(), edge.getName(), edge.getId());
log.debug("[{}] Session doesn't exist for edge [{}] [{}]", tenantId, edge.getName(), edge.getId());
}
}
@Override
public void deleteEdge(EdgeId edgeId) {
public void deleteEdge(TenantId tenantId, EdgeId edgeId) {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
log.info("Closing and removing session for edge [{}]", edgeId);
log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId);
session.close();
sessions.remove(edgeId);
sessionNewEvents.remove(edgeId);
@ -170,10 +178,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
@Override
public void onEdgeEvent(EdgeId edgeId) {
log.trace("[{}] onEdgeEvent", edgeId.getId());
public void onEdgeEvent(TenantId tenantId, EdgeId edgeId) {
log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId());
if (!sessionNewEvents.get(edgeId)) {
log.trace("[{}] set session new events flag to true", edgeId.getId());
log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId());
sessionNewEvents.put(edgeId, true);
}
}
@ -188,10 +196,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
scheduleEdgeEventsCheck(edgeGrpcSession);
}
public EdgeGrpcSession getEdgeGrpcSessionById(TenantId tenantId, EdgeId edgeId) {
@Override
public void startSyncProcess(TenantId tenantId, EdgeId edgeId) {
EdgeGrpcSession session = sessions.get(edgeId);
if (session != null && session.isConnected()) {
return session;
session.startSyncProcess();
} else {
log.error("[{}] Edge is not connected [{}]", tenantId, edgeId);
throw new RuntimeException("Edge is not connected");

298
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java

@ -16,9 +16,7 @@
package org.thingsboard.server.service.edge.rpc;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -28,6 +26,7 @@ import io.grpc.stub.StreamObserver;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Dashboard;
@ -61,8 +60,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
@ -71,7 +69,6 @@ import org.thingsboard.server.common.data.security.UserCredentials;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg;
import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.AssetUpdateMsg;
@ -83,6 +80,7 @@ import org.thingsboard.server.gen.edge.CustomerUpdateMsg;
import org.thingsboard.server.gen.edge.DashboardUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceProfileDevicesRequestMsg;
import org.thingsboard.server.gen.edge.DeviceProfileUpdateMsg;
import org.thingsboard.server.gen.edge.DeviceRpcCallMsg;
import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
@ -92,6 +90,7 @@ import org.thingsboard.server.gen.edge.EdgeConfiguration;
import org.thingsboard.server.gen.edge.EdgeUpdateMsg;
import org.thingsboard.server.gen.edge.EntityDataProto;
import org.thingsboard.server.gen.edge.EntityViewUpdateMsg;
import org.thingsboard.server.gen.edge.EntityViewsRequestMsg;
import org.thingsboard.server.gen.edge.RelationRequestMsg;
import org.thingsboard.server.gen.edge.RelationUpdateMsg;
import org.thingsboard.server.gen.edge.RequestMsg;
@ -100,14 +99,26 @@ import org.thingsboard.server.gen.edge.ResponseMsg;
import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
import org.thingsboard.server.gen.edge.RuleChainUpdateMsg;
import org.thingsboard.server.gen.edge.SyncCompletedMsg;
import org.thingsboard.server.gen.edge.UpdateMsgType;
import org.thingsboard.server.gen.edge.UplinkMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.UserCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.WidgetBundleTypesRequestMsg;
import org.thingsboard.server.gen.edge.WidgetTypeUpdateMsg;
import org.thingsboard.server.gen.edge.WidgetsBundleUpdateMsg;
import org.thingsboard.server.service.edge.EdgeContextComponent;
import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.DashboardsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.DeviceProfilesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.EdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.RuleChainsEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetsBundlesEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.TenantAdminUsersEdgeEventFetcher;
import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEventFetcher;
import java.io.Closeable;
import java.util.ArrayList;
@ -117,6 +128,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
@ -140,22 +152,26 @@ public final class EdgeGrpcSession implements Closeable {
private StreamObserver<RequestMsg> inputStream;
private StreamObserver<ResponseMsg> outputStream;
private boolean connected;
private boolean syncCompleted;
private ExecutorService syncExecutorService;
private CountDownLatch latch;
EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver<ResponseMsg> outputStream, BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener,
Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper) {
Consumer<EdgeId> sessionCloseListener, ObjectMapper mapper, ExecutorService syncExecutorService) {
this.sessionId = UUID.randomUUID();
this.ctx = ctx;
this.outputStream = outputStream;
this.sessionOpenListener = sessionOpenListener;
this.sessionCloseListener = sessionCloseListener;
this.mapper = mapper;
this.syncExecutorService = syncExecutorService;
initInputStream();
}
private void initInputStream() {
this.inputStream = new StreamObserver<RequestMsg>() {
this.inputStream = new StreamObserver<>() {
@Override
public void onNext(RequestMsg requestMsg) {
if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) {
@ -170,7 +186,10 @@ public final class EdgeGrpcSession implements Closeable {
}
}
if (connected && requestMsg.getMsgType().equals(RequestMsgType.SYNC_REQUEST_RPC_MESSAGE)) {
ctx.getSyncEdgeService().sync(edge.getTenantId(), edge);
if (requestMsg.getSyncRequestMsg().getSyncRequired()) {
startSyncProcess();
}
syncCompleted = true;
}
if (connected) {
if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE) && requestMsg.hasUplinkMsg()) {
@ -198,18 +217,55 @@ public final class EdgeGrpcSession implements Closeable {
if (edge != null) {
try {
sessionCloseListener.accept(edge.getId());
} catch (Exception ignored) {}
} catch (Exception ignored) {
}
}
try {
outputStream.onCompleted();
} catch (Exception ignored) {}
} catch (Exception ignored) {
}
}
};
}
public void startSyncProcess() {
log.trace("[{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId());
syncExecutorService.submit(() -> {
try {
startProcessingEdgeEvents(new TenantWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService()));
startProcessingEdgeEvents(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService()));
startProcessingEdgeEvents(new DeviceProfilesEdgeEventFetcher(ctx.getDeviceProfileService()));
startProcessingEdgeEvents(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService()));
startProcessingEdgeEvents(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService()));
if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
EdgeEvent customerEdgeEvent = EdgeEventUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(),
EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null);
DownlinkMsg customerDownlinkMsg = convertToDownlinkMsg(customerEdgeEvent);
sendSingleDownlinkMsg(customerDownlinkMsg);
startProcessingEdgeEvents(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId()));
}
// TODO: voba - implement this functionality
// syncAdminSettings(edge);
startProcessingEdgeEvents(new AssetsEdgeEventFetcher(ctx.getAssetService()));
startProcessingEdgeEvents(new DashboardsEdgeEventFetcher(ctx.getDashboardService()));
DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder()
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
.build();
sendSingleDownlinkMsg(syncCompleteDownlinkMsg);
} catch (Exception e) {
log.error("[{}][{}] Exception during sync process", edge.getTenantId(), edge.getId(), e);
}
});
}
private void onUplinkMsg(UplinkMsg uplinkMsg) {
ListenableFuture<List<Void>> future = processUplinkMsg(uplinkMsg);
Futures.addCallback(future, new FutureCallback<List<Void>>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> result) {
UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(true).build();
@ -271,101 +327,136 @@ public final class EdgeGrpcSession implements Closeable {
void processEdgeEvents() throws ExecutionException, InterruptedException {
log.trace("[{}] processHandleMessages started", this.sessionId);
if (isConnected()) {
if (isConnected() && isSyncCompleted()) {
Long queueStartTs = getQueueStartTs().get();
TimePageLink pageLink = new TimePageLink(
GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher(
ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(),
0,
null,
new SortOrder("createdTime", SortOrder.Direction.ASC),
queueStartTs,
null);
PageData<EdgeEvent> pageData;
UUID ifOffset = null;
boolean success = true;
do {
pageData = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), pageLink, true);
if (isConnected() && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size());
latch = new CountDownLatch(downlinkMsgsPack.size());
for (DownlinkMsg downlinkMsg : downlinkMsgsPack) {
sendDownlinkMsg(ResponseMsg.newBuilder()
.setDownlinkMsg(downlinkMsg)
.build());
}
ctx.getEdgeEventService());
UUID ifOffset = startProcessingEdgeEvents(fetcher);
if (ifOffset != null) {
Long newStartTs = Uuids.unixTimestamp(ifOffset);
updateQueueStartTs(newStartTs);
}
}
log.trace("[{}] processHandleMessages finished", this.sessionId);
}
ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
private UUID startProcessingEdgeEvents(EdgeEventFetcher fetcher) throws InterruptedException {
PageLink pageLink = fetcher.getPageLink();
PageData<EdgeEvent> pageData;
UUID ifOffset = null;
boolean success = true;
do {
pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge.getId(), pageLink);
if (isConnected() && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
success = latch.await(10, TimeUnit.SECONDS);
if (!success) {
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack);
}
success = processEdgeEventsPack(pageData.getData());
ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
}
if (isConnected() && (!success || pageData.hasNext())) {
try {
Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
} catch (InterruptedException e) {
log.error("[{}] Error during sleep between batches", this.sessionId, e);
}
if (isConnected() && (!success || pageData.hasNext())) {
try {
Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
} catch (InterruptedException e) {
log.error("[{}] Error during sleep between batches", this.sessionId, e);
}
if (success) {
pageLink = pageLink.nextPageLink();
}
if (success) {
pageLink = pageLink.nextPageLink();
}
} while (isConnected() && (!success || pageData.hasNext()));
}
} while (isConnected() && (!success || pageData.hasNext()));
return ifOffset;
}
if (ifOffset != null) {
Long newStartTs = Uuids.unixTimestamp(ifOffset);
updateQueueStartTs(newStartTs);
private boolean processEdgeEventsPack(List<EdgeEvent> edgeEvents) throws InterruptedException {
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents);
log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size());
latch = new CountDownLatch(downlinkMsgsPack.size());
for (DownlinkMsg downlinkMsg : downlinkMsgsPack) {
sendDownlinkMsg(ResponseMsg.newBuilder()
.setDownlinkMsg(downlinkMsg)
.build());
}
boolean success = latch.await(10, TimeUnit.SECONDS);
if (!success) {
log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack);
}
return success;
}
private void sendSingleDownlinkMsg(DownlinkMsg downlinkMsg) throws InterruptedException {
boolean success;
do {
latch = new CountDownLatch(1);
sendDownlinkMsg(ResponseMsg.newBuilder()
.setDownlinkMsg(downlinkMsg)
.build());
success = latch.await(10, TimeUnit.SECONDS);
if (!success) {
log.warn("[{}] Failed to deliver single downlink msg!", this.sessionId);
}
if (isConnected() && !success) {
try {
Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
} catch (InterruptedException e) {
log.error("[{}] Error during sleep between next send of single downlink msg", this.sessionId, e);
}
}
} while (isConnected() && !success);
}
private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) {
log.trace("[{}] converting edge event to downlink msg [{}]", this.sessionId, edgeEvent);
DownlinkMsg downlinkMsg = null;
try {
switch (edgeEvent.getAction()) {
case UPDATED:
case ADDED:
case DELETED:
case ASSIGNED_TO_EDGE:
case UNASSIGNED_FROM_EDGE:
case ALARM_ACK:
case ALARM_CLEAR:
case CREDENTIALS_UPDATED:
case RELATION_ADD_OR_UPDATE:
case RELATION_DELETED:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
downlinkMsg = processEntityMessage(edgeEvent, edgeEvent.getAction());
break;
case ATTRIBUTES_UPDATED:
case POST_ATTRIBUTES:
case ATTRIBUTES_DELETED:
case TIMESERIES_UPDATED:
downlinkMsg = processTelemetryMessage(edgeEvent);
break;
case CREDENTIALS_REQUEST:
downlinkMsg = processCredentialsRequestMessage(edgeEvent);
break;
case ENTITY_MERGE_REQUEST:
downlinkMsg = processEntityMergeRequestMessage(edgeEvent);
break;
case RPC_CALL:
downlinkMsg = processRpcCallMsg(edgeEvent);
break;
}
} catch (Exception e) {
log.error("Exception during converting edge event to downlink msg", e);
}
log.trace("[{}] processHandleMessages finished", this.sessionId);
return downlinkMsg;
}
private List<DownlinkMsg> convertToDownlinkMsgsPack(List<EdgeEvent> edgeEvents) {
List<DownlinkMsg> result = new ArrayList<>();
for (EdgeEvent edgeEvent : edgeEvents) {
log.trace("[{}] Processing edge event [{}]", this.sessionId, edgeEvent);
try {
DownlinkMsg downlinkMsg = null;
switch (edgeEvent.getAction()) {
case UPDATED:
case ADDED:
case DELETED:
case ASSIGNED_TO_EDGE:
case UNASSIGNED_FROM_EDGE:
case ALARM_ACK:
case ALARM_CLEAR:
case CREDENTIALS_UPDATED:
case RELATION_ADD_OR_UPDATE:
case RELATION_DELETED:
case ASSIGNED_TO_CUSTOMER:
case UNASSIGNED_FROM_CUSTOMER:
downlinkMsg = processEntityMessage(edgeEvent, edgeEvent.getAction());
break;
case ATTRIBUTES_UPDATED:
case POST_ATTRIBUTES:
case ATTRIBUTES_DELETED:
case TIMESERIES_UPDATED:
downlinkMsg = processTelemetryMessage(edgeEvent);
break;
case CREDENTIALS_REQUEST:
downlinkMsg = processCredentialsRequestMessage(edgeEvent);
break;
case ENTITY_MERGE_REQUEST:
downlinkMsg = processEntityMergeRequestMessage(edgeEvent);
break;
case RPC_CALL:
downlinkMsg = processRpcCallMsg(edgeEvent);
break;
}
if (downlinkMsg != null) {
result.add(downlinkMsg);
}
} catch (Exception e) {
log.error("Exception during processing records from queue", e);
DownlinkMsg downlinkMsg = convertToDownlinkMsg(edgeEvent);
if (downlinkMsg != null) {
result.add(downlinkMsg);
}
}
return result;
@ -945,27 +1036,27 @@ public final class EdgeGrpcSession implements Closeable {
}
if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) {
for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) {
result.add(ctx.getSyncEdgeService().processRuleChainMetadataRequestMsg(edge.getTenantId(), edge, ruleChainMetadataRequestMsg));
result.add(ctx.getEdgeRequestsService().processRuleChainMetadataRequestMsg(edge.getTenantId(), edge, ruleChainMetadataRequestMsg));
}
}
if (uplinkMsg.getAttributesRequestMsgCount() > 0) {
for (AttributesRequestMsg attributesRequestMsg : uplinkMsg.getAttributesRequestMsgList()) {
result.add(ctx.getSyncEdgeService().processAttributesRequestMsg(edge.getTenantId(), edge, attributesRequestMsg));
result.add(ctx.getEdgeRequestsService().processAttributesRequestMsg(edge.getTenantId(), edge, attributesRequestMsg));
}
}
if (uplinkMsg.getRelationRequestMsgCount() > 0) {
for (RelationRequestMsg relationRequestMsg : uplinkMsg.getRelationRequestMsgList()) {
result.add(ctx.getSyncEdgeService().processRelationRequestMsg(edge.getTenantId(), edge, relationRequestMsg));
result.add(ctx.getEdgeRequestsService().processRelationRequestMsg(edge.getTenantId(), edge, relationRequestMsg));
}
}
if (uplinkMsg.getUserCredentialsRequestMsgCount() > 0) {
for (UserCredentialsRequestMsg userCredentialsRequestMsg : uplinkMsg.getUserCredentialsRequestMsgList()) {
result.add(ctx.getSyncEdgeService().processUserCredentialsRequestMsg(edge.getTenantId(), edge, userCredentialsRequestMsg));
result.add(ctx.getEdgeRequestsService().processUserCredentialsRequestMsg(edge.getTenantId(), edge, userCredentialsRequestMsg));
}
}
if (uplinkMsg.getDeviceCredentialsRequestMsgCount() > 0) {
for (DeviceCredentialsRequestMsg deviceCredentialsRequestMsg : uplinkMsg.getDeviceCredentialsRequestMsgList()) {
result.add(ctx.getSyncEdgeService().processDeviceCredentialsRequestMsg(edge.getTenantId(), edge, deviceCredentialsRequestMsg));
result.add(ctx.getEdgeRequestsService().processDeviceCredentialsRequestMsg(edge.getTenantId(), edge, deviceCredentialsRequestMsg));
}
}
if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) {
@ -973,6 +1064,21 @@ public final class EdgeGrpcSession implements Closeable {
result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseMsg(edge.getTenantId(), deviceRpcCallMsg));
}
}
if (uplinkMsg.getDeviceProfileDevicesRequestMsgCount() > 0) {
for (DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg : uplinkMsg.getDeviceProfileDevicesRequestMsgList()) {
result.add(ctx.getEdgeRequestsService().processDeviceProfileDevicesRequestMsg(edge.getTenantId(), edge, deviceProfileDevicesRequestMsg));
}
}
if (uplinkMsg.getWidgetBundleTypesRequestMsgCount() > 0) {
for (WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg : uplinkMsg.getWidgetBundleTypesRequestMsgList()) {
result.add(ctx.getEdgeRequestsService().processWidgetBundleTypesRequestMsg(edge.getTenantId(), edge, widgetBundleTypesRequestMsg));
}
}
if (uplinkMsg.getEntityViewsRequestMsgCount() > 0) {
for (EntityViewsRequestMsg entityViewRequestMsg : uplinkMsg.getEntityViewsRequestMsgList()) {
result.add(ctx.getEdgeRequestsService().processEntityViewsRequestMsg(edge.getTenantId(), edge, entityViewRequestMsg));
}
}
} catch (Exception e) {
log.error("[{}] Can't process uplink msg [{}]", this.sessionId, uplinkMsg, e);
}

9
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java

@ -17,12 +17,15 @@ package org.thingsboard.server.service.edge.rpc;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
public interface EdgeRpcService {
void updateEdge(Edge edge);
void updateEdge(TenantId tenantId, Edge edge);
void deleteEdge(EdgeId edgeId);
void deleteEdge(TenantId tenantId, EdgeId edgeId);
void onEdgeEvent(EdgeId edgeId);
void onEdgeEvent(TenantId tenantId, EdgeId edgeId);
void startSyncProcess(TenantId tenantId, EdgeId edgeId);
}

109
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java

@ -0,0 +1,109 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
@AllArgsConstructor
@Slf4j
public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher {
@Override
public PageLink getPageLink() {
return new PageLink(DEFAULT_LIMIT);
}
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) {
return null;
}
//
// private void syncAdminSettings(TenantId tenantId, Edge edge) {
// log.trace("[{}] syncAdminSettings [{}]", tenantId, edge.getName());
// try {
// AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail");
// saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailSettings));
// AdminSettings tenantMailSettings = convertToTenantAdminSettings(systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue());
// saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailSettings));
// AdminSettings systemMailTemplates = loadMailTemplates();
// saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailTemplates));
// AdminSettings tenantMailTemplates = convertToTenantAdminSettings(systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue());
// saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailTemplates));
// } catch (Exception e) {
// log.error("Can't load admin settings", e);
// }
// }
//
// private AdminSettings loadMailTemplates() throws Exception {
// Map<String, Object> mailTemplates = new HashMap<>();
// Pattern startPattern = Pattern.compile("<div class=\"content\".*?>");
// Pattern endPattern = Pattern.compile("<div class=\"footer\".*?>");
// File[] files = new DefaultResourceLoader().getResource("classpath:/templates/").getFile().listFiles();
// for (File file : files) {
// Map<String, String> mailTemplate = new HashMap<>();
// String name = validateName(file.getName());
// String stringTemplate = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
// Matcher start = startPattern.matcher(stringTemplate);
// Matcher end = endPattern.matcher(stringTemplate);
// if (start.find() && end.find()) {
// String body = StringUtils.substringBetween(stringTemplate, start.group(), end.group()).replaceAll("\t", "");
// String subject = StringUtils.substringBetween(body, "<h2>", "</h2>");
// mailTemplate.put("subject", subject);
// mailTemplate.put("body", body);
// mailTemplates.put(name, mailTemplate);
// } else {
// log.error("Can't load mail template from file {}", file.getName());
// }
// }
// AdminSettings adminSettings = new AdminSettings();
// adminSettings.setId(new AdminSettingsId(Uuids.timeBased()));
// adminSettings.setKey("mailTemplates");
// adminSettings.setJsonValue(mapper.convertValue(mailTemplates, JsonNode.class));
// return adminSettings;
// }
//
// private String validateName(String name) throws Exception {
// StringBuilder nameBuilder = new StringBuilder();
// name = name.replace(".vm", "");
// String[] nameParts = name.split("\\.");
// if (nameParts.length >= 1) {
// nameBuilder.append(nameParts[0]);
// for (int i = 1; i < nameParts.length; i++) {
// String word = WordUtils.capitalize(nameParts[i]);
// nameBuilder.append(word);
// }
// return nameBuilder.toString();
// } else {
// throw new Exception("Error during filename validation");
// }
// }
//
// private AdminSettings convertToTenantAdminSettings(String key, ObjectNode jsonValue) {
// AdminSettings tenantMailSettings = new AdminSettings();
// jsonValue.put("useSystemMailSettings", true);
// tenantMailSettings.setJsonValue(jsonValue);
// tenantMailSettings.setKey(key);
// return tenantMailSettings;
// }
}

58
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java

@ -0,0 +1,58 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
import java.util.ArrayList;
import java.util.List;
@AllArgsConstructor
@Slf4j
public class AssetsEdgeEventFetcher implements EdgeEventFetcher {
private final AssetService assetService;
@Override
public PageLink getPageLink() {
return new PageLink(DEFAULT_LIMIT);
}
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) {
log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId);
PageData<Asset> pageData = assetService.findAssetsByTenantIdAndEdgeId(tenantId, edgeId, pageLink);
List<EdgeEvent> result = new ArrayList<>();
if (!pageData.getData().isEmpty()) {
for (Asset asset : pageData.getData()) {
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.ASSET,
EdgeEventActionType.ADDED, asset.getId(), null));
}
}
return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext());
}
}

58
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java

@ -0,0 +1,58 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public abstract class BaseUsersEdgeEventFetcher implements EdgeEventFetcher {
@Override
public PageLink getPageLink() {
return new PageLink(DEFAULT_LIMIT);
}
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) {
log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId);
PageData<User> pageData = findUsers(tenantId, pageLink);
List<EdgeEvent> result = new ArrayList<>();
if (!pageData.getData().isEmpty()) {
for (User user : pageData.getData()) {
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.USER,
EdgeEventActionType.ADDED, user.getId(), null));
}
}
return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext());
}
protected abstract PageData<User> findUsers(TenantId tenantId, PageLink pageLink);
}

55
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java

@ -0,0 +1,55 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public abstract class BaseWidgetsBundlesEdgeEventFetcher implements EdgeEventFetcher {
@Override
public PageLink getPageLink() {
return new PageLink(DEFAULT_LIMIT);
}
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) {
log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId);
PageData<WidgetsBundle> pageData = findWidgetsBundles(tenantId, pageLink);
List<EdgeEvent> result = new ArrayList<>();
if (!pageData.getData().isEmpty()) {
for (WidgetsBundle widgetsBundle : pageData.getData()) {
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.WIDGETS_BUNDLE,
EdgeEventActionType.ADDED, widgetsBundle.getId(), null));
}
}
return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext());
}
protected abstract PageData<WidgetsBundle> findWidgetsBundles(TenantId tenantId, PageLink pageLink);
}

36
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.user.UserService;
@AllArgsConstructor
public class CustomerUsersEdgeEventFetcher extends BaseUsersEdgeEventFetcher {
private final UserService userService;
private final CustomerId customerId;
@Override
protected PageData<User> findUsers(TenantId tenantId, PageLink pageLink) {
return userService.findCustomerUsers(tenantId, customerId, pageLink);
}
}

58
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java

@ -0,0 +1,58 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DashboardInfo;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
import java.util.ArrayList;
import java.util.List;
@AllArgsConstructor
@Slf4j
public class DashboardsEdgeEventFetcher implements EdgeEventFetcher {
private final DashboardService dashboardService;
@Override
public PageLink getPageLink() {
return new PageLink(DEFAULT_LIMIT);
}
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) {
log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId);
PageData<DashboardInfo> pageData = dashboardService.findDashboardsByTenantIdAndEdgeId(tenantId, edgeId, pageLink);
List<EdgeEvent> result = new ArrayList<>();
if (!pageData.getData().isEmpty()) {
for (DashboardInfo dashboardInfo : pageData.getData()) {
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DASHBOARD,
EdgeEventActionType.ADDED, dashboardInfo.getId(), null));
}
}
return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext());
}
}

58
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DeviceProfilesEdgeEventFetcher.java

@ -0,0 +1,58 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
import java.util.ArrayList;
import java.util.List;
@AllArgsConstructor
@Slf4j
public class DeviceProfilesEdgeEventFetcher implements EdgeEventFetcher {
private final DeviceProfileService deviceProfileService;
@Override
public PageLink getPageLink() {
return new PageLink(DEFAULT_LIMIT);
}
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) {
log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId);
PageData<DeviceProfile> pageData = deviceProfileService.findDeviceProfiles(tenantId, pageLink);
List<EdgeEvent> result = new ArrayList<>();
if (!pageData.getData().isEmpty()) {
for (DeviceProfile deviceProfile : pageData.getData()) {
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE_PROFILE,
EdgeEventActionType.ADDED, deviceProfile.getId(), null));
}
}
return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext());
}
}

31
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/EdgeEventFetcher.java

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
public interface EdgeEventFetcher {
final int DEFAULT_LIMIT = 100;
PageLink getPageLink();
PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink);
}

50
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java

@ -0,0 +1,50 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.edge.EdgeEventService;
@AllArgsConstructor
public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
private final int maxReadRecordsCount;
private final Long queueStartTs;
private final EdgeEventService edgeEventService;
@Override
public PageLink getPageLink() {
return new TimePageLink(
maxReadRecordsCount,
0,
null,
new SortOrder("createdTime", SortOrder.Direction.ASC),
queueStartTs,
null);
}
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) {
return edgeEventService.findEdgeEvents(tenantId, edgeId, (TimePageLink) pageLink, true);
}
}

58
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/RuleChainsEdgeEventFetcher.java

@ -0,0 +1,58 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@AllArgsConstructor
public class RuleChainsEdgeEventFetcher implements EdgeEventFetcher {
private final RuleChainService ruleChainService;
@Override
public PageLink getPageLink() {
return new PageLink(DEFAULT_LIMIT);
}
@Override
public PageData<EdgeEvent> fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) {
log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId);
PageData<RuleChain> pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edgeId, pageLink);
List<EdgeEvent> result = new ArrayList<>();
if (!pageData.getData().isEmpty()) {
for (RuleChain ruleChain : pageData.getData()) {
result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.RULE_CHAIN,
EdgeEventActionType.ADDED, ruleChain.getId(), null));
}
}
return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext());
}
}

36
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemWidgetsBundlesEdgeEventFetcher.java

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
@Slf4j
@AllArgsConstructor
public class SystemWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdgeEventFetcher {
private final WidgetsBundleService widgetsBundleService;
@Override
protected PageData<WidgetsBundle> findWidgetsBundles(TenantId tenantId, PageLink pageLink) {
return widgetsBundleService.findSystemWidgetsBundlesByPageLink(tenantId, pageLink);
}
}

34
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.user.UserService;
@AllArgsConstructor
public class TenantAdminUsersEdgeEventFetcher extends BaseUsersEdgeEventFetcher {
private final UserService userService;
@Override
protected PageData<User> findUsers(TenantId tenantId, PageLink pageLink) {
return userService.findTenantAdmins(tenantId, pageLink);
}
}

36
application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
@Slf4j
@AllArgsConstructor
public class TenantWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdgeEventFetcher implements EdgeEventFetcher {
private final WidgetsBundleService widgetsBundleService;
@Override
protected PageData<WidgetsBundle> findWidgetsBundles(TenantId tenantId, PageLink pageLink) {
return widgetsBundleService.findAllTenantWidgetsBundlesByTenantIdAndPageLink(tenantId, pageLink);
}
}

503
application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java → application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java

@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.init;
package org.thingsboard.server.service.edge.rpc.sync;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -24,34 +23,27 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.text.WordUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.DashboardInfo;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
import org.thingsboard.server.common.data.edge.EdgeEventType;
import org.thingsboard.server.common.data.id.AdminSettingsId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.page.PageData;
@ -59,44 +51,39 @@ import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntityRelationsQuery;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceProfileService;
import org.thingsboard.server.dao.device.DeviceService;
import org.thingsboard.server.dao.edge.EdgeEventService;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.settings.AdminSettingsService;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.edge.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.DeviceProfileDevicesRequestMsg;
import org.thingsboard.server.gen.edge.EntityViewsRequestMsg;
import org.thingsboard.server.gen.edge.RelationRequestMsg;
import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.WidgetBundleTypesRequestMsg;
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.queue.TbClusterService;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Service
@Slf4j
public class DefaultSyncEdgeService implements SyncEdgeService {
public class DefaultEdgeRequestsService implements EdgeRequestsService {
private static final ObjectMapper mapper = new ObjectMapper();
@ -108,29 +95,17 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
@Autowired
private AttributesService attributesService;
@Autowired
private RuleChainService ruleChainService;
@Autowired
private RelationService relationService;
@Autowired
private DeviceService deviceService;
@Autowired
private DeviceProfileService deviceProfileService;
@Autowired
private AssetService assetService;
@Autowired
private EntityViewService entityViewService;
@Autowired
private DashboardService dashboardService;
@Autowired
private UserService userService;
private DeviceProfileService deviceProfileService;
@Autowired
private WidgetsBundleService widgetsBundleService;
@ -138,309 +113,12 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
@Autowired
private WidgetTypeService widgetTypeService;
@Autowired
private AdminSettingsService adminSettingsService;
@Autowired
private DbCallbackExecutorService dbCallbackExecutorService;
@Autowired
private TbClusterService tbClusterService;
@Override
public void sync(TenantId tenantId, Edge edge) {
log.trace("[{}][{}] Staring edge sync process", tenantId, edge.getId());
try {
syncWidgetsBundles(tenantId, edge);
// TODO: voba - implement this functionality
// syncAdminSettings(edge);
syncDeviceProfiles(tenantId, edge);
syncRuleChains(tenantId, edge);
syncUsers(tenantId, edge);
syncAssets(tenantId, edge);
syncEntityViews(tenantId, edge);
syncDashboards(tenantId, edge);
syncWidgetsTypes(tenantId, edge);
syncDevices(tenantId, edge);
} catch (Exception e) {
log.error("[{}][{}] Exception during sync process", tenantId, edge.getId(), e);
}
}
private void syncRuleChains(TenantId tenantId, Edge edge) {
log.trace("[{}] syncRuleChains [{}]", tenantId, edge.getName());
try {
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<RuleChain> pageData;
do {
pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (RuleChain ruleChain : pageData.getData()) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.ADDED, ruleChain.getId(), null);
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
}
} while (pageData != null && pageData.hasNext());
} catch (Exception e) {
log.error("Exception during loading edge rule chain(s) on sync!", e);
}
}
private void syncDevices(TenantId tenantId, Edge edge) {
log.trace("[{}] syncDevices [{}]", tenantId, edge.getName());
try {
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<Device> pageData;
do {
pageData = deviceService.findDevicesByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (Device device : pageData.getData()) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null);
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
}
} while (pageData != null && pageData.hasNext());
} catch (Exception e) {
log.error("Exception during loading edge device(s) on sync!", e);
}
}
private void syncDeviceProfiles(TenantId tenantId, Edge edge) {
log.trace("[{}] syncDeviceProfiles [{}]", tenantId, edge.getName());
try {
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<DeviceProfile> pageData;
do {
pageData = deviceProfileService.findDeviceProfiles(tenantId, pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (DeviceProfile deviceProfile : pageData.getData()) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE_PROFILE, EdgeEventActionType.ADDED, deviceProfile.getId(), null);
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
}
} while (pageData != null && pageData.hasNext());
} catch (Exception e) {
log.error("Exception during loading device profile(s) on sync!", e);
}
}
private void syncAssets(TenantId tenantId, Edge edge) {
log.trace("[{}] syncAssets [{}]", tenantId, edge.getName());
try {
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<Asset> pageData;
do {
pageData = assetService.findAssetsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] asset(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (Asset asset : pageData.getData()) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET, EdgeEventActionType.ADDED, asset.getId(), null);
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
}
} while (pageData != null && pageData.hasNext());
} catch (Exception e) {
log.error("Exception during loading edge asset(s) on sync!", e);
}
}
private void syncEntityViews(TenantId tenantId, Edge edge) {
log.trace("[{}] syncEntityViews [{}]", tenantId, edge.getName());
try {
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<EntityView> pageData;
do {
pageData = entityViewService.findEntityViewsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] entity view(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (EntityView entityView : pageData.getData()) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, EdgeEventActionType.ADDED, entityView.getId(), null);
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
}
} while (pageData != null && pageData.hasNext());
} catch (Exception e) {
log.error("Exception during loading edge entity view(s) on sync!", e);
}
}
private void syncDashboards(TenantId tenantId, Edge edge) {
log.trace("[{}] syncDashboards [{}]", tenantId, edge.getName());
try {
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<DashboardInfo> pageData;
do {
pageData = dashboardService.findDashboardsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] dashboard(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (DashboardInfo dashboardInfo : pageData.getData()) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DASHBOARD, EdgeEventActionType.ADDED, dashboardInfo.getId(), null);
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
}
} while (pageData != null && pageData.hasNext());
} catch (Exception e) {
log.error("Exception during loading edge dashboard(s) on sync!", e);
}
}
private void syncUsers(TenantId tenantId, Edge edge) {
log.trace("[{}] syncUsers [{}]", tenantId, edge.getName());
try {
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<User> pageData;
do {
pageData = userService.findTenantAdmins(tenantId, pageLink);
pushUsersToEdge(tenantId, pageData, edge);
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
} while (pageData.hasNext());
syncCustomerUsers(tenantId, edge);
} catch (Exception e) {
log.error("Exception during loading edge user(s) on sync!", e);
}
}
private void syncCustomerUsers(TenantId tenantId, Edge edge) {
if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null);
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<User> pageData;
do {
pageData = userService.findCustomerUsers(tenantId, edge.getCustomerId(), pageLink);
pushUsersToEdge(tenantId, pageData, edge);
if (pageData != null && pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
} while (pageData != null && pageData.hasNext());
}
}
private void pushUsersToEdge(TenantId tenantId, PageData<User> pageData, Edge edge) {
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (User user : pageData.getData()) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null);
}
}
}
private void syncWidgetsBundles(TenantId tenantId, Edge edge) {
log.trace("[{}] syncWidgetsBundles [{}]", tenantId, edge.getName());
List<WidgetsBundle> widgetsBundlesToPush = new ArrayList<>();
widgetsBundlesToPush.addAll(widgetsBundleService.findAllTenantWidgetsBundlesByTenantId(tenantId));
widgetsBundlesToPush.addAll(widgetsBundleService.findSystemWidgetsBundles(tenantId));
try {
for (WidgetsBundle widgetsBundle : widgetsBundlesToPush) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE, EdgeEventActionType.ADDED, widgetsBundle.getId(), null);
}
} catch (Exception e) {
log.error("Exception during loading widgets bundle(s) on sync!", e);
}
}
private void syncWidgetsTypes(TenantId tenantId, Edge edge) {
log.trace("[{}] syncWidgetsTypes [{}]", tenantId, edge.getName());
List<WidgetsBundle> widgetsBundlesToPush = new ArrayList<>();
widgetsBundlesToPush.addAll(widgetsBundleService.findAllTenantWidgetsBundlesByTenantId(tenantId));
widgetsBundlesToPush.addAll(widgetsBundleService.findSystemWidgetsBundles(tenantId));
try {
for (WidgetsBundle widgetsBundle : widgetsBundlesToPush) {
List<WidgetType> widgetTypesToPush =
widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundle.getTenantId(), widgetsBundle.getAlias());
for (WidgetType widgetType : widgetTypesToPush) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null);
}
}
} catch (Exception e) {
log.error("Exception during loading widgets type(s) on sync!", e);
}
}
private void syncAdminSettings(TenantId tenantId, Edge edge) {
log.trace("[{}] syncAdminSettings [{}]", tenantId, edge.getName());
try {
AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail");
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailSettings));
AdminSettings tenantMailSettings = convertToTenantAdminSettings(systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue());
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailSettings));
AdminSettings systemMailTemplates = loadMailTemplates();
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailTemplates));
AdminSettings tenantMailTemplates = convertToTenantAdminSettings(systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue());
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailTemplates));
} catch (Exception e) {
log.error("Can't load admin settings", e);
}
}
private AdminSettings loadMailTemplates() throws Exception {
Map<String, Object> mailTemplates = new HashMap<>();
Pattern startPattern = Pattern.compile("<div class=\"content\".*?>");
Pattern endPattern = Pattern.compile("<div class=\"footer\".*?>");
File[] files = new DefaultResourceLoader().getResource("classpath:/templates/").getFile().listFiles();
for (File file : files) {
Map<String, String> mailTemplate = new HashMap<>();
String name = validateName(file.getName());
String stringTemplate = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
Matcher start = startPattern.matcher(stringTemplate);
Matcher end = endPattern.matcher(stringTemplate);
if (start.find() && end.find()) {
String body = StringUtils.substringBetween(stringTemplate, start.group(), end.group()).replaceAll("\t", "");
String subject = StringUtils.substringBetween(body, "<h2>", "</h2>");
mailTemplate.put("subject", subject);
mailTemplate.put("body", body);
mailTemplates.put(name, mailTemplate);
} else {
log.error("Can't load mail template from file {}", file.getName());
}
}
AdminSettings adminSettings = new AdminSettings();
adminSettings.setId(new AdminSettingsId(Uuids.timeBased()));
adminSettings.setKey("mailTemplates");
adminSettings.setJsonValue(mapper.convertValue(mailTemplates, JsonNode.class));
return adminSettings;
}
private String validateName(String name) throws Exception {
StringBuilder nameBuilder = new StringBuilder();
name = name.replace(".vm", "");
String[] nameParts = name.split("\\.");
if (nameParts.length >= 1) {
nameBuilder.append(nameParts[0]);
for (int i = 1; i < nameParts.length; i++) {
String word = WordUtils.capitalize(nameParts[i]);
nameBuilder.append(word);
}
return nameBuilder.toString();
} else {
throw new Exception("Error during filename validation");
}
}
private AdminSettings convertToTenantAdminSettings(String key, ObjectNode jsonValue) {
AdminSettings tenantMailSettings = new AdminSettings();
jsonValue.put("useSystemMailSettings", true);
tenantMailSettings.setJsonValue(jsonValue);
tenantMailSettings.setKey(key);
return tenantMailSettings;
}
@Override
public ListenableFuture<Void> processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) {
log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", tenantId, edge.getName(), ruleChainMetadataRequestMsg);
@ -448,7 +126,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) {
RuleChainId ruleChainId =
new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB()));
ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null);
ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(),
EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null);
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
@Override
public void onSuccess(@Nullable EdgeEvent result) {
@ -584,7 +263,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
return futureToSet;
}
private ListenableFuture<List<EntityRelation>> findRelationByQuery(TenantId tenantId, Edge edge, EntityId entityId, EntitySearchDirection direction) {
private ListenableFuture<List<EntityRelation>> findRelationByQuery(TenantId tenantId, Edge edge,
EntityId entityId, EntitySearchDirection direction) {
EntityRelationsQuery query = new EntityRelationsQuery();
query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false));
return relationService.findByQuery(tenantId, query);
@ -596,7 +276,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
SettableFuture<Void> futureToSet = SettableFuture.create();
if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) {
DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB()));
ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null);
ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE,
EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null);
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
@Override
public void onSuccess(@Nullable EdgeEvent result) {
@ -619,8 +300,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
SettableFuture<Void> futureToSet = SettableFuture.create();
if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) {
UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB()));
ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.CREDENTIALS_UPDATED, userId, null);
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
ListenableFuture<EdgeEvent> future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER,
EdgeEventActionType.CREDENTIALS_UPDATED, userId, null);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable EdgeEvent result) {
futureToSet.set(null);
@ -636,6 +318,139 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
return futureToSet;
}
@Override
public ListenableFuture<Void> processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg) {
log.trace("[{}] processDeviceProfileDevicesRequestMsg [{}][{}]", tenantId, edge.getName(), deviceProfileDevicesRequestMsg);
SettableFuture<Void> futureToSet = SettableFuture.create();
if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() != 0 && deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() != 0) {
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB()));
DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
List<ListenableFuture<EdgeEvent>> futures;
if (deviceProfileById != null) {
futures = syncDevices(tenantId, edge, deviceProfileById.getName());
} else {
futures = new ArrayList<>();
}
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<EdgeEvent> result) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't sync devices by device profile [{}]", deviceProfileDevicesRequestMsg, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
}
return futureToSet;
}
private List<ListenableFuture<EdgeEvent>> syncDevices(TenantId tenantId, Edge edge, String deviceType) {
List<ListenableFuture<EdgeEvent>> futures = new ArrayList<>();
log.trace("[{}] syncDevices [{}][{}]", tenantId, edge.getName(), deviceType);
try {
PageLink pageLink = new PageLink(DEFAULT_LIMIT);
PageData<Device> pageData;
do {
pageData = deviceService.findDevicesByTenantIdAndEdgeIdAndType(tenantId, edge.getId(), deviceType, pageLink);
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size());
for (Device device : pageData.getData()) {
futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null));
}
if (pageData.hasNext()) {
pageLink = pageLink.nextPageLink();
}
}
} while (pageData != null && pageData.hasNext());
} catch (Exception e) {
log.error("Exception during loading edge device(s) on sync!", e);
}
return futures;
}
@Override
public ListenableFuture<Void> processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge,
WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg) {
log.trace("[{}] processWidgetBundleTypesRequestMsg [{}][{}]", tenantId, edge.getName(), widgetBundleTypesRequestMsg);
SettableFuture<Void> futureToSet = SettableFuture.create();
if (widgetBundleTypesRequestMsg.getWidgetBundleIdMSB() != 0 && widgetBundleTypesRequestMsg.getWidgetBundleIdLSB() != 0) {
WidgetsBundleId widgetsBundleId = new WidgetsBundleId(new UUID(widgetBundleTypesRequestMsg.getWidgetBundleIdMSB(), widgetBundleTypesRequestMsg.getWidgetBundleIdLSB()));
WidgetsBundle widgetsBundleById = widgetsBundleService.findWidgetsBundleById(tenantId, widgetsBundleId);
List<ListenableFuture<EdgeEvent>> futures = new ArrayList<>();
if (widgetsBundleById != null) {
List<WidgetType> widgetTypesToPush =
widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundleById.getTenantId(), widgetsBundleById.getAlias());
for (WidgetType widgetType : widgetTypesToPush) {
futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null));
}
}
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<EdgeEvent> result) {
futureToSet.set(null);
}
@Override
public void onFailure(Throwable t) {
log.error("Can't sync widget types by widget bundle [{}]", widgetBundleTypesRequestMsg, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
}
return futureToSet;
}
@Override
public ListenableFuture<Void> processEntityViewsRequestMsg(TenantId tenantId, Edge edge, EntityViewsRequestMsg entityViewsRequestMsg) {
log.trace("[{}] processEntityViewsRequestMsg [{}][{}]", tenantId, edge.getName(), entityViewsRequestMsg);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(
EntityType.valueOf(entityViewsRequestMsg.getEntityType()),
new UUID(entityViewsRequestMsg.getEntityIdMSB(), entityViewsRequestMsg.getEntityIdLSB()));
SettableFuture<Void> futureToSet = SettableFuture.create();
Futures.addCallback(entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<EntityView> entityViews) {
try {
if (entityViews != null && !entityViews.isEmpty()) {
for (EntityView entityView : entityViews) {
Futures.addCallback(relationService.checkRelation(tenantId, edge.getId(), entityView.getId(),
EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Boolean result) {
if (Boolean.TRUE.equals(result)) {
saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW,
EdgeEventActionType.ADDED, entityView.getId(), null);
}
}
@Override
public void onFailure(Throwable t) {
log.error("Exception during loading relation [{}] to edge on sync!", t, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
}
}
futureToSet.set(null);
} catch (Exception e) {
log.error("Exception during loading relation(s) to edge on sync!", e);
futureToSet.setException(e);
}
}
@Override
public void onFailure(Throwable t) {
log.error("[{}] Can't find entity views by entity id [{}]", tenantId, entityId, t);
futureToSet.setException(t);
}
}, dbCallbackExecutorService);
return futureToSet;
}
private ListenableFuture<EdgeEvent> saveEdgeEvent(TenantId tenantId,
EdgeId edgeId,
EdgeEventType type,
@ -645,17 +460,10 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
log.trace("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], type [{}], action[{}], entityId [{}], body [{}]",
tenantId, edgeId, type, action, entityId, body);
EdgeEvent edgeEvent = new EdgeEvent();
edgeEvent.setTenantId(tenantId);
edgeEvent.setEdgeId(edgeId);
edgeEvent.setType(type);
edgeEvent.setAction(action);
if (entityId != null) {
edgeEvent.setEntityId(entityId.getId());
}
edgeEvent.setBody(body);
EdgeEvent edgeEvent = EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body);
ListenableFuture<EdgeEvent> future = edgeEventService.saveAsync(edgeEvent);
Futures.addCallback(future, new FutureCallback<EdgeEvent>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable EdgeEvent result) {
tbClusterService.onEdgeEventUpdate(tenantId, edgeId);
@ -668,4 +476,5 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
}, dbCallbackExecutorService);
return future;
}
}

15
application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java → application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java

@ -13,20 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edge.rpc.init;
package org.thingsboard.server.service.edge.rpc.sync;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.DeviceProfileDevicesRequestMsg;
import org.thingsboard.server.gen.edge.EntityViewsRequestMsg;
import org.thingsboard.server.gen.edge.RelationRequestMsg;
import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.WidgetBundleTypesRequestMsg;
public interface SyncEdgeService {
void sync(TenantId tenantId, Edge edge);
public interface EdgeRequestsService {
ListenableFuture<Void> processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg);
@ -37,4 +38,10 @@ public interface SyncEdgeService {
ListenableFuture<Void> processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg);
ListenableFuture<Void> processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg);
ListenableFuture<Void> processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg);
ListenableFuture<Void> processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge, WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg);
ListenableFuture<Void> processEntityViewsRequestMsg(TenantId tenantId, Edge edge, EntityViewsRequestMsg entityViewsRequestMsg);
}

1
application/src/main/resources/logback.xml

@ -27,6 +27,7 @@
<logger name="org.thingsboard.server" level="INFO" />
<logger name="org.thingsboard.server.transport.snmp" level="TRACE" />
<!-- <logger name="org.thingsboard.server.service.edge.rpc" level="TRACE" />-->
<!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />-->
<!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />-->

9
application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java

@ -191,7 +191,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
@Test
public void test() throws Exception {
testReceivedInitialData();
int expectedDownlinkSize = 10;
int expectedDownlinkSize = 9;
Assert.assertEquals(expectedDownlinkSize, edgeImitator.getDownlinkMsgs().size());
testDevices();
@ -308,7 +308,8 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
private void testReceivedInitialData() throws Exception {
log.info("Checking received data");
Assert.assertTrue(edgeImitator.waitForMessages());
boolean condition = edgeImitator.waitForMessages();
Assert.assertTrue(condition);
EdgeConfiguration configuration = edgeImitator.getConfiguration();
Assert.assertNotNull(configuration);
@ -1523,6 +1524,10 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
private void installation() throws Exception {
edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class);
Device savedDeviceDefault = saveDevice("Edge Device Default", "Default");
doPost("/api/edge/" + edge.getId().getId().toString()
+ "/device/" + savedDeviceDefault.getId().getId().toString(), Device.class);
DeviceProfile deviceProfile = this.createDeviceProfile(CUSTOM_DEVICE_PROFILE_NAME, null);
extendDeviceProfileData(deviceProfile);
doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class);

2
application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java

@ -109,7 +109,7 @@ public class EdgeImitator {
this::onDownlink,
this::onClose);
edgeRpcClient.sendSyncRequestMsg();
edgeRpcClient.sendSyncRequestMsg(false);
}
public void disconnect() throws InterruptedException {

7
common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java

@ -34,6 +34,7 @@ import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc;
import org.thingsboard.server.gen.edge.RequestMsg;
import org.thingsboard.server.gen.edge.RequestMsgType;
import org.thingsboard.server.gen.edge.ResponseMsg;
import org.thingsboard.server.gen.edge.SyncRequestMsg;
import org.thingsboard.server.gen.edge.UplinkMsg;
import org.thingsboard.server.gen.edge.UplinkResponseMsg;
@ -102,7 +103,7 @@ public class EdgeGrpcClient implements EdgeRpcClient {
Consumer<EdgeConfiguration> onEdgeUpdate,
Consumer<DownlinkMsg> onDownlink,
Consumer<Exception> onError) {
return new StreamObserver<ResponseMsg>() {
return new StreamObserver<>() {
@Override
public void onNext(ResponseMsg responseMsg) {
if (responseMsg.hasConnectResponseMsg()) {
@ -195,11 +196,13 @@ public class EdgeGrpcClient implements EdgeRpcClient {
}
@Override
public void sendSyncRequestMsg() {
public void sendSyncRequestMsg(boolean syncRequired) {
try {
uplinkMsgLock.lock();
SyncRequestMsg syncRequestMsg = SyncRequestMsg.newBuilder().setSyncRequired(syncRequired).build();
this.inputStream.onNext(RequestMsg.newBuilder()
.setMsgType(RequestMsgType.SYNC_REQUEST_RPC_MESSAGE)
.setSyncRequestMsg(syncRequestMsg)
.build());
} finally {
uplinkMsgLock.unlock();

2
common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java

@ -34,7 +34,7 @@ public interface EdgeRpcClient {
void disconnect(boolean onError) throws InterruptedException;
void sendSyncRequestMsg();
void sendSyncRequestMsg(boolean syncRequired);
void sendUplinkMsg(UplinkMsg uplinkMsg);

66
common/edge-api/src/main/proto/edge.proto

@ -38,6 +38,7 @@ message RequestMsg {
ConnectRequestMsg connectRequestMsg = 2;
UplinkMsg uplinkMsg = 3;
DownlinkResponseMsg downlinkResponseMsg = 4;
SyncRequestMsg syncRequestMsg = 5;
}
message ResponseMsg {
@ -74,6 +75,13 @@ message ConnectResponseMsg {
EdgeConfiguration configuration = 3;
}
message SyncRequestMsg {
bool syncRequired = 1;
}
message SyncCompletedMsg {
}
message EdgeConfiguration {
int64 edgeIdMSB = 1;
int64 edgeIdLSB = 2;
@ -358,6 +366,22 @@ message DeviceCredentialsRequestMsg {
int64 deviceIdLSB = 2;
}
message DeviceProfileDevicesRequestMsg {
int64 deviceProfileIdMSB = 1;
int64 deviceProfileIdLSB = 2;
}
message WidgetBundleTypesRequestMsg {
int64 widgetBundleIdMSB = 1;
int64 widgetBundleIdLSB = 2;
}
message EntityViewsRequestMsg {
int64 entityIdMSB = 1;
int64 entityIdLSB = 2;
string entityType = 3;
}
message DeviceRpcCallMsg {
int64 deviceIdMSB = 1;
int64 deviceIdLSB = 2;
@ -402,6 +426,9 @@ message UplinkMsg {
repeated UserCredentialsRequestMsg userCredentialsRequestMsg = 10;
repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 11;
repeated DeviceRpcCallMsg deviceRpcCallMsg = 12;
repeated DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg = 13;
repeated WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg = 14;
repeated EntityViewsRequestMsg entityViewsRequestMsg = 15;
}
message UplinkResponseMsg {
@ -416,24 +443,25 @@ message DownlinkResponseMsg {
message DownlinkMsg {
int32 downlinkMsgId = 1;
repeated EntityDataProto entityData = 2;
repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 3;
repeated DeviceUpdateMsg deviceUpdateMsg = 4;
repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 5;
repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 6;
repeated RuleChainUpdateMsg ruleChainUpdateMsg = 7;
repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 8;
repeated DashboardUpdateMsg dashboardUpdateMsg = 9;
repeated AssetUpdateMsg assetUpdateMsg = 10;
repeated EntityViewUpdateMsg entityViewUpdateMsg = 11;
repeated AlarmUpdateMsg alarmUpdateMsg = 12;
repeated UserUpdateMsg userUpdateMsg = 13;
repeated UserCredentialsUpdateMsg userCredentialsUpdateMsg = 14;
repeated CustomerUpdateMsg customerUpdateMsg = 15;
repeated RelationUpdateMsg relationUpdateMsg = 16;
repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 17;
repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 18;
repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 19;
repeated DeviceRpcCallMsg deviceRpcCallMsg = 20;
SyncCompletedMsg syncCompletedMsg = 2;
repeated EntityDataProto entityData = 3;
repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 4;
repeated DeviceUpdateMsg deviceUpdateMsg = 5;
repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 6;
repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 7;
repeated RuleChainUpdateMsg ruleChainUpdateMsg = 8;
repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 9;
repeated DashboardUpdateMsg dashboardUpdateMsg = 10;
repeated AssetUpdateMsg assetUpdateMsg = 11;
repeated EntityViewUpdateMsg entityViewUpdateMsg = 12;
repeated AlarmUpdateMsg alarmUpdateMsg = 13;
repeated UserUpdateMsg userUpdateMsg = 14;
repeated UserCredentialsUpdateMsg userCredentialsUpdateMsg = 15;
repeated CustomerUpdateMsg customerUpdateMsg = 16;
repeated RelationUpdateMsg relationUpdateMsg = 17;
repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 18;
repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 19;
repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 20;
repeated DeviceRpcCallMsg deviceRpcCallMsg = 21;
}

1
dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java

@ -558,6 +558,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic
if (userById.getCustomerId() == null || userById.getCustomerId().isNullUid()) {
pageData = findEdgesByTenantId(tenantId, pageLink);
} else {
// pageData = findEdgesByTenantIdAndCustomerId(tenantId, userById.getCustomerId(), pageLink);
pageData = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), pageLink);
}
if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {

Loading…
Cancel
Save