diff --git a/application/src/main/conf/logback.xml b/application/src/main/conf/logback.xml
index 898128c1ff..6d2c95ee84 100644
--- a/application/src/main/conf/logback.xml
+++ b/application/src/main/conf/logback.xml
@@ -36,6 +36,8 @@
+
+
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index 733cf75e64..28aaa583ab 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -80,6 +80,7 @@ import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
+import org.thingsboard.server.service.entitiy.entityView.TbEntityViewService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
@@ -220,6 +221,11 @@ public class ActorSystemContext {
@Getter
private EntityViewService entityViewService;
+ @Lazy
+ @Autowired(required = false)
+ @Getter
+ private TbEntityViewService tbEntityViewService;
+
@Autowired
@Getter
private TelemetrySubscriptionService tsSubService;
diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java
index c5d3483faf..a425b0968a 100644
--- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java
+++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java
@@ -137,6 +137,8 @@ public class ControllerConstants {
protected static final String EDGE_ASSIGN_RECEIVE_STEP_DESCRIPTION = "(Edge will receive this instantly, if it's currently connected, or once it's going to be connected to platform). ";
protected static final String ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION = "The case insensitive 'substring' filter based on the entity version name.";
+ protected static final String VERSION_ID_PARAM_DESCRIPTION = "Version id, for example fd82625bdd7d6131cf8027b44ee967012ecaf990. Represents commit hash.";
+ protected static final String BRANCH_PARAM_DESCRIPTION = "The name of the working branch, for example 'master'";
protected static final String MARKDOWN_CODE_BLOCK_START = "```json\n";
protected static final String MARKDOWN_CODE_BLOCK_END = "\n```";
diff --git a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java
index 1bcc304548..2e37bcbe59 100644
--- a/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/EntitiesVersionControlController.java
@@ -20,7 +20,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
-import lombok.Data;
import lombok.RequiredArgsConstructor;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping;
@@ -39,6 +38,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
+import org.thingsboard.server.common.data.sync.vc.BranchInfo;
import org.thingsboard.server.common.data.sync.vc.EntityDataDiff;
import org.thingsboard.server.common.data.sync.vc.EntityDataInfo;
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
@@ -56,15 +56,24 @@ import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.stream.Collectors;
+import static org.thingsboard.server.controller.ControllerConstants.BRANCH_PARAM_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.ENTITY_ID_PARAM_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.ENTITY_TYPE_PARAM_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_END;
+import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_START;
import static org.thingsboard.server.controller.ControllerConstants.NEW_LINE;
+import static org.thingsboard.server.controller.ControllerConstants.PAGE_DATA_PARAMETERS;
import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_VERSION_TEXT_SEARCH_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_ALLOWABLE_VALUES;
+import static org.thingsboard.server.controller.ControllerConstants.TENANT_AUTHORITY_PARAGRAPH;
import static org.thingsboard.server.controller.ControllerConstants.VC_REQUEST_ID_PARAM_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.VERSION_ID_PARAM_DESCRIPTION;
@RestController
@TbCoreComponent
@@ -76,9 +85,20 @@ public class EntitiesVersionControlController extends BaseController {
private final EntitiesVersionControlService versionControlService;
- @ApiOperation(value = "", notes = "" +
- "SINGLE_ENTITY:" + NEW_LINE +
- "```\n{\n" +
+ @ApiOperation(value = "Save entities version (saveEntitiesVersion)", notes = "" +
+ "Creates a new version of entities (or a single entity) by request.\n" +
+ "Supported entity types: CUSTOMER, ASSET, RULE_CHAIN, DASHBOARD, DEVICE_PROFILE, DEVICE, ENTITY_VIEW, WIDGETS_BUNDLE." + NEW_LINE +
+ "There are two available types of request: `SINGLE_ENTITY` and `COMPLEX`. " +
+ "Each of them contains version name (`versionName`) and name of a branch (`branch`) to create version (commit) in. " +
+ "If specified branch does not exists in a remote repo, then new empty branch will be created. " +
+ "Request of the `SINGLE_ENTITY` type has id of an entity (`entityId`) and additional configuration (`config`) " +
+ "which has following options: \n" +
+ "- `saveRelations` - whether to add inbound and outbound relations of type COMMON to created entity version;\n" +
+ "- `saveAttributes` - to save attributes of server scope (and also shared scope for devices);\n" +
+ "- `saveCredentials` - when saving a version of a device, to add its credentials to the version." + NEW_LINE +
+ "An example of a `SINGLE_ENTITY` version create request:\n" +
+ MARKDOWN_CODE_BLOCK_START +
+ "{\n" +
" \"type\": \"SINGLE_ENTITY\",\n" +
"\n" +
" \"versionName\": \"Version 1.0\",\n" +
@@ -89,11 +109,25 @@ public class EntitiesVersionControlController extends BaseController {
" \"id\": \"b79448e0-d4f4-11ec-847b-0f432358ab48\"\n" +
" },\n" +
" \"config\": {\n" +
- " \"saveRelations\": true\n" +
+ " \"saveRelations\": true,\n" +
+ " \"saveAttributes\": true,\n" +
+ " \"saveCredentials\": false\n" +
" }\n" +
- "}\n```" + NEW_LINE +
- "COMPLEX:" + NEW_LINE +
- "```\n{\n" +
+ "}" +
+ MARKDOWN_CODE_BLOCK_END + NEW_LINE +
+ "Second request type (`COMPLEX`), additionally to `branch` and `versionName`, contains following properties:\n" +
+ "- `entityTypes` - a structure with entity types to export and configuration for each entity type; " +
+ " this configuration has all the options available for `SINGLE_ENTITY` and additionally has these ones: \n" +
+ " - `allEntities` and `entityIds` - if you want to save the version of all entities of the entity type " +
+ " then set `allEntities` param to true, otherwise set it to false and specify the list of specific entities (`entityIds`);\n" +
+ " - `syncStrategy` - synchronization strategy to use for this entity type: when set to `OVERWRITE` " +
+ " then the list of remote entities of this type will be overwritten by newly added entities. If set to " +
+ " `MERGE` - existing remote entities of this entity type will not be removed, new entities will just " +
+ " be added on top (or existing remote entities will be updated).\n" +
+ "- `syncStrategy` - default synchronization strategy to use when it is not specified for an entity type." + NEW_LINE +
+ "Example for this type of request:\n" +
+ MARKDOWN_CODE_BLOCK_START +
+ "{\n" +
" \"type\": \"COMPLEX\",\n" +
"\n" +
" \"versionName\": \"Devices and profiles: release 2\",\n" +
@@ -104,7 +138,9 @@ public class EntitiesVersionControlController extends BaseController {
" \"DEVICE\": {\n" +
" \"syncStrategy\": null,\n" +
" \"allEntities\": true,\n" +
- " \"saveRelations\": true\n" +
+ " \"saveRelations\": true,\n" +
+ " \"saveAttributes\": true,\n" +
+ " \"saveCredentials\": true\n" +
" },\n" +
" \"DEVICE_PROFILE\": {\n" +
" \"syncStrategy\": \"MERGE\",\n" +
@@ -115,7 +151,11 @@ public class EntitiesVersionControlController extends BaseController {
" \"saveRelations\": true\n" +
" }\n" +
" }\n" +
- "}\n```")
+ "}" +
+ MARKDOWN_CODE_BLOCK_END + NEW_LINE +
+ "Response wil contain generated request UUID, that can be then used to retrieve " +
+ "status of operation via `getVersionCreateRequestStatus`.\n" +
+ TENANT_AUTHORITY_PARAGRAPH)
@PostMapping("/version")
public DeferredResult saveEntitiesVersion(@RequestBody VersionCreateRequest request) throws Exception {
SecurityUser user = getCurrentUser();
@@ -123,7 +163,32 @@ public class EntitiesVersionControlController extends BaseController {
return wrapFuture(versionControlService.saveEntitiesVersion(user, request));
}
- @ApiOperation(value = "", notes = "")
+ @ApiOperation(value = "Get version create request status (getVersionCreateRequestStatus)", notes = "" +
+ "Returns the status of previously made version create request. " + NEW_LINE +
+ "This status contains following properties:\n" +
+ "- `done` - whether request processing is finished;\n" +
+ "- `version` - created version info: timestamp, version id (commit hash), commit name and commit author;\n" +
+ "- `added` - count of items that were created in the remote repo;\n" +
+ "- `modified` - modified items count;\n" +
+ "- `removed` - removed items count;\n" +
+ "- `error` - error message, if an error occurred while handling the request." + NEW_LINE +
+ "An example of successful status:\n" +
+ MARKDOWN_CODE_BLOCK_START +
+ "{\n" +
+ " \"done\": true,\n" +
+ " \"added\": 10,\n" +
+ " \"modified\": 2,\n" +
+ " \"removed\": 5,\n" +
+ " \"version\": {\n" +
+ " \"timestamp\": 1655198528000,\n" +
+ " \"id\":\"8a834dd389ed80e0759ba8ee338b3f1fd160a114\",\n" +
+ " \"name\": \"My devices v2.0\",\n" +
+ " \"author\": \"John Doe\"\n" +
+ " },\n" +
+ " \"error\": null\n" +
+ "}" +
+ MARKDOWN_CODE_BLOCK_END +
+ TENANT_AUTHORITY_PARAGRAPH)
@GetMapping(value = "/version/{requestId}/status")
public VersionCreationResult getVersionCreateRequestStatus(@ApiParam(value = VC_REQUEST_ID_PARAM_DESCRIPTION, required = true)
@PathVariable UUID requestId) throws Exception {
@@ -131,17 +196,49 @@ public class EntitiesVersionControlController extends BaseController {
return versionControlService.getVersionCreateStatus(getCurrentUser(), requestId);
}
- @ApiOperation(value = "", notes = "" +
- "```\n[\n" +
- " {\n" +
- " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" +
- " \"name\": \"Device profile 1 version 1.0\"\n" +
- " }\n" +
- "]\n```")
- @GetMapping(value = "/version/{branch}/{entityType}/{externalEntityUuid}", params = {"pageSize", "page"})
- public DeferredResult> listEntityVersions(@PathVariable String branch,
+ @ApiOperation(value = "List entity versions (listEntityVersions)", notes = "" +
+ "Returns list of versions for a specific entity in a concrete branch. \n" +
+ "You need to specify external id of an entity to list versions for. This is `externalId` property of an entity, " +
+ "or otherwise if not set - simply id of this entity. \n" +
+ "If specified branch does not exist - empty page data will be returned. " + NEW_LINE +
+ "Each version info item has timestamp, id, name and author. Version id can then be used to restore the version. " +
+ PAGE_DATA_PARAMETERS + NEW_LINE +
+ "Response example: \n" +
+ MARKDOWN_CODE_BLOCK_START +
+ "{\n" +
+ " \"data\": [\n" +
+ " {\n" +
+ " \"timestamp\": 1655198593000,\n" +
+ " \"id\": \"fd82625bdd7d6131cf8027b44ee967012ecaf990\",\n" +
+ " \"name\": \"Devices and assets - v2.0\",\n" +
+ " \"author\": \"John Doe \"\n" +
+ " },\n" +
+ " {\n" +
+ " \"timestamp\": 1655198528000,\n" +
+ " \"id\": \"682adcffa9c8a2f863af6f00c4850323acbd4219\",\n" +
+ " \"name\": \"Update my device\",\n" +
+ " \"author\": \"John Doe \"\n" +
+ " },\n" +
+ " {\n" +
+ " \"timestamp\": 1655198280000,\n" +
+ " \"id\": \"d2a6087c2b30e18cc55e7cdda345a8d0dfb959a4\",\n" +
+ " \"name\": \"Devices and assets - v1.0\",\n" +
+ " \"author\": \"John Doe \"\n" +
+ " }\n" +
+ " ],\n" +
+ " \"totalPages\": 1,\n" +
+ " \"totalElements\": 3,\n" +
+ " \"hasNext\": false\n" +
+ "}" +
+ MARKDOWN_CODE_BLOCK_END +
+ TENANT_AUTHORITY_PARAGRAPH)
+ @GetMapping(value = "/version/{entityType}/{externalEntityUuid}", params = {"branch", "pageSize", "page"})
+ public DeferredResult> listEntityVersions(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable EntityType entityType,
+ @ApiParam(value = "A string value representing external entity id. This is `externalId` property of an entity, or otherwise if not set - simply id of this entity.")
@PathVariable UUID externalEntityUuid,
+ @ApiParam(value = BRANCH_PARAM_DESCRIPTION)
+ @RequestParam String branch,
@ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true)
@RequestParam int pageSize,
@ApiParam(value = PAGE_NUMBER_DESCRIPTION, required = true)
@@ -158,16 +255,17 @@ public class EntitiesVersionControlController extends BaseController {
return wrapFuture(versionControlService.listEntityVersions(getTenantId(), branch, externalEntityId, pageLink));
}
- @ApiOperation(value = "", notes = "" +
- "```\n[\n" +
- " {\n" +
- " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" +
- " \"name\": \"Device profiles from dev\"\n" +
- " }\n" +
- "]\n```")
- @GetMapping(value = "/version/{branch}/{entityType}", params = {"pageSize", "page"})
- public DeferredResult> listEntityTypeVersions(@PathVariable String branch,
+ @ApiOperation(value = "List entity type versions (listEntityTypeVersions)", notes = "" +
+ "Returns list of versions of an entity type in a branch. This is a collected list of versions that were created " +
+ "for entities of this type in a remote branch. \n" +
+ "If specified branch does not exist - empty page data will be returned. " +
+ "The response structure is the same as for `listEntityVersions` API method." +
+ TENANT_AUTHORITY_PARAGRAPH)
+ @GetMapping(value = "/version/{entityType}", params = {"branch", "pageSize", "page"})
+ public DeferredResult> listEntityTypeVersions(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable EntityType entityType,
+ @ApiParam(value = BRANCH_PARAM_DESCRIPTION, required = true)
+ @RequestParam String branch,
@ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true)
@RequestParam int pageSize,
@ApiParam(value = PAGE_NUMBER_DESCRIPTION, required = true)
@@ -183,23 +281,14 @@ public class EntitiesVersionControlController extends BaseController {
return wrapFuture(versionControlService.listEntityTypeVersions(getTenantId(), branch, entityType, pageLink));
}
- @ApiOperation(value = "", notes = "" +
- "```\n[\n" +
- " {\n" +
- " \"id\": \"ba9baaca1742b730e7331f31a6a51da5fc7da7f7\",\n" +
- " \"name\": \"Device 1 removed\"\n" +
- " },\n" +
- " {\n" +
- " \"id\": \"b3c28d722d328324c7c15b0b30047b0c40011cf7\",\n" +
- " \"name\": \"Device profiles added\"\n" +
- " },\n" +
- " {\n" +
- " \"id\": \"c30c8bcaed3f0813649f0dee51a89d04d0a12b28\",\n" +
- " \"name\": \"Devices added\"\n" +
- " }\n" +
- "]\n```")
- @GetMapping(value = "/version/{branch}", params = {"pageSize", "page"})
- public DeferredResult> listVersions(@PathVariable String branch,
+ @ApiOperation(value = "List all versions (listVersions)", notes = "" +
+ "Lists all available versions in a branch for all entity types. \n" +
+ "If specified branch does not exist - empty page data will be returned. " +
+ "The response format is the same as for `listEntityVersions` API method." +
+ TENANT_AUTHORITY_PARAGRAPH)
+ @GetMapping(value = "/version", params = {"branch", "pageSize", "page"})
+ public DeferredResult> listVersions(@ApiParam(value = BRANCH_PARAM_DESCRIPTION, required = true)
+ @RequestParam String branch,
@ApiParam(value = PAGE_SIZE_DESCRIPTION, required = true)
@RequestParam int pageSize,
@ApiParam(value = PAGE_NUMBER_DESCRIPTION, required = true)
@@ -216,43 +305,84 @@ public class EntitiesVersionControlController extends BaseController {
}
- @GetMapping("/entity/{branch}/{entityType}/{versionId}")
- public DeferredResult> listEntitiesAtVersion(@PathVariable String branch,
+ @ApiOperation(value = "List entities at version (listEntitiesAtVersion)", notes = "" +
+ "Returns a list of remote entities of a specific entity type that are available at a concrete version. \n" +
+ "Each entity item in the result has `externalId` property. " +
+ "Entities order will be the same as in the repository." +
+ TENANT_AUTHORITY_PARAGRAPH)
+ @GetMapping(value = "/entity/{entityType}/{versionId}", params = {"branch"})
+ public DeferredResult> listEntitiesAtVersion(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable EntityType entityType,
- @PathVariable String versionId) throws Exception {
+ @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true)
+ @PathVariable String versionId,
+ @ApiParam(value = BRANCH_PARAM_DESCRIPTION, required = true)
+ @RequestParam String branch) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
return wrapFuture(versionControlService.listEntitiesAtVersion(getTenantId(), branch, versionId, entityType));
}
- @GetMapping("/entity/{branch}/{versionId}")
- public DeferredResult> listAllEntitiesAtVersion(@PathVariable String branch,
- @PathVariable String versionId) throws Exception {
+ @ApiOperation(value = "List all entities at version (listAllEntitiesAtVersion)", notes = "" +
+ "Returns a list of all remote entities available in a specific version. " +
+ "Response type is the same as for listAllEntitiesAtVersion API method. \n" +
+ "Returned entities order will be the same as in the repository." +
+ TENANT_AUTHORITY_PARAGRAPH)
+ @GetMapping(value = "/entity/{versionId}", params = {"branch"})
+ public DeferredResult> listAllEntitiesAtVersion(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true)
+ @PathVariable String versionId,
+ @ApiParam(value = BRANCH_PARAM_DESCRIPTION, required = true)
+ @RequestParam String branch) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
return wrapFuture(versionControlService.listAllEntitiesAtVersion(getTenantId(), branch, versionId));
}
+ @ApiOperation(value = "Get entity data info (getEntityDataInfo)", notes = "" +
+ "Retrieves short info about the remote entity by external id at a concrete version. \n" +
+ "Returned entity data info contains following properties: " +
+ "`hasRelations` (whether stored entity data contains relations), `hasAttributes` (contains attributes) and " +
+ "`hasCredentials` (whether stored device data has credentials)." +
+ TENANT_AUTHORITY_PARAGRAPH)
@GetMapping("/info/{versionId}/{entityType}/{externalEntityUuid}")
- public DeferredResult getEntityDataInfo(@PathVariable String versionId,
+ public DeferredResult getEntityDataInfo(@ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true)
+ @PathVariable String versionId,
+ @ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable EntityType entityType,
+ @ApiParam(value = "A string value representing external entity id", required = true)
@PathVariable UUID externalEntityUuid) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, externalEntityUuid);
return wrapFuture(versionControlService.getEntityDataInfo(getCurrentUser(), entityId, versionId));
}
- @GetMapping("/diff/{branch}/{entityType}/{internalEntityUuid}")
- public DeferredResult compareEntityDataToVersion(@PathVariable String branch,
+ @ApiOperation(value = "Compare entity data to version (compareEntityDataToVersion)", notes = "" +
+ "Returns an object with current entity data and the one at a specific version. " +
+ "Entity data structure is the same as stored in a repository. " +
+ TENANT_AUTHORITY_PARAGRAPH)
+ @GetMapping(value = "/diff/{entityType}/{internalEntityUuid}", params = {"branch", "versionId"})
+ public DeferredResult compareEntityDataToVersion(@ApiParam(value = ENTITY_TYPE_PARAM_DESCRIPTION, required = true)
@PathVariable EntityType entityType,
+ @ApiParam(value = ENTITY_ID_PARAM_DESCRIPTION, required = true)
@PathVariable UUID internalEntityUuid,
+ @ApiParam(value = BRANCH_PARAM_DESCRIPTION)
+ @RequestParam String branch,
+ @ApiParam(value = VERSION_ID_PARAM_DESCRIPTION, required = true)
@RequestParam String versionId) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, internalEntityUuid);
return wrapFuture(versionControlService.compareEntityDataToVersion(getCurrentUser(), branch, entityId, versionId));
}
- @ApiOperation(value = "", notes = "" +
- "SINGLE_ENTITY:" + NEW_LINE +
- "```\n{\n" +
+ @ApiOperation(value = "Load entities version (loadEntitiesVersion)", notes = "" +
+ "Loads specific version of remote entities (or single entity) by request. " +
+ "Supported entity types: CUSTOMER, ASSET, RULE_CHAIN, DASHBOARD, DEVICE_PROFILE, DEVICE, ENTITY_VIEW, WIDGETS_BUNDLE." + NEW_LINE +
+ "There are multiple types of request. Each of them requires branch name (`branch`) and version id (`versionId`). " +
+ "Request of type `SINGLE_ENTITY` is needed to restore a concrete version of a specific entity. It contains " +
+ "id of a remote entity (`externalEntityId`) and additional configuration (`config`):\n" +
+ "- `loadRelations` - to update relations list (in case `saveRelations` option was enabled during version creation);\n" +
+ "- `loadAttributes` - to load entity attributes (if `saveAttributes` config option was enabled);\n" +
+ "- `loadCredentials` - to update device credentials (if `saveCredentials` option was enabled during version creation)." + NEW_LINE +
+ "An example of such request:\n" +
+ MARKDOWN_CODE_BLOCK_START +
+ "{\n" +
" \"type\": \"SINGLE_ENTITY\",\n" +
" \n" +
" \"branch\": \"dev\",\n" +
@@ -264,11 +394,23 @@ public class EntitiesVersionControlController extends BaseController {
" },\n" +
" \"config\": {\n" +
" \"loadRelations\": false,\n" +
- " \"findExistingEntityByName\": false\n" +
+ " \"loadAttributes\": true,\n" +
+ " \"loadCredentials\": true\n" +
" }\n" +
- "}\n```" + NEW_LINE +
- "ENTITY_TYPE:" + NEW_LINE +
- "```\n{\n" +
+ "}" +
+ MARKDOWN_CODE_BLOCK_END + NEW_LINE +
+ "Another request type (`ENTITY_TYPE`) is needed to load specific version of the whole entity types. " +
+ "It contains a structure with entity types to load and configs for each entity type (`entityTypes`). " +
+ "For each specified entity type, the method will load all remote entities of this type that are present " +
+ "at the version. A config for each entity type contains the same options as in `SINGLE_ENTITY` request type, and " +
+ "additionally contains following options:\n" +
+ "- `removeOtherEntities` - to remove local entities that are not present on the remote - basically to " +
+ " overwrite local entity type with the remote one;\n" +
+ "- `findExistingEntityByName` - when you are loading some remote entities that are not yet present at this tenant, " +
+ " try to find existing entity by name and update it rather than create new." + NEW_LINE +
+ "Here is an example of the request to completely restore version of the whole device entity type:\n" +
+ MARKDOWN_CODE_BLOCK_START +
+ "{\n" +
" \"type\": \"ENTITY_TYPE\",\n" +
"\n" +
" \"branch\": \"dev\",\n" +
@@ -276,12 +418,18 @@ public class EntitiesVersionControlController extends BaseController {
"\n" +
" \"entityTypes\": {\n" +
" \"DEVICE\": {\n" +
- " \"loadRelations\": false,\n" +
+ " \"removeOtherEntities\": true,\n" +
" \"findExistingEntityByName\": false,\n" +
- " \"removeOtherEntities\": true\n" +
+ " \"loadRelations\": true,\n" +
+ " \"loadAttributes\": true,\n" +
+ " \"loadCredentials\": true\n" +
" }\n" +
" }\n" +
- "}\n```")
+ "}" +
+ MARKDOWN_CODE_BLOCK_END + NEW_LINE +
+ "The response will contain generated request UUID that is to be used to check the status of operation " +
+ "via `getVersionLoadRequestStatus`." +
+ TENANT_AUTHORITY_PARAGRAPH)
@PostMapping("/entity")
public UUID loadEntitiesVersion(@RequestBody VersionLoadRequest request) throws Exception {
SecurityUser user = getCurrentUser();
@@ -289,17 +437,54 @@ public class EntitiesVersionControlController extends BaseController {
return versionControlService.loadEntitiesVersion(user, request);
}
- @ApiOperation(value = "", notes = "")
+ @ApiOperation(value = "Get version load request status (getVersionLoadRequestStatus)", notes = "" +
+ "Returns the status of previously made version load request. " +
+ "The structure contains following parameters:\n" +
+ "- `done` - if the request was successfully processed;\n" +
+ "- `result` - a list of load results for each entity type:\n" +
+ " - `created` - created entities count;\n" +
+ " - `updated` - updated entities count;\n" +
+ " - `deleted` - removed entities count.\n" +
+ "- `error` - if an error occurred during processing, error info:\n" +
+ " - `type` - error type;\n" +
+ " - `source` - an external id of remote entity;\n" +
+ " - `target` - if failed to find referenced entity by external id - this external id;\n" +
+ " - `message` - error message." + NEW_LINE +
+ "An example of successfully processed request status:\n" +
+ MARKDOWN_CODE_BLOCK_START +
+ "{\n" +
+ " \"done\": true,\n" +
+ " \"result\": [\n" +
+ " {\n" +
+ " \"entityType\": \"DEVICE\",\n" +
+ " \"created\": 10,\n" +
+ " \"updated\": 5,\n" +
+ " \"deleted\": 5\n" +
+ " },\n" +
+ " {\n" +
+ " \"entityType\": \"ASSET\",\n" +
+ " \"created\": 4,\n" +
+ " \"updated\": 0,\n" +
+ " \"deleted\": 8\n" +
+ " }\n" +
+ " ]\n" +
+ "}" +
+ MARKDOWN_CODE_BLOCK_END +
+ TENANT_AUTHORITY_PARAGRAPH
+ )
@GetMapping(value = "/entity/{requestId}/status")
public VersionLoadResult getVersionLoadRequestStatus(@ApiParam(value = VC_REQUEST_ID_PARAM_DESCRIPTION, required = true)
- @PathVariable UUID requestId) throws Exception {
+ @PathVariable UUID requestId) throws Exception {
accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
return versionControlService.getVersionLoadStatus(getCurrentUser(), requestId);
}
- @ApiOperation(value = "", notes = "" +
- "```\n[\n" +
+ @ApiOperation(value = "List branches (listBranches)", notes = "" +
+ "Lists branches available in the remote repository. \n\n" +
+ "Response example: \n" +
+ MARKDOWN_CODE_BLOCK_START +
+ "[\n" +
" {\n" +
" \"name\": \"master\",\n" +
" \"default\": true\n" +
@@ -312,37 +497,29 @@ public class EntitiesVersionControlController extends BaseController {
" \"name\": \"dev-2\",\n" +
" \"default\": false\n" +
" }\n" +
- "]\n\n```")
+ "]" +
+ MARKDOWN_CODE_BLOCK_END)
@GetMapping("/branches")
- public DeferredResult> listBranches() throws ThingsboardException {
- try {
- accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
- final TenantId tenantId = getTenantId();
- ListenableFuture> branches = versionControlService.listBranches(tenantId);
- return wrapFuture(Futures.transform(branches, remoteBranches -> {
- List infos = new ArrayList<>();
-
- String defaultBranch = versionControlService.getVersionControlSettings(tenantId).getDefaultBranch();
- if (StringUtils.isNotEmpty(defaultBranch)) {
- infos.add(new BranchInfo(defaultBranch, true));
- }
-
- remoteBranches.forEach(branch -> {
- if (!branch.equals(defaultBranch)) {
- infos.add(new BranchInfo(branch, false));
- }
- });
- return infos;
- }, MoreExecutors.directExecutor()));
- } catch (Exception e) {
- throw handleException(e);
- }
- }
-
- @Data
- public static class BranchInfo {
- private final String name;
- private final boolean isDefault;
+ public DeferredResult> listBranches() throws Exception {
+ accessControlService.checkPermission(getCurrentUser(), Resource.VERSION_CONTROL, Operation.READ);
+ final TenantId tenantId = getTenantId();
+ ListenableFuture> branches = versionControlService.listBranches(tenantId);
+ return wrapFuture(Futures.transform(branches, remoteBranches -> {
+ List infos = new ArrayList<>();
+ BranchInfo defaultBranch;
+ String defaultBranchName = versionControlService.getVersionControlSettings(tenantId).getDefaultBranch();
+ if (StringUtils.isNotEmpty(defaultBranchName)) {
+ defaultBranch = new BranchInfo(defaultBranchName, true);
+ } else {
+ defaultBranch = remoteBranches.stream().filter(BranchInfo::isDefault).findFirst().orElse(null);
+ }
+ if (defaultBranch != null) {
+ infos.add(defaultBranch);
+ }
+ infos.addAll(remoteBranches.stream().filter(b -> !b.equals(defaultBranch))
+ .map(b -> new BranchInfo(b.getName(), false)).collect(Collectors.toList()));
+ return infos;
+ }, MoreExecutors.directExecutor()));
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
index f279a29aac..7a95bf78f9 100644
--- a/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
+++ b/application/src/main/java/org/thingsboard/server/service/component/AnnotationComponentDiscoveryService.java
@@ -161,20 +161,20 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
configurationDescriptor.set("nodeDefinition", node);
scannedComponent.setConfigurationDescriptor(configurationDescriptor);
scannedComponent.setClazz(clazzName);
- log.info("Processing scanned component: {}", scannedComponent);
+ log.debug("Processing scanned component: {}", scannedComponent);
} catch (Exception e) {
log.error("Can't initialize component {}, due to {}", def.getBeanClassName(), e.getMessage(), e);
throw new RuntimeException(e);
}
ComponentDescriptor persistedComponent = componentDescriptorService.findByClazz(TenantId.SYS_TENANT_ID, clazzName);
if (persistedComponent == null) {
- log.info("Persisting new component: {}", scannedComponent);
+ log.debug("Persisting new component: {}", scannedComponent);
scannedComponent = componentDescriptorService.saveComponent(TenantId.SYS_TENANT_ID, scannedComponent);
} else if (scannedComponent.equals(persistedComponent)) {
- log.info("Component is already persisted: {}", persistedComponent);
+ log.debug("Component is already persisted: {}", persistedComponent);
scannedComponent = persistedComponent;
} else {
- log.info("Component {} will be updated to {}", persistedComponent, scannedComponent);
+ log.debug("Component {} will be updated to {}", persistedComponent, scannedComponent);
componentDescriptorService.deleteByClazz(TenantId.SYS_TENANT_ID, persistedComponent.getClazz());
scannedComponent.setId(persistedComponent.getId());
scannedComponent = componentDescriptorService.saveComponent(TenantId.SYS_TENANT_ID, scannedComponent);
@@ -224,7 +224,7 @@ public class AnnotationComponentDiscoveryService implements ComponentDiscoverySe
@Override
public void discoverComponents() {
registerRuleNodeComponents();
- log.info("Found following definitions: {}", components.values());
+ log.debug("Found following definitions: {}", components.values());
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java
index c67f7a5a14..8f37d743b5 100644
--- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessor.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EdgeUtils;
@@ -53,6 +54,8 @@ import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
+import org.thingsboard.server.queue.discovery.PartitionService;
+import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.edge.rpc.constructor.AdminSettingsMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AlarmMsgConstructor;
import org.thingsboard.server.service.edge.rpc.constructor.AssetMsgConstructor;
@@ -142,6 +145,13 @@ public abstract class BaseEdgeProcessor {
@Autowired
protected OtaPackageService otaPackageService;
+ @Autowired
+ protected PartitionService partitionService;
+
+ @Autowired
+ @Lazy
+ protected TbQueueProducerProvider producerProvider;
+
@Autowired
protected DataValidator deviceValidator;
diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java
index 769d988507..f30111e1ef 100644
--- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java
+++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java
@@ -52,6 +52,8 @@ import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
+import org.thingsboard.server.common.msg.queue.ServiceType;
+import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.common.transport.util.JsonUtils;
@@ -61,9 +63,12 @@ import org.thingsboard.server.gen.edge.v1.EntityDataProto;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
+import org.thingsboard.server.queue.TbQueueProducer;
+import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.Nullable;
+import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -77,14 +82,19 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
private final Gson gson = new Gson();
+ private TbQueueProducer> tbCoreMsgProducer;
+
+ @PostConstruct
+ public void init() {
+ tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
+ }
+
public List> processTelemetryFromEdge(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) {
log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData);
List> result = new ArrayList<>();
EntityId entityId = constructEntityId(entityData);
if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg() || entityData.hasAttributesUpdatedMsg()) && entityId != null) {
- // @voba - in terms of performance we should not fetch device from DB by id
- // TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
- TbMsgMetaData metaData = new TbMsgMetaData();
+ TbMsgMetaData metaData = constructBaseMsgMetadata(tenantId, entityId);
metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
if (entityData.hasPostAttributesMsg()) {
result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData));
@@ -96,6 +106,20 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
if (entityData.hasPostTelemetryMsg()) {
result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData));
}
+ if (EntityType.DEVICE.equals(entityId.getEntityType())) {
+ DeviceId deviceId = new DeviceId(entityId.getId());
+
+ TransportProtos.DeviceActivityProto deviceActivityMsg = TransportProtos.DeviceActivityProto.newBuilder()
+ .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
+ .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
+ .setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
+ .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
+ .setLastActivityTime(System.currentTimeMillis()).build();
+
+ TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
+ tbCoreMsgProducer.send(tpi, new TbProtoQueueMsg<>(deviceId.getId(),
+ TransportProtos.ToCoreMsg.newBuilder().setDeviceActivityMsg(deviceActivityMsg).build()), null);
+ }
}
if (entityData.hasAttributeDeleteMsg()) {
result.add(processAttributeDeleteMsg(tenantId, entityId, entityData.getAttributeDeleteMsg(), entityData.getEntityType()));
diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java
index dbbb6ddb1f..210ad30c44 100644
--- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java
+++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.Device;
@@ -72,6 +73,7 @@ import org.thingsboard.server.gen.edge.v1.RelationRequestMsg;
import org.thingsboard.server.gen.edge.v1.RuleChainMetadataRequestMsg;
import org.thingsboard.server.gen.edge.v1.UserCredentialsRequestMsg;
import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg;
+import org.thingsboard.server.service.entitiy.entityView.TbEntityViewService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import java.util.ArrayList;
@@ -100,8 +102,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
@Autowired
private DeviceService deviceService;
+ @Lazy
@Autowired
- private EntityViewService entityViewService;
+ private TbEntityViewService entityViewService;
@Autowired
private DeviceProfileService deviceProfileService;
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java
index 8439c1780b..e3d91b7a7e 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/AbstractTbEntityService.java
@@ -21,6 +21,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Lazy;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.HasName;
@@ -64,6 +65,7 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.service.action.EntityActionService;
import org.thingsboard.server.service.edge.EdgeNotificationService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
+import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.sync.vc.EntitiesVersionControlService;
import org.thingsboard.server.service.install.InstallScripts;
import org.thingsboard.server.service.ota.OtaPackageStateService;
@@ -77,6 +79,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
import java.util.stream.Collectors;
@Slf4j
@@ -93,13 +96,13 @@ public abstract class AbstractTbEntityService {
@Autowired
protected DbCallbackExecutorService dbExecutor;
- @Autowired
+ @Autowired(required = false)
protected TbNotificationEntityService notificationEntityService;
@Autowired(required = false)
protected EdgeService edgeService;
@Autowired
protected AlarmService alarmService;
- @Autowired
+ @Autowired(required = false)
protected EntityActionService entityActionService;
@Autowired
protected DeviceService deviceService;
@@ -111,24 +114,27 @@ public abstract class AbstractTbEntityService {
protected TenantService tenantService;
@Autowired
protected CustomerService customerService;
- @Autowired
+ @Lazy
+ @Autowired(required = false)
protected ClaimDevicesService claimDevicesService;
@Autowired
protected TbTenantProfileCache tenantProfileCache;
@Autowired
protected RuleChainService ruleChainService;
- @Autowired
+ @Autowired(required = false)
protected TbRuleChainService tbRuleChainService;
- @Autowired
+ @Autowired(required = false)
protected EdgeNotificationService edgeNotificationService;
@Autowired
protected QueueService queueService;
@Autowired
protected DashboardService dashboardService;
- @Autowired
- protected EntitiesVersionControlService vcService;
+
+ @Autowired(required = false)
+ private EntitiesVersionControlService vcService;
@Autowired
protected EntityViewService entityViewService;
+ @Lazy
@Autowired
protected TelemetrySubscriptionService tsSubService;
@Autowired
@@ -149,7 +155,7 @@ public abstract class AbstractTbEntityService {
protected InstallScripts installScripts;
@Autowired
protected UserService userService;
- @Autowired
+ @Autowired(required = false)
protected TbResourceService resourceService;
@Autowired
protected WidgetsBundleService widgetsBundleService;
@@ -245,4 +251,13 @@ public abstract class AbstractTbEntityService {
}
return result;
}
+
+ protected ListenableFuture autoCommit(SecurityUser user, EntityId entityId) throws Exception {
+ if (vcService != null) {
+ return vcService.autoCommit(user, entityId);
+ } else {
+ // We do not support auto-commit for rule engine
+ return Futures.immediateFailedFuture(new RuntimeException("Operation not supported!"));
+ }
+ }
}
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java
index 7ee7696f1a..76477c974b 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java
@@ -128,7 +128,7 @@ public class DefaultTbNotificationEntityService implements TbNotificationEntityS
}
@Override
- public void notifyCreateOruUpdateTenant(Tenant tenant, ComponentLifecycleEvent event) {
+ public void notifyCreateOrUpdateTenant(Tenant tenant, ComponentLifecycleEvent event) {
tbClusterService.onTenantChange(tenant, null);
tbClusterService.broadcastEntityStateChangeEvent(tenant.getId(), tenant.getId(), event);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/TbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/TbNotificationEntityService.java
index c1131761dc..5ce91046d6 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/TbNotificationEntityService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/TbNotificationEntityService.java
@@ -71,7 +71,7 @@ public interface TbNotificationEntityService {
E entity, ActionType actionType,
SecurityUser user, Object... additionalInfo);
- void notifyCreateOruUpdateTenant(Tenant tenant, ComponentLifecycleEvent event);
+ void notifyCreateOrUpdateTenant(Tenant tenant, ComponentLifecycleEvent event);
void notifyDeleteTenant(Tenant tenant);
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/asset/DefaultTbAssetService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/asset/DefaultTbAssetService.java
index e118346588..e71c41b2f3 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/asset/DefaultTbAssetService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/asset/DefaultTbAssetService.java
@@ -47,7 +47,7 @@ public class DefaultTbAssetService extends AbstractTbEntityService implements Tb
TenantId tenantId = asset.getTenantId();
try {
Asset savedAsset = checkNotNull(assetService.saveAsset(asset));
- vcService.autoCommit(user, savedAsset.getId());
+ autoCommit(user, savedAsset.getId());
notificationEntityService.notifyCreateOrUpdateEntity(tenantId, savedAsset.getId(), savedAsset, savedAsset.getCustomerId(), actionType, user);
return savedAsset;
} catch (Exception e) {
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/customer/DefaultTbCustomerService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/customer/DefaultTbCustomerService.java
index 536e1be976..4d5a85d811 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/customer/DefaultTbCustomerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/customer/DefaultTbCustomerService.java
@@ -42,7 +42,7 @@ public class DefaultTbCustomerService extends AbstractTbEntityService implements
TenantId tenantId = customer.getTenantId();
try {
Customer savedCustomer = checkNotNull(customerService.saveCustomer(customer));
- vcService.autoCommit(user, savedCustomer.getId());
+ autoCommit(user, savedCustomer.getId());
notificationEntityService.notifyCreateOrUpdateEntity(tenantId, savedCustomer.getId(), savedCustomer, null, actionType, user);
return savedCustomer;
} catch (Exception e) {
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DefaultTbDashboardService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DefaultTbDashboardService.java
index a475ff77da..c7a04fbb78 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DefaultTbDashboardService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/dashboard/DefaultTbDashboardService.java
@@ -48,7 +48,7 @@ public class DefaultTbDashboardService extends AbstractTbEntityService implement
TenantId tenantId = dashboard.getTenantId();
try {
Dashboard savedDashboard = checkNotNull(dashboardService.saveDashboard(dashboard));
- vcService.autoCommit(user, savedDashboard.getId());
+ autoCommit(user, savedDashboard.getId());
notificationEntityService.notifyCreateOrUpdateEntity(tenantId, savedDashboard.getId(), savedDashboard,
null, actionType, user);
return savedDashboard;
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/device/DefaultTbDeviceService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/device/DefaultTbDeviceService.java
index 5199887940..5c232e4712 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/device/DefaultTbDeviceService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/device/DefaultTbDeviceService.java
@@ -54,7 +54,7 @@ public class DefaultTbDeviceService extends AbstractTbEntityService implements T
ActionType actionType = device.getId() == null ? ActionType.ADDED : ActionType.UPDATED;
try {
Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken));
- vcService.autoCommit(user, savedDevice.getId());
+ autoCommit(user, savedDevice.getId());
notificationEntityService.notifyCreateOrUpdateDevice(tenantId, savedDevice.getId(), savedDevice.getCustomerId(),
savedDevice, oldDevice, actionType, user);
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/deviceProfile/DefaultTbDeviceProfileService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/deviceProfile/DefaultTbDeviceProfileService.java
index 3a0c1f1a23..a92e5109f1 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/deviceProfile/DefaultTbDeviceProfileService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/deviceProfile/DefaultTbDeviceProfileService.java
@@ -54,7 +54,7 @@ public class DefaultTbDeviceProfileService extends AbstractTbEntityService imple
}
}
DeviceProfile savedDeviceProfile = checkNotNull(deviceProfileService.saveDeviceProfile(deviceProfile));
- vcService.autoCommit(user, savedDeviceProfile.getId());
+ autoCommit(user, savedDeviceProfile.getId());
tbClusterService.onDeviceProfileChange(savedDeviceProfile, null);
tbClusterService.broadcastEntityStateChangeEvent(tenantId, savedDeviceProfile.getId(),
actionType.equals(ActionType.ADDED) ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java
index 2eee167e75..cd30276fab 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/DefaultTbEntityViewService.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.SettableFuture;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
+import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
@@ -40,8 +41,9 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
-import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
import org.thingsboard.server.service.security.model.SecurityUser;
@@ -50,19 +52,22 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
@Service
-@TbCoreComponent
@AllArgsConstructor
@Slf4j
public class DefaultTbEntityViewService extends AbstractTbEntityService implements TbEntityViewService {
private final TimeseriesService tsService;
+ final Map>> localCache = new ConcurrentHashMap<>();
+
@Override
public EntityView save(EntityView entityView, EntityView existingEntityView, SecurityUser user) throws ThingsboardException {
ActionType actionType = entityView.getId() == null ? ActionType.ADDED : ActionType.UPDATED;
@@ -71,6 +76,9 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
this.updateEntityViewAttributes(user, savedEntityView, existingEntityView);
notificationEntityService.notifyCreateOrUpdateEntity(savedEntityView.getTenantId(), savedEntityView.getId(), savedEntityView,
null, actionType, user);
+ localCache.computeIfAbsent(savedEntityView.getTenantId(), (k) -> new ConcurrentReferenceHashMap<>()).clear();
+ tbClusterService.broadcastEntityStateChangeEvent(savedEntityView.getTenantId(), savedEntityView.getId(),
+ entityView.getId() == null ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
return savedEntityView;
} catch (Exception e) {
notificationEntityService.notifyEntity(user.getTenantId(), emptyId(EntityType.ENTITY_VIEW), entityView, null, actionType, user, e);
@@ -122,6 +130,9 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
entityViewService.deleteEntityView(tenantId, entityViewId);
notificationEntityService.notifyDeleteEntity(tenantId, entityViewId, entityView, entityView.getCustomerId(), ActionType.DELETED,
relatedEdgeIds, user, entityViewId.toString());
+
+ localCache.computeIfAbsent(tenantId, (k) -> new ConcurrentReferenceHashMap<>()).clear();
+ tbClusterService.broadcastEntityStateChangeEvent(tenantId, entityViewId, ComponentLifecycleEvent.DELETED);
} catch (Exception e) {
notificationEntityService.notifyEntity(tenantId, emptyId(EntityType.ENTITY_VIEW), null, null,
ActionType.DELETED, user, e, entityViewId.toString());
@@ -214,6 +225,51 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
}
}
+ @Override
+ public ListenableFuture> findEntityViewsByTenantIdAndEntityIdAsync(TenantId tenantId, EntityId entityId) {
+ Map> localCacheByTenant = localCache.computeIfAbsent(tenantId, (k) -> new ConcurrentReferenceHashMap<>());
+ List fromLocalCache = localCacheByTenant.get(entityId);
+ if (fromLocalCache != null) {
+ return Futures.immediateFuture(fromLocalCache);
+ }
+
+ ListenableFuture> future = entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId);
+
+ return Futures.transform(future, (entityViewList) -> {
+ localCacheByTenant.put(entityId, entityViewList);
+ return entityViewList;
+ }, MoreExecutors.directExecutor());
+ }
+
+ @Override
+ public void onComponentLifecycleMsg(ComponentLifecycleMsg componentLifecycleMsg) {
+ Map> localCacheByTenant = localCache.computeIfAbsent(componentLifecycleMsg.getTenantId(), (k) -> new ConcurrentReferenceHashMap<>());
+ EntityViewId entityViewId = new EntityViewId(componentLifecycleMsg.getEntityId().getId());
+ deleteOldCacheValue(localCacheByTenant, entityViewId);
+ if (componentLifecycleMsg.getEvent() != ComponentLifecycleEvent.DELETED) {
+ EntityView entityView = entityViewService.findEntityViewById(componentLifecycleMsg.getTenantId(), entityViewId);
+ if (entityView != null) {
+ localCacheByTenant.remove(entityView.getEntityId());
+ }
+ }
+ }
+
+ private void deleteOldCacheValue(Map> localCacheByTenant, EntityViewId entityViewId) {
+ for (var entry : localCacheByTenant.entrySet()) {
+ EntityView toDelete = null;
+ for (EntityView view : entry.getValue()) {
+ if (entityViewId.equals(view.getId())) {
+ toDelete = view;
+ break;
+ }
+ }
+ if (toDelete != null) {
+ entry.getValue().remove(toDelete);
+ break;
+ }
+ }
+ }
+
private ListenableFuture> copyAttributesFromEntityToEntityView(EntityView entityView, String scope, Collection keys, SecurityUser user) throws ThingsboardException {
EntityViewId entityId = entityView.getId();
if (keys != null && !keys.isEmpty()) {
@@ -229,8 +285,8 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
long lastUpdateTs = attributeKvEntry.getLastUpdateTs();
return startTime == 0 && endTime == 0 ||
(endTime == 0 && startTime < lastUpdateTs) ||
- (startTime == 0 && endTime > lastUpdateTs)
- ? true : startTime < lastUpdateTs && endTime > lastUpdateTs;
+ (startTime == 0 && endTime > lastUpdateTs) ||
+ (startTime < lastUpdateTs && endTime > lastUpdateTs);
}).collect(Collectors.toList());
tsSubService.saveAndNotify(entityView.getTenantId(), entityId, scope, attributes, new FutureCallback() {
@Override
@@ -345,7 +401,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
@Override
public void onFailure(Throwable t) {
try {
- logTimeseriesDeleted(entityView.getTenantId(),user, entityId, keys, t);
+ logTimeseriesDeleted(entityView.getTenantId(), user, entityId, keys, t);
} catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java
index e7e150531b..e762877bc3 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/entityView/TbEntityViewService.java
@@ -15,22 +15,27 @@
*/
package org.thingsboard.server.service.entitiy.entityView;
+import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.CustomerId;
+import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleListener;
import org.thingsboard.server.service.security.model.SecurityUser;
-public interface TbEntityViewService {
+import java.util.List;
+
+public interface TbEntityViewService extends ComponentLifecycleListener {
EntityView save(EntityView entityView, EntityView existingEntityView, SecurityUser user) throws ThingsboardException;
void updateEntityViewAttributes(SecurityUser user, EntityView savedEntityView, EntityView oldEntityView) throws ThingsboardException;
- void delete (EntityView entity, SecurityUser user) throws ThingsboardException;
+ void delete (EntityView entity, SecurityUser user) throws ThingsboardException;
EntityView assignEntityViewToCustomer(TenantId tenantId, EntityViewId entityViewId, Customer customer,
SecurityUser user) throws ThingsboardException;
@@ -46,4 +51,6 @@ public interface TbEntityViewService {
EntityView unassignEntityViewFromCustomer(TenantId tenantId, EntityViewId entityViewId, Customer customer,
SecurityUser user) throws ThingsboardException;
+
+ ListenableFuture> findEntityViewsByTenantIdAndEntityIdAsync(TenantId tenantId, EntityId entityId);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java
index 22baed8366..d3c12cb3f8 100644
--- a/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java
+++ b/application/src/main/java/org/thingsboard/server/service/entitiy/tenant/DefaultTbTenantService.java
@@ -54,7 +54,7 @@ public class DefaultTbTenantService extends AbstractTbEntityService implements T
installScripts.createDefaultEdgeRuleChains(savedTenant.getId());
}
tenantProfileCache.evict(savedTenant.getId());
- notificationEntityService.notifyCreateOruUpdateTenant(savedTenant, created ?
+ notificationEntityService.notifyCreateOrUpdateTenant(savedTenant, created ?
ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
TenantProfile oldTenantProfile = oldTenant != null ? tenantProfileService.findTenantProfileById(TenantId.SYS_TENANT_ID, oldTenant.getTenantProfileId()) : null;
diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java
index 50fcb28387..14e49e056f 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java
@@ -25,6 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.stereotype.Service;
@@ -162,6 +163,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
@Getter
private boolean persistActivityToTelemetry;
+ @Lazy
@Autowired
private QueueService queueService;
diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
index 306102c66c..e428d55d91 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntitySubtype;
@@ -121,6 +122,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
@Autowired
private ApiUsageStateService apiUsageStateService;
+ @Lazy
@Autowired
private QueueService queueService;
diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
index c6576097ee..9bfa86b3ba 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
@@ -117,6 +118,7 @@ public class DefaultDataUpdateService implements DataUpdateService {
@Autowired
private TenantProfileService tenantProfileService;
+ @Lazy
@Autowired
private QueueService queueService;
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
index 01adab5f6f..9e60f69620 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java
@@ -364,13 +364,14 @@ public class DefaultTbClusterService implements TbClusterService {
private void broadcast(ComponentLifecycleMsg msg) {
byte[] msgBytes = encodingService.encode(msg);
TbQueueProducer> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer();
- Set tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE));
+ Set tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE);
EntityType entityType = msg.getEntityId().getEntityType();
if (entityType.equals(EntityType.TENANT)
|| entityType.equals(EntityType.TENANT_PROFILE)
|| entityType.equals(EntityType.DEVICE_PROFILE)
|| entityType.equals(EntityType.API_USAGE_STATE)
|| (entityType.equals(EntityType.DEVICE) && msg.getEvent() == ComponentLifecycleEvent.UPDATED)
+ || entityType.equals(EntityType.ENTITY_VIEW)
|| entityType.equals(EntityType.EDGE)) {
TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer();
Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE);
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
index d071d591a1..a3bae8bc4e 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
@@ -28,6 +28,7 @@ import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.alarm.Alarm;
+import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
@@ -236,6 +237,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
if (actorMsg.isPresent()) {
@@ -520,6 +524,20 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService counters = new ArrayList<>();
@@ -70,6 +72,7 @@ public class TbCoreConsumerStats {
this.subscriptionMsgCounter = register(statsFactory.createStatsCounter(statsKey, SUBSCRIPTION_MSGS));
this.toCoreNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NOTIFICATIONS));
this.edgeNotificationsCounter = register(statsFactory.createStatsCounter(statsKey, EDGE_NOTIFICATIONS));
+ this.deviceActivitiesCounter = register(statsFactory.createStatsCounter(statsKey, DEVICE_ACTIVITIES));
}
private StatsCounter register(StatsCounter counter){
@@ -112,6 +115,11 @@ public class TbCoreConsumerStats {
edgeNotificationsCounter.increment();
}
+ public void log(TransportProtos.DeviceActivityProto msg) {
+ totalCounter.increment();
+ deviceActivitiesCounter.increment();
+ }
+
public void log(TransportProtos.SubscriptionMgrMsgProto msg) {
totalCounter.increment();
subscriptionMsgCounter.increment();
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java
index 0c130c9f62..98fb95ad15 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java
@@ -180,6 +180,8 @@ public abstract class AbstractConsumerService> exportServices = new HashMap<>();
private final Map> importServices = new HashMap<>();
- private final EntityActionService entityActionService;
private final RelationService relationService;
private final RateLimitService rateLimitService;
+ private final TbNotificationEntityService entityNotificationService;
protected static final List SUPPORTED_ENTITY_TYPES = List.of(
EntityType.CUSTOMER, EntityType.ASSET, EntityType.RULE_CHAIN,
@@ -109,10 +110,8 @@ public class DefaultEntitiesExportImportService implements EntitiesExportImportS
relationService.saveRelations(ctx.getTenantId(), new ArrayList<>(ctx.getRelations()));
for (EntityRelation relation : ctx.getRelations()) {
- entityActionService.logEntityAction(ctx.getUser(), relation.getFrom(), null, null,
- ActionType.RELATION_ADD_OR_UPDATE, null, relation);
- entityActionService.logEntityAction(ctx.getUser(), relation.getTo(), null, null,
- ActionType.RELATION_ADD_OR_UPDATE, null, relation);
+ entityNotificationService.notifyCreateOrUpdateOrDeleteRelation(ctx.getTenantId(), null,
+ relation, ctx.getUser(), ActionType.RELATION_ADD_OR_UPDATE, null, relation);
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/BaseEntityExportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/BaseEntityExportService.java
index 169b7f273c..01626368dc 100644
--- a/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/BaseEntityExportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/BaseEntityExportService.java
@@ -41,8 +41,6 @@ public abstract class BaseEntityExportService();
}
- ;
-
public abstract Set getSupportedEntityTypes();
protected void replaceUuidsRecursively(EntitiesExportCtx> ctx, JsonNode node, Set skipFieldsSet) {
diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/DefaultEntityExportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/DefaultEntityExportService.java
index 7e236370df..a00a7c09aa 100644
--- a/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/DefaultEntityExportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/exporting/impl/DefaultEntityExportService.java
@@ -30,7 +30,7 @@ import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.sync.ie.AttributeExportData;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
import org.thingsboard.server.dao.attributes.AttributesService;
-import org.thingsboard.server.dao.relation.RelationService;
+import org.thingsboard.server.dao.relation.RelationDao;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.sync.ie.exporting.EntityExportService;
import org.thingsboard.server.service.sync.ie.exporting.ExportableEntitiesService;
@@ -55,7 +55,7 @@ public class DefaultEntityExportService exportRelations(EntitiesExportCtx> ctx, E entity) throws ThingsboardException {
List relations = new ArrayList<>();
- List inboundRelations = relationService.findByTo(ctx.getTenantId(), entity.getId(), RelationTypeGroup.COMMON);
+ List inboundRelations = relationDao.findAllByTo(ctx.getTenantId(), entity.getId(), RelationTypeGroup.COMMON);
relations.addAll(inboundRelations);
- List outboundRelations = relationService.findByFrom(ctx.getTenantId(), entity.getId(), RelationTypeGroup.COMMON);
+ List outboundRelations = relationDao.findAllByFrom(ctx.getTenantId(), entity.getId(), RelationTypeGroup.COMMON);
relations.addAll(outboundRelations);
return relations;
}
diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java
index f2e61b8f2d..3225668004 100644
--- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/AssetImportService.java
@@ -19,14 +19,11 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.asset.Asset;
-import org.thingsboard.server.common.data.edge.EdgeEventActionType;
-import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.queue.util.TbCoreComponent;
-import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.sync.vc.data.EntitiesImportCtx;
@Service
@@ -52,14 +49,6 @@ public class AssetImportService extends BaseEntityImportService existingRelations = new ArrayList<>();
- existingRelations.addAll(relationService.findByTo(tenantId, entity.getId(), RelationTypeGroup.COMMON));
- existingRelations.addAll(relationService.findByFrom(tenantId, entity.getId(), RelationTypeGroup.COMMON));
+ existingRelations.addAll(relationDao.findAllByTo(tenantId, entity.getId(), RelationTypeGroup.COMMON));
+ existingRelations.addAll(relationDao.findAllByFrom(tenantId, entity.getId(), RelationTypeGroup.COMMON));
+ // dao is used here instead of service to avoid getting cached values, because relationService.deleteRelation will evict value from cache only after transaction is committed
for (EntityRelation existingRelation : existingRelations) {
EntityRelation relation = relationsMap.get(existingRelation);
if (relation == null) {
importResult.setUpdatedRelatedEntities(true);
- relationService.deleteRelation(tenantId, existingRelation);
+ relationService.deleteRelation(ctx.getTenantId(), existingRelation.getFrom(), existingRelation.getTo(), existingRelation.getType(), existingRelation.getTypeGroup());
importResult.addSendEventsCallback(() -> {
- entityActionService.logEntityAction(ctx.getUser(), existingRelation.getFrom(), null, null,
- ActionType.RELATION_DELETED, null, existingRelation);
- entityActionService.logEntityAction(ctx.getUser(), existingRelation.getTo(), null, null,
- ActionType.RELATION_DELETED, null, existingRelation);
+ entityNotificationService.notifyCreateOrUpdateOrDeleteRelation(tenantId, null,
+ existingRelation, ctx.getUser(), ActionType.RELATION_DELETED, null, existingRelation);
});
} else if (Objects.equal(relation.getAdditionalInfo(), existingRelation.getAdditionalInfo())) {
relationsMap.remove(relation);
@@ -269,9 +268,8 @@ public abstract class BaseEntityImportService exportData, Dashboard prepared, Dashboard existing) {
+ return super.compare(ctx, exportData, prepared, existing) || !prepared.getConfiguration().equals(existing.getConfiguration());
}
@Override
diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceImportService.java
index c699f466d7..853657d1c2 100644
--- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceImportService.java
+++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceImportService.java
@@ -19,6 +19,7 @@ import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
@@ -93,8 +94,8 @@ public class DeviceImportService extends BaseEntityImportService T getStatus(SecurityUser user, UUID requestId, Function getter) throws ThingsboardException {
var cacheEntry = taskCache.get(requestId);
if (cacheEntry == null || cacheEntry.get() == null) {
+ log.debug("[{}] No cache record: {}", requestId, cacheEntry);
throw new ThingsboardException(ThingsboardErrorCode.ITEM_NOT_FOUND);
} else {
var entry = cacheEntry.get();
+ log.debug("[{}] Cache get: {}", requestId, entry);
var result = getter.apply(entry);
if (result == null) {
throw new ThingsboardException(ThingsboardErrorCode.BAD_REQUEST_PARAMS);
@@ -358,10 +359,12 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
return EntityImportSettings.builder()
.updateRelations(config.isLoadRelations())
.saveAttributes(config.isLoadAttributes())
+ .saveCredentials(config.isLoadCredentials())
.findExistingByName(config.isFindExistingEntityByName())
.build();
}
+ @SneakyThrows
@SuppressWarnings({"rawtypes", "unchecked"})
private void importEntities(EntitiesImportCtx ctx, EntityType entityType) {
int limit = 100;
@@ -373,9 +376,9 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
+ log.debug("[{}] Loading {} entities pack ({})", ctx.getTenantId(), entityType, entityDataList.size());
for (EntityExportData entityData : entityDataList) {
EntityExportData reimportBackup = JacksonUtil.clone(entityData);
- log.debug("[{}] Loading {} entities", ctx.getTenantId(), entityType);
EntityImportResult> importResult;
try {
importResult = exportImportService.importEntity(ctx, entityData);
@@ -391,6 +394,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>())
.add(importResult.getSavedEntity().getId());
}
+ log.debug("Imported {} pack for tenant {}", entityType, ctx.getTenantId());
offset += limit;
} while (entityDataList.size() == limit);
}
@@ -456,18 +460,20 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
EntityId externalId = ((ExportableEntity) entity).getExternalId();
if (externalId == null) externalId = entityId;
- return transformAsync(gitServiceQueue.getEntity(user.getTenantId(), versionId, externalId),
+ return transform(gitServiceQueue.getEntity(user.getTenantId(), versionId, externalId),
otherVersion -> {
SimpleEntitiesExportCtx ctx = new SimpleEntitiesExportCtx(user, null, null, EntityExportSettings.builder()
.exportRelations(otherVersion.hasRelations())
.exportAttributes(otherVersion.hasAttributes())
.exportCredentials(otherVersion.hasCredentials())
.build());
- EntityExportData> currentVersion = exportImportService.exportEntity(ctx, entityId);
- return transform(gitServiceQueue.getContentsDiff(user.getTenantId(),
- JacksonUtil.toPrettyString(currentVersion.sort()),
- JacksonUtil.toPrettyString(otherVersion.sort())),
- rawDiff -> new EntityDataDiff(currentVersion, otherVersion, rawDiff), MoreExecutors.directExecutor());
+ EntityExportData> currentVersion;
+ try {
+ currentVersion = exportImportService.exportEntity(ctx, entityId);
+ } catch (ThingsboardException e) {
+ throw new RuntimeException(e);
+ }
+ return new EntityDataDiff(currentVersion.sort(), otherVersion.sort());
}, MoreExecutors.directExecutor());
}
@@ -479,7 +485,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
@Override
- public ListenableFuture> listBranches(TenantId tenantId) throws Exception {
+ public ListenableFuture> listBranches(TenantId tenantId) throws Exception {
return gitServiceQueue.listBranches(tenantId);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java
index 99246d282f..6eaa2df74f 100644
--- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java
+++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java
@@ -15,7 +15,10 @@
*/
package org.thingsboard.server.service.sync.vc;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import lombok.SneakyThrows;
@@ -23,6 +26,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
+import org.thingsboard.common.util.CollectionsUtil;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
@@ -35,6 +39,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
+import org.thingsboard.server.common.data.sync.vc.BranchInfo;
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
import org.thingsboard.server.common.data.sync.vc.EntityVersionsDiff;
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
@@ -70,12 +75,16 @@ import org.thingsboard.server.service.sync.vc.data.VersionsDiffGitRequest;
import org.thingsboard.server.service.sync.vc.data.VoidGitRequest;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -92,9 +101,12 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
private final SchedulerComponent scheduler;
private final Map> pendingRequestMap = new HashMap<>();
+ private final Map> chunkedMsgs = new ConcurrentHashMap<>();
@Value("${queue.vc.request-timeout:60000}")
private int requestTimeout;
+ @Value("${queue.vc.msg-chunk-size:500000}")
+ private int msgChunkSize;
public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService,
DataDecodingEncodingService encodingService,
@@ -118,20 +130,35 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
return future;
}
+ @SuppressWarnings("UnstableApiUsage")
@Override
public ListenableFuture addToCommit(CommitGitRequest commit, EntityExportData> entityData) {
- SettableFuture future = SettableFuture.create();
-
String path = getRelativePath(entityData.getEntityType(), entityData.getExternalId());
String entityDataJson = JacksonUtil.toPrettyString(entityData.sort());
- registerAndSend(commit, builder -> builder.setCommitRequest(
- buildCommitRequest(commit).setAddMsg(
- TransportProtos.AddMsg.newBuilder()
- .setRelativePath(path).setEntityDataJson(entityDataJson).build()
- ).build()
- ).build(), wrap(future, null));
- return future;
+ Iterable entityDataChunks = StringUtils.split(entityDataJson, msgChunkSize);
+ String chunkedMsgId = UUID.randomUUID().toString();
+ int chunksCount = Iterables.size(entityDataChunks);
+
+ AtomicInteger chunkIndex = new AtomicInteger();
+ List> futures = new ArrayList<>();
+ entityDataChunks.forEach(chunk -> {
+ SettableFuture chunkFuture = SettableFuture.create();
+ log.trace("[{}] sending chunk {} for 'addToCommit'", chunkedMsgId, chunkIndex.get());
+ registerAndSend(commit, builder -> builder.setCommitRequest(
+ buildCommitRequest(commit).setAddMsg(
+ TransportProtos.AddMsg.newBuilder()
+ .setRelativePath(path).setEntityDataJsonChunk(chunk)
+ .setChunkedMsgId(chunkedMsgId).setChunkIndex(chunkIndex.getAndIncrement())
+ .setChunksCount(chunksCount).build()
+ ).build()
+ ).build(), wrap(chunkFuture, null));
+ futures.add(chunkFuture);
+ });
+ return Futures.transform(Futures.allAsList(futures), r -> {
+ log.trace("[{}] sent all chunks for 'addToCommit'", chunkedMsgId);
+ return null;
+ }, MoreExecutors.directExecutor());
}
@Override
@@ -220,7 +247,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override
public ListenableFuture> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId, EntityType entityType) {
return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder()
- .setBranchName(branch)
.setVersionId(versionId)
.setEntityType(entityType.name())
.build());
@@ -229,7 +255,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override
public ListenableFuture> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId) {
return listEntitiesAtVersion(tenantId, ListEntitiesRequestMsg.newBuilder()
- .setBranchName(branch)
.setVersionId(versionId)
.build());
}
@@ -240,7 +265,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
}
@Override
- public ListenableFuture> listBranches(TenantId tenantId) {
+ public ListenableFuture> listBranches(TenantId tenantId) {
ListBranchesGitRequest request = new ListBranchesGitRequest(tenantId);
return sendRequest(request, builder -> builder.setListBranchesRequest(TransportProtos.ListBranchesRequestMsg.newBuilder().build()));
}
@@ -256,18 +281,11 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
.build()));
}
- @Override
- public ListenableFuture getContentsDiff(TenantId tenantId, String content1, String content2) {
- ContentsDiffGitRequest request = new ContentsDiffGitRequest(tenantId, content1, content2);
- return sendRequest(request, builder -> builder.setContentsDiffRequest(TransportProtos.ContentsDiffRequestMsg.newBuilder()
- .setContent1(content1)
- .setContent2(content2)));
- }
-
@Override
@SuppressWarnings("rawtypes")
public ListenableFuture getEntity(TenantId tenantId, String versionId, EntityId entityId) {
EntityContentGitRequest request = new EntityContentGitRequest(tenantId, versionId, entityId);
+ chunkedMsgs.put(request.getRequestId(), new HashMap<>());
registerAndSend(request, builder -> builder.setEntityContentRequest(EntityContentRequestMsg.newBuilder()
.setVersionId(versionId)
.setEntityType(entityId.getEntityType().name())
@@ -289,9 +307,9 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
var requestBody = enrichFunction.apply(newRequestProto(request, settings));
log.trace("[{}][{}] PUSHING request: {}", request.getTenantId(), request.getRequestId(), requestBody);
clusterService.pushMsgToVersionControl(request.getTenantId(), requestBody, callback);
- request.setTimeoutTask(scheduler.schedule(() -> {
- processTimeout(request.getRequestId());
- }, requestTimeout, TimeUnit.MILLISECONDS));
+ if (request.getTimeoutTask() == null) {
+ request.setTimeoutTask(scheduler.schedule(() -> processTimeout(request.getRequestId()), requestTimeout, TimeUnit.MILLISECONDS));
+ }
} else {
throw new RuntimeException("Future is already done!");
}
@@ -309,7 +327,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@SuppressWarnings("rawtypes")
public ListenableFuture> getEntities(TenantId tenantId, String versionId, EntityType entityType, int offset, int limit) {
EntitiesContentGitRequest request = new EntitiesContentGitRequest(tenantId, versionId, entityType);
-
+ chunkedMsgs.put(request.getRequestId(), new HashMap<>());
registerAndSend(request, builder -> builder.setEntitiesContentRequest(EntitiesContentRequestMsg.newBuilder()
.setVersionId(versionId)
.setEntityType(entityType.name())
@@ -355,15 +373,15 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
@Override
public void processResponse(VersionControlResponseMsg vcResponseMsg) {
UUID requestId = new UUID(vcResponseMsg.getRequestIdMSB(), vcResponseMsg.getRequestIdLSB());
- PendingGitRequest> request = pendingRequestMap.remove(requestId);
+ PendingGitRequest> request = pendingRequestMap.get(requestId);
if (request == null) {
log.debug("[{}] received stale response: {}", requestId, vcResponseMsg);
return;
} else {
log.debug("[{}] processing response: {}", requestId, vcResponseMsg);
- request.getTimeoutTask().cancel(true);
}
var future = request.getFuture();
+ boolean completed = true;
if (!StringUtils.isEmpty(vcResponseMsg.getError())) {
future.setException(new RuntimeException(vcResponseMsg.getError()));
} else {
@@ -382,7 +400,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
((CommitGitRequest) request).getFuture().set(commitResult);
} else if (vcResponseMsg.hasListBranchesResponse()) {
var listBranchesResponse = vcResponseMsg.getListBranchesResponse();
- ((ListBranchesGitRequest) request).getFuture().set(listBranchesResponse.getBranchesList());
+ ((ListBranchesGitRequest) request).getFuture().set(listBranchesResponse.getBranchesList().stream().map(this::getBranchInfo).collect(Collectors.toList()));
} else if (vcResponseMsg.hasListEntitiesResponse()) {
var listEntitiesResponse = vcResponseMsg.getListEntitiesResponse();
((ListEntitiesGitRequest) request).getFuture().set(
@@ -391,12 +409,28 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
var listVersionsResponse = vcResponseMsg.getListVersionsResponse();
((ListVersionsGitRequest) request).getFuture().set(toPageData(listVersionsResponse));
} else if (vcResponseMsg.hasEntityContentResponse()) {
- var data = vcResponseMsg.getEntityContentResponse().getData();
- ((EntityContentGitRequest) request).getFuture().set(toData(data));
+ TransportProtos.EntityContentResponseMsg responseMsg = vcResponseMsg.getEntityContentResponse();
+ log.trace("[{}] received chunk {} for 'getEntity'", responseMsg.getChunkedMsgId(), responseMsg.getChunkIndex());
+ var joined = joinChunks(requestId, responseMsg, 1);
+ if (joined.isPresent()) {
+ log.trace("[{}] collected all chunks for 'getEntity'", responseMsg.getChunkedMsgId());
+ ((EntityContentGitRequest) request).getFuture().set(joined.get().get(0));
+ } else {
+ completed = false;
+ }
} else if (vcResponseMsg.hasEntitiesContentResponse()) {
- var dataList = vcResponseMsg.getEntitiesContentResponse().getDataList();
- ((EntitiesContentGitRequest) request).getFuture()
- .set(dataList.stream().map(this::toData).collect(Collectors.toList()));
+ TransportProtos.EntitiesContentResponseMsg responseMsg = vcResponseMsg.getEntitiesContentResponse();
+ TransportProtos.EntityContentResponseMsg item = responseMsg.getItem();
+ if (responseMsg.getItemsCount() > 0) {
+ var joined = joinChunks(requestId, item, responseMsg.getItemsCount());
+ if (joined.isPresent()) {
+ ((EntitiesContentGitRequest) request).getFuture().set(joined.get());
+ } else {
+ completed = false;
+ }
+ } else {
+ ((EntitiesContentGitRequest) request).getFuture().set(Collections.emptyList());
+ }
} else if (vcResponseMsg.hasVersionsDiffResponse()) {
TransportProtos.VersionsDiffResponseMsg diffResponse = vcResponseMsg.getVersionsDiffResponse();
List entityVersionsDiffList = diffResponse.getDiffList().stream()
@@ -411,21 +445,50 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
.build())
.collect(Collectors.toList());
((VersionsDiffGitRequest) request).getFuture().set(entityVersionsDiffList);
- } else if (vcResponseMsg.hasContentsDiffResponse()) {
- String diff = vcResponseMsg.getContentsDiffResponse().getDiff();
- ((ContentsDiffGitRequest) request).getFuture().set(diff);
}
}
+ if (completed) {
+ removePendingRequest(requestId);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private Optional> joinChunks(UUID requestId, TransportProtos.EntityContentResponseMsg responseMsg, int expectedMsgCount) {
+ var chunksMap = chunkedMsgs.get(requestId);
+ if (chunksMap == null) {
+ return Optional.empty();
+ }
+ String[] msgChunks = chunksMap.computeIfAbsent(responseMsg.getChunkedMsgId(), id -> new String[responseMsg.getChunksCount()]);
+ msgChunks[responseMsg.getChunkIndex()] = responseMsg.getData();
+ if (chunksMap.size() == expectedMsgCount && chunksMap.values().stream()
+ .allMatch(chunks -> CollectionsUtil.countNonNull(chunks) == chunks.length)) {
+ return Optional.of(chunksMap.values().stream()
+ .map(chunks -> String.join("", chunks))
+ .map(this::toData)
+ .collect(Collectors.toList()));
+ } else {
+ return Optional.empty();
+ }
}
private void processTimeout(UUID requestId) {
- PendingGitRequest> pendingRequest = pendingRequestMap.remove(requestId);
+ PendingGitRequest> pendingRequest = removePendingRequest(requestId);
if (pendingRequest != null) {
log.debug("[{}] request timed out ({} ms}", requestId, requestTimeout);
pendingRequest.getFuture().setException(new TimeoutException("Request timed out"));
}
}
+ private PendingGitRequest> removePendingRequest(UUID requestId) {
+ PendingGitRequest> pendingRequest = pendingRequestMap.remove(requestId);
+ if (pendingRequest != null && pendingRequest.getTimeoutTask() != null) {
+ pendingRequest.getTimeoutTask().cancel(true);
+ pendingRequest.setTimeoutTask(null);
+ }
+ chunkedMsgs.remove(requestId);
+ return pendingRequest;
+ }
+
private PageData toPageData(TransportProtos.ListVersionsResponseMsg listVersionsResponse) {
var listVersions = listVersionsResponse.getVersionsList().stream().map(this::getEntityVersion).collect(Collectors.toList());
return new PageData<>(listVersions, listVersionsResponse.getTotalPages(), listVersionsResponse.getTotalElements(), listVersionsResponse.getHasNext());
@@ -439,12 +502,17 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
return new VersionedEntityInfo(EntityIdFactory.getByTypeAndUuid(proto.getEntityType(), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())));
}
+ private BranchInfo getBranchInfo(TransportProtos.BranchInfoProto proto) {
+ return new BranchInfo(proto.getName(), proto.getIsDefault());
+ }
+
@SuppressWarnings("rawtypes")
@SneakyThrows
private EntityExportData toData(String data) {
return JacksonUtil.fromString(data, EntityExportData.class);
}
+ //The future will be completed when the corresponding result arrives from kafka
private static TbQueueCallback wrap(SettableFuture future) {
return new TbQueueCallback() {
@Override
@@ -458,7 +526,8 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
};
}
- private static TbQueueCallback wrap(SettableFuture future, T value) {
+ //The future will be completed when the request is successfully sent to kafka
+ private TbQueueCallback wrap(SettableFuture future, T value) {
return new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/EntitiesVersionControlService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/EntitiesVersionControlService.java
index 3ae68e33d0..5aa4c40008 100644
--- a/application/src/main/java/org/thingsboard/server/service/sync/vc/EntitiesVersionControlService.java
+++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/EntitiesVersionControlService.java
@@ -16,22 +16,21 @@
package org.thingsboard.server.service.sync.vc;
import com.google.common.util.concurrent.ListenableFuture;
-import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
+import org.thingsboard.server.common.data.sync.vc.BranchInfo;
import org.thingsboard.server.common.data.sync.vc.EntityDataDiff;
import org.thingsboard.server.common.data.sync.vc.EntityDataInfo;
-import org.thingsboard.server.common.data.sync.vc.VersionLoadResult;
-import org.thingsboard.server.service.security.model.SecurityUser;
-import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
+import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
-import org.thingsboard.server.common.data.sync.vc.EntityTypeLoadResult;
+import org.thingsboard.server.common.data.sync.vc.VersionLoadResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
+import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadRequest;
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest;
@@ -60,7 +59,7 @@ public interface EntitiesVersionControlService {
ListenableFuture compareEntityDataToVersion(SecurityUser user, String branch, EntityId entityId, String versionId) throws Exception;
- ListenableFuture> listBranches(TenantId tenantId) throws Exception;
+ ListenableFuture> listBranches(TenantId tenantId) throws Exception;
RepositorySettings getVersionControlSettings(TenantId tenantId);
diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java
index a1aad04701..cc83479896 100644
--- a/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java
+++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/GitVersionControlQueueService.java
@@ -24,14 +24,15 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
-import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
+import org.thingsboard.server.common.data.sync.vc.BranchInfo;
import org.thingsboard.server.common.data.sync.vc.EntityVersion;
+import org.thingsboard.server.common.data.sync.vc.EntityVersionsDiff;
+import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest;
import org.thingsboard.server.gen.transport.TransportProtos.VersionControlResponseMsg;
import org.thingsboard.server.service.sync.vc.data.CommitGitRequest;
-import org.thingsboard.server.common.data.sync.vc.EntityVersionsDiff;
import java.util.List;
@@ -55,7 +56,7 @@ public interface GitVersionControlQueueService {
ListenableFuture> listEntitiesAtVersion(TenantId tenantId, String branch, String versionId);
- ListenableFuture> listBranches(TenantId tenantId);
+ ListenableFuture> listBranches(TenantId tenantId);
ListenableFuture getEntity(TenantId tenantId, String versionId, EntityId entityId);
@@ -63,8 +64,6 @@ public interface GitVersionControlQueueService {
ListenableFuture> getVersionsDiff(TenantId tenantId, EntityType entityType, EntityId externalId, String versionId1, String versionId2);
- ListenableFuture getContentsDiff(TenantId tenantId, String rawEntityData1, String rawEntityData2);
-
ListenableFuture initRepository(TenantId tenantId, RepositorySettings settings);
ListenableFuture testRepository(TenantId tenantId, RepositorySettings settings);
diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/data/ListBranchesGitRequest.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/data/ListBranchesGitRequest.java
index c045030dc7..4d89efa14d 100644
--- a/application/src/main/java/org/thingsboard/server/service/sync/vc/data/ListBranchesGitRequest.java
+++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/data/ListBranchesGitRequest.java
@@ -16,10 +16,11 @@
package org.thingsboard.server.service.sync.vc.data;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.sync.vc.BranchInfo;
import java.util.List;
-public class ListBranchesGitRequest extends PendingGitRequest> {
+public class ListBranchesGitRequest extends PendingGitRequest> {
public ListBranchesGitRequest(TenantId tenantId) {
super(tenantId);
diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
index 38275c09b6..9ee4543b68 100644
--- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
+++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
+import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.cluster.TbClusterService;
@@ -43,12 +44,12 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService;
-import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
+import org.thingsboard.server.service.entitiy.entityView.TbEntityViewService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import javax.annotation.Nullable;
@@ -75,7 +76,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
private final AttributesService attrService;
private final TimeseriesService tsService;
- private final EntityViewService entityViewService;
+ private final TbEntityViewService tbEntityViewService;
private final TbApiUsageClient apiUsageClient;
private final TbApiUsageStateService apiUsageStateService;
@@ -83,7 +84,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
public DefaultTelemetrySubscriptionService(AttributesService attrService,
TimeseriesService tsService,
- EntityViewService entityViewService,
+ @Lazy TbEntityViewService tbEntityViewService,
TbClusterService clusterService,
PartitionService partitionService,
TbApiUsageClient apiUsageClient,
@@ -91,7 +92,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
super(clusterService, partitionService);
this.attrService = attrService;
this.tsService = tsService;
- this.entityViewService = entityViewService;
+ this.tbEntityViewService = tbEntityViewService;
this.apiUsageClient = apiUsageClient;
this.apiUsageStateService = apiUsageStateService;
}
@@ -182,11 +183,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addMainCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
- Futures.addCallback(this.entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
+ Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
new FutureCallback>() {
@Override
public void onSuccess(@Nullable List result) {
- if (result != null) {
+ if (result != null && !result.isEmpty()) {
Map> tsMap = new HashMap<>();
for (TsKvEntry entry : ts) {
tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry);
diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml
index 995ebb43c4..4784ad2d43 100644
--- a/application/src/main/resources/logback.xml
+++ b/application/src/main/resources/logback.xml
@@ -25,26 +25,31 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
-
-
+
+
diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml
index 33e575606a..f5f3c8fd3d 100644
--- a/application/src/main/resources/thingsboard.yml
+++ b/application/src/main/resources/thingsboard.yml
@@ -428,6 +428,9 @@ cache:
timeToLiveInMinutes: "${CACHE_SPECS_VERSION_CONTROL_TASK_TTL:5}"
maxSize: "${CACHE_SPECS_VERSION_CONTROL_TASK_MAX_SIZE:100000}"
+#Disable this because it is not required.
+spring.data.redis.repositories.enabled: false
+
redis:
# standalone or cluster
connection:
@@ -1027,6 +1030,7 @@ queue:
poll-interval: "${TB_QUEUE_VC_INTERVAL_MS:25}"
pack-processing-timeout: "${TB_QUEUE_VC_PACK_PROCESSING_TIMEOUT_MS:60000}"
request-timeout: "${TB_QUEUE_VC_REQUEST_TIMEOUT:60000}"
+ msg-chunk-size: "${TB_QUEUE_VC_MSG_CHUNK_SIZE:500000}"
js:
# JS Eval request topic
request_topic: "${REMOTE_JS_EVAL_REQUEST_TOPIC:js_eval.requests}"
diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseEntityRelationControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseEntityRelationControllerTest.java
index 1a032a392b..65558e1955 100644
--- a/application/src/test/java/org/thingsboard/server/controller/BaseEntityRelationControllerTest.java
+++ b/application/src/test/java/org/thingsboard/server/controller/BaseEntityRelationControllerTest.java
@@ -45,7 +45,7 @@ import java.util.List;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@Slf4j
-public class BaseEntityRelationControllerTest extends AbstractControllerTest {
+public abstract class BaseEntityRelationControllerTest extends AbstractControllerTest {
public static final String BASE_DEVICE_NAME = "Test dummy device";
diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java
index 2b56af82cb..c10a9dd12d 100644
--- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java
+++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java
@@ -1352,17 +1352,17 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
String timeseriesKey = "key";
String timeseriesValue = "25";
data.addProperty(timeseriesKey, timeseriesValue);
- UplinkMsg.Builder uplinkMsgBuilder1 = UplinkMsg.newBuilder();
+ UplinkMsg.Builder uplinkMsgBuilder = UplinkMsg.newBuilder();
EntityDataProto.Builder entityDataBuilder = EntityDataProto.newBuilder();
entityDataBuilder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(data, System.currentTimeMillis()));
entityDataBuilder.setEntityType(device.getId().getEntityType().name());
entityDataBuilder.setEntityIdMSB(device.getUuidId().getMostSignificantBits());
entityDataBuilder.setEntityIdLSB(device.getUuidId().getLeastSignificantBits());
testAutoGeneratedCodeByProtobuf(entityDataBuilder);
- uplinkMsgBuilder1.addEntityData(entityDataBuilder.build());
+ uplinkMsgBuilder.addEntityData(entityDataBuilder.build());
- testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder1);
- edgeImitator.sendUplinkMsg(uplinkMsgBuilder1.build());
+ testAutoGeneratedCodeByProtobuf(uplinkMsgBuilder);
+ edgeImitator.sendUplinkMsg(uplinkMsgBuilder.build());
JsonObject attributesData = new JsonObject();
String attributesKey = "test_attr";
@@ -1394,9 +1394,24 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
String attributeValuesUrl = "/api/plugins/telemetry/DEVICE/" + device.getId() + "/values/attributes/" + DataConstants.SERVER_SCOPE;
List