Browse Source

Log telemetry update event to populate it into rule chain

pull/3602/head
Viacheslav Kukhtyn 6 years ago
parent
commit
7bf0cb90fa
  1. 41
      application/src/main/java/org/thingsboard/server/controller/BaseController.java
  2. 10
      application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
  3. 1
      common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java
  4. 1
      common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java

41
application/src/main/java/org/thingsboard/server/controller/BaseController.java

@ -35,7 +35,6 @@ import org.thingsboard.server.common.data.asset.AssetInfo;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.*;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CustomerId;
@ -54,6 +53,8 @@ import org.thingsboard.server.common.data.id.WidgetTypeId;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.page.TimePageLink;
@ -710,6 +711,9 @@ public abstract class BaseController {
case PROVISION_FAILURE:
msgType = DataConstants.PROVISION_FAILURE;
break;
case TIMESERIES_UPDATED:
msgType = DataConstants.TIMESERIES_UPDATED;
break;
}
if (!StringUtils.isEmpty(msgType)) {
try {
@ -754,17 +758,7 @@ public abstract class BaseController {
metaData.putValue("scope", scope);
if (attributes != null) {
for (AttributeKvEntry attr : attributes) {
if (attr.getDataType() == DataType.BOOLEAN) {
entityNode.put(attr.getKey(), attr.getBooleanValue().get());
} else if (attr.getDataType() == DataType.DOUBLE) {
entityNode.put(attr.getKey(), attr.getDoubleValue().get());
} else if (attr.getDataType() == DataType.LONG) {
entityNode.put(attr.getKey(), attr.getLongValue().get());
} else if (attr.getDataType() == DataType.JSON) {
entityNode.set(attr.getKey(), json.readTree(attr.getJsonValue().get()));
} else {
entityNode.put(attr.getKey(), attr.getValueAsString());
}
addKvEntry(entityNode, attr);
}
}
} else if (actionType == ActionType.ATTRIBUTES_DELETED) {
@ -775,6 +769,13 @@ public abstract class BaseController {
if (keys != null) {
keys.forEach(attrsArrayNode::add);
}
} else if (actionType == ActionType.TIMESERIES_UPDATED) {
List<TsKvEntry> telemetry = extractParameter(List.class, 0, additionalInfo);
if (telemetry != null) {
for (TsKvEntry tsKvEntry : telemetry) {
addKvEntry(entityNode, tsKvEntry);
}
}
}
}
TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode));
@ -791,6 +792,22 @@ public abstract class BaseController {
}
}
private void addKvEntry(ObjectNode entityNode, KvEntry kvEntry) throws Exception {
if (kvEntry.getDataType() == DataType.BOOLEAN) {
kvEntry.getBooleanValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.DOUBLE) {
kvEntry.getDoubleValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.LONG) {
kvEntry.getLongValue().ifPresent(value -> entityNode.put(kvEntry.getKey(), value));
} else if (kvEntry.getDataType() == DataType.JSON) {
if (kvEntry.getJsonValue().isPresent()) {
entityNode.set(kvEntry.getKey(), json.readTree(kvEntry.getJsonValue().get()));
}
} else {
entityNode.put(kvEntry.getKey(), kvEntry.getValueAsString());
}
}
private <T> T extractParameter(Class<T> clazz, int index, Object... additionalInfo) {
T result = null;
if (additionalInfo != null && additionalInfo.length > index) {

10
application/src/main/java/org/thingsboard/server/controller/TelemetryController.java

@ -440,11 +440,13 @@ public class TelemetryController extends BaseController {
tsSubService.saveAndNotify(tenantId, entityId, entries, ttl, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
logTelemetryUpdated(user, entityId, entries, null);
result.setResult(new ResponseEntity(HttpStatus.OK));
}
@Override
public void onFailure(Throwable t) {
logTelemetryUpdated(user, entityId, entries, t);
AccessValidator.handleError(t, result, HttpStatus.INTERNAL_SERVER_ERROR);
}
});
@ -585,6 +587,14 @@ public class TelemetryController extends BaseController {
}
}
private void logTelemetryUpdated(SecurityUser user, EntityId entityId, List<TsKvEntry> telemetry, Throwable e) {
try {
logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.TIMESERIES_UPDATED, toException(e), telemetry);
} catch (ThingsboardException te) {
log.warn("Failed to log telemetry update");
}
}
private void logAttributesDeleted(SecurityUser user, EntityId entityId, String scope, List<String> keys, Throwable e) {
try {
logEntityAction(user, (UUIDBased & EntityId) entityId, null, null, ActionType.ATTRIBUTES_DELETED, toException(e),

1
common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java

@ -59,6 +59,7 @@ public class DataConstants {
public static final String ENTITY_UNASSIGNED = "ENTITY_UNASSIGNED";
public static final String ATTRIBUTES_UPDATED = "ATTRIBUTES_UPDATED";
public static final String ATTRIBUTES_DELETED = "ATTRIBUTES_DELETED";
public static final String TIMESERIES_UPDATED = "TIMESERIES_UPDATED";
public static final String ALARM_ACK = "ALARM_ACK";
public static final String ALARM_CLEAR = "ALARM_CLEAR";
public static final String ENTITY_ASSIGNED_FROM_TENANT = "ENTITY_ASSIGNED_FROM_TENANT";

1
common/data/src/main/java/org/thingsboard/server/common/data/audit/ActionType.java

@ -25,6 +25,7 @@ public enum ActionType {
ATTRIBUTES_UPDATED(false), // log attributes/values
ATTRIBUTES_DELETED(false), // log attributes
TIMESERIES_DELETED(false), // log timeseries
TIMESERIES_UPDATED(false), // log timeseries update
RPC_CALL(false), // log method and params
CREDENTIALS_UPDATED(false), // log new credentials
ASSIGNED_TO_CUSTOMER(false), // log customer name

Loading…
Cancel
Save