|
|
|
@ -18,31 +18,42 @@ package org.thingsboard.server.service.telemetry; |
|
|
|
import com.google.common.util.concurrent.FutureCallback; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
import com.google.common.util.concurrent.ListenableFuture; |
|
|
|
import com.google.protobuf.InvalidProtocolBufferException; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.beans.factory.annotation.Autowired; |
|
|
|
import org.springframework.context.annotation.Lazy; |
|
|
|
import org.springframework.stereotype.Service; |
|
|
|
import org.springframework.util.StringUtils; |
|
|
|
import org.thingsboard.rule.engine.DonAsynchron; |
|
|
|
import org.thingsboard.server.common.data.DataConstants; |
|
|
|
import org.thingsboard.server.common.data.EntityType; |
|
|
|
import org.thingsboard.server.common.data.id.DeviceId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityId; |
|
|
|
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|
|
|
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.BaseTsKvQuery; |
|
|
|
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.BooleanDataEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.DataType; |
|
|
|
import org.thingsboard.server.common.data.kv.DoubleDataEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.KvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.LongDataEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.StringDataEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.TsKvEntry; |
|
|
|
import org.thingsboard.server.common.data.kv.TsKvQuery; |
|
|
|
import org.thingsboard.server.common.msg.cluster.ServerAddress; |
|
|
|
import org.thingsboard.server.dao.attributes.AttributesService; |
|
|
|
import org.thingsboard.server.dao.timeseries.TimeseriesService; |
|
|
|
import org.thingsboard.server.extensions.api.plugins.PluginContext; |
|
|
|
import org.thingsboard.server.extensions.core.plugin.telemetry.handlers.TelemetryFeature; |
|
|
|
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.Subscription; |
|
|
|
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionErrorCode; |
|
|
|
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; |
|
|
|
import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionUpdate; |
|
|
|
import org.thingsboard.server.gen.cluster.ClusterAPIProtos; |
|
|
|
import org.thingsboard.server.service.cluster.routing.ClusterRoutingService; |
|
|
|
import org.thingsboard.server.service.cluster.rpc.ClusterRpcService; |
|
|
|
import org.thingsboard.server.service.state.DefaultDeviceStateService; |
|
|
|
import org.thingsboard.server.service.state.DeviceStateService; |
|
|
|
|
|
|
|
@ -53,15 +64,18 @@ import java.util.ArrayList; |
|
|
|
import java.util.Collections; |
|
|
|
import java.util.HashMap; |
|
|
|
import java.util.HashSet; |
|
|
|
import java.util.Iterator; |
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.TreeMap; |
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
import java.util.concurrent.Executors; |
|
|
|
import java.util.function.Consumer; |
|
|
|
import java.util.function.Function; |
|
|
|
import java.util.function.Predicate; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
/** |
|
|
|
* Created by ashvayka on 27.03.18. |
|
|
|
@ -82,6 +96,9 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
@Autowired |
|
|
|
private ClusterRoutingService routingService; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
private ClusterRpcService rpcService; |
|
|
|
|
|
|
|
@Autowired |
|
|
|
@Lazy |
|
|
|
private DeviceStateService stateService; |
|
|
|
@ -106,7 +123,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
} |
|
|
|
|
|
|
|
private final Map<EntityId, Set<Subscription>> subscriptionsByEntityId = new HashMap<>(); |
|
|
|
|
|
|
|
private final Map<String, Map<Integer, Subscription>> subscriptionsByWsSessionId = new HashMap<>(); |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -117,7 +133,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
ServerAddress address = server.get(); |
|
|
|
log.trace("[{}] Forwarding subscription [{}] for device [{}] to [{}]", sessionId, sub.getSubscriptionId(), entityId, address); |
|
|
|
subscription = new Subscription(sub, true, address); |
|
|
|
// rpcHandler.onNewSubscription(ctx, address, sessionId, subscription);
|
|
|
|
tellNewSubscription(address, sessionId, subscription); |
|
|
|
} else { |
|
|
|
log.trace("[{}] Registering local subscription [{}] for device [{}]", sessionId, sub.getSubscriptionId(), entityId); |
|
|
|
subscription = new Subscription(sub, true); |
|
|
|
@ -189,6 +205,174 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
, System.currentTimeMillis())), callback); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onNewRemoteSubscription(ServerAddress serverAddress, byte[] data) { |
|
|
|
ClusterAPIProtos.SubscriptionProto proto; |
|
|
|
try { |
|
|
|
proto = ClusterAPIProtos.SubscriptionProto.parseFrom(data); |
|
|
|
} catch (InvalidProtocolBufferException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
Map<String, Long> statesMap = proto.getKeyStatesList().stream().collect( |
|
|
|
Collectors.toMap(ClusterAPIProtos.SubscriptionKetStateProto::getKey, ClusterAPIProtos.SubscriptionKetStateProto::getTs)); |
|
|
|
Subscription subscription = new Subscription( |
|
|
|
new SubscriptionState(proto.getSessionId(), proto.getSubscriptionId(), |
|
|
|
EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), |
|
|
|
TelemetryFeature.valueOf(proto.getType()), proto.getAllKeys(), statesMap, proto.getScope()), |
|
|
|
false, new ServerAddress(serverAddress.getHost(), serverAddress.getPort())); |
|
|
|
|
|
|
|
addRemoteWsSubscription(serverAddress, proto.getSessionId(), subscription); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onRemoteSubscriptionUpdate(ServerAddress serverAddress, byte[] data) { |
|
|
|
ClusterAPIProtos.SubscriptionUpdateProto proto; |
|
|
|
try { |
|
|
|
proto = ClusterAPIProtos.SubscriptionUpdateProto.parseFrom(data); |
|
|
|
} catch (InvalidProtocolBufferException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
SubscriptionUpdate update = convert(proto); |
|
|
|
String sessionId = proto.getSessionId(); |
|
|
|
log.trace("[{}] Processing remote subscription onUpdate [{}]", sessionId, update); |
|
|
|
Optional<Subscription> subOpt = getSubscription(sessionId, update.getSubscriptionId()); |
|
|
|
if (subOpt.isPresent()) { |
|
|
|
updateSubscriptionState(sessionId, subOpt.get(), update); |
|
|
|
wsService.sendWsMsg(sessionId, update); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onRemoteSubscriptionClose(ServerAddress serverAddress, byte[] data) { |
|
|
|
ClusterAPIProtos.SubscriptionCloseProto proto; |
|
|
|
try { |
|
|
|
proto = ClusterAPIProtos.SubscriptionCloseProto.parseFrom(data); |
|
|
|
} catch (InvalidProtocolBufferException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
removeSubscription(proto.getSessionId(), proto.getSubscriptionId()); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onRemoteSessionClose(ServerAddress serverAddress, byte[] data) { |
|
|
|
ClusterAPIProtos.SessionCloseProto proto; |
|
|
|
try { |
|
|
|
proto = ClusterAPIProtos.SessionCloseProto.parseFrom(data); |
|
|
|
} catch (InvalidProtocolBufferException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
cleanupRemoteWsSessionSubscriptions(proto.getSessionId()); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onRemoteAttributesUpdate(ServerAddress serverAddress, byte[] data) { |
|
|
|
ClusterAPIProtos.AttributeUpdateProto proto; |
|
|
|
try { |
|
|
|
proto = ClusterAPIProtos.AttributeUpdateProto.parseFrom(data); |
|
|
|
} catch (InvalidProtocolBufferException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
onAttributesUpdate(EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), proto.getScope(), |
|
|
|
proto.getDataList().stream().map(this::toAttribute).collect(Collectors.toList())); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onRemoteTsUpdate(ServerAddress serverAddress, byte[] data) { |
|
|
|
ClusterAPIProtos.TimeseriesUpdateProto proto; |
|
|
|
try { |
|
|
|
proto = ClusterAPIProtos.TimeseriesUpdateProto.parseFrom(data); |
|
|
|
} catch (InvalidProtocolBufferException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
onTimeseriesUpdate(EntityIdFactory.getByTypeAndId(proto.getEntityType(), proto.getEntityId()), |
|
|
|
proto.getDataList().stream().map(this::toTimeseries).collect(Collectors.toList())); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onClusterUpdate() { |
|
|
|
log.trace("Processing cluster onUpdate msg!"); |
|
|
|
Iterator<Map.Entry<EntityId, Set<Subscription>>> deviceIterator = subscriptionsByEntityId.entrySet().iterator(); |
|
|
|
while (deviceIterator.hasNext()) { |
|
|
|
Map.Entry<EntityId, Set<Subscription>> e = deviceIterator.next(); |
|
|
|
Set<Subscription> subscriptions = e.getValue(); |
|
|
|
Optional<ServerAddress> newAddressOptional = routingService.resolveById(e.getKey()); |
|
|
|
if (newAddressOptional.isPresent()) { |
|
|
|
newAddressOptional.ifPresent(serverAddress -> checkSubsciptionsNewAddress(serverAddress, subscriptions)); |
|
|
|
} else { |
|
|
|
checkSubsciptionsPrevAddress(subscriptions); |
|
|
|
} |
|
|
|
if (subscriptions.size() == 0) { |
|
|
|
log.trace("[{}] No more subscriptions for this device on current server.", e.getKey()); |
|
|
|
deviceIterator.remove(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void checkSubsciptionsNewAddress(ServerAddress newAddress, Set<Subscription> subscriptions) { |
|
|
|
Iterator<Subscription> subscriptionIterator = subscriptions.iterator(); |
|
|
|
while (subscriptionIterator.hasNext()) { |
|
|
|
Subscription s = subscriptionIterator.next(); |
|
|
|
if (s.isLocal()) { |
|
|
|
if (!newAddress.equals(s.getServer())) { |
|
|
|
log.trace("[{}] Local subscription is now handled on new server [{}]", s.getWsSessionId(), newAddress); |
|
|
|
s.setServer(newAddress); |
|
|
|
tellNewSubscription(newAddress, s.getWsSessionId(), s); |
|
|
|
} |
|
|
|
} else { |
|
|
|
log.trace("[{}] Remote subscription is now handled on new server address: [{}]", s.getWsSessionId(), newAddress); |
|
|
|
subscriptionIterator.remove(); |
|
|
|
//TODO: onUpdate state of subscription by WsSessionId and other maps.
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void checkSubsciptionsPrevAddress(Set<Subscription> subscriptions) { |
|
|
|
for (Subscription s : subscriptions) { |
|
|
|
if (s.isLocal() && s.getServer() != null) { |
|
|
|
log.trace("[{}] Local subscription is no longer handled on remote server address [{}]", s.getWsSessionId(), s.getServer()); |
|
|
|
s.setServer(null); |
|
|
|
} else { |
|
|
|
log.trace("[{}] Remote subscription is on up to date server address.", s.getWsSessionId()); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void addRemoteWsSubscription(ServerAddress address, String sessionId, Subscription subscription) { |
|
|
|
EntityId entityId = subscription.getEntityId(); |
|
|
|
log.trace("[{}] Registering remote subscription [{}] for device [{}] to [{}]", sessionId, subscription.getSubscriptionId(), entityId, address); |
|
|
|
registerSubscription(sessionId, entityId, subscription); |
|
|
|
if (subscription.getType() == TelemetryFeature.ATTRIBUTES) { |
|
|
|
final Map<String, Long> keyStates = subscription.getKeyStates(); |
|
|
|
DonAsynchron.withCallback(attrService.find(entityId, DataConstants.CLIENT_SCOPE, keyStates.keySet()), values -> { |
|
|
|
List<TsKvEntry> missedUpdates = new ArrayList<>(); |
|
|
|
values.forEach(latestEntry -> { |
|
|
|
if (latestEntry.getLastUpdateTs() > keyStates.get(latestEntry.getKey())) { |
|
|
|
missedUpdates.add(new BasicTsKvEntry(latestEntry.getLastUpdateTs(), latestEntry)); |
|
|
|
} |
|
|
|
}); |
|
|
|
if (!missedUpdates.isEmpty()) { |
|
|
|
tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates)); |
|
|
|
} |
|
|
|
}, |
|
|
|
e -> log.error("Failed to fetch missed updates.", e), tsCallBackExecutor); |
|
|
|
} else if (subscription.getType() == TelemetryFeature.TIMESERIES) { |
|
|
|
long curTs = System.currentTimeMillis(); |
|
|
|
List<TsKvQuery> queries = new ArrayList<>(); |
|
|
|
subscription.getKeyStates().entrySet().forEach(e -> { |
|
|
|
queries.add(new BaseTsKvQuery(e.getKey(), e.getValue() + 1L, curTs)); |
|
|
|
}); |
|
|
|
|
|
|
|
DonAsynchron.withCallback(tsService.findAll(entityId, queries), |
|
|
|
missedUpdates -> { |
|
|
|
if (!missedUpdates.isEmpty()) { |
|
|
|
tellRemoteSubUpdate(address, sessionId, new SubscriptionUpdate(subscription.getSubscriptionId(), missedUpdates)); |
|
|
|
} |
|
|
|
}, |
|
|
|
e -> log.error("Failed to fetch missed updates.", e), |
|
|
|
tsCallBackExecutor); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void onAttributesUpdate(EntityId entityId, String scope, List<AttributeKvEntry> attributes) { |
|
|
|
Optional<ServerAddress> serverAddress = routingService.resolveById(entityId); |
|
|
|
if (!serverAddress.isPresent()) { |
|
|
|
@ -201,7 +385,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
} |
|
|
|
} |
|
|
|
} else { |
|
|
|
// rpcHandler.onAttributesUpdate(ctx, serverAddress.get(), entityId, entries);
|
|
|
|
tellRemoteAttributesUpdate(serverAddress.get(), entityId, scope, attributes); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -210,7 +394,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
if (!serverAddress.isPresent()) { |
|
|
|
onLocalTimeseriesUpdate(entityId, ts); |
|
|
|
} else { |
|
|
|
// rpcHandler.onTimeseriesUpdate(ctx, serverAddress.get(), entityId, entries);
|
|
|
|
tellRemoteTimeseriesUpdate(serverAddress.get(), entityId, ts); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -256,8 +440,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
updateSubscriptionState(sessionId, s, update); |
|
|
|
wsService.sendWsMsg(sessionId, update); |
|
|
|
} else { |
|
|
|
//TODO: ashvayka
|
|
|
|
// rpcHandler.onSubscriptionUpdate(ctx, s.getServer(), sessionId, update);
|
|
|
|
tellRemoteSubUpdate(s.getServer(), sessionId, update); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
@ -278,11 +461,11 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); |
|
|
|
} |
|
|
|
|
|
|
|
public void cleanupLocalWsSessionSubscriptions(String sessionId) { |
|
|
|
private void cleanupLocalWsSessionSubscriptions(String sessionId) { |
|
|
|
cleanupWsSessionSubscriptions(sessionId, true); |
|
|
|
} |
|
|
|
|
|
|
|
public void cleanupRemoteWsSessionSubscriptions(String sessionId) { |
|
|
|
private void cleanupRemoteWsSessionSubscriptions(String sessionId) { |
|
|
|
cleanupWsSessionSubscriptions(sessionId, false); |
|
|
|
} |
|
|
|
|
|
|
|
@ -320,14 +503,14 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
} |
|
|
|
for (ServerAddress address : affectedServers) { |
|
|
|
log.debug("[{}] Going to onSubscriptionUpdate [{}] server about session close event", sessionId, address); |
|
|
|
// rpcHandler.onSessionClose(ctx, address, sessionId);
|
|
|
|
tellRemoteSessionClose(address, sessionId); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void processSubscriptionRemoval(String sessionId, Map<Integer, Subscription> sessionSubscriptions, Subscription subscription) { |
|
|
|
EntityId entityId = subscription.getEntityId(); |
|
|
|
if (subscription.isLocal() && subscription.getServer() != null) { |
|
|
|
// rpcHandler.onSubscriptionClose(ctx, subscription.getServer(), sessionId, subscription.getSubscriptionId());
|
|
|
|
tellRemoteSubClose(subscription.getServer(), sessionId, subscription.getSubscriptionId()); |
|
|
|
} |
|
|
|
if (sessionSubscriptions.isEmpty()) { |
|
|
|
log.debug("[{}] Removed last subscription for particular session.", sessionId); |
|
|
|
@ -379,4 +562,151 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio |
|
|
|
} |
|
|
|
}, wsCallBackExecutor); |
|
|
|
} |
|
|
|
|
|
|
|
private void tellNewSubscription(ServerAddress address, String sessionId, Subscription sub) { |
|
|
|
ClusterAPIProtos.SubscriptionProto.Builder builder = ClusterAPIProtos.SubscriptionProto.newBuilder(); |
|
|
|
builder.setSessionId(sessionId); |
|
|
|
builder.setSubscriptionId(sub.getSubscriptionId()); |
|
|
|
builder.setEntityType(sub.getEntityId().getEntityType().name()); |
|
|
|
builder.setEntityId(sub.getEntityId().getId().toString()); |
|
|
|
builder.setType(sub.getType().name()); |
|
|
|
builder.setAllKeys(sub.isAllKeys()); |
|
|
|
builder.setScope(sub.getScope()); |
|
|
|
sub.getKeyStates().entrySet().forEach(e -> builder.addKeyStates( |
|
|
|
ClusterAPIProtos.SubscriptionKetStateProto.newBuilder().setKey(e.getKey()).setTs(e.getValue()).build())); |
|
|
|
rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE, builder.build().toByteArray()); |
|
|
|
} |
|
|
|
|
|
|
|
private void tellRemoteSubUpdate(ServerAddress address, String sessionId, SubscriptionUpdate update) { |
|
|
|
ClusterAPIProtos.SubscriptionUpdateProto.Builder builder = ClusterAPIProtos.SubscriptionUpdateProto.newBuilder(); |
|
|
|
builder.setSessionId(sessionId); |
|
|
|
builder.setSubscriptionId(update.getSubscriptionId()); |
|
|
|
builder.setErrorCode(update.getErrorCode()); |
|
|
|
if (update.getErrorMsg() != null) { |
|
|
|
builder.setErrorMsg(update.getErrorMsg()); |
|
|
|
} |
|
|
|
update.getData().entrySet().forEach( |
|
|
|
e -> { |
|
|
|
ClusterAPIProtos.SubscriptionUpdateValueListProto.Builder dataBuilder = ClusterAPIProtos.SubscriptionUpdateValueListProto.newBuilder(); |
|
|
|
|
|
|
|
dataBuilder.setKey(e.getKey()); |
|
|
|
e.getValue().forEach(v -> { |
|
|
|
Object[] array = (Object[]) v; |
|
|
|
dataBuilder.addTs((long) array[0]); |
|
|
|
dataBuilder.addValue((String) array[1]); |
|
|
|
}); |
|
|
|
|
|
|
|
builder.addData(dataBuilder.build()); |
|
|
|
} |
|
|
|
); |
|
|
|
rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE, builder.build().toByteArray()); |
|
|
|
} |
|
|
|
|
|
|
|
private void tellRemoteAttributesUpdate(ServerAddress address, EntityId entityId, String scope, List<AttributeKvEntry> attributes) { |
|
|
|
ClusterAPIProtos.AttributeUpdateProto.Builder builder = ClusterAPIProtos.AttributeUpdateProto.newBuilder(); |
|
|
|
builder.setEntityId(entityId.getId().toString()); |
|
|
|
builder.setEntityType(entityId.getEntityType().name()); |
|
|
|
builder.setScope(scope); |
|
|
|
attributes.forEach(v -> builder.addData(toKeyValueProto(v.getLastUpdateTs(), v).build())); |
|
|
|
rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE, builder.build().toByteArray()); |
|
|
|
} |
|
|
|
|
|
|
|
private void tellRemoteTimeseriesUpdate(ServerAddress address, EntityId entityId, List<TsKvEntry> ts) { |
|
|
|
ClusterAPIProtos.TimeseriesUpdateProto.Builder builder = ClusterAPIProtos.TimeseriesUpdateProto.newBuilder(); |
|
|
|
builder.setEntityId(entityId.getId().toString()); |
|
|
|
builder.setEntityType(entityId.getEntityType().name()); |
|
|
|
ts.forEach(v -> builder.addData(toKeyValueProto(v.getTs(), v).build())); |
|
|
|
rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE, builder.build().toByteArray()); |
|
|
|
} |
|
|
|
|
|
|
|
private void tellRemoteSessionClose(ServerAddress address, String sessionId) { |
|
|
|
ClusterAPIProtos.SessionCloseProto proto = ClusterAPIProtos.SessionCloseProto.newBuilder().setSessionId(sessionId).build(); |
|
|
|
rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE, proto.toByteArray()); |
|
|
|
} |
|
|
|
|
|
|
|
private void tellRemoteSubClose(ServerAddress address, String sessionId, int subscriptionId) { |
|
|
|
ClusterAPIProtos.SubscriptionCloseProto proto = ClusterAPIProtos.SubscriptionCloseProto.newBuilder().setSessionId(sessionId).setSubscriptionId(subscriptionId).build(); |
|
|
|
rpcService.tell(address, ClusterAPIProtos.MessageType.CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE, proto.toByteArray()); |
|
|
|
} |
|
|
|
|
|
|
|
private ClusterAPIProtos.KeyValueProto.Builder toKeyValueProto(long ts, KvEntry attr) { |
|
|
|
ClusterAPIProtos.KeyValueProto.Builder dataBuilder = ClusterAPIProtos.KeyValueProto.newBuilder(); |
|
|
|
dataBuilder.setKey(attr.getKey()); |
|
|
|
dataBuilder.setTs(ts); |
|
|
|
dataBuilder.setValueType(attr.getDataType().ordinal()); |
|
|
|
switch (attr.getDataType()) { |
|
|
|
case BOOLEAN: |
|
|
|
Optional<Boolean> booleanValue = attr.getBooleanValue(); |
|
|
|
booleanValue.ifPresent(dataBuilder::setBoolValue); |
|
|
|
break; |
|
|
|
case LONG: |
|
|
|
Optional<Long> longValue = attr.getLongValue(); |
|
|
|
longValue.ifPresent(dataBuilder::setLongValue); |
|
|
|
break; |
|
|
|
case DOUBLE: |
|
|
|
Optional<Double> doubleValue = attr.getDoubleValue(); |
|
|
|
doubleValue.ifPresent(dataBuilder::setDoubleValue); |
|
|
|
break; |
|
|
|
case STRING: |
|
|
|
Optional<String> stringValue = attr.getStrValue(); |
|
|
|
stringValue.ifPresent(dataBuilder::setStrValue); |
|
|
|
break; |
|
|
|
} |
|
|
|
return dataBuilder; |
|
|
|
} |
|
|
|
|
|
|
|
private AttributeKvEntry toAttribute(ClusterAPIProtos.KeyValueProto proto) { |
|
|
|
return new BaseAttributeKvEntry(getKvEntry(proto), proto.getTs()); |
|
|
|
} |
|
|
|
|
|
|
|
private TsKvEntry toTimeseries(ClusterAPIProtos.KeyValueProto proto) { |
|
|
|
return new BasicTsKvEntry(proto.getTs(), getKvEntry(proto)); |
|
|
|
} |
|
|
|
|
|
|
|
private KvEntry getKvEntry(ClusterAPIProtos.KeyValueProto proto) { |
|
|
|
KvEntry entry = null; |
|
|
|
DataType type = DataType.values()[proto.getValueType()]; |
|
|
|
switch (type) { |
|
|
|
case BOOLEAN: |
|
|
|
entry = new BooleanDataEntry(proto.getKey(), proto.getBoolValue()); |
|
|
|
break; |
|
|
|
case LONG: |
|
|
|
entry = new LongDataEntry(proto.getKey(), proto.getLongValue()); |
|
|
|
break; |
|
|
|
case DOUBLE: |
|
|
|
entry = new DoubleDataEntry(proto.getKey(), proto.getDoubleValue()); |
|
|
|
break; |
|
|
|
case STRING: |
|
|
|
entry = new StringDataEntry(proto.getKey(), proto.getStrValue()); |
|
|
|
break; |
|
|
|
} |
|
|
|
return entry; |
|
|
|
} |
|
|
|
|
|
|
|
private SubscriptionUpdate convert(ClusterAPIProtos.SubscriptionUpdateProto proto) { |
|
|
|
if (proto.getErrorCode() > 0) { |
|
|
|
return new SubscriptionUpdate(proto.getSubscriptionId(), SubscriptionErrorCode.forCode(proto.getErrorCode()), proto.getErrorMsg()); |
|
|
|
} else { |
|
|
|
Map<String, List<Object>> data = new TreeMap<>(); |
|
|
|
proto.getDataList().forEach(v -> { |
|
|
|
List<Object> values = data.computeIfAbsent(v.getKey(), k -> new ArrayList<>()); |
|
|
|
for (int i = 0; i < v.getTsCount(); i++) { |
|
|
|
Object[] value = new Object[2]; |
|
|
|
value[0] = v.getTs(i); |
|
|
|
value[1] = v.getValue(i); |
|
|
|
values.add(value); |
|
|
|
} |
|
|
|
}); |
|
|
|
return new SubscriptionUpdate(proto.getSubscriptionId(), data); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private Optional<Subscription> getSubscription(String sessionId, int subscriptionId) { |
|
|
|
Subscription state = null; |
|
|
|
Map<Integer, Subscription> subMap = subscriptionsByWsSessionId.get(sessionId); |
|
|
|
if (subMap != null) { |
|
|
|
state = subMap.get(subscriptionId); |
|
|
|
} |
|
|
|
return Optional.ofNullable(state); |
|
|
|
} |
|
|
|
} |
|
|
|
|