Browse Source

Merge branch 'master' of github.com:thingsboard/thingsboard into develop/3.0

pull/2729/head
Igor Kulikov 6 years ago
parent
commit
3a129de654
  1. 13
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
  2. 18
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java
  3. 23
      common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java
  4. 13
      common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java
  5. 13
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java

13
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java

@ -23,6 +23,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
@ -45,6 +46,7 @@ import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
import org.thingsboard.server.service.subscription.TbLocalSubscriptionService;
@ -100,7 +102,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
@PreDestroy
public void destroy(){
public void destroy() {
super.destroy();
}
@ -143,8 +145,13 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreMsg.getToDeviceActorNotificationMsg() != null && !toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
if (actorMsg.isPresent()) {
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
actorContext.tell(actorMsg.get(), ActorRef.noSender());
TbActorMsg tbActorMsg = actorMsg.get();
if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg);
} else {
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
actorContext.tell(actorMsg.get(), ActorRef.noSender());
}
}
callback.onSuccess();
}

18
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java

@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
@ -31,6 +32,7 @@ import org.thingsboard.server.common.msg.queue.ServiceQueue;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
import org.thingsboard.server.queue.TbQueueConsumer;
@ -48,6 +50,9 @@ import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStr
import org.thingsboard.server.service.queue.processing.TbRuleEngineProcessingStrategyFactory;
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategy;
import org.thingsboard.server.service.queue.processing.TbRuleEngineSubmitStrategyFactory;
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
import org.thingsboard.server.service.stats.RuleEngineStatisticsService;
import javax.annotation.PostConstruct;
@ -81,6 +86,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
private final TbRuleEngineQueueFactory tbRuleEngineQueueFactory;
private final TbQueueRuleEngineSettings ruleEngineSettings;
private final RuleEngineStatisticsService statisticsService;
private final TbRuleEngineDeviceRpcService tbDeviceRpcService;
private final ConcurrentMap<String, TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> consumers = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TbRuleEngineQueueConfiguration> consumerConfigurations = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TbRuleEngineConsumerStats> consumerStats = new ConcurrentHashMap<>();
@ -90,13 +96,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
TbRuleEngineSubmitStrategyFactory submitStrategyFactory,
TbQueueRuleEngineSettings ruleEngineSettings,
TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
ActorSystemContext actorContext, DataDecodingEncodingService encodingService) {
ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
TbRuleEngineDeviceRpcService tbDeviceRpcService) {
super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
this.statisticsService = statisticsService;
this.ruleEngineSettings = ruleEngineSettings;
this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
this.submitStrategyFactory = submitStrategyFactory;
this.processingStrategyFactory = processingStrategyFactory;
this.tbDeviceRpcService = tbDeviceRpcService;
}
@PostConstruct
@ -227,7 +235,15 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
actorContext.tell(actorMsg.get(), ActorRef.noSender());
}
callback.onSuccess();
} else if (nfMsg.hasFromDeviceRpcResponse()) {
TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse();
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB())
, proto.getResponse(), error);
tbDeviceRpcService.processRpcResponseFromDevice(response);
callback.onSuccess();
} else {
log.trace("Received notification with missing handler");
callback.onSuccess();
}
}

23
common/transport/coap/src/main/java/org/thingsboard/server/transport/coap/CoapTransportResource.java

@ -37,6 +37,7 @@ import org.thingsboard.server.gen.transport.TransportProtos;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -55,6 +56,8 @@ public class CoapTransportResource extends CoapResource {
private final Field observerField;
private final long timeout;
private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionIdMap = new ConcurrentHashMap<>();
private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet();
private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet();
public CoapTransportResource(CoapTransportContext context, String name) {
super(name);
@ -149,11 +152,13 @@ public class CoapTransportResource extends CoapResource {
transportService.process(sessionInfo,
transportContext.getAdaptor().convertToPostAttributes(sessionId, request),
new CoapOkCallback(exchange));
reportActivity(sessionId, sessionInfo);
break;
case POST_TELEMETRY_REQUEST:
transportService.process(sessionInfo,
transportContext.getAdaptor().convertToPostTelemetry(sessionId, request),
new CoapOkCallback(exchange));
reportActivity(sessionId, sessionInfo);
break;
case CLAIM_REQUEST:
transportService.process(sessionInfo,
@ -161,6 +166,7 @@ public class CoapTransportResource extends CoapResource {
new CoapOkCallback(exchange));
break;
case SUBSCRIBE_ATTRIBUTES_REQUEST:
attributeSubscriptions.add(sessionId);
advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced),
registerAsyncCoapSession(exchange, request, sessionInfo, sessionId)));
transportService.process(sessionInfo,
@ -168,6 +174,7 @@ public class CoapTransportResource extends CoapResource {
new CoapNoOpCallback(exchange));
break;
case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
attributeSubscriptions.remove(sessionId);
TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(request);
if (attrSession != null) {
transportService.process(attrSession,
@ -177,6 +184,7 @@ public class CoapTransportResource extends CoapResource {
}
break;
case SUBSCRIBE_RPC_COMMANDS_REQUEST:
rpcSubscriptions.add(sessionId);
advanced.setObserver(new CoapExchangeObserverProxy((ExchangeObserver) observerField.get(advanced),
registerAsyncCoapSession(exchange, request, sessionInfo, sessionId)));
transportService.process(sessionInfo,
@ -184,13 +192,13 @@ public class CoapTransportResource extends CoapResource {
new CoapNoOpCallback(exchange));
break;
case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
rpcSubscriptions.remove(sessionId);
TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(request);
if (rpcSession != null) {
transportService.process(rpcSession,
TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(),
new CoapOkCallback(exchange));
transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
transportService.deregisterSession(rpcSession);
closeAndDeregister(sessionInfo);
}
break;
case TO_DEVICE_RPC_RESPONSE:
@ -221,6 +229,14 @@ public class CoapTransportResource extends CoapResource {
}));
}
private void reportActivity(UUID sessionId, TransportProtos.SessionInfoProto sessionInfo) {
transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(attributeSubscriptions.contains(sessionId))
.setRpcSubscription(rpcSubscriptions.contains(sessionId))
.setLastActivityTime(System.currentTimeMillis())
.build(), TransportServiceCallback.EMPTY);
}
private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(Request request) {
String token = request.getSource().getHostAddress() + ":" + request.getSourcePort() + ":" + request.getTokenString();
return tokenToSessionIdMap.remove(token);
@ -438,6 +454,9 @@ public class CoapTransportResource extends CoapResource {
private void closeAndDeregister(TransportProtos.SessionInfoProto session) {
transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null);
transportService.deregisterSession(session);
UUID sessionId = new UUID(session.getSessionIdMSB(), session.getSessionIdLSB());
rpcSubscriptions.remove(sessionId);
attributeSubscriptions.remove(sessionId);
}
}

13
common/transport/http/src/main/java/org/thingsboard/server/transport/http/DeviceApiController.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.transport.TransportContext;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg;
@ -102,6 +103,7 @@ public class DeviceApiController {
TransportService transportService = transportContext.getTransportService();
transportService.process(sessionInfo, JsonConverter.convertToAttributesProto(new JsonParser().parse(json)),
new HttpOkCallback(responseWriter));
reportActivity(sessionInfo);
}));
return responseWriter;
}
@ -115,6 +117,7 @@ public class DeviceApiController {
TransportService transportService = transportContext.getTransportService();
transportService.process(sessionInfo, JsonConverter.convertToTelemetryProto(new JsonParser().parse(json)),
new HttpOkCallback(responseWriter));
reportActivity(sessionInfo);
}));
return responseWriter;
}
@ -274,7 +277,6 @@ public class DeviceApiController {
}
}
private static class HttpSessionListener implements SessionMsgListener {
private final DeferredResult<ResponseEntity> responseWriter;
@ -308,4 +310,13 @@ public class DeviceApiController {
responseWriter.setResult(new ResponseEntity<>(JsonConverter.toJson(msg).toString(), HttpStatus.OK));
}
}
private void reportActivity(SessionInfoProto sessionInfo) {
transportContext.getTransportService().process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder()
.setAttributeSubscription(false)
.setRpcSubscription(false)
.setLastActivityTime(System.currentTimeMillis())
.build(), TransportServiceCallback.EMPTY);
}
}

13
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportServiceCallback.java

@ -20,7 +20,20 @@ package org.thingsboard.server.common.transport;
*/
public interface TransportServiceCallback<T> {
TransportServiceCallback<Void> EMPTY = new TransportServiceCallback<Void>() {
@Override
public void onSuccess(Void msg) {
}
@Override
public void onError(Throwable e) {
}
};
void onSuccess(T msg);
void onError(Throwable e);
}

Loading…
Cancel
Save