From 371cab26d2dc190ae37e0ed0ead3b6749f3ec735 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 22 Jun 2023 16:37:03 +0300 Subject: [PATCH 1/9] HotFix - fixed init of rule chains - init only on APP_INIT msg --- .../thingsboard/server/actors/app/AppActor.java | 12 ++++++++---- .../DefaultTbRuleEngineConsumerService.java | 16 +++++++++++++++- .../processing/AbstractConsumerService.java | 3 ++- .../thingsboard/server/common/msg/MsgType.java | 15 +++++++++++++-- .../rule/engine/profile/TbDeviceProfileNode.java | 14 +++++++++++--- 5 files changed, 49 insertions(+), 11 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index fb6fbbdff2..1461654216 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -73,10 +73,14 @@ public class AppActor extends ContextAwareActor { @Override protected boolean doProcess(TbActorMsg msg) { if (!ruleChainsInitialized) { - initTenantActors(); - ruleChainsInitialized = true; - if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) { - log.warn("Rule Chains initialized by unexpected message: {}", msg); + if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) { + initTenantActors(); + ruleChainsInitialized = true; + } else { + if (!msg.getMsgType().isIgnoreOnStart()) { + log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg); + } + return true; } } switch (msg.getMsgType()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index f8f6a7d25f..51f4d5283f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -259,7 +259,21 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< } void launchConsumer(TbQueueConsumer> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { - consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); + if (isReady) { + consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); + } else { + scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix); + } + } + + private void scheduleLaunchConsumer(TbQueueConsumer> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { + repartitionExecutor.schedule(() -> { + if (isReady) { + consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix)); + } else { + scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix); + } + }, 10, TimeUnit.SECONDS); } void consumerLoop(TbQueueConsumer> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 2d517a2213..b59086a350 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -68,7 +68,7 @@ public abstract class AbstractConsumerService !ctx.isLocalEntity(entry.getKey())); + initAlarmRuleState(true); } @Override @@ -156,13 +161,16 @@ public class TbDeviceProfileNode implements TbNode { deviceStates.clear(); } - protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns) { + protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns, boolean printNewlyAddedDeviceStates) { DeviceState deviceState = deviceStates.get(deviceId); if (deviceState == null) { DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId); if (deviceProfile != null) { deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns); deviceStates.put(deviceId, deviceState); + if (printNewlyAddedDeviceStates) { + log.info("[{}][{}] Device [{}] was added during PartitionChangeMsg", ctx.getTenantId(), ctx.getSelfId(), deviceId); + } } } return deviceState; From 2986700795030df04ceb9d981099474d9dac46b3 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Mon, 26 Jun 2023 17:44:18 +0300 Subject: [PATCH 2/9] swugger_device_controller: fix bug example request - AccessToken, Lwm2m_RPK --- .../controller/ControllerConstants.java | 115 ++++++++++++------ .../server/controller/DeviceController.java | 16 ++- 2 files changed, 89 insertions(+), 42 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index 1cb794c2ec..6fe53516ed 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -207,45 +207,83 @@ public class ControllerConstants { protected static final String IS_BOOTSTRAP_SERVER_PARAM_DESCRIPTION = "A Boolean value representing the Server SecurityInfo for future Bootstrap client mode settings. Values: 'true' for Bootstrap Server; 'false' for Lwm2m Server. "; - protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION = + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION = "{\n" + " \"device\": {\n" + - " \"name\": \"LwRpk00000000\",\n" + - " \"type\": \"lwm2mProfileRpk\"\n" + - " },\n" + + " \"name\":\"Name_DeviceWithCredantial_AccessToken\",\n" + + " \"label\":\"Label_DeviceWithCredantial_AccessToken\",\n" + + " \"deviceProfileId\":{\n" + + " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" + + " \"entityType\":\"DEVICE_PROFILE\"\n" + + " }\n" + + " },\n" + " \"credentials\": {\n" + - " \"id\": \"null\",\n" + - " \"createdTime\": 0,\n" + - " \"deviceId\": \"null\",\n" + - " \"credentialsType\": \"LWM2M_CREDENTIALS\",\n" + - " \"credentialsId\": \"LwRpk00000000\",\n" + - " \"credentialsValue\": {\n" + - " \"client\": {\n" + - " \"endpoint\": \"LwRpk00000000\",\n" + - " \"securityConfigClientMode\": \"RPK\",\n" + - " \"key\": \"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\"\n" + - " },\n" + - " \"bootstrap\": {\n" + - " \"bootstrapServer\": {\n" + - " \"securityMode\": \"RPK\",\n" + - " \"clientPublicKeyOrId\": \"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\",\n" + - " \"clientSecretKey\": \"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\"\n" + - " },\n" + - " \"lwm2mServer\": {\n" + - " \"securityMode\": \"RPK\",\n" + - " \"clientPublicKeyOrId\": \"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\",\n" + - " \"clientSecretKey\": \"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\"\n" + - " }\n" + - " }\n" + - " }\n" + - " }\n" + + " \"credentialsType\": \"ACCESS_TOKEN\",\n" + + " \"credentialsId\": \"6hmxew8pmmzng4e3une2\"\n" + + " }\n" + "}"; - protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION_MARKDOWN = - MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; - + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_DEFAULT_PARAM_DESCRIPTION = + "{\n" + + " \"device\": {\n" + + " \"name\":\"Name_DeviceWithCredantial_AccessToken_Default\",\n" + + " \"label\":\"Label_DeviceWithCredantial_AccessToken_Default\",\n" + + " \"type\": \"default\"\n" + + " },\n" + + " \"credentials\": {\n" + + " \"credentialsType\": \"ACCESS_TOKEN\",\n" + + " \"credentialsId\": \"6hmxew8pmmzng4e3une3\"\n" + + " }\n" + + "}"; - protected static final String FILTER_VALUE_TYPE = NEW_LINE + "## Value Type and Operations" + NEW_LINE + + protected static final String CREDENTIALS_VALUE_LVM2M_RPK_DESCRIPTION = + " \"{" + + "\\\"client\\\":{ " + + "\\\"endpoint\\\":\\\"LwRpk00000000\\\", " + + "\\\"securityConfigClientMode\\\":\\\"RPK\\\", " + + "\\\"key\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\"" + + " }, " + + "\\\"bootstrap\\\":{ " + + "\\\"bootstrapServer\\\":{ " + + "\\\"securityMode\\\":\\\"RPK\\\", " + + "\\\"clientPublicKeyOrId\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\", " + + "\\\"clientSecretKey\\\":\\\"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\\\"" + + "}, " + + "\\\"lwm2mServer\\\":{ \\\"securityMode\\\":\\\"RPK\\\", " + + "\\\"clientPublicKeyOrId\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\", " + + "\\\"clientSecretKey\\\":\\\"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\\\"" + + "}" + + "} " + + "}\""; + + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION = + "{\n" + + " \"device\": {\n" + + " \"name\":\"Name_LwRpk00000000\",\n" + + " \"label\":\"Label_LwRpk00000000\",\n" + + " \"deviceProfileId\":{\n" + + " \"id\":\"a660bd50-10ef-11ee-8737-b5634e73c779\",\n" + + " \"entityType\":\"DEVICE_PROFILE\"\n" + + " }\n" + + " },\n" + + " \"credentials\": {\n" + + " \"credentialsType\": \"LWM2M_CREDENTIALS\",\n" + + " \"credentialsId\": \"LwRpk00000000\",\n" + + " \"credentialsValue\":\n" + CREDENTIALS_VALUE_LVM2M_RPK_DESCRIPTION + "\n" + + " }\n" + + "}"; + + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_DEFAULT_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + + + protected static final String FILTER_VALUE_TYPE = NEW_LINE + "## Value Type and Operations" + NEW_LINE + "Provides a hint about the data type of the entity field that is defined in the filter key. " + "The value type impacts the list of possible operations that you may use in the corresponding predicate. For example, you may use 'STARTS_WITH' or 'END_WITH', but you can't use 'GREATER_OR_EQUAL' for string values." + "The following filter value types and corresponding predicate operations are supported: " + NEW_LINE + @@ -254,7 +292,7 @@ public class ControllerConstants { " * 'BOOLEAN' - used for boolean values. Operations: EQUAL, NOT_EQUAL;\n" + " * 'DATE_TIME' - similar to numeric, transforms value to milliseconds since epoch. Operations: EQUAL, NOT_EQUAL, GREATER, LESS, GREATER_OR_EQUAL, LESS_OR_EQUAL; \n"; - protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_SPECIFIC_TIME_EXAMPLE = MARKDOWN_CODE_BLOCK_START + + protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_SPECIFIC_TIME_EXAMPLE = MARKDOWN_CODE_BLOCK_START + "{\n" + " \"schedule\":{\n" + " \"type\":\"SPECIFIC_TIME\",\n" + @@ -269,7 +307,7 @@ public class ControllerConstants { " }\n" + "}" + MARKDOWN_CODE_BLOCK_END; - protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_CUSTOM_EXAMPLE = MARKDOWN_CODE_BLOCK_START + + protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_CUSTOM_EXAMPLE = MARKDOWN_CODE_BLOCK_START + "{\n" + " \"schedule\":{\n" + " \"type\":\"CUSTOM\",\n" + @@ -321,9 +359,9 @@ public class ControllerConstants { " }\n" + "}" + MARKDOWN_CODE_BLOCK_END; - protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_ALWAYS_EXAMPLE = MARKDOWN_CODE_BLOCK_START + "\"schedule\": null" + MARKDOWN_CODE_BLOCK_END; + protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_ALWAYS_EXAMPLE = MARKDOWN_CODE_BLOCK_START + "\"schedule\": null" + MARKDOWN_CODE_BLOCK_END; - protected static final String DEVICE_PROFILE_ALARM_CONDITION_REPEATING_EXAMPLE = MARKDOWN_CODE_BLOCK_START + + protected static final String DEVICE_PROFILE_ALARM_CONDITION_REPEATING_EXAMPLE = MARKDOWN_CODE_BLOCK_START + "{\n" + " \"spec\":{\n" + " \"type\":\"REPEATING\",\n" + @@ -339,7 +377,8 @@ public class ControllerConstants { " }\n" + "}" + MARKDOWN_CODE_BLOCK_END; - protected static final String DEVICE_PROFILE_ALARM_CONDITION_DURATION_EXAMPLE = MARKDOWN_CODE_BLOCK_START + + + protected static final String DEVICE_PROFILE_ALARM_CONDITION_DURATION_EXAMPLE = MARKDOWN_CODE_BLOCK_START + "{\n" + " \"spec\":{\n" + " \"type\":\"DURATION\",\n" + diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index bb34f6d5b2..842e47756d 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -93,7 +93,9 @@ import static org.thingsboard.server.controller.ControllerConstants.DEVICE_PROFI import static org.thingsboard.server.controller.ControllerConstants.DEVICE_SORT_PROPERTY_ALLOWABLE_VALUES; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_TEXT_SEARCH_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_TYPE_DESCRIPTION; -import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION_MARKDOWN; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN; import static org.thingsboard.server.controller.ControllerConstants.EDGE_ASSIGN_ASYNC_FIRST_STEP_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.EDGE_ASSIGN_RECEIVE_STEP_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.EDGE_ID_PARAM_DESCRIPTION; @@ -182,9 +184,15 @@ public class DeviceController extends BaseController { @ApiOperation(value = "Create Device (saveDevice) with credentials ", notes = "Create or update the Device. When creating device, platform generates Device Id as " + UUID_WIKI_LINK + - "Requires to provide the Device Credentials object as well. Useful to create device and credentials in one request. " + - "You may find the example of LwM2M device and RPK credentials below: \n\n" + - DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION_MARKDOWN + + "Requires to provide the Device Credentials object as well as an existing device profile ID or use \"default\".\n" + + "Note: LwM2M device - only existing device profile ID (Transport configuration -> Transport type: \"LWM2M\".\n\n" + + "You may find the example of device with different type of credentials below: \n\n" + + "- Credentials type: \"Access token\" with Device profile ID below: \n\n" + + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN + "\n\n" + + "- Credentials type: \"Access token\" with Device profile default below: \n\n" + + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN + "\n\n" + + "- You may find the example of LwM2M device and RPK credentials below: \n\n" + + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN + "\n\n" + "Remove 'id', 'tenantId' and optionally 'customerId' from the request body example (below) to create new Device entity. " + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH) @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") From 4a0ff8b968c2f2165215e4f1a18ab3f3d219eeb9 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Tue, 27 Jun 2023 15:42:23 +0300 Subject: [PATCH 3/9] Edge event seq (#82) * edge event - added seq id auto generated column to fix issue with concurrent write of multiple edge events with the same created time * kotlin Pair replaced by springframework class * Handle cases when seq_id column started new cycle * Added check for null in case entity was deleted * GeneralEdgeEventFetched - sort order by seqId and not created time * Edge event table - added migration script to add seq_id column * Code review updates to be in sync with PE * Improved handling cases when edge_event.seqId started new cycle * Edge event table - seq_id column make to be cycled * Improved handling of cases when seq_id column of edge_event table started new cycle * Improved stability by properly handling exceptions --- .../main/data/upgrade/3.5.1/schema_update.sql | 63 +++++ .../controller/EdgeEventController.java | 2 +- .../service/edge/rpc/EdgeGrpcService.java | 5 +- .../service/edge/rpc/EdgeGrpcSession.java | 237 ++++++++++++------ .../rpc/constructor/AlarmMsgConstructor.java | 18 +- .../rpc/fetch/GeneralEdgeEventFetcher.java | 35 ++- .../update/DefaultDataUpdateService.java | 19 +- .../server/edge/AbstractEdgeTest.java | 9 +- .../server/edge/imitator/EdgeImitator.java | 4 - .../resources/application-test.properties | 1 + .../server/dao/edge/EdgeEventService.java | 2 +- .../server/common/data/edge/EdgeEvent.java | 1 + .../server/dao/edge/BaseEdgeEventService.java | 4 +- .../server/dao/edge/EdgeEventDao.java | 4 +- .../server/dao/model/ModelConstants.java | 1 + .../server/dao/model/sql/EdgeEventEntity.java | 5 + .../dao/sql/edge/EdgeEventRepository.java | 21 +- .../dao/sql/edge/JpaBaseEdgeEventDao.java | 43 ++-- .../main/resources/sql/schema-entities.sql | 2 + .../dao/service/EdgeEventServiceTest.java | 26 +- .../test/resources/sql/system-test-psql.sql | 5 +- 21 files changed, 336 insertions(+), 171 deletions(-) diff --git a/application/src/main/data/upgrade/3.5.1/schema_update.sql b/application/src/main/data/upgrade/3.5.1/schema_update.sql index 58031ce5c0..1655ecb978 100644 --- a/application/src/main/data/upgrade/3.5.1/schema_update.sql +++ b/application/src/main/data/upgrade/3.5.1/schema_update.sql @@ -53,6 +53,69 @@ $$; -- NOTIFICATION CONFIGS VERSION CONTROL END +-- EDGE EVENTS MIGRATION START +DO +$$ + DECLARE table_partition RECORD; + BEGIN + -- in case of running the upgrade script a second time: + IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_edge_event')) THEN + ALTER TABLE edge_event RENAME TO old_edge_event; + CREATE INDEX IF NOT EXISTS idx_old_edge_event_created_time_tmp ON old_edge_event(created_time); + ALTER INDEX IF EXISTS idx_edge_event_tenant_id_and_created_time RENAME TO idx_old_edge_event_tenant_id_and_created_time; + + FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts + FROM pg_tables WHERE tablename LIKE 'edge_event_%' + LOOP + EXECUTE format('ALTER TABLE %s RENAME TO old_edge_event_%s', table_partition.name, table_partition.partition_ts); + END LOOP; + ELSE + RAISE NOTICE 'Table old_edge_event already exists, leaving as is'; + END IF; + END; +$$; + +CREATE TABLE IF NOT EXISTS edge_event ( + seq_id INT GENERATED ALWAYS AS IDENTITY, + id uuid NOT NULL, + created_time bigint NOT NULL, + edge_id uuid, + edge_event_type varchar(255), + edge_event_uid varchar(255), + entity_id uuid, + edge_event_action varchar(255), + body varchar(10000000), + tenant_id uuid, + ts bigint NOT NULL +) PARTITION BY RANGE (created_time); +CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC); +CREATE INDEX IF NOT EXISTS idx_edge_event_id ON edge_event(id); +ALTER TABLE IF EXISTS edge_event ALTER COLUMN seq_id SET CYCLE; + +CREATE OR REPLACE PROCEDURE migrate_edge_event(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT) + LANGUAGE plpgsql AS +$$ +DECLARE + p RECORD; + partition_end_ts BIGINT; +BEGIN + FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_edge_event + WHERE created_time >= start_time_ms AND created_time < end_time_ms + LOOP + partition_end_ts = p.partition_ts + partition_size_ms; + RAISE NOTICE '[edge_event] Partition to create : [%-%]', p.partition_ts, partition_end_ts; + EXECUTE format('CREATE TABLE IF NOT EXISTS edge_event_%s PARTITION OF edge_event ' || + 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts); + END LOOP; + + INSERT INTO edge_event (id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts) + SELECT id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts + FROM old_edge_event + WHERE created_time >= start_time_ms AND created_time < end_time_ms; +END; +$$; +-- EDGE EVENTS MIGRATION END + ALTER TABLE resource ADD COLUMN IF NOT EXISTS etag varchar; diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeEventController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeEventController.java index fc85f439e0..386b1eab01 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeEventController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeEventController.java @@ -85,6 +85,6 @@ public class EdgeEventController extends BaseController { EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); checkEdgeId(edgeId, Operation.READ); TimePageLink pageLink = createTimePageLink(pageSize, page, textSearch, sortProperty, sortOrder, startTime, endTime); - return checkNotNull(edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false)); + return checkNotNull(edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index b7334ebf5a..e7214177d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -341,7 +341,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i sessionNewEvents.put(edgeId, false); Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() { @Override - public void onSuccess(Void result) { + public void onSuccess(Boolean newEventsAdded) { + if (Boolean.TRUE.equals(newEventsAdded)) { + sessionNewEvents.put(edgeId, true); + } scheduleEdgeEventsCheck(session); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 1dd0f31c20..4f0f5d277a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -24,6 +24,7 @@ import io.grpc.stub.StreamObserver; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; +import org.springframework.data.util.Pair; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.edge.Edge; @@ -35,6 +36,8 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.SortOrder; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg; import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg; @@ -68,17 +71,15 @@ import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher; import java.io.Closeable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; @Slf4j @Data @@ -89,6 +90,7 @@ public final class EdgeGrpcSession implements Closeable { private static final int MAX_DOWNLINK_ATTEMPTS = 10; // max number of attemps to send downlink message if edge connected private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs"; + private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId"; private final UUID sessionId; private final BiConsumer sessionOpenListener; @@ -103,6 +105,12 @@ public final class EdgeGrpcSession implements Closeable { private boolean connected; private boolean syncCompleted; + private Long newStartTs; + private Long previousStartTs; + private Long newStartSeqId; + private Long previousStartSeqId; + private Long seqIdEnd; + private EdgeVersion edgeVersion; private int maxInboundMessageSize; @@ -204,10 +212,10 @@ public final class EdgeGrpcSession implements Closeable { EdgeEventFetcher next = cursor.getNext(); log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}", edge.getTenantId(), edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName()); - ListenableFuture uuidListenableFuture = startProcessingEdgeEvents(next); - Futures.addCallback(uuidListenableFuture, new FutureCallback<>() { + ListenableFuture> future = startProcessingEdgeEvents(next); + Futures.addCallback(future, new FutureCallback<>() { @Override - public void onSuccess(@Nullable UUID result) { + public void onSuccess(@Nullable Pair result) { doSync(cursor); } @@ -307,36 +315,51 @@ public final class EdgeGrpcSession implements Closeable { sendDownlinkMsg(edgeConfigMsg); } - ListenableFuture processEdgeEvents() throws Exception { - SettableFuture result = SettableFuture.create(); + ListenableFuture processEdgeEvents() throws Exception { + SettableFuture result = SettableFuture.create(); log.trace("[{}] starting processing edge events", this.sessionId); if (isConnected() && isSyncCompleted()) { - Long queueStartTs = getQueueStartTs().get(); + Pair startTsAndSeqId = getQueueStartTsAndSeqId().get(); + this.previousStartTs = startTsAndSeqId.getFirst(); + this.previousStartSeqId = startTsAndSeqId.getSecond(); GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher( - queueStartTs, + this.previousStartTs, + this.previousStartSeqId, + this.seqIdEnd, + false, + Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()), ctx.getEdgeEventService()); - ListenableFuture ifOffsetFuture = startProcessingEdgeEvents(fetcher); - Futures.addCallback(ifOffsetFuture, new FutureCallback<>() { + Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() { @Override - public void onSuccess(@Nullable UUID ifOffset) { - if (ifOffset != null) { - Long newStartTs = Uuids.unixTimestamp(ifOffset); - ListenableFuture> updateFuture = updateQueueStartTs(newStartTs); + public void onSuccess(@Nullable Pair newStartTsAndSeqId) { + if (newStartTsAndSeqId != null) { + ListenableFuture> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId); Futures.addCallback(updateFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable List list) { - log.debug("[{}] queue offset was updated [{}][{}]", sessionId, ifOffset, newStartTs); - result.set(null); + log.debug("[{}] queue offset was updated [{}]", sessionId, newStartTsAndSeqId); + if (fetcher.isSeqIdNewCycleStarted()) { + seqIdEnd = fetcher.getSeqIdEnd(); + boolean newEventsAvailable = isNewEdgeEventsAvailable(); + result.set(newEventsAvailable); + } else { + seqIdEnd = null; + boolean newEventsAvailable = isSeqIdStartedNewCycle(); + if (!newEventsAvailable) { + newEventsAvailable = isNewEdgeEventsAvailable(); + } + result.set(newEventsAvailable); + } } @Override public void onFailure(Throwable t) { - log.error("[{}] Failed to update queue offset [{}]", sessionId, ifOffset, t); + log.error("[{}] Failed to update queue offset [{}]", sessionId, newStartTsAndSeqId, t); result.setException(t); } }, ctx.getGrpcCallbackExecutorService()); } else { - log.trace("[{}] ifOffset is null. Skipping iteration without db update", sessionId); + log.trace("[{}] newStartTsAndSeqId is null. Skipping iteration without db update", sessionId); result.set(null); } } @@ -354,14 +377,14 @@ public final class EdgeGrpcSession implements Closeable { return result; } - private ListenableFuture startProcessingEdgeEvents(EdgeEventFetcher fetcher) { - SettableFuture result = SettableFuture.create(); + private ListenableFuture> startProcessingEdgeEvents(EdgeEventFetcher fetcher) { + SettableFuture> result = SettableFuture.create(); PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()); processEdgeEvents(fetcher, pageLink, result); return result; } - private void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture result) { + private void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture> result) { try { PageData pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink); if (isConnected() && !pageData.getData().isEmpty()) { @@ -377,8 +400,15 @@ public final class EdgeGrpcSession implements Closeable { if (isConnected() && pageData.hasNext()) { processEdgeEvents(fetcher, pageLink.nextPageLink(), result); } else { - UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); - result.set(ifOffset); + EdgeEvent latestEdgeEvent = pageData.getData().get(pageData.getData().size() - 1); + UUID idOffset = latestEdgeEvent.getUuidId(); + if (idOffset != null) { + Long newStartTs = Uuids.unixTimestamp(idOffset); + long newStartSeqId = latestEdgeEvent.getSeqId(); + result.set(Pair.of(newStartTs, newStartSeqId)); + } else { + result.set(null); + } } } } @@ -461,69 +491,113 @@ public final class EdgeGrpcSession implements Closeable { } } - private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) { - log.trace("[{}][{}] converting edge event to downlink msg [{}]", edge.getTenantId(), this.sessionId, edgeEvent); - DownlinkMsg downlinkMsg = null; - try { - switch (edgeEvent.getAction()) { - case UPDATED: - case ADDED: - case DELETED: - case ASSIGNED_TO_EDGE: - case UNASSIGNED_FROM_EDGE: - case ALARM_ACK: - case ALARM_CLEAR: - case CREDENTIALS_UPDATED: - case RELATION_ADD_OR_UPDATE: - case RELATION_DELETED: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: - case CREDENTIALS_REQUEST: - case RPC_CALL: - downlinkMsg = convertEntityEventToDownlink(edgeEvent); - log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg); - break; - case ATTRIBUTES_UPDATED: - case POST_ATTRIBUTES: - case ATTRIBUTES_DELETED: - case TIMESERIES_UPDATED: - downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent); - break; - default: - log.warn("[{}][{}] Unsupported action type [{}]", edge.getTenantId(), this.sessionId, edgeEvent.getAction()); + private List convertToDownlinkMsgsPack(List edgeEvents) { + List result = new ArrayList<>(); + for (EdgeEvent edgeEvent : edgeEvents) { + log.trace("[{}][{}] converting edge event to downlink msg [{}]", edge.getTenantId(), this.sessionId, edgeEvent); + DownlinkMsg downlinkMsg = null; + try { + switch (edgeEvent.getAction()) { + case UPDATED: + case ADDED: + case DELETED: + case ASSIGNED_TO_EDGE: + case UNASSIGNED_FROM_EDGE: + case ALARM_ACK: + case ALARM_CLEAR: + case CREDENTIALS_UPDATED: + case RELATION_ADD_OR_UPDATE: + case RELATION_DELETED: + case CREDENTIALS_REQUEST: + case RPC_CALL: + case ASSIGNED_TO_CUSTOMER: + case UNASSIGNED_FROM_CUSTOMER: + downlinkMsg = convertEntityEventToDownlink(edgeEvent); + log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg); + break; + case ATTRIBUTES_UPDATED: + case POST_ATTRIBUTES: + case ATTRIBUTES_DELETED: + case TIMESERIES_UPDATED: + downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent); + break; + default: + log.warn("[{}][{}] Unsupported action type [{}]", edge.getTenantId(), this.sessionId, edgeEvent.getAction()); + } + } catch (Exception e) { + log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e); + } + if (downlinkMsg != null) { + result.add(downlinkMsg); + } + } + return result; + } + + private ListenableFuture> getQueueStartTsAndSeqId() { + ListenableFuture> future = + ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, Arrays.asList(QUEUE_START_TS_ATTR_KEY, QUEUE_START_SEQ_ID_ATTR_KEY)); + return Futures.transform(future, attributeKvEntries -> { + long startTs = 0L; + long startSeqId = 0L; + for (AttributeKvEntry attributeKvEntry : attributeKvEntries) { + if (QUEUE_START_TS_ATTR_KEY.equals(attributeKvEntry.getKey())) { + startTs = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L; + } + if (QUEUE_START_SEQ_ID_ATTR_KEY.equals(attributeKvEntry.getKey())) { + startSeqId = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L; + } + } + if (startSeqId == 0L) { + startSeqId = findStartSeqIdFromOldestEventIfAny(); } + return Pair.of(startTs, startSeqId); + }, ctx.getGrpcCallbackExecutorService()); + } + + private boolean isSeqIdStartedNewCycle() { + try { + TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis()); + PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, this.previousStartSeqId == 0 ? null : this.previousStartSeqId - 1, pageLink); + return !edgeEvents.getData().isEmpty(); } catch (Exception e) { - log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e); + log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", edge.getTenantId(), edge.getId(), sessionId, e); } - return downlinkMsg; + return false; } - private List convertToDownlinkMsgsPack(List edgeEvents) { - return edgeEvents - .stream() - .map(this::convertToDownlinkMsg) - .filter(Objects::nonNull) - .collect(Collectors.toList()); + private boolean isNewEdgeEventsAvailable() { + try { + TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis()); + PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), this.newStartSeqId, null, pageLink); + return !edgeEvents.getData().isEmpty(); + } catch (Exception e) { + log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", edge.getTenantId(), edge.getId(), sessionId, e); + } + return false; } - private ListenableFuture getQueueStartTs() { - ListenableFuture> future = - ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY); - return Futures.transform(future, attributeKvEntryOpt -> { - if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) { - AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get(); - return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L; - } else { - return 0L; + private long findStartSeqIdFromOldestEventIfAny() { + long startSeqId = 0L; + try { + TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime"), null, null); + PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), null, null, pageLink); + if (!edgeEvents.getData().isEmpty()) { + startSeqId = edgeEvents.getData().get(0).getSeqId() - 1; } - }, ctx.getGrpcCallbackExecutorService()); + } catch (Exception e) { + log.error("[{}][{}][{}] Failed to execute findStartSeqIdFromOldestEventIfAny", edge.getTenantId(), edge.getId(), sessionId, e); + } + return startSeqId; } - private ListenableFuture> updateQueueStartTs(Long newStartTs) { - log.trace("[{}] updating QueueStartTs [{}][{}]", this.sessionId, edge.getId(), newStartTs); - List attributes = Collections.singletonList( - new BaseAttributeKvEntry( - new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis())); + private ListenableFuture> updateQueueStartTsAndSeqId(Pair pair) { + this.newStartTs = pair.getFirst(); + this.newStartSeqId = pair.getSecond(); + log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", this.sessionId, edge.getId(), this.newStartTs, this.newStartSeqId); + List attributes = Arrays.asList( + new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, this.newStartTs), System.currentTimeMillis()), + new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, this.newStartSeqId), System.currentTimeMillis())); return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes); } @@ -693,8 +767,11 @@ public final class EdgeGrpcSession implements Closeable { } private void interruptPreviousSendDownlinkMsgsTask() { - log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId); - stopCurrentSendDownlinkMsgsTask(true); + if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone() + || sessionState.getScheduledSendDownlinkTask() != null && !sessionState.getScheduledSendDownlinkTask().isCancelled()) { + log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId); + stopCurrentSendDownlinkMsgsTask(true); + } } private void interruptGeneralProcessingOnSync() { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmMsgConstructor.java index 69a83da0a0..447a73e5cf 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmMsgConstructor.java @@ -18,7 +18,10 @@ package org.thingsboard.server.service.edge.rpc.constructor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityViewId; @@ -47,13 +50,22 @@ public class AlarmMsgConstructor { String entityName = null; switch (alarm.getOriginator().getEntityType()) { case DEVICE: - entityName = deviceService.findDeviceById(tenantId, new DeviceId(alarm.getOriginator().getId())).getName(); + Device deviceById = deviceService.findDeviceById(tenantId, new DeviceId(alarm.getOriginator().getId())); + if (deviceById != null) { + entityName = deviceById.getName(); + } break; case ASSET: - entityName = assetService.findAssetById(tenantId, new AssetId(alarm.getOriginator().getId())).getName(); + Asset assetById = assetService.findAssetById(tenantId, new AssetId(alarm.getOriginator().getId())); + if (assetById != null) { + entityName = assetById.getName(); + } break; case ENTITY_VIEW: - entityName = entityViewService.findEntityViewById(tenantId, new EntityViewId(alarm.getOriginator().getId())).getName(); + EntityView entityViewById = entityViewService.findEntityViewById(tenantId, new EntityViewId(alarm.getOriginator().getId())); + if (entityViewById != null) { + entityName = entityViewById.getName(); + } break; } AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java index 327184e6a9..24008ece09 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java @@ -16,19 +16,27 @@ package org.thingsboard.server.service.edge.rpc.fetch; import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.edge.EdgeEventService; @AllArgsConstructor +@Slf4j public class GeneralEdgeEventFetcher implements EdgeEventFetcher { private final Long queueStartTs; + private Long seqIdStart; + @Getter + private Long seqIdEnd; + @Getter + private boolean seqIdNewCycleStarted; + private Long maxReadRecordsCount; private final EdgeEventService edgeEventService; @Override @@ -37,13 +45,32 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher { pageSize, 0, null, - new SortOrder("createdTime", SortOrder.Direction.ASC), + null, queueStartTs, - null); + System.currentTimeMillis()); } @Override public PageData fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) { - return edgeEventService.findEdgeEvents(tenantId, edge.getId(), (TimePageLink) pageLink, true); + try { + PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, seqIdEnd, (TimePageLink) pageLink); + if (edgeEvents.getData().isEmpty()) { + this.seqIdEnd = Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount); + edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, seqIdEnd, (TimePageLink) pageLink); + if (edgeEvents.getData().stream().anyMatch(ee -> ee.getSeqId() < seqIdStart)) { + log.info("[{}] seqId column of edge_event table started new cycle [{}]", tenantId, edge.getId()); + this.seqIdNewCycleStarted = true; + this.seqIdStart = 0L; + } else { + edgeEvents = new PageData<>(); + log.warn("[{}] unexpected edge notification message received. " + + "no new events found and seqId column of edge_event table doesn't started new cycle [{}]", tenantId, edge.getId()); + } + } + return edgeEvents; + } catch (Exception e) { + log.error("[{}] failed to find edge events [{}]", tenantId, edge.getId()); + } + return new PageData<>(); } } diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index f832b04c21..dafccfdad6 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -202,22 +202,27 @@ public class DefaultDataUpdateService implements DataUpdateService { } else { log.info("Skipping audit logs migration"); } - boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false); - if (!skipEdgeEventsMigration) { - log.info("Starting edge events migration. Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true"); - edgeEventDao.migrateEdgeEvents(); - } else { - log.info("Skipping edge events migration"); - } + migrateEdgeEvents("Starting edge events migration. "); break; case "3.5.1": log.info("Updating data from version 3.5.1 to 3.5.2 ..."); + migrateEdgeEvents("Starting edge events migration - adding seq_id column. "); break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); } } + private void migrateEdgeEvents(String logPrefix) { + boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false); + if (!skipEdgeEventsMigration) { + log.info(logPrefix + "Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true"); + edgeEventDao.migrateEdgeEvents(); + } else { + log.info("Skipping edge events migration"); + } + } + @Override public void upgradeRuleNodes() { try { diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index 135b6d87c8..82b2ce3ec2 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -100,6 +100,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @TestPropertySource(properties = { "edges.enabled=true", + "queue.rule-engine.stats.enabled=false", }) abstract public class AbstractEdgeTest extends AbstractControllerTest { @@ -181,14 +182,14 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { @After public void afterTest() throws Exception { + try { + edgeImitator.disconnect(); + } catch (Exception ignored){} + loginSysAdmin(); doDelete("/api/tenant/" + savedTenant.getUuidId()) .andExpect(status().isOk()); - - try { - edgeImitator.disconnect(); - } catch (Exception ignored) {} } private void installation() { diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index 8f05e6810f..0edf070aef 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -94,8 +94,6 @@ public class EdgeImitator { @Getter private UplinkResponseMsg latestResponseMsg; - private boolean connected = false; - public EdgeImitator(String host, int port, String routingKey, String routingSecret) throws NoSuchFieldException, IllegalAccessException { edgeRpcClient = new EdgeGrpcClient(); messagesLatch = new CountDownLatch(0); @@ -120,7 +118,6 @@ public class EdgeImitator { } public void connect() { - connected = true; edgeRpcClient.connect(routingKey, routingSecret, this::onUplinkResponse, this::onEdgeUpdate, @@ -131,7 +128,6 @@ public class EdgeImitator { } public void disconnect() throws InterruptedException { - connected = false; edgeRpcClient.disconnect(false); } diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index ad86ff736b..99055e0e5f 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -14,6 +14,7 @@ edges.enabled=false edges.storage.no_read_records_sleep=500 edges.storage.sleep_between_batches=500 actors.rpc.sequential=true +queue.rule-engine.stats.enabled=true # Transports disabled to speed up the context init. Particular transport will be enabled with @TestPropertySource in respective tests transport.http.enabled=false diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java index dcb3a5232a..9055202f4f 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java @@ -26,7 +26,7 @@ public interface EdgeEventService { ListenableFuture saveAsync(EdgeEvent edgeEvent); - PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate); + PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink); /** * Executes stored procedure to cleanup old edge events. diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java index 71c35f4bd8..3688f5c6c2 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java @@ -31,6 +31,7 @@ import java.util.UUID; @ToString(callSuper = true) public class EdgeEvent extends BaseData { + private long seqId; private TenantId tenantId; private EdgeId edgeId; private EdgeEventActionType action; diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java index 0fc451cf18..82058761f8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java @@ -42,8 +42,8 @@ public class BaseEdgeEventService implements EdgeEventService { } @Override - public PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) { - return edgeEventDao.findEdgeEvents(tenantId.getId(), edgeId, pageLink, withTsUpdate); + public PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink) { + return edgeEventDao.findEdgeEvents(tenantId.getId(), edgeId, seqIdStart, seqIdEnd, pageLink); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java index 84bf8c40d2..942a536674 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java @@ -43,10 +43,12 @@ public interface EdgeEventDao extends Dao { * * @param tenantId the tenantId * @param edgeId the edgeId + * @param seqIdStart the seq id start + * @param seqIdEnd the seq id end * @param pageLink the pageLink * @return the event list */ - PageData findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate); + PageData findEdgeEvents(UUID tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink); /** * Executes stored procedure to cleanup old edge events. diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index 102d5f181e..552f83c749 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -535,6 +535,7 @@ public class ModelConstants { */ public static final String EDGE_EVENT_TABLE_NAME = "edge_event"; public static final String EDGE_EVENT_TENANT_ID_PROPERTY = TENANT_ID_PROPERTY; + public static final String EDGE_EVENT_SEQUENTIAL_ID_PROPERTY = "seq_id"; public static final String EDGE_EVENT_EDGE_ID_PROPERTY = "edge_id"; public static final String EDGE_EVENT_TYPE_PROPERTY = "edge_event_type"; public static final String EDGE_EVENT_ACTION_PROPERTY = "edge_event_action"; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/EdgeEventEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/EdgeEventEntity.java index 1edc47f197..55a30383d1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/EdgeEventEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/EdgeEventEntity.java @@ -43,6 +43,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_BODY_PR import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TABLE_NAME; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_EDGE_ID_PROPERTY; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_ENTITY_ID_PROPERTY; +import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_SEQUENTIAL_ID_PROPERTY; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TENANT_ID_PROPERTY; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TYPE_PROPERTY; import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_UID_PROPERTY; @@ -57,6 +58,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN; @NoArgsConstructor public class EdgeEventEntity extends BaseSqlEntity implements BaseEntity { + @Column(name = EDGE_EVENT_SEQUENTIAL_ID_PROPERTY) + protected long seqId; + @Column(name = EDGE_EVENT_TENANT_ID_PROPERTY) private UUID tenantId; @@ -120,6 +124,7 @@ public class EdgeEventEntity extends BaseSqlEntity implements BaseEnt edgeEvent.setAction(edgeEventAction); edgeEvent.setBody(entityBody); edgeEvent.setUid(edgeEventUid); + edgeEvent.setSeqId(seqId); return edgeEvent; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeEventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeEventRepository.java index c4827f1ffe..c3a9697ad6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeEventRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeEventRepository.java @@ -30,8 +30,10 @@ public interface EdgeEventRepository extends JpaRepository :startTime) " + + "AND (:startTime IS NULL OR e.createdTime >= :startTime) " + "AND (:endTime IS NULL OR e.createdTime <= :endTime) " + + "AND (:seqIdStart IS NULL OR e.seqId > :seqIdStart) " + + "AND (:seqIdEnd IS NULL OR e.seqId < :seqIdEnd) " + "AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))" ) Page findEdgeEventsByTenantIdAndEdgeId(@Param("tenantId") UUID tenantId, @@ -39,20 +41,7 @@ public interface EdgeEventRepository extends JpaRepository :startTime) " + - "AND (:endTime IS NULL OR e.createdTime <= :endTime) " + - "AND e.edgeEventAction <> 'TIMESERIES_UPDATED' " + - "AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))" - ) - Page findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(@Param("tenantId") UUID tenantId, - @Param("edgeId") UUID edgeId, - @Param("textSearch") String textSearch, - @Param("startTime") Long startTime, - @Param("endTime") Long endTime, - Pageable pageable); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java index bb825f504d..9f2eaae273 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java @@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.id.EdgeEventId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; @@ -43,7 +44,9 @@ import org.thingsboard.server.dao.util.SqlDao; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.ArrayList; import java.util.Comparator; +import java.util.List; import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -118,7 +121,7 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao(params, hashcodeFunction, 1, statsFactory); - queue.init(logExecutor, v -> edgeEventInsertRepository.save(v), + queue.init(logExecutor, edgeEventInsertRepository::save, Comparator.comparing(EdgeEventEntity::getTs) ); } @@ -171,29 +174,23 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) { - if (withTsUpdate) { - return DaoUtil.toPageData( - edgeEventRepository - .findEdgeEventsByTenantIdAndEdgeId( - tenantId, - edgeId.getId(), - Objects.toString(pageLink.getTextSearch(), ""), - pageLink.getStartTime(), - pageLink.getEndTime(), - DaoUtil.toPageable(pageLink))); - } else { - return DaoUtil.toPageData( - edgeEventRepository - .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated( - tenantId, - edgeId.getId(), - Objects.toString(pageLink.getTextSearch(), ""), - pageLink.getStartTime(), - pageLink.getEndTime(), - DaoUtil.toPageable(pageLink))); - + public PageData findEdgeEvents(UUID tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink) { + List sortOrders = new ArrayList<>(); + if (pageLink.getSortOrder() != null) { + sortOrders.add(pageLink.getSortOrder()); } + sortOrders.add(new SortOrder("seqId")); + return DaoUtil.toPageData( + edgeEventRepository + .findEdgeEventsByTenantIdAndEdgeId( + tenantId, + edgeId.getId(), + Objects.toString(pageLink.getTextSearch(), ""), + pageLink.getStartTime(), + pageLink.getEndTime(), + seqIdStart, + seqIdEnd, + DaoUtil.toPageable(pageLink, sortOrders))); } @Override diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 7fe3ec6e67..bfb2eed805 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -720,6 +720,7 @@ CREATE TABLE IF NOT EXISTS edge ( ); CREATE TABLE IF NOT EXISTS edge_event ( + seq_id INT GENERATED ALWAYS AS IDENTITY, id uuid NOT NULL, created_time bigint NOT NULL, edge_id uuid, @@ -731,6 +732,7 @@ CREATE TABLE IF NOT EXISTS edge_event ( tenant_id uuid, ts bigint NOT NULL ) PARTITION BY RANGE(created_time); +ALTER TABLE IF EXISTS edge_event ALTER COLUMN seq_id SET CYCLE; CREATE TABLE IF NOT EXISTS rpc ( id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY, diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/EdgeEventServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/EdgeEventServiceTest.java index 63958fe1e4..26e7cc2e1f 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/EdgeEventServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/EdgeEventServiceTest.java @@ -71,7 +71,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest { EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.ADDED); edgeEventService.saveAsync(edgeEvent).get(); - PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, new TimePageLink(1), false); + PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, new TimePageLink(1)); Assert.assertFalse(edgeEvents.getData().isEmpty()); EdgeEvent saved = edgeEvents.getData().get(0); @@ -113,7 +113,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest { Futures.allAsList(futures).get(); TimePageLink pageLink = new TimePageLink(2, 0, "", new SortOrder("createdTime", SortOrder.Direction.DESC), startTime, endTime); - PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true); + PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink); Assert.assertNotNull(edgeEvents.getData()); Assert.assertEquals(2, edgeEvents.getData().size()); @@ -122,7 +122,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest { Assert.assertTrue(edgeEvents.hasNext()); Assert.assertNotNull(pageLink.nextPageLink()); - edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink.nextPageLink(), true); + edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink.nextPageLink()); Assert.assertNotNull(edgeEvents.getData()); Assert.assertEquals(1, edgeEvents.getData().size()); @@ -132,26 +132,6 @@ public class EdgeEventServiceTest extends AbstractServiceTest { edgeEventService.cleanupEvents(1); } - @Test - public void findEdgeEventsWithTsUpdateAndWithout() throws Exception { - EdgeId edgeId = new EdgeId(Uuids.timeBased()); - DeviceId deviceId = new DeviceId(Uuids.timeBased()); - TenantId tenantId = TenantId.fromUUID(Uuids.timeBased()); - TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime", SortOrder.Direction.ASC)); - - EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED); - edgeEventService.saveAsync(edgeEventWithTsUpdate).get(); - - PageData allEdgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true); - PageData edgeEventsWithoutTsUpdate = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false); - - Assert.assertNotNull(allEdgeEvents.getData()); - Assert.assertNotNull(edgeEventsWithoutTsUpdate.getData()); - Assert.assertEquals(1, allEdgeEvents.getData().size()); - Assert.assertEquals(allEdgeEvents.getData().get(0).getUuidId(), edgeEventWithTsUpdate.getUuidId()); - Assert.assertTrue(edgeEventsWithoutTsUpdate.getData().isEmpty()); - } - private ListenableFuture saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception { EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED); edgeEvent.setId(new EdgeEventId(Uuids.startOf(time))); diff --git a/dao/src/test/resources/sql/system-test-psql.sql b/dao/src/test/resources/sql/system-test-psql.sql index 172731b9c5..21af327f13 100644 --- a/dao/src/test/resources/sql/system-test-psql.sql +++ b/dao/src/test/resources/sql/system-test-psql.sql @@ -1,2 +1,5 @@ --PostgreSQL specific truncate to fit constraints -TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm; \ No newline at end of file +TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm; + +-- Decrease seq_id column to make sure to cover cases of new sequential cycle during the tests +ALTER SEQUENCE edge_event_seq_id_seq MAXVALUE 256; From 7ff353a4d9758f40314f7e991ef87f1f94dcac34 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Tue, 27 Jun 2023 17:54:02 +0300 Subject: [PATCH 4/9] swagger_device_controller: fix bug example request - X509, MQTT_BASIC --- .../controller/ControllerConstants.java | 81 ++++++++++++++++++- .../server/controller/DeviceController.java | 13 ++- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index 6fe53516ed..bbd2773e6b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -15,6 +15,16 @@ */ package org.thingsboard.server.controller; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.thingsboard.server.common.msg.EncryptionUtil.certTrimNewLinesForChainInDeviceProfile; + public class ControllerConstants { protected static final String NEW_LINE = "\n\n"; @@ -236,7 +246,59 @@ public class ControllerConstants { " }\n" + "}"; - protected static final String CREDENTIALS_VALUE_LVM2M_RPK_DESCRIPTION = + protected static final String[] getCertificateValue() { + String filePath = "src/test/resources/provision/x509ChainProvisionTest.pem"; + try { + String certificateChain = Files.readString(Paths.get(filePath)); + certificateChain = certTrimNewLinesForChainInDeviceProfile(certificateChain); + return fetchLeafCertificateFromChain(certificateChain); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected static final String certificateValue = "\"-----BEGIN CERTIFICATE----- " + + "MIICMTCCAdegAwIBAgIUI9dBuwN6pTtK6uZ03rkiCwV4wEYwCgYIKoZIzj0EAwIwbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlQ2VydGlmaWNhdGVAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MB4XDTIzMDMyOTE0NTYxN1oXDTI0MDMyODE0NTYxN1owbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlQ2VydGlmaWNhdGVAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE9Zo791qKQiGNBm11r4ZGxh+w+ossZL3xc46ufq5QckQHP7zkD2XDAcmP5GvdkM1sBFN9AWaCkQfNnWmfERsOOKNTMFEwHQYDVR0OBBYEFFFc5uyCyglQoZiKhzXzMcQ3BKORMB8GA1UdIwQYMBaAFFFc5uyCyglQoZiKhzXzMcQ3BKORMA8GA1UdEwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSAAwRQIhANbA9CuhoOifZMMmqkpuld+65CR+ItKdXeRAhLMZuccuAiB0FSQB34zMutXrZj1g8Gl5OkE7YryFHbei1z0SveHR8g== " + + "-----END CERTIFICATE-----\""; + + protected static final String certificateId = "\"84f5911765abba1f96bf4165604e9e90338fc6214081a8e623b6ff9669aedb27\""; + + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION = + "{\n" + + " \"device\": {\n" + + " \"name\":\"Name_DeviceWithCredantial_X509_Certificate\",\n" + + " \"label\":\"Label_DeviceWithCredantial_X509_Certificate\",\n" + + " \"deviceProfileId\":{\n" + + " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" + + " \"entityType\":\"DEVICE_PROFILE\"\n" + + " }\n" + + " },\n" + + " \"credentials\": {\n" + + " \"credentialsType\": \"X509_CERTIFICATE\",\n" + + " \"credentialsId\": " + certificateId + ",\n" + + " \"credentialsValue\": " + certificateValue + "\n" + + " }\n" + + "}"; + + protected static final String MQTT_BASIC_VALUE = "\"{\\\"clientId\\\":\\\"5euh5nzm34bjjh1efmlt\\\",\\\"userName\\\":\\\"onasd1lgwasmjl7v2v7h\\\",\\\"password\\\":\\\"b9xtm4ny8kt9zewaga5o\\\"}\""; + + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION = + "{\n" + + " \"device\": {\n" + + " \"name\":\"Name_DeviceWithCredantial_MQTT_Basic\",\n" + + " \"label\":\"Label_DeviceWithCredantial_MQTT_Basic\",\n" + + " \"deviceProfileId\":{\n" + + " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" + + " \"entityType\":\"DEVICE_PROFILE\"\n" + + " }\n" + + " },\n" + + " \"credentials\": {\n" + + " \"credentialsType\": \"MQTT_BASIC\",\n" + + " \"credentialsValue\": " + MQTT_BASIC_VALUE + "\n" + + " }\n" + + "}"; + + protected static final String CREDENTIALS_VALUE_LVM2M_RPK_DESCRIPTION = " \"{" + "\\\"client\\\":{ " + "\\\"endpoint\\\":\\\"LwRpk00000000\\\", " + @@ -279,6 +341,12 @@ public class ControllerConstants { protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN = MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_DEFAULT_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN = MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; @@ -1594,4 +1662,15 @@ public class ControllerConstants { MARKDOWN_CODE_BLOCK_START + "[{\"ts\":1634712287000,\"values\":{\"temperature\":26, \"humidity\":87}}, {\"ts\":1634712588000,\"values\":{\"temperature\":25, \"humidity\":88}}]" + MARKDOWN_CODE_BLOCK_END ; + + private static String[] fetchLeafCertificateFromChain(String value) { + List chain = new ArrayList<>(); + String regex = "-----BEGIN CERTIFICATE-----\\s*.*?\\s*-----END CERTIFICATE-----"; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(value); + while (matcher.find()) { + chain.add(matcher.group(0)); + } + return chain.toArray(new String[0]); + } } diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index 842e47756d..f033b2758f 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -96,6 +96,8 @@ import static org.thingsboard.server.controller.ControllerConstants.DEVICE_TYPE_ import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN; import static org.thingsboard.server.controller.ControllerConstants.EDGE_ASSIGN_ASYNC_FIRST_STEP_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.EDGE_ASSIGN_RECEIVE_STEP_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.EDGE_ID_PARAM_DESCRIPTION; @@ -185,13 +187,18 @@ public class DeviceController extends BaseController { @ApiOperation(value = "Create Device (saveDevice) with credentials ", notes = "Create or update the Device. When creating device, platform generates Device Id as " + UUID_WIKI_LINK + "Requires to provide the Device Credentials object as well as an existing device profile ID or use \"default\".\n" + - "Note: LwM2M device - only existing device profile ID (Transport configuration -> Transport type: \"LWM2M\".\n\n" + "You may find the example of device with different type of credentials below: \n\n" + - "- Credentials type: \"Access token\" with Device profile ID below: \n\n" + + "- Credentials type: \"Access token\" with device profile ID below: \n\n" + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN + "\n\n" + - "- Credentials type: \"Access token\" with Device profile default below: \n\n" + + "- Credentials type: \"Access token\" with device profile default below: \n\n" + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN + "\n\n" + + "- Credentials type: \"X509\" with device profile ID below: \n\n" + + "Note: credentialsId - format Sha3Hash, certificateValue - format PEM (with \"--BEGIN CERTIFICATE----\" and -\"----END CERTIFICATE-\").\n\n" + + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN + "\n\n" + + "- Credentials type: \"MQTT_BASIC\" with device profile ID below: \n\n" + + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN + "\n\n" + "- You may find the example of LwM2M device and RPK credentials below: \n\n" + + "Note: LwM2M device - only existing device profile ID (Transport configuration -> Transport type: \"LWM2M\".\n\n" + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN + "\n\n" + "Remove 'id', 'tenantId' and optionally 'customerId' from the request body example (below) to create new Device entity. " + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH) From 1cb74577d510bf971c57507c24156fa618e5d213 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Tue, 27 Jun 2023 18:09:21 +0300 Subject: [PATCH 5/9] swagger_device_controller: refactoring fix bug example request - X509, MQTT_BASIC --- .../controller/ControllerConstants.java | 32 ------------------- 1 file changed, 32 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index bbd2773e6b..7701a19f02 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -15,16 +15,6 @@ */ package org.thingsboard.server.controller; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.thingsboard.server.common.msg.EncryptionUtil.certTrimNewLinesForChainInDeviceProfile; - public class ControllerConstants { protected static final String NEW_LINE = "\n\n"; @@ -246,17 +236,6 @@ public class ControllerConstants { " }\n" + "}"; - protected static final String[] getCertificateValue() { - String filePath = "src/test/resources/provision/x509ChainProvisionTest.pem"; - try { - String certificateChain = Files.readString(Paths.get(filePath)); - certificateChain = certTrimNewLinesForChainInDeviceProfile(certificateChain); - return fetchLeafCertificateFromChain(certificateChain); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - protected static final String certificateValue = "\"-----BEGIN CERTIFICATE----- " + "MIICMTCCAdegAwIBAgIUI9dBuwN6pTtK6uZ03rkiCwV4wEYwCgYIKoZIzj0EAwIwbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlQ2VydGlmaWNhdGVAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MB4XDTIzMDMyOTE0NTYxN1oXDTI0MDMyODE0NTYxN1owbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlQ2VydGlmaWNhdGVAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE9Zo791qKQiGNBm11r4ZGxh+w+ossZL3xc46ufq5QckQHP7zkD2XDAcmP5GvdkM1sBFN9AWaCkQfNnWmfERsOOKNTMFEwHQYDVR0OBBYEFFFc5uyCyglQoZiKhzXzMcQ3BKORMB8GA1UdIwQYMBaAFFFc5uyCyglQoZiKhzXzMcQ3BKORMA8GA1UdEwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSAAwRQIhANbA9CuhoOifZMMmqkpuld+65CR+ItKdXeRAhLMZuccuAiB0FSQB34zMutXrZj1g8Gl5OkE7YryFHbei1z0SveHR8g== " + "-----END CERTIFICATE-----\""; @@ -1662,15 +1641,4 @@ public class ControllerConstants { MARKDOWN_CODE_BLOCK_START + "[{\"ts\":1634712287000,\"values\":{\"temperature\":26, \"humidity\":87}}, {\"ts\":1634712588000,\"values\":{\"temperature\":25, \"humidity\":88}}]" + MARKDOWN_CODE_BLOCK_END ; - - private static String[] fetchLeafCertificateFromChain(String value) { - List chain = new ArrayList<>(); - String regex = "-----BEGIN CERTIFICATE-----\\s*.*?\\s*-----END CERTIFICATE-----"; - Pattern pattern = Pattern.compile(regex); - Matcher matcher = pattern.matcher(value); - while (matcher.find()) { - chain.add(matcher.group(0)); - } - return chain.toArray(new String[0]); - } } From eab633632a7cc0904ec4ae2002bef8a88c308050 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 28 Jun 2023 11:35:54 +0300 Subject: [PATCH 6/9] added cache for TBResourceInfo --- .../src/main/resources/thingsboard.yml | 3 ++ .../resourceinfo/ResourceInfoCacheKey.java | 45 +++++++++++++++++++ .../ResourceInfoCaffeineCache.java | 34 ++++++++++++++ .../resourceinfo/ResourceInfoEvictEvent.java | 23 ++++++++++ .../resourceinfo/ResourceInfoRedisCache.java | 35 +++++++++++++++ .../server/common/data/CacheConstants.java | 1 + .../dao/resource/BaseResourceService.java | 28 +++++++----- 7 files changed, 157 insertions(+), 12 deletions(-) create mode 100644 common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCacheKey.java create mode 100644 common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCaffeineCache.java create mode 100644 common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoEvictEvent.java create mode 100644 common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoRedisCache.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 81654e3033..ba49ae5b23 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -497,6 +497,9 @@ cache: entityCount: timeToLiveInMinutes: "${CACHE_SPECS_ENTITY_COUNT_TTL:1440}" maxSize: "${CACHE_SPECS_ENTITY_COUNT_MAX_SIZE:100000}" + resourceInfo: + timeToLiveInMinutes: "${CACHE_SPECS_RESOURCE_INFO_TTL:1440}" + maxSize: "${CACHE_SPECS_USER_SETTINGS_MAX_SIZE:100000}" # deliberately placed outside 'specs' group above notificationRules: diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCacheKey.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCacheKey.java new file mode 100644 index 0000000000..670866e54b --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCacheKey.java @@ -0,0 +1,45 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.cache.resourceinfo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.RequiredArgsConstructor; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TbResourceId; +import org.thingsboard.server.common.data.id.TenantId; + +import java.io.Serializable; +import java.util.UUID; + +@Getter +@EqualsAndHashCode +@RequiredArgsConstructor +@Builder +public class ResourceInfoCacheKey implements Serializable { + + private final TenantId tenantId; + private final TbResourceId tbResourceId; + + @Override + public String toString() { + return tenantId + "_" + tbResourceId; + } +} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCaffeineCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCaffeineCache.java new file mode 100644 index 0000000000..371f2012cc --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCaffeineCache.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.cache.resourceinfo; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.CacheManager; +import org.springframework.stereotype.Service; +import org.thingsboard.server.cache.CaffeineTbTransactionalCache; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.TbResourceInfo; + + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "caffeine", matchIfMissing = true) +@Service("ResourceInfoCache") +public class ResourceInfoCaffeineCache extends CaffeineTbTransactionalCache { + + public ResourceInfoCaffeineCache(CacheManager cacheManager) { + super(cacheManager, CacheConstants.RESOURCE_INFO_CACHE); + } + +} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoEvictEvent.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoEvictEvent.java new file mode 100644 index 0000000000..002510b314 --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoEvictEvent.java @@ -0,0 +1,23 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.cache.resourceinfo; + +import lombok.Data; + +@Data +public class ResourceInfoEvictEvent { + private final ResourceInfoCacheKey key; +} diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoRedisCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoRedisCache.java new file mode 100644 index 0000000000..617367fb80 --- /dev/null +++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoRedisCache.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.cache.resourceinfo; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.stereotype.Service; +import org.thingsboard.server.cache.CacheSpecsMap; +import org.thingsboard.server.cache.RedisTbTransactionalCache; +import org.thingsboard.server.cache.TBRedisCacheConfiguration; +import org.thingsboard.server.cache.TbFSTRedisSerializer; +import org.thingsboard.server.common.data.CacheConstants; +import org.thingsboard.server.common.data.TbResourceInfo; + +@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis") +@Service("ResourceInfoCache") +public class ResourceInfoRedisCache extends RedisTbTransactionalCache { + + public ResourceInfoRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) { + super(CacheConstants.RESOURCE_INFO_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbFSTRedisSerializer<>()); + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java index ff0032f435..f21b13a674 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java @@ -44,4 +44,5 @@ public class CacheConstants { public static final String USER_SETTINGS_CACHE = "userSettings"; public static final String DASHBOARD_TITLES_CACHE = "dashboardTitles"; public static final String ENTITY_COUNT_CACHE = "entityCount"; + public static final String RESOURCE_INFO_CACHE = "resourceInfo"; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java index 974e3665f1..016b9c9d9a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java @@ -20,7 +20,10 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.hibernate.exception.ConstraintViolationException; import org.springframework.stereotype.Service; +import org.springframework.transaction.event.TransactionalEventListener; +import org.thingsboard.server.cache.resourceinfo.ResourceInfoEvictEvent; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.cache.resourceinfo.ResourceInfoCacheKey; import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResourceInfo; @@ -31,6 +34,7 @@ import org.thingsboard.server.common.data.id.TbResourceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.entity.AbstractCachedEntityService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; @@ -45,7 +49,7 @@ import static org.thingsboard.server.dao.service.Validator.validateId; @Service("TbResourceDaoService") @Slf4j @AllArgsConstructor -public class BaseResourceService implements ResourceService { +public class BaseResourceService extends AbstractCachedEntityService implements ResourceService { public static final String INCORRECT_RESOURCE_ID = "Incorrect resourceId "; private final TbResourceDao resourceDao; @@ -55,10 +59,12 @@ public class BaseResourceService implements ResourceService { @Override public TbResource saveResource(TbResource resource) { resourceValidator.validate(resource, TbResourceInfo::getTenantId); - try { - return resourceDao.save(resource.getTenantId(), resource); + TbResource saved = resourceDao.save(resource.getTenantId(), resource); + publishEvictEvent(new ResourceInfoEvictEvent(new ResourceInfoCacheKey(resource.getTenantId(), resource.getId()))); + return saved; } catch (Exception t) { + publishEvictEvent(new ResourceInfoEvictEvent(new ResourceInfoCacheKey(resource.getTenantId(), resource.getId()))); ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("resource_unq_key")) { String field = ResourceType.LWM2M_MODEL.equals(resource.getResourceType()) ? "resourceKey" : "fileName"; @@ -86,7 +92,9 @@ public class BaseResourceService implements ResourceService { public TbResourceInfo findResourceInfoById(TenantId tenantId, TbResourceId resourceId) { log.trace("Executing findResourceInfoById [{}] [{}]", tenantId, resourceId); Validator.validateId(resourceId, INCORRECT_RESOURCE_ID + resourceId); - return resourceInfoDao.findById(tenantId, resourceId.getId()); + + return cache.getAndPutInTransaction(new ResourceInfoCacheKey(tenantId, resourceId), + () -> resourceInfoDao.findById(tenantId, resourceId.getId()), true); } @Override @@ -169,13 +177,9 @@ public class BaseResourceService implements ResourceService { } }; - protected Optional extractConstraintViolationException(Exception t) { - if (t instanceof ConstraintViolationException) { - return Optional.of((ConstraintViolationException) t); - } else if (t.getCause() instanceof ConstraintViolationException) { - return Optional.of((ConstraintViolationException) (t.getCause())); - } else { - return Optional.empty(); - } + @TransactionalEventListener(classes = ResourceInfoCacheKey.class) + @Override + public void handleEvictEvent(ResourceInfoEvictEvent event) { + cache.evict(event.getKey()); } } From e2ba34bbf33a70d6b3307a7284e9c5890da711f3 Mon Sep 17 00:00:00 2001 From: dashevchenko Date: Wed, 28 Jun 2023 13:48:46 +0300 Subject: [PATCH 7/9] refactoring --- application/src/main/resources/thingsboard.yml | 2 +- .../ResourceInfoCacheKey.java | 7 +------ .../ResourceInfoCaffeineCache.java | 2 +- .../ResourceInfoEvictEvent.java | 7 +++++-- .../ResourceInfoRedisCache.java | 2 +- .../server/dao/resource/BaseResourceService.java | 15 +++++++++------ 6 files changed, 18 insertions(+), 17 deletions(-) rename common/cache/src/main/java/org/thingsboard/server/cache/{resourceinfo => resourceInfo}/ResourceInfoCacheKey.java (84%) rename common/cache/src/main/java/org/thingsboard/server/cache/{resourceinfo => resourceInfo}/ResourceInfoCaffeineCache.java (96%) rename common/cache/src/main/java/org/thingsboard/server/cache/{resourceinfo => resourceInfo}/ResourceInfoEvictEvent.java (73%) rename common/cache/src/main/java/org/thingsboard/server/cache/{resourceinfo => resourceInfo}/ResourceInfoRedisCache.java (97%) diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index ba49ae5b23..c8dfd1b029 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -499,7 +499,7 @@ cache: maxSize: "${CACHE_SPECS_ENTITY_COUNT_MAX_SIZE:100000}" resourceInfo: timeToLiveInMinutes: "${CACHE_SPECS_RESOURCE_INFO_TTL:1440}" - maxSize: "${CACHE_SPECS_USER_SETTINGS_MAX_SIZE:100000}" + maxSize: "${CACHE_SPECS_RESOURCE_INFO_MAX_SIZE:100000}" # deliberately placed outside 'specs' group above notificationRules: diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCacheKey.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoCacheKey.java similarity index 84% rename from common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCacheKey.java rename to common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoCacheKey.java index 670866e54b..9db53f86c6 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCacheKey.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoCacheKey.java @@ -13,21 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.cache.resourceinfo; +package org.thingsboard.server.cache.resourceInfo; -import lombok.AllArgsConstructor; import lombok.Builder; -import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TbResourceId; import org.thingsboard.server.common.data.id.TenantId; import java.io.Serializable; -import java.util.UUID; @Getter @EqualsAndHashCode diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCaffeineCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoCaffeineCache.java similarity index 96% rename from common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCaffeineCache.java rename to common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoCaffeineCache.java index 371f2012cc..95754d891a 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoCaffeineCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoCaffeineCache.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.cache.resourceinfo; +package org.thingsboard.server.cache.resourceInfo; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.CacheManager; diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoEvictEvent.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoEvictEvent.java similarity index 73% rename from common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoEvictEvent.java rename to common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoEvictEvent.java index 002510b314..11272a5e24 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoEvictEvent.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoEvictEvent.java @@ -13,11 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.cache.resourceinfo; +package org.thingsboard.server.cache.resourceInfo; import lombok.Data; +import org.thingsboard.server.common.data.id.TbResourceId; +import org.thingsboard.server.common.data.id.TenantId; @Data public class ResourceInfoEvictEvent { - private final ResourceInfoCacheKey key; + private final TenantId tenantId; + private final TbResourceId resourceId; } diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoRedisCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoRedisCache.java similarity index 97% rename from common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoRedisCache.java rename to common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoRedisCache.java index 617367fb80..fee14e1ca1 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/resourceinfo/ResourceInfoRedisCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoRedisCache.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.cache.resourceinfo; +package org.thingsboard.server.cache.resourceInfo; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.redis.connection.RedisConnectionFactory; diff --git a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java index 016b9c9d9a..bc4f47040b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java @@ -21,9 +21,10 @@ import lombok.extern.slf4j.Slf4j; import org.hibernate.exception.ConstraintViolationException; import org.springframework.stereotype.Service; import org.springframework.transaction.event.TransactionalEventListener; -import org.thingsboard.server.cache.resourceinfo.ResourceInfoEvictEvent; +import org.thingsboard.server.cache.device.DeviceCacheKey; +import org.thingsboard.server.cache.resourceInfo.ResourceInfoEvictEvent; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.cache.resourceinfo.ResourceInfoCacheKey; +import org.thingsboard.server.cache.resourceInfo.ResourceInfoCacheKey; import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.TbResourceInfo; @@ -61,10 +62,10 @@ public class BaseResourceService extends AbstractCachedEntityService Date: Wed, 28 Jun 2023 14:50:17 +0300 Subject: [PATCH 8/9] added cache config properties --- dao/src/test/resources/application-test.properties | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties index d89211cb2f..98f9091318 100644 --- a/dao/src/test/resources/application-test.properties +++ b/dao/src/test/resources/application-test.properties @@ -74,6 +74,9 @@ cache.specs.dashboardTitles.maxSize=10000 cache.specs.entityCount.timeToLiveInMinutes=1440 cache.specs.entityCount.maxSize=10000 +cache.specs.resourceInfo.timeToLiveInMinutes=1440 +cache.specs.resourceInfo.maxSize=10000 + redis.connection.host=localhost redis.connection.port=6379 redis.connection.db=0 From 66ec7e523fa2b7d74816bcfe332047129bbd7140 Mon Sep 17 00:00:00 2001 From: nickAS21 Date: Thu, 29 Jun 2023 18:33:51 +0300 Subject: [PATCH 9/9] swagger_device_controller: refactoring device credentials with 4 mode security --- .../controller/ControllerConstants.java | 151 ++++++++++++++---- .../server/controller/DeviceController.java | 29 +++- 2 files changed, 148 insertions(+), 32 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java index 7701a19f02..a6a49f6b3c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java +++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java @@ -223,6 +223,19 @@ public class ControllerConstants { " }\n" + "}"; + protected static final String DEVICE_UPDATE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION = + "{\n" + + " \"id\": {\n" + + " \"id\":\"c886a090-168d-11ee-87c9-6f157dbc816a\"\n" + + " },\n" + + " \"deviceId\": {\n" + + " \"id\":\"c5fb3ac0-168d-11ee-87c9-6f157dbc816a\",\n" + + " \"entityType\":\"DEVICE\"\n" + + " },\n" + + " \"credentialsType\": \"ACCESS_TOKEN\",\n" + + " \"credentialsId\": \"6hmxew8pmmzng4e3une4\"\n" + + "}"; + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_DEFAULT_PARAM_DESCRIPTION = "{\n" + " \"device\": {\n" + @@ -242,40 +255,75 @@ public class ControllerConstants { protected static final String certificateId = "\"84f5911765abba1f96bf4165604e9e90338fc6214081a8e623b6ff9669aedb27\""; + protected static final String certificateValueUpdate = "\"-----BEGIN CERTIFICATE----- " + + "MIICMTCCAdegAwIBAgIUUEKxS9hTz4l+oLUMF0LV6TC/gCIwCgYIKoZIzj0EAwIwbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlUHJvZmlsZUNlcnRAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MB4XDTIzMDMyOTE0NTczNloXDTI0MDMyODE0NTczNlowbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlUHJvZmlsZUNlcnRAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAECMlWO72krDoUL9FQjUmSCetkhaEGJUfQkdSfkLSNa0GyAEIMbfmzI4zITeapunu4rGet3EMyLydQzuQanBicp6NTMFEwHQYDVR0OBBYEFHpZ78tPnztNii4Da/yCw6mhEIL3MB8GA1UdIwQYMBaAFHpZ78tPnztNii4Da/yCw6mhEIL3MA8GA1UdEwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSAAwRQIgJ7qyMFqNcwSYkH6o+UlQXzLWfwZbNjVk+aR7foAZNGsCIQDsd7v3WQIGHiArfZeDs1DLEDuV/2h6L+ZNoGNhEKL+1A== " + + "-----END CERTIFICATE-----\""; + + protected static final String certificateIdUpdate = "\"6b8adb49015500e51a527acd332b51684ab9b49b4ade03a9582a44c455e2e9b6\""; + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION = "{\n" + - " \"device\": {\n" + - " \"name\":\"Name_DeviceWithCredantial_X509_Certificate\",\n" + - " \"label\":\"Label_DeviceWithCredantial_X509_Certificate\",\n" + - " \"deviceProfileId\":{\n" + - " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" + - " \"entityType\":\"DEVICE_PROFILE\"\n" + - " }\n" + - " },\n" + - " \"credentials\": {\n" + - " \"credentialsType\": \"X509_CERTIFICATE\",\n" + - " \"credentialsId\": " + certificateId + ",\n" + - " \"credentialsValue\": " + certificateValue + "\n" + - " }\n" + - "}"; + " \"device\": {\n" + + " \"name\":\"Name_DeviceWithCredantial_X509_Certificate\",\n" + + " \"label\":\"Label_DeviceWithCredantial_X509_Certificate\",\n" + + " \"deviceProfileId\":{\n" + + " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" + + " \"entityType\":\"DEVICE_PROFILE\"\n" + + " }\n" + + " },\n" + + " \"credentials\": {\n" + + " \"credentialsType\": \"X509_CERTIFICATE\",\n" + + " \"credentialsId\": " + certificateId + ",\n" + + " \"credentialsValue\": " + certificateValue + "\n" + + " }\n" + + "}"; + + protected static final String DEVICE_UPDATE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION = + "{\n" + + " \"id\": {\n" + + " \"id\":\"309bd9c0-14f4-11ee-9fc9-d9b7463abb63\"\n" + + " },\n" + + " \"deviceId\": {\n" + + " \"id\":\"3092b200-14f4-11ee-9fc9-d9b7463abb63\",\n" + + " \"entityType\":\"DEVICE\"\n" + + " },\n" + + " \"credentialsType\": \"X509_CERTIFICATE\",\n" + + " \"credentialsId\": " + certificateIdUpdate + ",\n" + + " \"credentialsValue\": " + certificateValueUpdate + "\n" + + "}"; protected static final String MQTT_BASIC_VALUE = "\"{\\\"clientId\\\":\\\"5euh5nzm34bjjh1efmlt\\\",\\\"userName\\\":\\\"onasd1lgwasmjl7v2v7h\\\",\\\"password\\\":\\\"b9xtm4ny8kt9zewaga5o\\\"}\""; + protected static final String MQTT_BASIC_VALUE_UPDATE = "\"{\\\"clientId\\\":\\\"juy03yv4owqxcmqhqtvk\\\",\\\"userName\\\":\\\"ov19fxca0cyjn7lm7w7u\\\",\\\"password\\\":\\\"twy94he114dfi9usyk1o\\\"}\""; + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION = "{\n" + - " \"device\": {\n" + - " \"name\":\"Name_DeviceWithCredantial_MQTT_Basic\",\n" + - " \"label\":\"Label_DeviceWithCredantial_MQTT_Basic\",\n" + - " \"deviceProfileId\":{\n" + - " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" + - " \"entityType\":\"DEVICE_PROFILE\"\n" + - " }\n" + - " },\n" + - " \"credentials\": {\n" + - " \"credentialsType\": \"MQTT_BASIC\",\n" + - " \"credentialsValue\": " + MQTT_BASIC_VALUE + "\n" + - " }\n" + - "}"; + " \"device\": {\n" + + " \"name\":\"Name_DeviceWithCredantial_MQTT_Basic\",\n" + + " \"label\":\"Label_DeviceWithCredantial_MQTT_Basic\",\n" + + " \"deviceProfileId\":{\n" + + " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" + + " \"entityType\":\"DEVICE_PROFILE\"\n" + + " }\n" + + " },\n" + + " \"credentials\": {\n" + + " \"credentialsType\": \"MQTT_BASIC\",\n" + + " \"credentialsValue\": " + MQTT_BASIC_VALUE + "\n" + + " }\n" + + "}"; + + protected static final String DEVICE_UPDATE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION = + "{\n" + + " \"id\": {\n" + + " \"id\":\"d877ffb0-14f5-11ee-9fc9-d9b7463abb63\"\n" + + " },\n" + + " \"deviceId\": {\n" + + " \"id\":\"d875dcd0-14f5-11ee-9fc9-d9b7463abb63\",\n" + + " \"entityType\":\"DEVICE\"\n" + + " },\n" + + " \"credentialsType\": \"MQTT_BASIC\",\n" + + " \"credentialsValue\": " + MQTT_BASIC_VALUE_UPDATE + "\n" + + "}"; protected static final String CREDENTIALS_VALUE_LVM2M_RPK_DESCRIPTION = " \"{" + @@ -297,6 +345,26 @@ public class ControllerConstants { "} " + "}\""; + protected static final String CREDENTIALS_VALUE_UPDATE_LVM2M_RPK_DESCRIPTION = + " \"{" + + "\\\"client\\\":{ " + + "\\\"endpoint\\\":\\\"LwRpk00000000\\\", " + + "\\\"securityConfigClientMode\\\":\\\"RPK\\\", " + + "\\\"key\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEdvBZZ2vQRK9wgDhctj6B1c7bxR3Z0wYg1+YdoYFnVUKWb+rIfTTyYK9tmQJx5Vlb5fxdLnVv1RJOPiwsLIQbAA==\\\"" + + " }, " + + "\\\"bootstrap\\\":{ " + + "\\\"bootstrapServer\\\":{ " + + "\\\"securityMode\\\":\\\"RPK\\\", " + + "\\\"clientPublicKeyOrId\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\", " + + "\\\"clientSecretKey\\\":\\\"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\\\"" + + "}, " + + "\\\"lwm2mServer\\\":{ \\\"securityMode\\\":\\\"RPK\\\", " + + "\\\"clientPublicKeyOrId\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\", " + + "\\\"clientSecretKey\\\":\\\"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\\\"" + + "}" + + "} " + + "}\""; + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION = "{\n" + " \"device\": {\n" + @@ -314,6 +382,20 @@ public class ControllerConstants { " }\n" + "}"; + protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION = + "{\n" + + " \"id\": {\n" + + " \"id\":\"e238d4d0-1689-11ee-98c6-1713c1be5a8e\"\n" + + " },\n" + + " \"deviceId\": {\n" + + " \"id\":\"e232e160-1689-11ee-98c6-1713c1be5a8e\",\n" + + " \"entityType\":\"DEVICE\"\n" + + " },\n" + + " \"credentialsType\": \"LWM2M_CREDENTIALS\",\n" + + " \"credentialsId\": \"LwRpk00000000\",\n" + + " \"credentialsValue\":\n" + CREDENTIALS_VALUE_UPDATE_LVM2M_RPK_DESCRIPTION + "\n" + + "}"; + protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN = MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; @@ -329,8 +411,21 @@ public class ControllerConstants { protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN = MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_UPDATE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + + protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_UPDATE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + + protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_UPDATE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + + protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN = + MARKDOWN_CODE_BLOCK_START + DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION + MARKDOWN_CODE_BLOCK_END; + + - protected static final String FILTER_VALUE_TYPE = NEW_LINE + "## Value Type and Operations" + NEW_LINE + + protected static final String FILTER_VALUE_TYPE = NEW_LINE + "## Value Type and Operations" + NEW_LINE + "Provides a hint about the data type of the entity field that is defined in the filter key. " + "The value type impacts the list of possible operations that you may use in the corresponding predicate. For example, you may use 'STARTS_WITH' or 'END_WITH', but you can't use 'GREATER_OR_EQUAL' for string values." + "The following filter value types and corresponding predicate operations are supported: " + NEW_LINE + diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index f033b2758f..8be76856a4 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -93,6 +93,10 @@ import static org.thingsboard.server.controller.ControllerConstants.DEVICE_PROFI import static org.thingsboard.server.controller.ControllerConstants.DEVICE_SORT_PROPERTY_ALLOWABLE_VALUES; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_TEXT_SEARCH_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_TYPE_DESCRIPTION; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_UPDATE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_UPDATE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN; +import static org.thingsboard.server.controller.ControllerConstants.DEVICE_UPDATE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN; import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN; @@ -292,10 +296,27 @@ public class DeviceController extends BaseController { return tbDeviceService.getDeviceCredentialsByDeviceId(device, getCurrentUser()); } - @ApiOperation(value = "Update device credentials (updateDeviceCredentials)", notes = "During device creation, platform generates random 'ACCESS_TOKEN' credentials. " + - "Use this method to update the device credentials. First use 'getDeviceCredentialsByDeviceId' to get the credentials id and value. " + - "Then use current method to update the credentials type and value. It is not possible to create multiple device credentials for the same device. " + - "The structure of device credentials id and value is simple for the 'ACCESS_TOKEN' but is much more complex for the 'MQTT_BASIC' or 'LWM2M_CREDENTIALS'." + TENANT_AUTHORITY_PARAGRAPH) + @ApiOperation(value = "Update device credentials (updateDeviceCredentials)", + notes = "During device creation, platform generates random 'ACCESS_TOKEN' credentials. \" +\n" + + "Use this method to update the device credentials. First use 'getDeviceCredentialsByDeviceId' to get the credentials id and value.\n" + + "Then use current method to update the credentials type and value. It is not possible to create multiple device credentials for the same device.\n" + + "The structure of device credentials id and value is simple for the 'ACCESS_TOKEN' but is much more complex for the 'MQTT_BASIC' or 'LWM2M_CREDENTIALS'.\n" + + "You may find the example of device with different type of credentials below: \n\n" + + "- Credentials type: \"Access token\" with device ID and with device ID below: \n\n" + + DEVICE_UPDATE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN + "\n\n" + + "- Credentials type: \"X509\" with device profile ID below: \n\n" + + "Note: credentialsId - format Sha3Hash, certificateValue - format PEM (with \"--BEGIN CERTIFICATE----\" and -\"----END CERTIFICATE-\").\n\n" + + DEVICE_UPDATE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN + "\n\n" + + "- Credentials type: \"MQTT_BASIC\" with device profile ID below: \n\n" + + DEVICE_UPDATE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN + "\n\n" + + "- You may find the example of LwM2M device and RPK credentials below: \n\n" + + "Note: LwM2M device - only existing device profile ID (Transport configuration -> Transport type: \"LWM2M\".\n\n" + + DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN + "\n\n" + + "Update to real value:\n" + + " - 'id' (this is id of Device Credentials -> \"Get Device Credentials (getDeviceCredentialsByDeviceId)\",\n" + + " - 'deviceId.id' (this is id of Device).\n" + + "Remove 'tenantId' and optionally 'customerId' from the request body example (below) to create new Device entity." + + TENANT_AUTHORITY_PARAGRAPH) @PreAuthorize("hasAuthority('TENANT_ADMIN')") @RequestMapping(value = "/device/credentials", method = RequestMethod.POST) @ResponseBody