|
|
|
@ -16,7 +16,6 @@ |
|
|
|
package org.thingsboard.server.service.telemetry; |
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException; |
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper; |
|
|
|
import com.google.common.base.Function; |
|
|
|
import com.google.common.util.concurrent.FutureCallback; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
@ -27,6 +26,7 @@ import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.web.socket.CloseStatus; |
|
|
|
import org.thingsboard.common.util.JacksonUtil; |
|
|
|
import org.thingsboard.common.util.ThingsBoardExecutors; |
|
|
|
import org.thingsboard.common.util.ThingsBoardThreadFactory; |
|
|
|
import org.thingsboard.server.common.data.DataConstants; |
|
|
|
@ -112,7 +112,6 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi |
|
|
|
private static final Aggregation DEFAULT_AGGREGATION = Aggregation.NONE; |
|
|
|
private static final int UNKNOWN_SUBSCRIPTION_ID = 0; |
|
|
|
private static final String PROCESSING_MSG = "[{}] Processing: {}"; |
|
|
|
private static final ObjectMapper jsonMapper = new ObjectMapper(); |
|
|
|
private static final String FAILED_TO_FETCH_DATA = "Failed to fetch data!"; |
|
|
|
private static final String FAILED_TO_FETCH_ATTRIBUTES = "Failed to fetch attributes!"; |
|
|
|
private static final String SESSION_META_DATA_NOT_FOUND = "Session meta-data not found!"; |
|
|
|
@ -147,10 +146,10 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi |
|
|
|
@Value("${server.ws.ping_timeout:30000}") |
|
|
|
private long pingTimeout; |
|
|
|
|
|
|
|
private ConcurrentMap<TenantId, Set<String>> tenantSubscriptionsMap = new ConcurrentHashMap<>(); |
|
|
|
private ConcurrentMap<CustomerId, Set<String>> customerSubscriptionsMap = new ConcurrentHashMap<>(); |
|
|
|
private ConcurrentMap<UserId, Set<String>> regularUserSubscriptionsMap = new ConcurrentHashMap<>(); |
|
|
|
private ConcurrentMap<UserId, Set<String>> publicUserSubscriptionsMap = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<TenantId, Set<String>> tenantSubscriptionsMap = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<CustomerId, Set<String>> customerSubscriptionsMap = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<UserId, Set<String>> regularUserSubscriptionsMap = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<UserId, Set<String>> publicUserSubscriptionsMap = new ConcurrentHashMap<>(); |
|
|
|
|
|
|
|
private ExecutorService executor; |
|
|
|
private String serviceId; |
|
|
|
@ -204,7 +203,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi |
|
|
|
} |
|
|
|
|
|
|
|
try { |
|
|
|
TelemetryPluginCmdsWrapper cmdsWrapper = jsonMapper.readValue(msg, TelemetryPluginCmdsWrapper.class); |
|
|
|
TelemetryPluginCmdsWrapper cmdsWrapper = JacksonUtil.OBJECT_MAPPER.readValue(msg, TelemetryPluginCmdsWrapper.class); |
|
|
|
if (cmdsWrapper != null) { |
|
|
|
if (cmdsWrapper.getAttrSubCmds() != null) { |
|
|
|
cmdsWrapper.getAttrSubCmds().forEach(cmd -> { |
|
|
|
@ -793,7 +792,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi |
|
|
|
|
|
|
|
private void sendWsMsg(TelemetryWebSocketSessionRef sessionRef, int cmdId, Object update) { |
|
|
|
try { |
|
|
|
String msg = jsonMapper.writeValueAsString(update); |
|
|
|
String msg = JacksonUtil.OBJECT_MAPPER.writeValueAsString(update); |
|
|
|
executor.submit(() -> { |
|
|
|
try { |
|
|
|
msgEndpoint.send(sessionRef, cmdId, msg); |
|
|
|
|