Browse Source

Merge 1cb0ca554c into be3207ab65

pull/15641/merge
Oleksandra Matviienko 5 days ago
committed by GitHub
parent
commit
674a75cec7
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 78
      application/src/main/data/upgrade/lts/schema_update.sql
  2. 3
      dao/src/main/resources/sql/schema-functions.sql
  3. 47
      dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java

78
application/src/main/data/upgrade/lts/schema_update.sql

@ -17,3 +17,81 @@
-- LTS cumulative schema update file.
-- All statements must be idempotent (use IF NOT EXISTS, ADD COLUMN IF NOT EXISTS, DO $$ ... END $$ guards, etc.).
-- This file is executed by SystemPatchApplier on every version increase within the LTS family.
-- CREATE_OR_UPDATE_ACTIVE_ALARM CONCURRENT DUPLICATE FIX START
CREATE OR REPLACE FUNCTION create_or_update_active_alarm(
t_id uuid, c_id uuid, a_id uuid, a_created_ts bigint,
a_o_id uuid, a_o_type integer, a_type varchar,
a_severity varchar, a_start_ts bigint, a_end_ts bigint,
a_details varchar,
a_propagate boolean, a_propagate_to_owner boolean,
a_propagate_to_tenant boolean, a_propagation_types varchar,
a_creation_enabled boolean)
RETURNS varchar
LANGUAGE plpgsql
AS
$$
DECLARE
null_id constant uuid = '13814000-1dd2-11b2-8080-808080808080'::uuid;
existing alarm;
result alarm_info;
row_count integer;
BEGIN
-- Serialize concurrent callers for the same (originator, type) so that the SELECT/INSERT
-- pair below cannot interleave and produce duplicate active alarms.
PERFORM pg_advisory_xact_lock(hashtext(a_o_id::text), hashtext(a_type));
SELECT * INTO existing FROM alarm a WHERE a.originator_id = a_o_id AND a.type = a_type AND a.cleared = false ORDER BY a.start_ts DESC FOR UPDATE;
IF existing.id IS NULL THEN
IF a_creation_enabled = FALSE THEN
RETURN json_build_object('success', false)::text;
END IF;
IF c_id = null_id THEN
c_id = NULL;
end if;
INSERT INTO alarm
(tenant_id, customer_id, id, created_time,
originator_id, originator_type, type,
severity, start_ts, end_ts,
additional_info,
propagate, propagate_to_owner, propagate_to_tenant, propagate_relation_types,
acknowledged, ack_ts,
cleared, clear_ts,
assignee_id, assign_ts)
VALUES
(t_id, c_id, a_id, a_created_ts,
a_o_id, a_o_type, a_type,
a_severity, a_start_ts, a_end_ts,
a_details,
a_propagate, a_propagate_to_owner, a_propagate_to_tenant, a_propagation_types,
false, 0, false, 0, NULL, 0);
INSERT INTO alarm_types (tenant_id, type) VALUES (t_id, a_type) ON CONFLICT (tenant_id, type) DO NOTHING;
SELECT * INTO result FROM alarm_info a WHERE a.id = a_id AND a.tenant_id = t_id;
RETURN json_build_object('success', true, 'created', true, 'modified', true, 'alarm', row_to_json(result))::text;
ELSE
UPDATE alarm a
SET severity = a_severity,
start_ts = a_start_ts,
end_ts = a_end_ts,
additional_info = a_details,
propagate = a_propagate,
propagate_to_owner = a_propagate_to_owner,
propagate_to_tenant = a_propagate_to_tenant,
propagate_relation_types = a_propagation_types
WHERE a.id = existing.id
AND a.tenant_id = t_id
AND (severity != a_severity OR start_ts != a_start_ts OR end_ts != a_end_ts OR additional_info != a_details
OR propagate != a_propagate OR propagate_to_owner != a_propagate_to_owner OR
propagate_to_tenant != a_propagate_to_tenant OR propagate_relation_types != a_propagation_types);
GET DIAGNOSTICS row_count = ROW_COUNT;
SELECT * INTO result FROM alarm_info a WHERE a.id = existing.id AND a.tenant_id = t_id;
IF row_count > 0 THEN
RETURN json_build_object('success', true, 'modified', true, 'alarm', row_to_json(result), 'old', row_to_json(existing))::text;
ELSE
RETURN json_build_object('success', true, 'modified', false, 'alarm', row_to_json(result))::text;
END IF;
END IF;
END
$$;
-- CREATE_OR_UPDATE_ACTIVE_ALARM CONCURRENT DUPLICATE FIX END

3
dao/src/main/resources/sql/schema-functions.sql

@ -32,6 +32,9 @@ DECLARE
result alarm_info;
row_count integer;
BEGIN
-- Serialize concurrent callers for the same (originator, type) so that the SELECT/INSERT
-- pair below cannot interleave and produce duplicate active alarms.
PERFORM pg_advisory_xact_lock(hashtext(a_o_id::text), hashtext(a_type));
SELECT * INTO existing FROM alarm a WHERE a.originator_id = a_o_id AND a.type = a_type AND a.cleared = false ORDER BY a.start_ts DESC FOR UPDATE;
IF existing.id IS NULL THEN
IF a_creation_enabled = FALSE THEN

47
dao/src/test/java/org/thingsboard/server/dao/service/AlarmServiceTest.java

@ -64,7 +64,11 @@ import org.thingsboard.server.dao.user.UserService;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@ -1012,4 +1016,47 @@ public class AlarmServiceTest extends AbstractServiceTest {
assertThat(result.getAlarm().getId()).isNotNull();
}
@Test
public void testShouldNotCreateDuplicateActiveAlarmsOnConcurrentCreate() throws InterruptedException {
AssetId originatorId = new AssetId(Uuids.timeBased());
int parallelism = 10;
long ts = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(parallelism);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(parallelism);
try {
for (int i = 0; i < parallelism; i++) {
executor.submit(() -> {
try {
start.await();
alarmService.createAlarm(AlarmCreateOrUpdateActiveRequest.builder()
.tenantId(tenantId)
.originator(originatorId)
.type(TEST_ALARM)
.severity(AlarmSeverity.CRITICAL)
.startTs(ts).build());
} catch (Throwable ignored) {
} finally {
done.countDown();
}
});
}
start.countDown();
Assert.assertTrue("Concurrent createAlarm did not finish in time", done.await(30, TimeUnit.SECONDS));
} finally {
executor.shutdownNow();
}
PageData<AlarmInfo> activeAlarms = alarmService.findAlarms(tenantId, AlarmQuery.builder()
.affectedEntityId(originatorId)
.status(AlarmStatus.ACTIVE_UNACK).pageLink(
new TimePageLink(parallelism * 2, 0, "",
new SortOrder("createdTime", SortOrder.Direction.DESC), 0L, System.currentTimeMillis())
).build());
assertThat(activeAlarms.getData())
.as("Concurrent createAlarm calls for the same originator and type must produce exactly one active alarm")
.hasSize(1);
}
}

Loading…
Cancel
Save