diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 869cbea127..72d1dd8799 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -53,6 +53,8 @@ import org.thingsboard.server.common.data.id.WidgetTypeId; import org.thingsboard.server.common.data.id.WidgetsBundleId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.SortOrder; import org.thingsboard.server.common.data.page.TimePageLink; @@ -106,9 +108,11 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import javax.mail.MessagingException; import javax.servlet.http.HttpServletResponse; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -709,6 +713,12 @@ public abstract class BaseController { case PROVISION_FAILURE: msgType = DataConstants.PROVISION_FAILURE; break; + case TIMESERIES_UPDATED: + msgType = DataConstants.TIMESERIES_UPDATED; + break; + case TIMESERIES_DELETED: + msgType = DataConstants.TIMESERIES_DELETED; + break; } if (!StringUtils.isEmpty(msgType)) { try { @@ -753,17 +763,7 @@ public abstract class BaseController { metaData.putValue("scope", scope); if (attributes != null) { for (AttributeKvEntry attr : attributes) { - if (attr.getDataType() == DataType.BOOLEAN) { - entityNode.put(attr.getKey(), attr.getBooleanValue().get()); - } else if (attr.getDataType() == DataType.DOUBLE) { - entityNode.put(attr.getKey(), attr.getDoubleValue().get()); - } else if (attr.getDataType() == DataType.LONG) { - entityNode.put(attr.getKey(), attr.getLongValue().get()); - } else if (attr.getDataType() == DataType.JSON) { - entityNode.set(attr.getKey(), json.readTree(attr.getJsonValue().get())); - } else { - entityNode.put(attr.getKey(), attr.getValueAsString()); - } + addKvEntry(entityNode, attr); } } } else if (actionType == ActionType.ATTRIBUTES_DELETED) { @@ -774,6 +774,17 @@ public abstract class BaseController { if (keys != null) { keys.forEach(attrsArrayNode::add); } + } else if (actionType == ActionType.TIMESERIES_UPDATED) { + List timeseries = extractParameter(List.class, 0, additionalInfo); + addTimeseries(entityNode, timeseries); + } else if (actionType == ActionType.TIMESERIES_DELETED) { + List keys = extractParameter(List.class, 0, additionalInfo); + if (keys != null) { + ArrayNode timeseriesArrayNode = entityNode.putArray("timeseries"); + keys.forEach(timeseriesArrayNode::add); + } + entityNode.put("startTs", extractParameter(Long.class, 1, additionalInfo)); + entityNode.put("endTs", extractParameter(Long.class, 2, additionalInfo)); } } TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); @@ -790,6 +801,22 @@ public abstract class BaseController { } } + private void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) throws Exception { + if (kvEntry.getDataType() == DataType.BOOLEAN) { + kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value)); + } else if (kvEntry.getDataType() == DataType.DOUBLE) { + kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value)); + } else if (kvEntry.getDataType() == DataType.LONG) { + kvEntry.getLongValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value)); + } else if (kvEntry.getDataType() == DataType.JSON) { + if (kvEntry.getJsonValue().isPresent()) { + entityNode.set(kvEntry.getKey(), json.readTree(kvEntry.getJsonValue().get())); + } + } else { + entityNode.put(kvEntry.getKey(), kvEntry.getValueAsString()); + } + } + private T extractParameter(Class clazz, int index, Object... additionalInfo) { T result = null; if (additionalInfo != null && additionalInfo.length > index) { @@ -810,4 +837,20 @@ public abstract class BaseController { return null; } + private void addTimeseries(ObjectNode entityNode, List timeseries) throws Exception { + if (timeseries != null && !timeseries.isEmpty()) { + ArrayNode result = entityNode.putArray("timeseries"); + Map> groupedTelemetry = timeseries.stream() + .collect(Collectors.groupingBy(TsKvEntry::getTs)); + for (Map.Entry> entry : groupedTelemetry.entrySet()) { + ObjectNode element = json.createObjectNode(); + element.put("ts", entry.getKey()); + ObjectNode values = element.putObject("values"); + for (TsKvEntry tsKvEntry : entry.getValue()) { + addKvEntry(values, tsKvEntry); + } + result.add(element); + } + } + } } diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 2141027fa7..18c240ef7f 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -298,7 +298,6 @@ public class TelemetryController extends BaseController { deleteToTs = System.currentTimeMillis(); } else { if (startTs == null || endTs == null) { - deleteToTs = endTs; return getImmediateDeferredResult("When deleteAllDataForKeys is false, start and end timestamp values shouldn't be empty", HttpStatus.BAD_REQUEST); } else { deleteFromTs = startTs; @@ -316,13 +315,13 @@ public class TelemetryController extends BaseController { Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(@Nullable List tmp) { - logTimeseriesDeleted(user, entityId, keys, null); + logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); result.setResult(new ResponseEntity<>(HttpStatus.OK)); } @Override public void onFailure(Throwable t) { - logTimeseriesDeleted(user, entityId, keys, t); + logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t); result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); } }, executor); @@ -443,6 +442,7 @@ public class TelemetryController extends BaseController { if (entries.isEmpty()) { return getImmediateDeferredResult("No timeseries data found in request body!", HttpStatus.BAD_REQUEST); } + SecurityUser user = getCurrentUser(); return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_TELEMETRY, entityIdSrc, (result, tenantId, entityId) -> { long tenantTtl = ttl; if (!TenantId.SYS_TENANT_ID.equals(tenantId) && tenantTtl == 0) { @@ -452,11 +452,13 @@ public class TelemetryController extends BaseController { tsSubService.saveAndNotify(tenantId, entityId, entries, tenantTtl, new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { + logTelemetryUpdated(user, entityId, entries, null); result.setResult(new ResponseEntity(HttpStatus.OK)); } @Override public void onFailure(Throwable t) { + logTelemetryUpdated(user, entityId, entries, t); AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR); } }); @@ -588,15 +590,23 @@ public class TelemetryController extends BaseController { }; } - private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List keys, Throwable e) { + private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List keys, long startTs, long endTs, Throwable e) { try { logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e), - keys); + keys, startTs, endTs); } catch (ThingsboardException te) { log.warn("Failed to log timeseries delete", te); } } + private void logTelemetryUpdated(SecurityUser user, EntityId entityId, List telemetry, Throwable e) { + try { + logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_UPDATED, toException(e), telemetry); + } catch (ThingsboardException te) { + log.warn("Failed to log telemetry update"); + } + } + private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List keys, Throwable e) { try { logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e), diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 5aadca44ec..1c2a0c9186 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -59,6 +59,8 @@ public class DataConstants { public static final String ENTITY_UNASSIGNED = "ENTITY_UNASSIGNED"; public static final String ATTRIBUTES_UPDATED = "ATTRIBUTES_UPDATED"; public static final String ATTRIBUTES_DELETED = "ATTRIBUTES_DELETED"; + public static final String TIMESERIES_UPDATED = "TIMESERIES_UPDATED"; + public static final String TIMESERIES_DELETED = "TIMESERIES_DELETED"; public static final String ALARM_ACK = "ALARM_ACK"; public static final String ALARM_CLEAR = "ALARM_CLEAR"; public static final String ENTITY_ASSIGNED_FROM_TENANT = "ENTITY_ASSIGNED_FROM_TENANT"; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java index e30e20090d..23105fb251 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java @@ -24,6 +24,7 @@ public enum ActionType { UPDATED(false), // log entity ATTRIBUTES_UPDATED(false), // log attributes/values ATTRIBUTES_DELETED(false), // log attributes + TIMESERIES_UPDATED(false), // log timeseries update TIMESERIES_DELETED(false), // log timeseries RPC_CALL(false), // log method and params CREDENTIALS_UPDATED(false), // log new credentials diff --git a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java index 6b3f1650a2..142f1bb092 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java @@ -39,21 +39,23 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.rule.RuleChainMetaData; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.dao.audit.sink.AuditLogSink; +import org.thingsboard.server.dao.device.provision.ProvisionRequest; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.exception.DataValidationException; -import org.thingsboard.server.dao.device.provision.ProvisionRequest; import org.thingsboard.server.dao.service.DataValidator; import java.io.PrintWriter; import java.io.StringWriter; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import static org.thingsboard.server.dao.service.Validator.validateEntityId; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -265,6 +267,32 @@ public class AuditLogServiceImpl implements AuditLogService { actionData.set("provisionRequest", objectMapper.valueToTree(request)); } break; + case TIMESERIES_UPDATED: + actionData.put("entityId", entityId.toString()); + List updatedTimeseries = extractParameter(List.class, 0, additionalInfo); + if (updatedTimeseries != null) { + ArrayNode result = actionData.putArray("timeseries"); + updatedTimeseries.stream() + .collect(Collectors.groupingBy(TsKvEntry::getTs)) + .forEach((k, v) -> { + ObjectNode element = objectMapper.createObjectNode(); + element.put("ts", k); + ObjectNode values = element.putObject("values"); + v.forEach(kvEntry -> values.put(kvEntry.getKey(), kvEntry.getValueAsString())); + result.add(element); + }); + } + break; + case TIMESERIES_DELETED: + actionData.put("entityId", entityId.toString()); + List timeseriesKeys = extractParameter(List.class, 0, additionalInfo); + if (timeseriesKeys != null) { + ArrayNode timeseriesArrayNode = actionData.putArray("timeseries"); + timeseriesKeys.forEach(timeseriesArrayNode::add); + } + actionData.put("startTs", extractParameter(Long.class, 1, additionalInfo)); + actionData.put("endTs", extractParameter(Long.class, 2, additionalInfo)); + break; } return actionData; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java index c5739e24ff..8cd40688d9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java @@ -35,7 +35,8 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; configClazz = EmptyNodeConfiguration.class, relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", - "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant"}, + "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", + "Timeseries Updated", "Timeseries Deleted"}, nodeDescription = "Route incoming messages by Message Type", nodeDetails = "Sends messages with message types \"Post attributes\", \"Post telemetry\", \"RPC Request\" etc. via corresponding chain, otherwise Other chain is used.", uiResources = {"static/rulenode/rulenode-core-config.js"}, @@ -90,6 +91,10 @@ public class TbMsgTypeSwitchNode implements TbNode { relationType = "Entity Assigned From Tenant"; } else if (msg.getType().equals(DataConstants.ENTITY_ASSIGNED_TO_TENANT)) { relationType = "Entity Assigned To Tenant"; + } else if (msg.getType().equals(DataConstants.TIMESERIES_UPDATED)) { + relationType = "Timeseries Updated"; + } else if (msg.getType().equals(DataConstants.TIMESERIES_DELETED)) { + relationType = "Timeseries Deleted"; } else { relationType = "Other"; } diff --git a/ui-ngx/src/app/shared/models/audit-log.models.ts b/ui-ngx/src/app/shared/models/audit-log.models.ts index 46efd740a3..9db4bdee9d 100644 --- a/ui-ngx/src/app/shared/models/audit-log.models.ts +++ b/ui-ngx/src/app/shared/models/audit-log.models.ts @@ -53,7 +53,9 @@ export enum ActionType { ASSIGNED_FROM_TENANT = 'ASSIGNED_FROM_TENANT', ASSIGNED_TO_TENANT = 'ASSIGNED_TO_TENANT', PROVISION_SUCCESS = 'PROVISION_SUCCESS', - PROVISION_FAILURE = 'PROVISION_FAILURE' + PROVISION_FAILURE = 'PROVISION_FAILURE', + TIMESERIES_UPDATED = 'TIMESERIES_UPDATED', + TIMESERIES_DELETED = 'TIMESERIES_DELETED' } export enum ActionStatus { @@ -87,7 +89,9 @@ export const actionTypeTranslations = new Map( [ActionType.ASSIGNED_FROM_TENANT, 'audit-log.type-assigned-from-tenant'], [ActionType.ASSIGNED_TO_TENANT, 'audit-log.type-assigned-to-tenant'], [ActionType.PROVISION_SUCCESS, 'audit-log.type-provision-success'], - [ActionType.PROVISION_FAILURE, 'audit-log.type-provision-failure'] + [ActionType.PROVISION_FAILURE, 'audit-log.type-provision-failure'], + [ActionType.TIMESERIES_UPDATED, 'audit-log.type-timeseries-updated'], + [ActionType.TIMESERIES_DELETED, 'audit-log.type-timeseries-deleted'] ] ); diff --git a/ui-ngx/src/app/shared/models/rule-node.models.ts b/ui-ngx/src/app/shared/models/rule-node.models.ts index 0312ad3198..df989401d9 100644 --- a/ui-ngx/src/app/shared/models/rule-node.models.ts +++ b/ui-ngx/src/app/shared/models/rule-node.models.ts @@ -348,7 +348,8 @@ export enum MessageType { ENTITY_ASSIGNED = 'ENTITY_ASSIGNED', ENTITY_UNASSIGNED = 'ENTITY_UNASSIGNED', ATTRIBUTES_UPDATED = 'ATTRIBUTES_UPDATED', - ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED' + ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED', + TIMESERIES_UPDATED = 'TIMESERIES_UPDATED' } export const messageTypeNames = new Map( @@ -367,7 +368,8 @@ export const messageTypeNames = new Map( [MessageType.ENTITY_ASSIGNED, 'Entity Assigned'], [MessageType.ENTITY_UNASSIGNED, 'Entity Unassigned'], [MessageType.ATTRIBUTES_UPDATED, 'Attributes Updated'], - [MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted'] + [MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted'], + [MessageType.TIMESERIES_UPDATED, 'Timeseries Updated'] ] ); diff --git a/ui-ngx/src/assets/locale/locale.constant-en_US.json b/ui-ngx/src/assets/locale/locale.constant-en_US.json index 14ed8a3f52..e3afb69d33 100644 --- a/ui-ngx/src/assets/locale/locale.constant-en_US.json +++ b/ui-ngx/src/assets/locale/locale.constant-en_US.json @@ -527,7 +527,9 @@ "type-assigned-from-tenant": "Assigned from Tenant", "type-assigned-to-tenant": "Assigned to Tenant", "type-provision-success": "Device provisioned", - "type-provision-failure": "Device provisioning was failed" + "type-provision-failure": "Device provisioning was failed", + "type-timeseries-updated": "Telemetry updated", + "type-timeseries-deleted": "Telemetry deleted" }, "confirm-on-exit": { "message": "You have unsaved changes. Are you sure you want to leave this page?",