diff --git a/application/src/main/data/upgrade/3.5.1/schema_update.sql b/application/src/main/data/upgrade/3.5.1/schema_update.sql
index 58031ce5c0..1655ecb978 100644
--- a/application/src/main/data/upgrade/3.5.1/schema_update.sql
+++ b/application/src/main/data/upgrade/3.5.1/schema_update.sql
@@ -53,6 +53,69 @@ $$;
-- NOTIFICATION CONFIGS VERSION CONTROL END
+-- EDGE EVENTS MIGRATION START
+DO
+$$
+ DECLARE table_partition RECORD;
+ BEGIN
+ -- in case of running the upgrade script a second time:
+ IF NOT (SELECT exists(SELECT FROM pg_tables WHERE tablename = 'old_edge_event')) THEN
+ ALTER TABLE edge_event RENAME TO old_edge_event;
+ CREATE INDEX IF NOT EXISTS idx_old_edge_event_created_time_tmp ON old_edge_event(created_time);
+ ALTER INDEX IF EXISTS idx_edge_event_tenant_id_and_created_time RENAME TO idx_old_edge_event_tenant_id_and_created_time;
+
+ FOR table_partition IN SELECT tablename AS name, split_part(tablename, '_', 3) AS partition_ts
+ FROM pg_tables WHERE tablename LIKE 'edge_event_%'
+ LOOP
+ EXECUTE format('ALTER TABLE %s RENAME TO old_edge_event_%s', table_partition.name, table_partition.partition_ts);
+ END LOOP;
+ ELSE
+ RAISE NOTICE 'Table old_edge_event already exists, leaving as is';
+ END IF;
+ END;
+$$;
+
+CREATE TABLE IF NOT EXISTS edge_event (
+ seq_id INT GENERATED ALWAYS AS IDENTITY,
+ id uuid NOT NULL,
+ created_time bigint NOT NULL,
+ edge_id uuid,
+ edge_event_type varchar(255),
+ edge_event_uid varchar(255),
+ entity_id uuid,
+ edge_event_action varchar(255),
+ body varchar(10000000),
+ tenant_id uuid,
+ ts bigint NOT NULL
+) PARTITION BY RANGE (created_time);
+CREATE INDEX IF NOT EXISTS idx_edge_event_tenant_id_and_created_time ON edge_event(tenant_id, created_time DESC);
+CREATE INDEX IF NOT EXISTS idx_edge_event_id ON edge_event(id);
+ALTER TABLE IF EXISTS edge_event ALTER COLUMN seq_id SET CYCLE;
+
+CREATE OR REPLACE PROCEDURE migrate_edge_event(IN start_time_ms BIGINT, IN end_time_ms BIGINT, IN partition_size_ms BIGINT)
+ LANGUAGE plpgsql AS
+$$
+DECLARE
+ p RECORD;
+ partition_end_ts BIGINT;
+BEGIN
+ FOR p IN SELECT DISTINCT (created_time - created_time % partition_size_ms) AS partition_ts FROM old_edge_event
+ WHERE created_time >= start_time_ms AND created_time < end_time_ms
+ LOOP
+ partition_end_ts = p.partition_ts + partition_size_ms;
+ RAISE NOTICE '[edge_event] Partition to create : [%-%]', p.partition_ts, partition_end_ts;
+ EXECUTE format('CREATE TABLE IF NOT EXISTS edge_event_%s PARTITION OF edge_event ' ||
+ 'FOR VALUES FROM ( %s ) TO ( %s )', p.partition_ts, p.partition_ts, partition_end_ts);
+ END LOOP;
+
+ INSERT INTO edge_event (id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts)
+ SELECT id, created_time, edge_id, edge_event_type, edge_event_uid, entity_id, edge_event_action, body, tenant_id, ts
+ FROM old_edge_event
+ WHERE created_time >= start_time_ms AND created_time < end_time_ms;
+END;
+$$;
+-- EDGE EVENTS MIGRATION END
+
ALTER TABLE resource
ADD COLUMN IF NOT EXISTS etag varchar;
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index fb6fbbdff2..1461654216 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -73,10 +73,14 @@ public class AppActor extends ContextAwareActor {
@Override
protected boolean doProcess(TbActorMsg msg) {
if (!ruleChainsInitialized) {
- initTenantActors();
- ruleChainsInitialized = true;
- if (msg.getMsgType() != MsgType.APP_INIT_MSG && msg.getMsgType() != MsgType.PARTITION_CHANGE_MSG) {
- log.warn("Rule Chains initialized by unexpected message: {}", msg);
+ if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {
+ initTenantActors();
+ ruleChainsInitialized = true;
+ } else {
+ if (!msg.getMsgType().isIgnoreOnStart()) {
+ log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);
+ }
+ return true;
}
}
switch (msg.getMsgType()) {
diff --git a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java
index 1cb794c2ec..a6a49f6b3c 100644
--- a/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java
+++ b/application/src/main/java/org/thingsboard/server/controller/ControllerConstants.java
@@ -207,42 +207,222 @@ public class ControllerConstants {
protected static final String IS_BOOTSTRAP_SERVER_PARAM_DESCRIPTION = "A Boolean value representing the Server SecurityInfo for future Bootstrap client mode settings. Values: 'true' for Bootstrap Server; 'false' for Lwm2m Server. ";
- protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION =
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION =
"{\n" +
" \"device\": {\n" +
- " \"name\": \"LwRpk00000000\",\n" +
- " \"type\": \"lwm2mProfileRpk\"\n" +
- " },\n" +
+ " \"name\":\"Name_DeviceWithCredantial_AccessToken\",\n" +
+ " \"label\":\"Label_DeviceWithCredantial_AccessToken\",\n" +
+ " \"deviceProfileId\":{\n" +
+ " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" +
+ " \"entityType\":\"DEVICE_PROFILE\"\n" +
+ " }\n" +
+ " },\n" +
" \"credentials\": {\n" +
- " \"id\": \"null\",\n" +
- " \"createdTime\": 0,\n" +
- " \"deviceId\": \"null\",\n" +
- " \"credentialsType\": \"LWM2M_CREDENTIALS\",\n" +
- " \"credentialsId\": \"LwRpk00000000\",\n" +
- " \"credentialsValue\": {\n" +
- " \"client\": {\n" +
- " \"endpoint\": \"LwRpk00000000\",\n" +
- " \"securityConfigClientMode\": \"RPK\",\n" +
- " \"key\": \"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\"\n" +
- " },\n" +
- " \"bootstrap\": {\n" +
- " \"bootstrapServer\": {\n" +
- " \"securityMode\": \"RPK\",\n" +
- " \"clientPublicKeyOrId\": \"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\",\n" +
- " \"clientSecretKey\": \"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\"\n" +
- " },\n" +
- " \"lwm2mServer\": {\n" +
- " \"securityMode\": \"RPK\",\n" +
- " \"clientPublicKeyOrId\": \"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\",\n" +
- " \"clientSecretKey\": \"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\"\n" +
- " }\n" +
- " }\n" +
- " }\n" +
- " }\n" +
+ " \"credentialsType\": \"ACCESS_TOKEN\",\n" +
+ " \"credentialsId\": \"6hmxew8pmmzng4e3une2\"\n" +
+ " }\n" +
"}";
- protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION_MARKDOWN =
- MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+ protected static final String DEVICE_UPDATE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION =
+ "{\n" +
+ " \"id\": {\n" +
+ " \"id\":\"c886a090-168d-11ee-87c9-6f157dbc816a\"\n" +
+ " },\n" +
+ " \"deviceId\": {\n" +
+ " \"id\":\"c5fb3ac0-168d-11ee-87c9-6f157dbc816a\",\n" +
+ " \"entityType\":\"DEVICE\"\n" +
+ " },\n" +
+ " \"credentialsType\": \"ACCESS_TOKEN\",\n" +
+ " \"credentialsId\": \"6hmxew8pmmzng4e3une4\"\n" +
+ "}";
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_DEFAULT_PARAM_DESCRIPTION =
+ "{\n" +
+ " \"device\": {\n" +
+ " \"name\":\"Name_DeviceWithCredantial_AccessToken_Default\",\n" +
+ " \"label\":\"Label_DeviceWithCredantial_AccessToken_Default\",\n" +
+ " \"type\": \"default\"\n" +
+ " },\n" +
+ " \"credentials\": {\n" +
+ " \"credentialsType\": \"ACCESS_TOKEN\",\n" +
+ " \"credentialsId\": \"6hmxew8pmmzng4e3une3\"\n" +
+ " }\n" +
+ "}";
+
+ protected static final String certificateValue = "\"-----BEGIN CERTIFICATE----- " +
+ "MIICMTCCAdegAwIBAgIUI9dBuwN6pTtK6uZ03rkiCwV4wEYwCgYIKoZIzj0EAwIwbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlQ2VydGlmaWNhdGVAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MB4XDTIzMDMyOTE0NTYxN1oXDTI0MDMyODE0NTYxN1owbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlQ2VydGlmaWNhdGVAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE9Zo791qKQiGNBm11r4ZGxh+w+ossZL3xc46ufq5QckQHP7zkD2XDAcmP5GvdkM1sBFN9AWaCkQfNnWmfERsOOKNTMFEwHQYDVR0OBBYEFFFc5uyCyglQoZiKhzXzMcQ3BKORMB8GA1UdIwQYMBaAFFFc5uyCyglQoZiKhzXzMcQ3BKORMA8GA1UdEwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSAAwRQIhANbA9CuhoOifZMMmqkpuld+65CR+ItKdXeRAhLMZuccuAiB0FSQB34zMutXrZj1g8Gl5OkE7YryFHbei1z0SveHR8g== " +
+ "-----END CERTIFICATE-----\"";
+
+ protected static final String certificateId = "\"84f5911765abba1f96bf4165604e9e90338fc6214081a8e623b6ff9669aedb27\"";
+
+ protected static final String certificateValueUpdate = "\"-----BEGIN CERTIFICATE----- " +
+ "MIICMTCCAdegAwIBAgIUUEKxS9hTz4l+oLUMF0LV6TC/gCIwCgYIKoZIzj0EAwIwbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlUHJvZmlsZUNlcnRAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MB4XDTIzMDMyOTE0NTczNloXDTI0MDMyODE0NTczNlowbjELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE5ldyBZb3JrMRowGAYDVQQKDBFUaGluZ3NCb2FyZCwgSW5jLjEwMC4GA1UEAwwnZGV2aWNlUHJvZmlsZUNlcnRAWDUwOVByb3Zpc2lvblN0cmF0ZWd5MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAECMlWO72krDoUL9FQjUmSCetkhaEGJUfQkdSfkLSNa0GyAEIMbfmzI4zITeapunu4rGet3EMyLydQzuQanBicp6NTMFEwHQYDVR0OBBYEFHpZ78tPnztNii4Da/yCw6mhEIL3MB8GA1UdIwQYMBaAFHpZ78tPnztNii4Da/yCw6mhEIL3MA8GA1UdEwEB/wQFMAMBAf8wCgYIKoZIzj0EAwIDSAAwRQIgJ7qyMFqNcwSYkH6o+UlQXzLWfwZbNjVk+aR7foAZNGsCIQDsd7v3WQIGHiArfZeDs1DLEDuV/2h6L+ZNoGNhEKL+1A== " +
+ "-----END CERTIFICATE-----\"";
+
+ protected static final String certificateIdUpdate = "\"6b8adb49015500e51a527acd332b51684ab9b49b4ade03a9582a44c455e2e9b6\"";
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION =
+ "{\n" +
+ " \"device\": {\n" +
+ " \"name\":\"Name_DeviceWithCredantial_X509_Certificate\",\n" +
+ " \"label\":\"Label_DeviceWithCredantial_X509_Certificate\",\n" +
+ " \"deviceProfileId\":{\n" +
+ " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" +
+ " \"entityType\":\"DEVICE_PROFILE\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"credentials\": {\n" +
+ " \"credentialsType\": \"X509_CERTIFICATE\",\n" +
+ " \"credentialsId\": " + certificateId + ",\n" +
+ " \"credentialsValue\": " + certificateValue + "\n" +
+ " }\n" +
+ "}";
+
+ protected static final String DEVICE_UPDATE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION =
+ "{\n" +
+ " \"id\": {\n" +
+ " \"id\":\"309bd9c0-14f4-11ee-9fc9-d9b7463abb63\"\n" +
+ " },\n" +
+ " \"deviceId\": {\n" +
+ " \"id\":\"3092b200-14f4-11ee-9fc9-d9b7463abb63\",\n" +
+ " \"entityType\":\"DEVICE\"\n" +
+ " },\n" +
+ " \"credentialsType\": \"X509_CERTIFICATE\",\n" +
+ " \"credentialsId\": " + certificateIdUpdate + ",\n" +
+ " \"credentialsValue\": " + certificateValueUpdate + "\n" +
+ "}";
+
+ protected static final String MQTT_BASIC_VALUE = "\"{\\\"clientId\\\":\\\"5euh5nzm34bjjh1efmlt\\\",\\\"userName\\\":\\\"onasd1lgwasmjl7v2v7h\\\",\\\"password\\\":\\\"b9xtm4ny8kt9zewaga5o\\\"}\"";
+
+ protected static final String MQTT_BASIC_VALUE_UPDATE = "\"{\\\"clientId\\\":\\\"juy03yv4owqxcmqhqtvk\\\",\\\"userName\\\":\\\"ov19fxca0cyjn7lm7w7u\\\",\\\"password\\\":\\\"twy94he114dfi9usyk1o\\\"}\"";
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION =
+ "{\n" +
+ " \"device\": {\n" +
+ " \"name\":\"Name_DeviceWithCredantial_MQTT_Basic\",\n" +
+ " \"label\":\"Label_DeviceWithCredantial_MQTT_Basic\",\n" +
+ " \"deviceProfileId\":{\n" +
+ " \"id\":\"9d9588c0-06c9-11ee-b618-19be30fdeb60\",\n" +
+ " \"entityType\":\"DEVICE_PROFILE\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"credentials\": {\n" +
+ " \"credentialsType\": \"MQTT_BASIC\",\n" +
+ " \"credentialsValue\": " + MQTT_BASIC_VALUE + "\n" +
+ " }\n" +
+ "}";
+
+ protected static final String DEVICE_UPDATE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION =
+ "{\n" +
+ " \"id\": {\n" +
+ " \"id\":\"d877ffb0-14f5-11ee-9fc9-d9b7463abb63\"\n" +
+ " },\n" +
+ " \"deviceId\": {\n" +
+ " \"id\":\"d875dcd0-14f5-11ee-9fc9-d9b7463abb63\",\n" +
+ " \"entityType\":\"DEVICE\"\n" +
+ " },\n" +
+ " \"credentialsType\": \"MQTT_BASIC\",\n" +
+ " \"credentialsValue\": " + MQTT_BASIC_VALUE_UPDATE + "\n" +
+ "}";
+
+ protected static final String CREDENTIALS_VALUE_LVM2M_RPK_DESCRIPTION =
+ " \"{" +
+ "\\\"client\\\":{ " +
+ "\\\"endpoint\\\":\\\"LwRpk00000000\\\", " +
+ "\\\"securityConfigClientMode\\\":\\\"RPK\\\", " +
+ "\\\"key\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\"" +
+ " }, " +
+ "\\\"bootstrap\\\":{ " +
+ "\\\"bootstrapServer\\\":{ " +
+ "\\\"securityMode\\\":\\\"RPK\\\", " +
+ "\\\"clientPublicKeyOrId\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\", " +
+ "\\\"clientSecretKey\\\":\\\"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\\\"" +
+ "}, " +
+ "\\\"lwm2mServer\\\":{ \\\"securityMode\\\":\\\"RPK\\\", " +
+ "\\\"clientPublicKeyOrId\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\", " +
+ "\\\"clientSecretKey\\\":\\\"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\\\"" +
+ "}" +
+ "} " +
+ "}\"";
+
+ protected static final String CREDENTIALS_VALUE_UPDATE_LVM2M_RPK_DESCRIPTION =
+ " \"{" +
+ "\\\"client\\\":{ " +
+ "\\\"endpoint\\\":\\\"LwRpk00000000\\\", " +
+ "\\\"securityConfigClientMode\\\":\\\"RPK\\\", " +
+ "\\\"key\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEdvBZZ2vQRK9wgDhctj6B1c7bxR3Z0wYg1+YdoYFnVUKWb+rIfTTyYK9tmQJx5Vlb5fxdLnVv1RJOPiwsLIQbAA==\\\"" +
+ " }, " +
+ "\\\"bootstrap\\\":{ " +
+ "\\\"bootstrapServer\\\":{ " +
+ "\\\"securityMode\\\":\\\"RPK\\\", " +
+ "\\\"clientPublicKeyOrId\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\", " +
+ "\\\"clientSecretKey\\\":\\\"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\\\"" +
+ "}, " +
+ "\\\"lwm2mServer\\\":{ \\\"securityMode\\\":\\\"RPK\\\", " +
+ "\\\"clientPublicKeyOrId\\\":\\\"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEUEBxNl/RcYJNm8mk91CyVXoIJiROYDlXcSSqK6e5bDHwOW4ZiN2lNnXalyF0Jxw8MbAytnDMERXyAja5VEMeVQ==\\\", " +
+ "\\\"clientSecretKey\\\":\\\"MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgd9GAx7yZW37autew5KZykn4IgRpge/tZSjnudnZJnMahRANCAARQQHE2X9Fxgk2byaT3ULJVeggmJE5gOVdxJKorp7lsMfA5bhmI3aU2ddqXIXQnHDwxsDK2cMwRFfICNrlUQx5V\\\"" +
+ "}" +
+ "} " +
+ "}\"";
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION =
+ "{\n" +
+ " \"device\": {\n" +
+ " \"name\":\"Name_LwRpk00000000\",\n" +
+ " \"label\":\"Label_LwRpk00000000\",\n" +
+ " \"deviceProfileId\":{\n" +
+ " \"id\":\"a660bd50-10ef-11ee-8737-b5634e73c779\",\n" +
+ " \"entityType\":\"DEVICE_PROFILE\"\n" +
+ " }\n" +
+ " },\n" +
+ " \"credentials\": {\n" +
+ " \"credentialsType\": \"LWM2M_CREDENTIALS\",\n" +
+ " \"credentialsId\": \"LwRpk00000000\",\n" +
+ " \"credentialsValue\":\n" + CREDENTIALS_VALUE_LVM2M_RPK_DESCRIPTION + "\n" +
+ " }\n" +
+ "}";
+
+ protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION =
+ "{\n" +
+ " \"id\": {\n" +
+ " \"id\":\"e238d4d0-1689-11ee-98c6-1713c1be5a8e\"\n" +
+ " },\n" +
+ " \"deviceId\": {\n" +
+ " \"id\":\"e232e160-1689-11ee-98c6-1713c1be5a8e\",\n" +
+ " \"entityType\":\"DEVICE\"\n" +
+ " },\n" +
+ " \"credentialsType\": \"LWM2M_CREDENTIALS\",\n" +
+ " \"credentialsId\": \"LwRpk00000000\",\n" +
+ " \"credentialsValue\":\n" + CREDENTIALS_VALUE_UPDATE_LVM2M_RPK_DESCRIPTION + "\n" +
+ "}";
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_ACCESS_TOKEN_DEFAULT_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
+ protected static final String DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
+ protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_UPDATE_CREDENTIALS_ACCESS_TOKEN_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
+ protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_UPDATE_CREDENTIALS_X509_CERTIFICATE_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
+ protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_UPDATE_CREDENTIALS_MQTT_BASIC_PARAM_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
+ protected static final String DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN =
+ MARKDOWN_CODE_BLOCK_START + DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION + MARKDOWN_CODE_BLOCK_END;
+
protected static final String FILTER_VALUE_TYPE = NEW_LINE + "## Value Type and Operations" + NEW_LINE +
@@ -254,7 +434,7 @@ public class ControllerConstants {
" * 'BOOLEAN' - used for boolean values. Operations: EQUAL, NOT_EQUAL;\n" +
" * 'DATE_TIME' - similar to numeric, transforms value to milliseconds since epoch. Operations: EQUAL, NOT_EQUAL, GREATER, LESS, GREATER_OR_EQUAL, LESS_OR_EQUAL; \n";
- protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_SPECIFIC_TIME_EXAMPLE = MARKDOWN_CODE_BLOCK_START +
+ protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_SPECIFIC_TIME_EXAMPLE = MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"schedule\":{\n" +
" \"type\":\"SPECIFIC_TIME\",\n" +
@@ -269,7 +449,7 @@ public class ControllerConstants {
" }\n" +
"}" +
MARKDOWN_CODE_BLOCK_END;
- protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_CUSTOM_EXAMPLE = MARKDOWN_CODE_BLOCK_START +
+ protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_CUSTOM_EXAMPLE = MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"schedule\":{\n" +
" \"type\":\"CUSTOM\",\n" +
@@ -321,9 +501,9 @@ public class ControllerConstants {
" }\n" +
"}" +
MARKDOWN_CODE_BLOCK_END;
- protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_ALWAYS_EXAMPLE = MARKDOWN_CODE_BLOCK_START + "\"schedule\": null" + MARKDOWN_CODE_BLOCK_END;
+ protected static final String DEVICE_PROFILE_ALARM_SCHEDULE_ALWAYS_EXAMPLE = MARKDOWN_CODE_BLOCK_START + "\"schedule\": null" + MARKDOWN_CODE_BLOCK_END;
- protected static final String DEVICE_PROFILE_ALARM_CONDITION_REPEATING_EXAMPLE = MARKDOWN_CODE_BLOCK_START +
+ protected static final String DEVICE_PROFILE_ALARM_CONDITION_REPEATING_EXAMPLE = MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"spec\":{\n" +
" \"type\":\"REPEATING\",\n" +
@@ -339,7 +519,8 @@ public class ControllerConstants {
" }\n" +
"}" +
MARKDOWN_CODE_BLOCK_END;
- protected static final String DEVICE_PROFILE_ALARM_CONDITION_DURATION_EXAMPLE = MARKDOWN_CODE_BLOCK_START +
+
+ protected static final String DEVICE_PROFILE_ALARM_CONDITION_DURATION_EXAMPLE = MARKDOWN_CODE_BLOCK_START +
"{\n" +
" \"spec\":{\n" +
" \"type\":\"DURATION\",\n" +
diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
index bb34f6d5b2..8be76856a4 100644
--- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java
@@ -93,7 +93,15 @@ import static org.thingsboard.server.controller.ControllerConstants.DEVICE_PROFI
import static org.thingsboard.server.controller.ControllerConstants.DEVICE_SORT_PROPERTY_ALLOWABLE_VALUES;
import static org.thingsboard.server.controller.ControllerConstants.DEVICE_TEXT_SEARCH_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.DEVICE_TYPE_DESCRIPTION;
-import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_UPDATE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_UPDATE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_UPDATE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN;
+import static org.thingsboard.server.controller.ControllerConstants.DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN;
import static org.thingsboard.server.controller.ControllerConstants.EDGE_ASSIGN_ASYNC_FIRST_STEP_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.EDGE_ASSIGN_RECEIVE_STEP_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.EDGE_ID_PARAM_DESCRIPTION;
@@ -182,9 +190,20 @@ public class DeviceController extends BaseController {
@ApiOperation(value = "Create Device (saveDevice) with credentials ",
notes = "Create or update the Device. When creating device, platform generates Device Id as " + UUID_WIKI_LINK +
- "Requires to provide the Device Credentials object as well. Useful to create device and credentials in one request. " +
- "You may find the example of LwM2M device and RPK credentials below: \n\n" +
- DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_DESCRIPTION_MARKDOWN +
+ "Requires to provide the Device Credentials object as well as an existing device profile ID or use \"default\".\n" +
+ "You may find the example of device with different type of credentials below: \n\n" +
+ "- Credentials type: \"Access token\" with device profile ID below: \n\n" +
+ DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN + "\n\n" +
+ "- Credentials type: \"Access token\" with device profile default below: \n\n" +
+ DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_ACCESS_TOKEN_DEFAULT_DESCRIPTION_MARKDOWN + "\n\n" +
+ "- Credentials type: \"X509\" with device profile ID below: \n\n" +
+ "Note: credentialsId - format Sha3Hash, certificateValue - format PEM (with \"--BEGIN CERTIFICATE----\" and -\"----END CERTIFICATE-\").\n\n" +
+ DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN + "\n\n" +
+ "- Credentials type: \"MQTT_BASIC\" with device profile ID below: \n\n" +
+ DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN + "\n\n" +
+ "- You may find the example of LwM2M device and RPK credentials below: \n\n" +
+ "Note: LwM2M device - only existing device profile ID (Transport configuration -> Transport type: \"LWM2M\".\n\n" +
+ DEVICE_WITH_DEVICE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN + "\n\n" +
"Remove 'id', 'tenantId' and optionally 'customerId' from the request body example (below) to create new Device entity. " +
TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH)
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@@ -277,10 +296,27 @@ public class DeviceController extends BaseController {
return tbDeviceService.getDeviceCredentialsByDeviceId(device, getCurrentUser());
}
- @ApiOperation(value = "Update device credentials (updateDeviceCredentials)", notes = "During device creation, platform generates random 'ACCESS_TOKEN' credentials. " +
- "Use this method to update the device credentials. First use 'getDeviceCredentialsByDeviceId' to get the credentials id and value. " +
- "Then use current method to update the credentials type and value. It is not possible to create multiple device credentials for the same device. " +
- "The structure of device credentials id and value is simple for the 'ACCESS_TOKEN' but is much more complex for the 'MQTT_BASIC' or 'LWM2M_CREDENTIALS'." + TENANT_AUTHORITY_PARAGRAPH)
+ @ApiOperation(value = "Update device credentials (updateDeviceCredentials)",
+ notes = "During device creation, platform generates random 'ACCESS_TOKEN' credentials. \" +\n" +
+ "Use this method to update the device credentials. First use 'getDeviceCredentialsByDeviceId' to get the credentials id and value.\n" +
+ "Then use current method to update the credentials type and value. It is not possible to create multiple device credentials for the same device.\n" +
+ "The structure of device credentials id and value is simple for the 'ACCESS_TOKEN' but is much more complex for the 'MQTT_BASIC' or 'LWM2M_CREDENTIALS'.\n" +
+ "You may find the example of device with different type of credentials below: \n\n" +
+ "- Credentials type: \"Access token\" with device ID and with device ID below: \n\n" +
+ DEVICE_UPDATE_CREDENTIALS_PARAM_ACCESS_TOKEN_DESCRIPTION_MARKDOWN + "\n\n" +
+ "- Credentials type: \"X509\" with device profile ID below: \n\n" +
+ "Note: credentialsId - format Sha3Hash, certificateValue - format PEM (with \"--BEGIN CERTIFICATE----\" and -\"----END CERTIFICATE-\").\n\n" +
+ DEVICE_UPDATE_CREDENTIALS_PARAM_X509_CERTIFICATE_DESCRIPTION_MARKDOWN + "\n\n" +
+ "- Credentials type: \"MQTT_BASIC\" with device profile ID below: \n\n" +
+ DEVICE_UPDATE_CREDENTIALS_PARAM_MQTT_BASIC_DESCRIPTION_MARKDOWN + "\n\n" +
+ "- You may find the example of LwM2M device and RPK credentials below: \n\n" +
+ "Note: LwM2M device - only existing device profile ID (Transport configuration -> Transport type: \"LWM2M\".\n\n" +
+ DEVICE_UPDATE_CREDENTIALS_PARAM_LVM2M_RPK_DESCRIPTION_MARKDOWN + "\n\n" +
+ "Update to real value:\n" +
+ " - 'id' (this is id of Device Credentials -> \"Get Device Credentials (getDeviceCredentialsByDeviceId)\",\n" +
+ " - 'deviceId.id' (this is id of Device).\n" +
+ "Remove 'tenantId' and optionally 'customerId' from the request body example (below) to create new Device entity." +
+ TENANT_AUTHORITY_PARAGRAPH)
@PreAuthorize("hasAuthority('TENANT_ADMIN')")
@RequestMapping(value = "/device/credentials", method = RequestMethod.POST)
@ResponseBody
diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeEventController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeEventController.java
index fc85f439e0..386b1eab01 100644
--- a/application/src/main/java/org/thingsboard/server/controller/EdgeEventController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/EdgeEventController.java
@@ -85,6 +85,6 @@ public class EdgeEventController extends BaseController {
EdgeId edgeId = new EdgeId(toUUID(strEdgeId));
checkEdgeId(edgeId, Operation.READ);
TimePageLink pageLink = createTimePageLink(pageSize, page, textSearch, sortProperty, sortOrder, startTime, endTime);
- return checkNotNull(edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false));
+ return checkNotNull(edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink));
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java
index b7334ebf5a..e7214177d2 100644
--- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java
+++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java
@@ -341,7 +341,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
sessionNewEvents.put(edgeId, false);
Futures.addCallback(session.processEdgeEvents(), new FutureCallback<>() {
@Override
- public void onSuccess(Void result) {
+ public void onSuccess(Boolean newEventsAdded) {
+ if (Boolean.TRUE.equals(newEventsAdded)) {
+ sessionNewEvents.put(edgeId, true);
+ }
scheduleEdgeEventsCheck(session);
}
diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
index 1dd0f31c20..4f0f5d277a 100644
--- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
+++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java
@@ -24,6 +24,7 @@ import io.grpc.stub.StreamObserver;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.springframework.data.util.Pair;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EdgeUtils;
import org.thingsboard.server.common.data.edge.Edge;
@@ -35,6 +36,8 @@ import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
+import org.thingsboard.server.common.data.page.SortOrder;
+import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.gen.edge.v1.AlarmUpdateMsg;
import org.thingsboard.server.gen.edge.v1.AttributesRequestMsg;
import org.thingsboard.server.gen.edge.v1.ConnectRequestMsg;
@@ -68,17 +71,15 @@ import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher;
import java.io.Closeable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
@Slf4j
@Data
@@ -89,6 +90,7 @@ public final class EdgeGrpcSession implements Closeable {
private static final int MAX_DOWNLINK_ATTEMPTS = 10; // max number of attemps to send downlink message if edge connected
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
+ private static final String QUEUE_START_SEQ_ID_ATTR_KEY = "queueStartSeqId";
private final UUID sessionId;
private final BiConsumer sessionOpenListener;
@@ -103,6 +105,12 @@ public final class EdgeGrpcSession implements Closeable {
private boolean connected;
private boolean syncCompleted;
+ private Long newStartTs;
+ private Long previousStartTs;
+ private Long newStartSeqId;
+ private Long previousStartSeqId;
+ private Long seqIdEnd;
+
private EdgeVersion edgeVersion;
private int maxInboundMessageSize;
@@ -204,10 +212,10 @@ public final class EdgeGrpcSession implements Closeable {
EdgeEventFetcher next = cursor.getNext();
log.info("[{}][{}] starting sync process, cursor current idx = {}, class = {}",
edge.getTenantId(), edge.getId(), cursor.getCurrentIdx(), next.getClass().getSimpleName());
- ListenableFuture uuidListenableFuture = startProcessingEdgeEvents(next);
- Futures.addCallback(uuidListenableFuture, new FutureCallback<>() {
+ ListenableFuture> future = startProcessingEdgeEvents(next);
+ Futures.addCallback(future, new FutureCallback<>() {
@Override
- public void onSuccess(@Nullable UUID result) {
+ public void onSuccess(@Nullable Pair result) {
doSync(cursor);
}
@@ -307,36 +315,51 @@ public final class EdgeGrpcSession implements Closeable {
sendDownlinkMsg(edgeConfigMsg);
}
- ListenableFuture processEdgeEvents() throws Exception {
- SettableFuture result = SettableFuture.create();
+ ListenableFuture processEdgeEvents() throws Exception {
+ SettableFuture result = SettableFuture.create();
log.trace("[{}] starting processing edge events", this.sessionId);
if (isConnected() && isSyncCompleted()) {
- Long queueStartTs = getQueueStartTs().get();
+ Pair startTsAndSeqId = getQueueStartTsAndSeqId().get();
+ this.previousStartTs = startTsAndSeqId.getFirst();
+ this.previousStartSeqId = startTsAndSeqId.getSecond();
GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher(
- queueStartTs,
+ this.previousStartTs,
+ this.previousStartSeqId,
+ this.seqIdEnd,
+ false,
+ Integer.toUnsignedLong(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount()),
ctx.getEdgeEventService());
- ListenableFuture ifOffsetFuture = startProcessingEdgeEvents(fetcher);
- Futures.addCallback(ifOffsetFuture, new FutureCallback<>() {
+ Futures.addCallback(startProcessingEdgeEvents(fetcher), new FutureCallback<>() {
@Override
- public void onSuccess(@Nullable UUID ifOffset) {
- if (ifOffset != null) {
- Long newStartTs = Uuids.unixTimestamp(ifOffset);
- ListenableFuture> updateFuture = updateQueueStartTs(newStartTs);
+ public void onSuccess(@Nullable Pair newStartTsAndSeqId) {
+ if (newStartTsAndSeqId != null) {
+ ListenableFuture> updateFuture = updateQueueStartTsAndSeqId(newStartTsAndSeqId);
Futures.addCallback(updateFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List list) {
- log.debug("[{}] queue offset was updated [{}][{}]", sessionId, ifOffset, newStartTs);
- result.set(null);
+ log.debug("[{}] queue offset was updated [{}]", sessionId, newStartTsAndSeqId);
+ if (fetcher.isSeqIdNewCycleStarted()) {
+ seqIdEnd = fetcher.getSeqIdEnd();
+ boolean newEventsAvailable = isNewEdgeEventsAvailable();
+ result.set(newEventsAvailable);
+ } else {
+ seqIdEnd = null;
+ boolean newEventsAvailable = isSeqIdStartedNewCycle();
+ if (!newEventsAvailable) {
+ newEventsAvailable = isNewEdgeEventsAvailable();
+ }
+ result.set(newEventsAvailable);
+ }
}
@Override
public void onFailure(Throwable t) {
- log.error("[{}] Failed to update queue offset [{}]", sessionId, ifOffset, t);
+ log.error("[{}] Failed to update queue offset [{}]", sessionId, newStartTsAndSeqId, t);
result.setException(t);
}
}, ctx.getGrpcCallbackExecutorService());
} else {
- log.trace("[{}] ifOffset is null. Skipping iteration without db update", sessionId);
+ log.trace("[{}] newStartTsAndSeqId is null. Skipping iteration without db update", sessionId);
result.set(null);
}
}
@@ -354,14 +377,14 @@ public final class EdgeGrpcSession implements Closeable {
return result;
}
- private ListenableFuture startProcessingEdgeEvents(EdgeEventFetcher fetcher) {
- SettableFuture result = SettableFuture.create();
+ private ListenableFuture> startProcessingEdgeEvents(EdgeEventFetcher fetcher) {
+ SettableFuture> result = SettableFuture.create();
PageLink pageLink = fetcher.getPageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount());
processEdgeEvents(fetcher, pageLink, result);
return result;
}
- private void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture result) {
+ private void processEdgeEvents(EdgeEventFetcher fetcher, PageLink pageLink, SettableFuture> result) {
try {
PageData pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge, pageLink);
if (isConnected() && !pageData.getData().isEmpty()) {
@@ -377,8 +400,15 @@ public final class EdgeGrpcSession implements Closeable {
if (isConnected() && pageData.hasNext()) {
processEdgeEvents(fetcher, pageLink.nextPageLink(), result);
} else {
- UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
- result.set(ifOffset);
+ EdgeEvent latestEdgeEvent = pageData.getData().get(pageData.getData().size() - 1);
+ UUID idOffset = latestEdgeEvent.getUuidId();
+ if (idOffset != null) {
+ Long newStartTs = Uuids.unixTimestamp(idOffset);
+ long newStartSeqId = latestEdgeEvent.getSeqId();
+ result.set(Pair.of(newStartTs, newStartSeqId));
+ } else {
+ result.set(null);
+ }
}
}
}
@@ -461,69 +491,113 @@ public final class EdgeGrpcSession implements Closeable {
}
}
- private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) {
- log.trace("[{}][{}] converting edge event to downlink msg [{}]", edge.getTenantId(), this.sessionId, edgeEvent);
- DownlinkMsg downlinkMsg = null;
- try {
- switch (edgeEvent.getAction()) {
- case UPDATED:
- case ADDED:
- case DELETED:
- case ASSIGNED_TO_EDGE:
- case UNASSIGNED_FROM_EDGE:
- case ALARM_ACK:
- case ALARM_CLEAR:
- case CREDENTIALS_UPDATED:
- case RELATION_ADD_OR_UPDATE:
- case RELATION_DELETED:
- case ASSIGNED_TO_CUSTOMER:
- case UNASSIGNED_FROM_CUSTOMER:
- case CREDENTIALS_REQUEST:
- case RPC_CALL:
- downlinkMsg = convertEntityEventToDownlink(edgeEvent);
- log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg);
- break;
- case ATTRIBUTES_UPDATED:
- case POST_ATTRIBUTES:
- case ATTRIBUTES_DELETED:
- case TIMESERIES_UPDATED:
- downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent);
- break;
- default:
- log.warn("[{}][{}] Unsupported action type [{}]", edge.getTenantId(), this.sessionId, edgeEvent.getAction());
+ private List convertToDownlinkMsgsPack(List edgeEvents) {
+ List result = new ArrayList<>();
+ for (EdgeEvent edgeEvent : edgeEvents) {
+ log.trace("[{}][{}] converting edge event to downlink msg [{}]", edge.getTenantId(), this.sessionId, edgeEvent);
+ DownlinkMsg downlinkMsg = null;
+ try {
+ switch (edgeEvent.getAction()) {
+ case UPDATED:
+ case ADDED:
+ case DELETED:
+ case ASSIGNED_TO_EDGE:
+ case UNASSIGNED_FROM_EDGE:
+ case ALARM_ACK:
+ case ALARM_CLEAR:
+ case CREDENTIALS_UPDATED:
+ case RELATION_ADD_OR_UPDATE:
+ case RELATION_DELETED:
+ case CREDENTIALS_REQUEST:
+ case RPC_CALL:
+ case ASSIGNED_TO_CUSTOMER:
+ case UNASSIGNED_FROM_CUSTOMER:
+ downlinkMsg = convertEntityEventToDownlink(edgeEvent);
+ log.trace("[{}][{}] entity message processed [{}]", edgeEvent.getTenantId(), this.sessionId, downlinkMsg);
+ break;
+ case ATTRIBUTES_UPDATED:
+ case POST_ATTRIBUTES:
+ case ATTRIBUTES_DELETED:
+ case TIMESERIES_UPDATED:
+ downlinkMsg = ctx.getTelemetryProcessor().convertTelemetryEventToDownlink(edgeEvent);
+ break;
+ default:
+ log.warn("[{}][{}] Unsupported action type [{}]", edge.getTenantId(), this.sessionId, edgeEvent.getAction());
+ }
+ } catch (Exception e) {
+ log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e);
+ }
+ if (downlinkMsg != null) {
+ result.add(downlinkMsg);
+ }
+ }
+ return result;
+ }
+
+ private ListenableFuture> getQueueStartTsAndSeqId() {
+ ListenableFuture> future =
+ ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, Arrays.asList(QUEUE_START_TS_ATTR_KEY, QUEUE_START_SEQ_ID_ATTR_KEY));
+ return Futures.transform(future, attributeKvEntries -> {
+ long startTs = 0L;
+ long startSeqId = 0L;
+ for (AttributeKvEntry attributeKvEntry : attributeKvEntries) {
+ if (QUEUE_START_TS_ATTR_KEY.equals(attributeKvEntry.getKey())) {
+ startTs = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
+ }
+ if (QUEUE_START_SEQ_ID_ATTR_KEY.equals(attributeKvEntry.getKey())) {
+ startSeqId = attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
+ }
+ }
+ if (startSeqId == 0L) {
+ startSeqId = findStartSeqIdFromOldestEventIfAny();
}
+ return Pair.of(startTs, startSeqId);
+ }, ctx.getGrpcCallbackExecutorService());
+ }
+
+ private boolean isSeqIdStartedNewCycle() {
+ try {
+ TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis());
+ PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), 0L, this.previousStartSeqId == 0 ? null : this.previousStartSeqId - 1, pageLink);
+ return !edgeEvents.getData().isEmpty();
} catch (Exception e) {
- log.error("[{}][{}] Exception during converting edge event to downlink msg", edge.getTenantId(), this.sessionId, e);
+ log.error("[{}][{}][{}] Failed to execute isSeqIdStartedNewCycle", edge.getTenantId(), edge.getId(), sessionId, e);
}
- return downlinkMsg;
+ return false;
}
- private List convertToDownlinkMsgsPack(List edgeEvents) {
- return edgeEvents
- .stream()
- .map(this::convertToDownlinkMsg)
- .filter(Objects::nonNull)
- .collect(Collectors.toList());
+ private boolean isNewEdgeEventsAvailable() {
+ try {
+ TimePageLink pageLink = new TimePageLink(ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), 0, null, null, this.newStartTs, System.currentTimeMillis());
+ PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), this.newStartSeqId, null, pageLink);
+ return !edgeEvents.getData().isEmpty();
+ } catch (Exception e) {
+ log.error("[{}][{}][{}] Failed to execute isNewEdgeEventsAvailable", edge.getTenantId(), edge.getId(), sessionId, e);
+ }
+ return false;
}
- private ListenableFuture getQueueStartTs() {
- ListenableFuture> future =
- ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY);
- return Futures.transform(future, attributeKvEntryOpt -> {
- if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
- AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();
- return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
- } else {
- return 0L;
+ private long findStartSeqIdFromOldestEventIfAny() {
+ long startSeqId = 0L;
+ try {
+ TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime"), null, null);
+ PageData edgeEvents = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), null, null, pageLink);
+ if (!edgeEvents.getData().isEmpty()) {
+ startSeqId = edgeEvents.getData().get(0).getSeqId() - 1;
}
- }, ctx.getGrpcCallbackExecutorService());
+ } catch (Exception e) {
+ log.error("[{}][{}][{}] Failed to execute findStartSeqIdFromOldestEventIfAny", edge.getTenantId(), edge.getId(), sessionId, e);
+ }
+ return startSeqId;
}
- private ListenableFuture> updateQueueStartTs(Long newStartTs) {
- log.trace("[{}] updating QueueStartTs [{}][{}]", this.sessionId, edge.getId(), newStartTs);
- List attributes = Collections.singletonList(
- new BaseAttributeKvEntry(
- new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()));
+ private ListenableFuture> updateQueueStartTsAndSeqId(Pair pair) {
+ this.newStartTs = pair.getFirst();
+ this.newStartSeqId = pair.getSecond();
+ log.trace("[{}] updateQueueStartTsAndSeqId [{}][{}][{}]", this.sessionId, edge.getId(), this.newStartTs, this.newStartSeqId);
+ List attributes = Arrays.asList(
+ new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, this.newStartTs), System.currentTimeMillis()),
+ new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_SEQ_ID_ATTR_KEY, this.newStartSeqId), System.currentTimeMillis()));
return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
}
@@ -693,8 +767,11 @@ public final class EdgeGrpcSession implements Closeable {
}
private void interruptPreviousSendDownlinkMsgsTask() {
- log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId);
- stopCurrentSendDownlinkMsgsTask(true);
+ if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()
+ || sessionState.getScheduledSendDownlinkTask() != null && !sessionState.getScheduledSendDownlinkTask().isCancelled()) {
+ log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId);
+ stopCurrentSendDownlinkMsgsTask(true);
+ }
}
private void interruptGeneralProcessingOnSync() {
diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmMsgConstructor.java
index 69a83da0a0..447a73e5cf 100644
--- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmMsgConstructor.java
+++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/AlarmMsgConstructor.java
@@ -18,7 +18,10 @@ package org.thingsboard.server.service.edge.rpc.constructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.alarm.Alarm;
+import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityViewId;
@@ -47,13 +50,22 @@ public class AlarmMsgConstructor {
String entityName = null;
switch (alarm.getOriginator().getEntityType()) {
case DEVICE:
- entityName = deviceService.findDeviceById(tenantId, new DeviceId(alarm.getOriginator().getId())).getName();
+ Device deviceById = deviceService.findDeviceById(tenantId, new DeviceId(alarm.getOriginator().getId()));
+ if (deviceById != null) {
+ entityName = deviceById.getName();
+ }
break;
case ASSET:
- entityName = assetService.findAssetById(tenantId, new AssetId(alarm.getOriginator().getId())).getName();
+ Asset assetById = assetService.findAssetById(tenantId, new AssetId(alarm.getOriginator().getId()));
+ if (assetById != null) {
+ entityName = assetById.getName();
+ }
break;
case ENTITY_VIEW:
- entityName = entityViewService.findEntityViewById(tenantId, new EntityViewId(alarm.getOriginator().getId())).getName();
+ EntityView entityViewById = entityViewService.findEntityViewById(tenantId, new EntityViewId(alarm.getOriginator().getId()));
+ if (entityViewById != null) {
+ entityName = entityViewById.getName();
+ }
break;
}
AlarmUpdateMsg.Builder builder = AlarmUpdateMsg.newBuilder()
diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java
index 327184e6a9..24008ece09 100644
--- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java
+++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java
@@ -16,19 +16,27 @@
package org.thingsboard.server.service.edge.rpc.fetch;
import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
-import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.dao.edge.EdgeEventService;
@AllArgsConstructor
+@Slf4j
public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
private final Long queueStartTs;
+ private Long seqIdStart;
+ @Getter
+ private Long seqIdEnd;
+ @Getter
+ private boolean seqIdNewCycleStarted;
+ private Long maxReadRecordsCount;
private final EdgeEventService edgeEventService;
@Override
@@ -37,13 +45,32 @@ public class GeneralEdgeEventFetcher implements EdgeEventFetcher {
pageSize,
0,
null,
- new SortOrder("createdTime", SortOrder.Direction.ASC),
+ null,
queueStartTs,
- null);
+ System.currentTimeMillis());
}
@Override
public PageData fetchEdgeEvents(TenantId tenantId, Edge edge, PageLink pageLink) {
- return edgeEventService.findEdgeEvents(tenantId, edge.getId(), (TimePageLink) pageLink, true);
+ try {
+ PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), seqIdStart, seqIdEnd, (TimePageLink) pageLink);
+ if (edgeEvents.getData().isEmpty()) {
+ this.seqIdEnd = Math.max(this.maxReadRecordsCount, seqIdStart - this.maxReadRecordsCount);
+ edgeEvents = edgeEventService.findEdgeEvents(tenantId, edge.getId(), 0L, seqIdEnd, (TimePageLink) pageLink);
+ if (edgeEvents.getData().stream().anyMatch(ee -> ee.getSeqId() < seqIdStart)) {
+ log.info("[{}] seqId column of edge_event table started new cycle [{}]", tenantId, edge.getId());
+ this.seqIdNewCycleStarted = true;
+ this.seqIdStart = 0L;
+ } else {
+ edgeEvents = new PageData<>();
+ log.warn("[{}] unexpected edge notification message received. " +
+ "no new events found and seqId column of edge_event table doesn't started new cycle [{}]", tenantId, edge.getId());
+ }
+ }
+ return edgeEvents;
+ } catch (Exception e) {
+ log.error("[{}] failed to find edge events [{}]", tenantId, edge.getId());
+ }
+ return new PageData<>();
}
}
diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
index f832b04c21..dafccfdad6 100644
--- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
+++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
@@ -202,22 +202,27 @@ public class DefaultDataUpdateService implements DataUpdateService {
} else {
log.info("Skipping audit logs migration");
}
- boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false);
- if (!skipEdgeEventsMigration) {
- log.info("Starting edge events migration. Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true");
- edgeEventDao.migrateEdgeEvents();
- } else {
- log.info("Skipping edge events migration");
- }
+ migrateEdgeEvents("Starting edge events migration. ");
break;
case "3.5.1":
log.info("Updating data from version 3.5.1 to 3.5.2 ...");
+ migrateEdgeEvents("Starting edge events migration - adding seq_id column. ");
break;
default:
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
}
}
+ private void migrateEdgeEvents(String logPrefix) {
+ boolean skipEdgeEventsMigration = getEnv("TB_SKIP_EDGE_EVENTS_MIGRATION", false);
+ if (!skipEdgeEventsMigration) {
+ log.info(logPrefix + "Can be skipped with TB_SKIP_EDGE_EVENTS_MIGRATION env variable set to true");
+ edgeEventDao.migrateEdgeEvents();
+ } else {
+ log.info("Skipping edge events migration");
+ }
+ }
+
@Override
public void upgradeRuleNodes() {
try {
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
index f8f6a7d25f..51f4d5283f 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
@@ -259,7 +259,21 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
}
void launchConsumer(TbQueueConsumer> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
- consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
+ if (isReady) {
+ consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
+ } else {
+ scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix);
+ }
+ }
+
+ private void scheduleLaunchConsumer(TbQueueConsumer> consumer, Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
+ repartitionExecutor.schedule(() -> {
+ if (isReady) {
+ consumersExecutor.execute(() -> consumerLoop(consumer, configuration, stats, threadSuffix));
+ } else {
+ scheduleLaunchConsumer(consumer, configuration, stats, threadSuffix);
+ }
+ }, 10, TimeUnit.SECONDS);
}
void consumerLoop(TbQueueConsumer> consumer, org.thingsboard.server.common.data.queue.Queue configuration, TbRuleEngineConsumerStats stats, String threadSuffix) {
diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java
index 2d517a2213..b59086a350 100644
--- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java
+++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java
@@ -68,7 +68,7 @@ public abstract class AbstractConsumerService {
+
+ public ResourceInfoCaffeineCache(CacheManager cacheManager) {
+ super(cacheManager, CacheConstants.RESOURCE_INFO_CACHE);
+ }
+
+}
diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoEvictEvent.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoEvictEvent.java
new file mode 100644
index 0000000000..11272a5e24
--- /dev/null
+++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoEvictEvent.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright © 2016-2023 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.cache.resourceInfo;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.TbResourceId;
+import org.thingsboard.server.common.data.id.TenantId;
+
+@Data
+public class ResourceInfoEvictEvent {
+ private final TenantId tenantId;
+ private final TbResourceId resourceId;
+}
diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoRedisCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoRedisCache.java
new file mode 100644
index 0000000000..fee14e1ca1
--- /dev/null
+++ b/common/cache/src/main/java/org/thingsboard/server/cache/resourceInfo/ResourceInfoRedisCache.java
@@ -0,0 +1,35 @@
+/**
+ * Copyright © 2016-2023 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.cache.resourceInfo;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.stereotype.Service;
+import org.thingsboard.server.cache.CacheSpecsMap;
+import org.thingsboard.server.cache.RedisTbTransactionalCache;
+import org.thingsboard.server.cache.TBRedisCacheConfiguration;
+import org.thingsboard.server.cache.TbFSTRedisSerializer;
+import org.thingsboard.server.common.data.CacheConstants;
+import org.thingsboard.server.common.data.TbResourceInfo;
+
+@ConditionalOnProperty(prefix = "cache", value = "type", havingValue = "redis")
+@Service("ResourceInfoCache")
+public class ResourceInfoRedisCache extends RedisTbTransactionalCache {
+
+ public ResourceInfoRedisCache(TBRedisCacheConfiguration configuration, CacheSpecsMap cacheSpecsMap, RedisConnectionFactory connectionFactory) {
+ super(CacheConstants.RESOURCE_INFO_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbFSTRedisSerializer<>());
+ }
+}
diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java
index dcb3a5232a..9055202f4f 100644
--- a/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java
+++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/edge/EdgeEventService.java
@@ -26,7 +26,7 @@ public interface EdgeEventService {
ListenableFuture saveAsync(EdgeEvent edgeEvent);
- PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate);
+ PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink);
/**
* Executes stored procedure to cleanup old edge events.
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
index ff0032f435..f21b13a674 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/CacheConstants.java
@@ -44,4 +44,5 @@ public class CacheConstants {
public static final String USER_SETTINGS_CACHE = "userSettings";
public static final String DASHBOARD_TITLES_CACHE = "dashboardTitles";
public static final String ENTITY_COUNT_CACHE = "entityCount";
+ public static final String RESOURCE_INFO_CACHE = "resourceInfo";
}
diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java
index 71c35f4bd8..3688f5c6c2 100644
--- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java
+++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeEvent.java
@@ -31,6 +31,7 @@ import java.util.UUID;
@ToString(callSuper = true)
public class EdgeEvent extends BaseData {
+ private long seqId;
private TenantId tenantId;
private EdgeId edgeId;
private EdgeEventActionType action;
diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
index 04b5d3fb35..0a39d8d51b 100644
--- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
+++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
@@ -15,6 +15,7 @@
*/
package org.thingsboard.server.common.msg;
+import lombok.Getter;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
@@ -28,7 +29,7 @@ public enum MsgType {
*
* See {@link PartitionChangeMsg}
*/
- PARTITION_CHANGE_MSG,
+ PARTITION_CHANGE_MSG(true),
APP_INIT_MSG,
@@ -108,7 +109,7 @@ public enum MsgType {
* Message that is sent from the Device Actor to Rule Engine. Requires acknowledgement
*/
- SESSION_TIMEOUT_MSG,
+ SESSION_TIMEOUT_MSG(true),
STATS_PERSIST_TICK_MSG,
@@ -130,4 +131,14 @@ public enum MsgType {
EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG,
EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG;
+ @Getter
+ private final boolean ignoreOnStart;
+
+ MsgType() {
+ this.ignoreOnStart = false;
+ }
+
+ MsgType(boolean ignoreOnStart) {
+ this.ignoreOnStart = ignoreOnStart;
+ }
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java
index 0fc451cf18..82058761f8 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/edge/BaseEdgeEventService.java
@@ -42,8 +42,8 @@ public class BaseEdgeEventService implements EdgeEventService {
}
@Override
- public PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) {
- return edgeEventDao.findEdgeEvents(tenantId.getId(), edgeId, pageLink, withTsUpdate);
+ public PageData findEdgeEvents(TenantId tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink) {
+ return edgeEventDao.findEdgeEvents(tenantId.getId(), edgeId, seqIdStart, seqIdEnd, pageLink);
}
@Override
diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java
index 84bf8c40d2..942a536674 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeEventDao.java
@@ -43,10 +43,12 @@ public interface EdgeEventDao extends Dao {
*
* @param tenantId the tenantId
* @param edgeId the edgeId
+ * @param seqIdStart the seq id start
+ * @param seqIdEnd the seq id end
* @param pageLink the pageLink
* @return the event list
*/
- PageData findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate);
+ PageData findEdgeEvents(UUID tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink);
/**
* Executes stored procedure to cleanup old edge events.
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
index 102d5f181e..552f83c749 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
@@ -535,6 +535,7 @@ public class ModelConstants {
*/
public static final String EDGE_EVENT_TABLE_NAME = "edge_event";
public static final String EDGE_EVENT_TENANT_ID_PROPERTY = TENANT_ID_PROPERTY;
+ public static final String EDGE_EVENT_SEQUENTIAL_ID_PROPERTY = "seq_id";
public static final String EDGE_EVENT_EDGE_ID_PROPERTY = "edge_id";
public static final String EDGE_EVENT_TYPE_PROPERTY = "edge_event_type";
public static final String EDGE_EVENT_ACTION_PROPERTY = "edge_event_action";
diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/EdgeEventEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/EdgeEventEntity.java
index 1edc47f197..55a30383d1 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/EdgeEventEntity.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/EdgeEventEntity.java
@@ -43,6 +43,7 @@ import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_BODY_PR
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TABLE_NAME;
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_EDGE_ID_PROPERTY;
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_ENTITY_ID_PROPERTY;
+import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_SEQUENTIAL_ID_PROPERTY;
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TENANT_ID_PROPERTY;
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_TYPE_PROPERTY;
import static org.thingsboard.server.dao.model.ModelConstants.EDGE_EVENT_UID_PROPERTY;
@@ -57,6 +58,9 @@ import static org.thingsboard.server.dao.model.ModelConstants.TS_COLUMN;
@NoArgsConstructor
public class EdgeEventEntity extends BaseSqlEntity implements BaseEntity {
+ @Column(name = EDGE_EVENT_SEQUENTIAL_ID_PROPERTY)
+ protected long seqId;
+
@Column(name = EDGE_EVENT_TENANT_ID_PROPERTY)
private UUID tenantId;
@@ -120,6 +124,7 @@ public class EdgeEventEntity extends BaseSqlEntity implements BaseEnt
edgeEvent.setAction(edgeEventAction);
edgeEvent.setBody(entityBody);
edgeEvent.setUid(edgeEventUid);
+ edgeEvent.setSeqId(seqId);
return edgeEvent;
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java
index 974e3665f1..bc4f47040b 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java
@@ -20,7 +20,11 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.event.TransactionalEventListener;
+import org.thingsboard.server.cache.device.DeviceCacheKey;
+import org.thingsboard.server.cache.resourceInfo.ResourceInfoEvictEvent;
import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.cache.resourceInfo.ResourceInfoCacheKey;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.TbResourceInfo;
@@ -31,6 +35,7 @@ import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
+import org.thingsboard.server.dao.entity.AbstractCachedEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.PaginatedRemover;
@@ -45,7 +50,7 @@ import static org.thingsboard.server.dao.service.Validator.validateId;
@Service("TbResourceDaoService")
@Slf4j
@AllArgsConstructor
-public class BaseResourceService implements ResourceService {
+public class BaseResourceService extends AbstractCachedEntityService implements ResourceService {
public static final String INCORRECT_RESOURCE_ID = "Incorrect resourceId ";
private final TbResourceDao resourceDao;
@@ -55,10 +60,12 @@ public class BaseResourceService implements ResourceService {
@Override
public TbResource saveResource(TbResource resource) {
resourceValidator.validate(resource, TbResourceInfo::getTenantId);
-
try {
- return resourceDao.save(resource.getTenantId(), resource);
+ TbResource saved = resourceDao.save(resource.getTenantId(), resource);
+ publishEvictEvent(new ResourceInfoEvictEvent(resource.getTenantId(), resource.getId()));
+ return saved;
} catch (Exception t) {
+ publishEvictEvent(new ResourceInfoEvictEvent(resource.getTenantId(), resource.getId()));
ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("resource_unq_key")) {
String field = ResourceType.LWM2M_MODEL.equals(resource.getResourceType()) ? "resourceKey" : "fileName";
@@ -86,7 +93,9 @@ public class BaseResourceService implements ResourceService {
public TbResourceInfo findResourceInfoById(TenantId tenantId, TbResourceId resourceId) {
log.trace("Executing findResourceInfoById [{}] [{}]", tenantId, resourceId);
Validator.validateId(resourceId, INCORRECT_RESOURCE_ID + resourceId);
- return resourceInfoDao.findById(tenantId, resourceId.getId());
+
+ return cache.getAndPutInTransaction(new ResourceInfoCacheKey(tenantId, resourceId),
+ () -> resourceInfoDao.findById(tenantId, resourceId.getId()), true);
}
@Override
@@ -169,13 +178,11 @@ public class BaseResourceService implements ResourceService {
}
};
- protected Optional extractConstraintViolationException(Exception t) {
- if (t instanceof ConstraintViolationException) {
- return Optional.of((ConstraintViolationException) t);
- } else if (t.getCause() instanceof ConstraintViolationException) {
- return Optional.of((ConstraintViolationException) (t.getCause()));
- } else {
- return Optional.empty();
+ @TransactionalEventListener(classes = ResourceInfoEvictEvent.class)
+ @Override
+ public void handleEvictEvent(ResourceInfoEvictEvent event) {
+ if (event.getResourceId() != null) {
+ cache.evict(new ResourceInfoCacheKey(event.getTenantId(), event.getResourceId()));
}
}
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeEventRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeEventRepository.java
index c4827f1ffe..c3a9697ad6 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeEventRepository.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/EdgeEventRepository.java
@@ -30,8 +30,10 @@ public interface EdgeEventRepository extends JpaRepository :startTime) " +
+ "AND (:startTime IS NULL OR e.createdTime >= :startTime) " +
"AND (:endTime IS NULL OR e.createdTime <= :endTime) " +
+ "AND (:seqIdStart IS NULL OR e.seqId > :seqIdStart) " +
+ "AND (:seqIdEnd IS NULL OR e.seqId < :seqIdEnd) " +
"AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))"
)
Page findEdgeEventsByTenantIdAndEdgeId(@Param("tenantId") UUID tenantId,
@@ -39,20 +41,7 @@ public interface EdgeEventRepository extends JpaRepository :startTime) " +
- "AND (:endTime IS NULL OR e.createdTime <= :endTime) " +
- "AND e.edgeEventAction <> 'TIMESERIES_UPDATED' " +
- "AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))"
- )
- Page findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(@Param("tenantId") UUID tenantId,
- @Param("edgeId") UUID edgeId,
- @Param("textSearch") String textSearch,
- @Param("startTime") Long startTime,
- @Param("endTime") Long endTime,
- Pageable pageable);
}
diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java
index bb825f504d..9f2eaae273 100644
--- a/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java
+++ b/dao/src/main/java/org/thingsboard/server/dao/sql/edge/JpaBaseEdgeEventDao.java
@@ -28,6 +28,7 @@ import org.thingsboard.server.common.data.edge.EdgeEvent;
import org.thingsboard.server.common.data.id.EdgeEventId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.page.PageData;
+import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
@@ -43,7 +44,9 @@ import org.thingsboard.server.dao.util.SqlDao;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -118,7 +121,7 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao(params, hashcodeFunction, 1, statsFactory);
- queue.init(logExecutor, v -> edgeEventInsertRepository.save(v),
+ queue.init(logExecutor, edgeEventInsertRepository::save,
Comparator.comparing(EdgeEventEntity::getTs)
);
}
@@ -171,29 +174,23 @@ public class JpaBaseEdgeEventDao extends JpaAbstractDao findEdgeEvents(UUID tenantId, EdgeId edgeId, TimePageLink pageLink, boolean withTsUpdate) {
- if (withTsUpdate) {
- return DaoUtil.toPageData(
- edgeEventRepository
- .findEdgeEventsByTenantIdAndEdgeId(
- tenantId,
- edgeId.getId(),
- Objects.toString(pageLink.getTextSearch(), ""),
- pageLink.getStartTime(),
- pageLink.getEndTime(),
- DaoUtil.toPageable(pageLink)));
- } else {
- return DaoUtil.toPageData(
- edgeEventRepository
- .findEdgeEventsByTenantIdAndEdgeIdWithoutTimeseriesUpdated(
- tenantId,
- edgeId.getId(),
- Objects.toString(pageLink.getTextSearch(), ""),
- pageLink.getStartTime(),
- pageLink.getEndTime(),
- DaoUtil.toPageable(pageLink)));
-
+ public PageData findEdgeEvents(UUID tenantId, EdgeId edgeId, Long seqIdStart, Long seqIdEnd, TimePageLink pageLink) {
+ List sortOrders = new ArrayList<>();
+ if (pageLink.getSortOrder() != null) {
+ sortOrders.add(pageLink.getSortOrder());
}
+ sortOrders.add(new SortOrder("seqId"));
+ return DaoUtil.toPageData(
+ edgeEventRepository
+ .findEdgeEventsByTenantIdAndEdgeId(
+ tenantId,
+ edgeId.getId(),
+ Objects.toString(pageLink.getTextSearch(), ""),
+ pageLink.getStartTime(),
+ pageLink.getEndTime(),
+ seqIdStart,
+ seqIdEnd,
+ DaoUtil.toPageable(pageLink, sortOrders)));
}
@Override
diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql
index 7fe3ec6e67..bfb2eed805 100644
--- a/dao/src/main/resources/sql/schema-entities.sql
+++ b/dao/src/main/resources/sql/schema-entities.sql
@@ -720,6 +720,7 @@ CREATE TABLE IF NOT EXISTS edge (
);
CREATE TABLE IF NOT EXISTS edge_event (
+ seq_id INT GENERATED ALWAYS AS IDENTITY,
id uuid NOT NULL,
created_time bigint NOT NULL,
edge_id uuid,
@@ -731,6 +732,7 @@ CREATE TABLE IF NOT EXISTS edge_event (
tenant_id uuid,
ts bigint NOT NULL
) PARTITION BY RANGE(created_time);
+ALTER TABLE IF EXISTS edge_event ALTER COLUMN seq_id SET CYCLE;
CREATE TABLE IF NOT EXISTS rpc (
id uuid NOT NULL CONSTRAINT rpc_pkey PRIMARY KEY,
diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/EdgeEventServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/EdgeEventServiceTest.java
index 63958fe1e4..26e7cc2e1f 100644
--- a/dao/src/test/java/org/thingsboard/server/dao/service/EdgeEventServiceTest.java
+++ b/dao/src/test/java/org/thingsboard/server/dao/service/EdgeEventServiceTest.java
@@ -71,7 +71,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest {
EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.ADDED);
edgeEventService.saveAsync(edgeEvent).get();
- PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, new TimePageLink(1), false);
+ PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, new TimePageLink(1));
Assert.assertFalse(edgeEvents.getData().isEmpty());
EdgeEvent saved = edgeEvents.getData().get(0);
@@ -113,7 +113,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest {
Futures.allAsList(futures).get();
TimePageLink pageLink = new TimePageLink(2, 0, "", new SortOrder("createdTime", SortOrder.Direction.DESC), startTime, endTime);
- PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true);
+ PageData edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink);
Assert.assertNotNull(edgeEvents.getData());
Assert.assertEquals(2, edgeEvents.getData().size());
@@ -122,7 +122,7 @@ public class EdgeEventServiceTest extends AbstractServiceTest {
Assert.assertTrue(edgeEvents.hasNext());
Assert.assertNotNull(pageLink.nextPageLink());
- edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink.nextPageLink(), true);
+ edgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, 0L, null, pageLink.nextPageLink());
Assert.assertNotNull(edgeEvents.getData());
Assert.assertEquals(1, edgeEvents.getData().size());
@@ -132,26 +132,6 @@ public class EdgeEventServiceTest extends AbstractServiceTest {
edgeEventService.cleanupEvents(1);
}
- @Test
- public void findEdgeEventsWithTsUpdateAndWithout() throws Exception {
- EdgeId edgeId = new EdgeId(Uuids.timeBased());
- DeviceId deviceId = new DeviceId(Uuids.timeBased());
- TenantId tenantId = TenantId.fromUUID(Uuids.timeBased());
- TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime", SortOrder.Direction.ASC));
-
- EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED);
- edgeEventService.saveAsync(edgeEventWithTsUpdate).get();
-
- PageData allEdgeEvents = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, true);
- PageData edgeEventsWithoutTsUpdate = edgeEventService.findEdgeEvents(tenantId, edgeId, pageLink, false);
-
- Assert.assertNotNull(allEdgeEvents.getData());
- Assert.assertNotNull(edgeEventsWithoutTsUpdate.getData());
- Assert.assertEquals(1, allEdgeEvents.getData().size());
- Assert.assertEquals(allEdgeEvents.getData().get(0).getUuidId(), edgeEventWithTsUpdate.getUuidId());
- Assert.assertTrue(edgeEventsWithoutTsUpdate.getData().isEmpty());
- }
-
private ListenableFuture saveEdgeEventWithProvidedTime(long time, EdgeId edgeId, EntityId entityId, TenantId tenantId) throws Exception {
EdgeEvent edgeEvent = generateEdgeEvent(tenantId, edgeId, entityId, EdgeEventActionType.ADDED);
edgeEvent.setId(new EdgeEventId(Uuids.startOf(time)));
diff --git a/dao/src/test/resources/application-test.properties b/dao/src/test/resources/application-test.properties
index d89211cb2f..98f9091318 100644
--- a/dao/src/test/resources/application-test.properties
+++ b/dao/src/test/resources/application-test.properties
@@ -74,6 +74,9 @@ cache.specs.dashboardTitles.maxSize=10000
cache.specs.entityCount.timeToLiveInMinutes=1440
cache.specs.entityCount.maxSize=10000
+cache.specs.resourceInfo.timeToLiveInMinutes=1440
+cache.specs.resourceInfo.maxSize=10000
+
redis.connection.host=localhost
redis.connection.port=6379
redis.connection.db=0
diff --git a/dao/src/test/resources/sql/system-test-psql.sql b/dao/src/test/resources/sql/system-test-psql.sql
index 172731b9c5..21af327f13 100644
--- a/dao/src/test/resources/sql/system-test-psql.sql
+++ b/dao/src/test/resources/sql/system-test-psql.sql
@@ -1,2 +1,5 @@
--PostgreSQL specific truncate to fit constraints
-TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm;
\ No newline at end of file
+TRUNCATE TABLE device_credentials, device, device_profile, asset, asset_profile, ota_package, rule_node_state, rule_node, rule_chain, alarm_comment, alarm, entity_alarm;
+
+-- Decrease seq_id column to make sure to cover cases of new sequential cycle during the tests
+ALTER SEQUENCE edge_event_seq_id_seq MAXVALUE 256;
diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java
index ba987678f3..e2986677b5 100644
--- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java
+++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java
@@ -76,6 +76,10 @@ public class TbDeviceProfileNode implements TbNode {
this.ctx = ctx;
scheduleAlarmHarvesting(ctx, null);
ctx.addDeviceProfileListeners(this::onProfileUpdate, this::onDeviceUpdate);
+ initAlarmRuleState(false);
+ }
+
+ private void initAlarmRuleState(boolean printNewlyAddedDeviceStates) {
if (config.isFetchAlarmRulesStateOnStart()) {
log.info("[{}] Fetching alarm rule state", ctx.getSelfId());
int fetchCount = 0;
@@ -86,7 +90,7 @@ public class TbDeviceProfileNode implements TbNode {
for (RuleNodeState rns : states.getData()) {
fetchCount++;
if (rns.getEntityId().getEntityType().equals(EntityType.DEVICE) && ctx.isLocalEntity(rns.getEntityId())) {
- getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns);
+ getOrCreateDeviceState(ctx, new DeviceId(rns.getEntityId().getId()), rns, printNewlyAddedDeviceStates);
}
}
}
@@ -130,7 +134,7 @@ public class TbDeviceProfileNode implements TbNode {
removeDeviceState(deviceId);
ctx.tellSuccess(msg);
} else {
- DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null);
+ DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId, null, false);
if (deviceState != null) {
deviceState.process(ctx, msg);
} else {
@@ -148,6 +152,7 @@ public class TbDeviceProfileNode implements TbNode {
public void onPartitionChangeMsg(TbContext ctx, PartitionChangeMsg msg) {
// Cleanup the cache for all entities that are no longer assigned to current server partitions
deviceStates.entrySet().removeIf(entry -> !ctx.isLocalEntity(entry.getKey()));
+ initAlarmRuleState(true);
}
@Override
@@ -156,13 +161,16 @@ public class TbDeviceProfileNode implements TbNode {
deviceStates.clear();
}
- protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns) {
+ protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId, RuleNodeState rns, boolean printNewlyAddedDeviceStates) {
DeviceState deviceState = deviceStates.get(deviceId);
if (deviceState == null) {
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
if (deviceProfile != null) {
deviceState = new DeviceState(ctx, config, deviceId, new ProfileState(deviceProfile), rns);
deviceStates.put(deviceId, deviceState);
+ if (printNewlyAddedDeviceStates) {
+ log.info("[{}][{}] Device [{}] was added during PartitionChangeMsg", ctx.getTenantId(), ctx.getSelfId(), deviceId);
+ }
}
}
return deviceState;