From 1cb0ca554cecae7aaf2dcfa9677d2a2099b3483b Mon Sep 17 00:00:00 2001 From: Oleksandra Matviienko Date: Fri, 15 May 2026 06:57:07 +0200 Subject: [PATCH] Fixed duplicate alarms created by concurrent rule engine messages When two messages for the same originator and alarm type arrived simultaneously at the create-alarm rule node, both could pass the "no existing active alarm" check inside create_or_update_active_alarm and proceed to INSERT, because SELECT ... FOR UPDATE does not lock a row that does not yet exist and the partial index on (originator_id, type) WHERE cleared = false is not UNIQUE. Serialize concurrent callers per (originator_id, type) with a transaction-scoped advisory lock so the SELECT/INSERT pair inside the function cannot interleave. --- .../main/data/upgrade/lts/schema_update.sql | 78 +++++++++++++++++++ .../main/resources/sql/schema-functions.sql | 3 + .../server/dao/service/AlarmServiceTest.java | 47 +++++++++++ 3 files changed, 128 insertions(+) diff --git a/application/src/main/data/upgrade/lts/schema_update.sql b/application/src/main/data/upgrade/lts/schema_update.sql index 3140b4f253..362df1ebae 100644 --- a/application/src/main/data/upgrade/lts/schema_update.sql +++ b/application/src/main/data/upgrade/lts/schema_update.sql @@ -17,3 +17,81 @@ -- LTS cumulative schema update file. -- All statements must be idempotent (use IF NOT EXISTS, ADD COLUMN IF NOT EXISTS, DO $$ ... END $$ guards, etc.). -- This file is executed by SystemPatchApplier on every version increase within the LTS family. + +-- CREATE_OR_UPDATE_ACTIVE_ALARM CONCURRENT DUPLICATE FIX START + +CREATE OR REPLACE FUNCTION create_or_update_active_alarm( + t_id uuid, c_id uuid, a_id uuid, a_created_ts bigint, + a_o_id uuid, a_o_type integer, a_type varchar, + a_severity varchar, a_start_ts bigint, a_end_ts bigint, + a_details varchar, + a_propagate boolean, a_propagate_to_owner boolean, + a_propagate_to_tenant boolean, a_propagation_types varchar, + a_creation_enabled boolean) + RETURNS varchar + LANGUAGE plpgsql +AS +$$ +DECLARE + null_id constant uuid = '13814000-1dd2-11b2-8080-808080808080'::uuid; + existing alarm; + result alarm_info; + row_count integer; +BEGIN + -- Serialize concurrent callers for the same (originator, type) so that the SELECT/INSERT + -- pair below cannot interleave and produce duplicate active alarms. + PERFORM pg_advisory_xact_lock(hashtext(a_o_id::text), hashtext(a_type)); + SELECT * INTO existing FROM alarm a WHERE a.originator_id = a_o_id AND a.type = a_type AND a.cleared = false ORDER BY a.start_ts DESC FOR UPDATE; + IF existing.id IS NULL THEN + IF a_creation_enabled = FALSE THEN + RETURN json_build_object('success', false)::text; + END IF; + IF c_id = null_id THEN + c_id = NULL; + end if; + INSERT INTO alarm + (tenant_id, customer_id, id, created_time, + originator_id, originator_type, type, + severity, start_ts, end_ts, + additional_info, + propagate, propagate_to_owner, propagate_to_tenant, propagate_relation_types, + acknowledged, ack_ts, + cleared, clear_ts, + assignee_id, assign_ts) + VALUES + (t_id, c_id, a_id, a_created_ts, + a_o_id, a_o_type, a_type, + a_severity, a_start_ts, a_end_ts, + a_details, + a_propagate, a_propagate_to_owner, a_propagate_to_tenant, a_propagation_types, + false, 0, false, 0, NULL, 0); + INSERT INTO alarm_types (tenant_id, type) VALUES (t_id, a_type) ON CONFLICT (tenant_id, type) DO NOTHING; + SELECT * INTO result FROM alarm_info a WHERE a.id = a_id AND a.tenant_id = t_id; + RETURN json_build_object('success', true, 'created', true, 'modified', true, 'alarm', row_to_json(result))::text; + ELSE + UPDATE alarm a + SET severity = a_severity, + start_ts = a_start_ts, + end_ts = a_end_ts, + additional_info = a_details, + propagate = a_propagate, + propagate_to_owner = a_propagate_to_owner, + propagate_to_tenant = a_propagate_to_tenant, + propagate_relation_types = a_propagation_types + WHERE a.id = existing.id + AND a.tenant_id = t_id + AND (severity != a_severity OR start_ts != a_start_ts OR end_ts != a_end_ts OR additional_info != a_details + OR propagate != a_propagate OR propagate_to_owner != a_propagate_to_owner OR + propagate_to_tenant != a_propagate_to_tenant OR propagate_relation_types != a_propagation_types); + GET DIAGNOSTICS row_count = ROW_COUNT; + SELECT * INTO result FROM alarm_info a WHERE a.id = existing.id AND a.tenant_id = t_id; + IF row_count > 0 THEN + RETURN json_build_object('success', true, 'modified', true, 'alarm', row_to_json(result), 'old', row_to_json(existing))::text; + ELSE + RETURN json_build_object('success', true, 'modified', false, 'alarm', row_to_json(result))::text; + END IF; + END IF; +END +$$; + +-- CREATE_OR_UPDATE_ACTIVE_ALARM CONCURRENT DUPLICATE FIX END diff --git a/dao/src/main/resources/sql/schema-functions.sql b/dao/src/main/resources/sql/schema-functions.sql index f9190b5e2f..5be191f87d 100644 --- a/dao/src/main/resources/sql/schema-functions.sql +++ b/dao/src/main/resources/sql/schema-functions.sql @@ -32,6 +32,9 @@ DECLARE result alarm_info; row_count integer; BEGIN + -- Serialize concurrent callers for the same (originator, type) so that the SELECT/INSERT + -- pair below cannot interleave and produce duplicate active alarms. + PERFORM pg_advisory_xact_lock(hashtext(a_o_id::text), hashtext(a_type)); SELECT * INTO existing FROM alarm a WHERE a.originator_id = a_o_id AND a.type = a_type AND a.cleared = false ORDER BY a.start_ts DESC FOR UPDATE; IF existing.id IS NULL THEN IF a_creation_enabled = FALSE THEN diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java index 2329111e22..84562f51bf 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java @@ -64,7 +64,11 @@ import org.thingsboard.server.dao.user.UserService; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import static org.assertj.core.api.Assertions.assertThat; @@ -1012,4 +1016,47 @@ public class AlarmServiceTest extends AbstractServiceTest { assertThat(result.getAlarm().getId()).isNotNull(); } + @Test + public void testShouldNotCreateDuplicateActiveAlarmsOnConcurrentCreate() throws InterruptedException { + AssetId originatorId = new AssetId(Uuids.timeBased()); + int parallelism = 10; + long ts = System.currentTimeMillis(); + + ExecutorService executor = Executors.newFixedThreadPool(parallelism); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(parallelism); + try { + for (int i = 0; i < parallelism; i++) { + executor.submit(() -> { + try { + start.await(); + alarmService.createAlarm(AlarmCreateOrUpdateActiveRequest.builder() + .tenantId(tenantId) + .originator(originatorId) + .type(TEST_ALARM) + .severity(AlarmSeverity.CRITICAL) + .startTs(ts).build()); + } catch (Throwable ignored) { + } finally { + done.countDown(); + } + }); + } + start.countDown(); + Assert.assertTrue("Concurrent createAlarm did not finish in time", done.await(30, TimeUnit.SECONDS)); + } finally { + executor.shutdownNow(); + } + + PageData activeAlarms = alarmService.findAlarms(tenantId, AlarmQuery.builder() + .affectedEntityId(originatorId) + .status(AlarmStatus.ACTIVE_UNACK).pageLink( + new TimePageLink(parallelism * 2, 0, "", + new SortOrder("createdTime", SortOrder.Direction.DESC), 0L, System.currentTimeMillis()) + ).build()); + assertThat(activeAlarms.getData()) + .as("Concurrent createAlarm calls for the same originator and type must produce exactly one active alarm") + .hasSize(1); + } + }