Browse Source

Merge pull request #3602 from vkukhtyn/feature/log-telemetry-updated

[3.2.1] Log telemetry update event to populate it into rule chains
pull/3904/head
Igor Kulikov 6 years ago
committed by GitHub
parent
commit
3c87a137ce
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      application/src/main/java/org/thingsboard/server/controller/BaseController.java
  2. 20
      application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
  3. 2
      common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
  4. 1
      common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java
  5. 30
      dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java
  6. 7
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java
  7. 8
      ui-ngx/src/app/shared/models/audit-log.models.ts
  8. 6
      ui-ngx/src/app/shared/models/rule-node.models.ts
  9. 4
      ui-ngx/src/assets/locale/locale.constant-en_US.json

65
application/src/main/java/org/thingsboard/server/controller/BaseController.java

@ -53,6 +53,8 @@ import org.thingsboard.server.common.data.id.WidgetTypeId;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
@ -106,9 +108,11 @@ import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.mail.MessagingException;
import javax.servlet.http.HttpServletResponse;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.service.Validator.validateId;
@ -709,6 +713,12 @@ public abstract class BaseController {
case PROVISION_FAILURE:
msgType = DataConstants.PROVISION_FAILURE;
break;
case TIMESERIES_UPDATED:
msgType = DataConstants.TIMESERIES_UPDATED;
break;
case TIMESERIES_DELETED:
msgType = DataConstants.TIMESERIES_DELETED;
break;
}
if (!StringUtils.isEmpty(msgType)) {
try {
@ -753,17 +763,7 @@ public abstract class BaseController {
metaData.putValue("scope", scope);
if (attributes != null) {
for (AttributeKvEntry attr : attributes) {
if (attr.getDataType() == DataType.BOOLEAN) {
entityNode.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE) {
entityNode.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG) {
entityNode.put(attr.getKey(), attr.getLongValue().get());
} else if (attr.getDataType() == DataType.JSON) {
entityNode.set(attr.getKey(), json.readTree(attr.getJsonValue().get()));
} else {
entityNode.put(attr.getKey(), attr.getValueAsString());
}
addKvEntry(entityNode, attr);
}
}
} else if (actionType == ActionType.ATTRIBUTES_DELETED) {
@ -774,6 +774,17 @@ public abstract class BaseController {
if (keys != null) {
keys.forEach(attrsArrayNode::add);
}
} else if (actionType == ActionType.TIMESERIES_UPDATED) {
List<TsKvEntry> timeseries = extractParameter(List.class, 0, additionalInfo);
addTimeseries(entityNode, timeseries);
} else if (actionType == ActionType.TIMESERIES_DELETED) {
List<String> keys = extractParameter(List.class, 0, additionalInfo);
if (keys != null) {
ArrayNode timeseriesArrayNode = entityNode.putArray("timeseries");
keys.forEach(timeseriesArrayNode::add);
}
entityNode.put("startTs", extractParameter(Long.class, 1, additionalInfo));
entityNode.put("endTs", extractParameter(Long.class, 2, additionalInfo));
}
}
TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
@ -790,6 +801,22 @@ public abstract class BaseController {
}
}
private void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) throws Exception {
if (kvEntry.getDataType() == DataType.BOOLEAN) {
kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.DOUBLE) {
kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.LONG) {
kvEntry.getLongValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.JSON) {
if (kvEntry.getJsonValue().isPresent()) {
entityNode.set(kvEntry.getKey(), json.readTree(kvEntry.getJsonValue().get()));
}
} else {
entityNode.put(kvEntry.getKey(), kvEntry.getValueAsString());
}
}
private <T> T extractParameter(Class<T> clazz, int index, Object... additionalInfo) {
T result = null;
if (additionalInfo != null && additionalInfo.length > index) {
@ -810,4 +837,20 @@ public abstract class BaseController {
return null;
}
private void addTimeseries(ObjectNode entityNode, List<TsKvEntry> timeseries) throws Exception {
if (timeseries != null && !timeseries.isEmpty()) {
ArrayNode result = entityNode.putArray("timeseries");
Map<Long, List<TsKvEntry>> groupedTelemetry = timeseries.stream()
.collect(Collectors.groupingBy(TsKvEntry::getTs));
for (Map.Entry<Long, List<TsKvEntry>> entry : groupedTelemetry.entrySet()) {
ObjectNode element = json.createObjectNode();
element.put("ts", entry.getKey());
ObjectNode values = element.putObject("values");
for (TsKvEntry tsKvEntry : entry.getValue()) {
addKvEntry(values, tsKvEntry);
}
result.add(element);
}
}
}
}

20
application/src/main/java/org/thingsboard/server/controller/TelemetryController.java

@ -298,7 +298,6 @@ public class TelemetryController extends BaseController {
deleteToTs = System.currentTimeMillis();
} else {
if (startTs == null || endTs == null) {
deleteToTs = endTs;
return getImmediateDeferredResult("When deleteAllDataForKeys is false, start and end timestamp values shouldn't be empty", HttpStatus.BAD_REQUEST);
} else {
deleteFromTs = startTs;
@ -316,13 +315,13 @@ public class TelemetryController extends BaseController {
Futures.addCallback(future, new FutureCallback<List<Void>>() {
@Override
public void onSuccess(@Nullable List<Void> tmp) {
logTimeseriesDeleted(user, entityId, keys, null);
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null);
result.setResult(new ResponseEntity<>(HttpStatus.OK));
}
@Override
public void onFailure(Throwable t) {
logTimeseriesDeleted(user, entityId, keys, t);
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t);
result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
}, executor);
@ -443,6 +442,7 @@ public class TelemetryController extends BaseController {
if (entries.isEmpty()) {
return getImmediateDeferredResult("No timeseries data found in request body!", HttpStatus.BAD_REQUEST);
}
SecurityUser user = getCurrentUser();
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_TELEMETRY, entityIdSrc, (result, tenantId, entityId) -> {
long tenantTtl = ttl;
if (!TenantId.SYS_TENANT_ID.equals(tenantId) && tenantTtl == 0) {
@ -452,11 +452,13 @@ public class TelemetryController extends BaseController {
tsSubService.saveAndNotify(tenantId, entityId, entries, tenantTtl, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
logTelemetryUpdated(user, entityId, entries, null);
result.setResult(new ResponseEntity(HttpStatus.OK));
}
@Override
public void onFailure(Throwable t) {
logTelemetryUpdated(user, entityId, entries, t);
AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
}
});
@ -588,15 +590,23 @@ public class TelemetryController extends BaseController {
};
}
private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List<String> keys, Throwable e) {
private void logTimeseriesDeleted(SecurityUser user, EntityId entityId, List<String> keys, long startTs, long endTs, Throwable e) {
try {
logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_DELETED, toException(e),
keys);
keys, startTs, endTs);
} catch (ThingsboardException te) {
log.warn("Failed to log timeseries delete", te);
}
}
private void logTelemetryUpdated(SecurityUser user, EntityId entityId, List<TsKvEntry> telemetry, Throwable e) {
try {
logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_UPDATED, toException(e), telemetry);
} catch (ThingsboardException te) {
log.warn("Failed to log telemetry update");
}
}
private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
try {
logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e),

2
common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java

@ -59,6 +59,8 @@ public class DataConstants {
public static final String ENTITY_UNASSIGNED = "ENTITY_UNASSIGNED";
public static final String ATTRIBUTES_UPDATED = "ATTRIBUTES_UPDATED";
public static final String ATTRIBUTES_DELETED = "ATTRIBUTES_DELETED";
public static final String TIMESERIES_UPDATED = "TIMESERIES_UPDATED";
public static final String TIMESERIES_DELETED = "TIMESERIES_DELETED";
public static final String ALARM_ACK = "ALARM_ACK";
public static final String ALARM_CLEAR = "ALARM_CLEAR";
public static final String ENTITY_ASSIGNED_FROM_TENANT = "ENTITY_ASSIGNED_FROM_TENANT";

1
common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java

@ -24,6 +24,7 @@ public enum ActionType {
UPDATED(false), // log entity
ATTRIBUTES_UPDATED(false), // log attributes/values
ATTRIBUTES_DELETED(false), // log attributes
TIMESERIES_UPDATED(false), // log timeseries update
TIMESERIES_DELETED(false), // log timeseries
RPC_CALL(false), // log method and params
CREDENTIALS_UPDATED(false), // log new credentials

30
dao/src/main/java/org/thingsboard/server/dao/audit/AuditLogServiceImpl.java

@ -39,21 +39,23 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.TimePageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.dao.audit.sink.AuditLogSink;
import org.thingsboard.server.dao.device.provision.ProvisionRequest;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.device.provision.ProvisionRequest;
import org.thingsboard.server.dao.service.DataValidator;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.thingsboard.server.dao.service.Validator.validateEntityId;
import static org.thingsboard.server.dao.service.Validator.validateId;
@ -265,6 +267,32 @@ public class AuditLogServiceImpl implements AuditLogService {
actionData.set("provisionRequest", objectMapper.valueToTree(request));
}
break;
case TIMESERIES_UPDATED:
actionData.put("entityId", entityId.toString());
List<TsKvEntry> updatedTimeseries = extractParameter(List.class, 0, additionalInfo);
if (updatedTimeseries != null) {
ArrayNode result = actionData.putArray("timeseries");
updatedTimeseries.stream()
.collect(Collectors.groupingBy(TsKvEntry::getTs))
.forEach((k, v) -> {
ObjectNode element = objectMapper.createObjectNode();
element.put("ts", k);
ObjectNode values = element.putObject("values");
v.forEach(kvEntry -> values.put(kvEntry.getKey(), kvEntry.getValueAsString()));
result.add(element);
});
}
break;
case TIMESERIES_DELETED:
actionData.put("entityId", entityId.toString());
List<String> timeseriesKeys = extractParameter(List.class, 0, additionalInfo);
if (timeseriesKeys != null) {
ArrayNode timeseriesArrayNode = actionData.putArray("timeseries");
timeseriesKeys.forEach(timeseriesArrayNode::add);
}
actionData.put("startTs", extractParameter(Long.class, 1, additionalInfo));
actionData.put("endTs", extractParameter(Long.class, 2, additionalInfo));
break;
}
return actionData;
}

7
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/filter/TbMsgTypeSwitchNode.java

@ -35,7 +35,8 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
configClazz = EmptyNodeConfiguration.class,
relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "Activity Event", "Inactivity Event",
"Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
"Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant"},
"Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
"Timeseries Updated", "Timeseries Deleted"},
nodeDescription = "Route incoming messages by Message Type",
nodeDetails = "Sends messages with message types <b>\"Post attributes\", \"Post telemetry\", \"RPC Request\"</b> etc. via corresponding chain, otherwise <b>Other</b> chain is used.",
uiResources = {"static/rulenode/rulenode-core-config.js"},
@ -90,6 +91,10 @@ public class TbMsgTypeSwitchNode implements TbNode {
relationType = "Entity Assigned From Tenant";
} else if (msg.getType().equals(DataConstants.ENTITY_ASSIGNED_TO_TENANT)) {
relationType = "Entity Assigned To Tenant";
} else if (msg.getType().equals(DataConstants.TIMESERIES_UPDATED)) {
relationType = "Timeseries Updated";
} else if (msg.getType().equals(DataConstants.TIMESERIES_DELETED)) {
relationType = "Timeseries Deleted";
} else {
relationType = "Other";
}

8
ui-ngx/src/app/shared/models/audit-log.models.ts

@ -53,7 +53,9 @@ export enum ActionType {
ASSIGNED_FROM_TENANT = 'ASSIGNED_FROM_TENANT',
ASSIGNED_TO_TENANT = 'ASSIGNED_TO_TENANT',
PROVISION_SUCCESS = 'PROVISION_SUCCESS',
PROVISION_FAILURE = 'PROVISION_FAILURE'
PROVISION_FAILURE = 'PROVISION_FAILURE',
TIMESERIES_UPDATED = 'TIMESERIES_UPDATED',
TIMESERIES_DELETED = 'TIMESERIES_DELETED'
}
export enum ActionStatus {
@ -87,7 +89,9 @@ export const actionTypeTranslations = new Map<ActionType, string>(
[ActionType.ASSIGNED_FROM_TENANT, 'audit-log.type-assigned-from-tenant'],
[ActionType.ASSIGNED_TO_TENANT, 'audit-log.type-assigned-to-tenant'],
[ActionType.PROVISION_SUCCESS, 'audit-log.type-provision-success'],
[ActionType.PROVISION_FAILURE, 'audit-log.type-provision-failure']
[ActionType.PROVISION_FAILURE, 'audit-log.type-provision-failure'],
[ActionType.TIMESERIES_UPDATED, 'audit-log.type-timeseries-updated'],
[ActionType.TIMESERIES_DELETED, 'audit-log.type-timeseries-deleted']
]
);

6
ui-ngx/src/app/shared/models/rule-node.models.ts

@ -348,7 +348,8 @@ export enum MessageType {
ENTITY_ASSIGNED = 'ENTITY_ASSIGNED',
ENTITY_UNASSIGNED = 'ENTITY_UNASSIGNED',
ATTRIBUTES_UPDATED = 'ATTRIBUTES_UPDATED',
ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED'
ATTRIBUTES_DELETED = 'ATTRIBUTES_DELETED',
TIMESERIES_UPDATED = 'TIMESERIES_UPDATED'
}
export const messageTypeNames = new Map<MessageType, string>(
@ -367,7 +368,8 @@ export const messageTypeNames = new Map<MessageType, string>(
[MessageType.ENTITY_ASSIGNED, 'Entity Assigned'],
[MessageType.ENTITY_UNASSIGNED, 'Entity Unassigned'],
[MessageType.ATTRIBUTES_UPDATED, 'Attributes Updated'],
[MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted']
[MessageType.ATTRIBUTES_DELETED, 'Attributes Deleted'],
[MessageType.TIMESERIES_UPDATED, 'Timeseries Updated']
]
);

4
ui-ngx/src/assets/locale/locale.constant-en_US.json

@ -527,7 +527,9 @@
"type-assigned-from-tenant": "Assigned from Tenant",
"type-assigned-to-tenant": "Assigned to Tenant",
"type-provision-success": "Device provisioned",
"type-provision-failure": "Device provisioning was failed"
"type-provision-failure": "Device provisioning was failed",
"type-timeseries-updated": "Telemetry updated",
"type-timeseries-deleted": "Telemetry deleted"
},
"confirm-on-exit": {
"message": "You have unsaved changes. Are you sure you want to leave this page?",

Loading…
Cancel
Save