|
|
|
@ -28,6 +28,7 @@ import org.eclipse.californium.core.observe.ObserveRelation; |
|
|
|
import org.eclipse.californium.core.server.resources.CoapExchange; |
|
|
|
import org.eclipse.californium.core.server.resources.Resource; |
|
|
|
import org.eclipse.californium.core.server.resources.ResourceObserver; |
|
|
|
import org.springframework.util.CollectionUtils; |
|
|
|
import org.thingsboard.server.coapserver.CoapServerService; |
|
|
|
import org.thingsboard.server.coapserver.TbCoapDtlsSessionInfo; |
|
|
|
import org.thingsboard.server.common.data.DataConstants; |
|
|
|
@ -55,11 +56,14 @@ import org.thingsboard.server.gen.transport.TransportProtos; |
|
|
|
import org.thingsboard.server.transport.coap.adaptors.CoapTransportAdaptor; |
|
|
|
|
|
|
|
import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
import java.util.Optional; |
|
|
|
import java.util.Random; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.UUID; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
|
|
|
|
@Slf4j |
|
|
|
@ -72,25 +76,30 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
private static final int REQUEST_ID_POSITION_CERTIFICATE_REQUEST = 4; |
|
|
|
private static final String DTLS_SESSION_ID_KEY = "DTLS_SESSION_ID"; |
|
|
|
|
|
|
|
private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionIdMap = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<String, AtomicInteger> tokenToNotificationCounterMap = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<String, TransportProtos.SessionInfoProto> tokenToSessionInfoMap = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<String, AtomicInteger> tokenToObserveNotificationSeqMap = new ConcurrentHashMap<>(); |
|
|
|
private final ConcurrentMap<TransportProtos.SessionInfoProto, ObserveRelation> sessionInfoToObserveRelationMap = new ConcurrentHashMap<>(); |
|
|
|
private final Set<UUID> rpcSubscriptions = ConcurrentHashMap.newKeySet(); |
|
|
|
private final Set<UUID> attributeSubscriptions = ConcurrentHashMap.newKeySet(); |
|
|
|
|
|
|
|
private ConcurrentMap<String, TbCoapDtlsSessionInfo> dtlsSessionIdMap; |
|
|
|
private long timeout; |
|
|
|
private long sessionReportTimeout; |
|
|
|
|
|
|
|
public CoapTransportResource(CoapTransportContext coapTransportContext, CoapServerService coapServerService, String name) { |
|
|
|
super(coapTransportContext, name); |
|
|
|
public CoapTransportResource(CoapTransportContext ctx, CoapServerService coapServerService, String name) { |
|
|
|
super(ctx, name); |
|
|
|
this.setObservable(true); // enable observing
|
|
|
|
this.addObserver(new CoapResourceObserver()); |
|
|
|
this.dtlsSessionIdMap = coapServerService.getDtlsSessionsMap(); |
|
|
|
this.timeout = coapServerService.getTimeout(); |
|
|
|
// this.setObservable(false); // disable observing
|
|
|
|
// this.setObserveType(CoAP.Type.CON); // configure the notification type to CONs
|
|
|
|
// this.getAttributes().setObservable(); // mark observable in the Link-Format
|
|
|
|
this.sessionReportTimeout = ctx.getSessionReportTimeout(); |
|
|
|
ctx.getScheduler().scheduleAtFixedRate(() -> { |
|
|
|
Set<TransportProtos.SessionInfoProto> observeSessions = sessionInfoToObserveRelationMap.keySet(); |
|
|
|
observeSessions.forEach(this::reportActivity); |
|
|
|
}, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void checkObserveRelation(Exchange exchange, Response response) { |
|
|
|
String token = getTokenFromRequest(exchange.getRequest()); |
|
|
|
final ObserveRelation relation = exchange.getRelation(); |
|
|
|
@ -103,11 +112,20 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
relation.setEstablished(); |
|
|
|
addObserveRelation(relation); |
|
|
|
} |
|
|
|
AtomicInteger notificationCounter = tokenToNotificationCounterMap.computeIfAbsent(token, s -> new AtomicInteger(0)); |
|
|
|
AtomicInteger notificationCounter = tokenToObserveNotificationSeqMap.computeIfAbsent(token, s -> new AtomicInteger(0)); |
|
|
|
response.getOptions().setObserve(notificationCounter.getAndIncrement()); |
|
|
|
} // ObserveLayer takes care of the else case
|
|
|
|
} |
|
|
|
|
|
|
|
public void clearAndNotifyObserveRelation(ObserveRelation relation, CoAP.ResponseCode code) { |
|
|
|
relation.cancel(); |
|
|
|
relation.getExchange().sendResponse(new Response(code)); |
|
|
|
} |
|
|
|
|
|
|
|
public Map<TransportProtos.SessionInfoProto, ObserveRelation> getSessionInfoToObserveRelationMap() { |
|
|
|
return sessionInfoToObserveRelationMap; |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
protected void processHandleGet(CoapExchange exchange) { |
|
|
|
Optional<FeatureType> featureType = getFeatureType(exchange.advanced().getRequest()); |
|
|
|
@ -239,7 +257,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
} |
|
|
|
|
|
|
|
private void processRequest(CoapExchange exchange, SessionMsgType type, Request request, TransportProtos.SessionInfoProto sessionInfo, DeviceProfile deviceProfile) { |
|
|
|
UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); |
|
|
|
UUID sessionId = toSessionId(sessionInfo); |
|
|
|
try { |
|
|
|
TransportConfigurationContainer transportConfigurationContainer = getTransportConfigurationContainer(deviceProfile); |
|
|
|
CoapTransportAdaptor coapTransportAdaptor = getCoapTransportAdaptor(transportConfigurationContainer.isJsonPayload()); |
|
|
|
@ -249,14 +267,14 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
coapTransportAdaptor.convertToPostAttributes(sessionId, request, |
|
|
|
transportConfigurationContainer.getAttributesMsgDescriptor()), |
|
|
|
new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); |
|
|
|
reportActivity(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId)); |
|
|
|
reportSubscriptionInfo(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId)); |
|
|
|
break; |
|
|
|
case POST_TELEMETRY_REQUEST: |
|
|
|
transportService.process(sessionInfo, |
|
|
|
coapTransportAdaptor.convertToPostTelemetry(sessionId, request, |
|
|
|
transportConfigurationContainer.getTelemetryMsgDescriptor()), |
|
|
|
new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); |
|
|
|
reportActivity(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId)); |
|
|
|
reportSubscriptionInfo(sessionInfo, attributeSubscriptions.contains(sessionId), rpcSubscriptions.contains(sessionId)); |
|
|
|
break; |
|
|
|
case CLAIM_REQUEST: |
|
|
|
transportService.process(sessionInfo, |
|
|
|
@ -264,49 +282,52 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
new CoapOkCallback(exchange, CoAP.ResponseCode.CREATED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); |
|
|
|
break; |
|
|
|
case SUBSCRIBE_ATTRIBUTES_REQUEST: |
|
|
|
TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionIdMap.get(getTokenFromRequest(request)); |
|
|
|
TransportProtos.SessionInfoProto currentAttrSession = tokenToSessionInfoMap.get(getTokenFromRequest(request)); |
|
|
|
if (currentAttrSession == null) { |
|
|
|
attributeSubscriptions.add(sessionId); |
|
|
|
registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, |
|
|
|
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); |
|
|
|
transportService.process(sessionInfo, |
|
|
|
TransportProtos.SubscribeToAttributeUpdatesMsg.getDefaultInstance(), new CoapNoOpCallback(exchange)); |
|
|
|
transportService.process(sessionInfo, |
|
|
|
TransportProtos.GetAttributeRequestMsg.newBuilder().setOnlyShared(true).build(), |
|
|
|
new CoapNoOpCallback(exchange)); |
|
|
|
} |
|
|
|
break; |
|
|
|
case UNSUBSCRIBE_ATTRIBUTES_REQUEST: |
|
|
|
TransportProtos.SessionInfoProto attrSession = lookupAsyncSessionInfo(getTokenFromRequest(request)); |
|
|
|
if (attrSession != null) { |
|
|
|
UUID attrSessionId = new UUID(attrSession.getSessionIdMSB(), attrSession.getSessionIdLSB()); |
|
|
|
UUID attrSessionId = toSessionId(attrSession); |
|
|
|
attributeSubscriptions.remove(attrSessionId); |
|
|
|
sessionInfoToObserveRelationMap.remove(attrSession); |
|
|
|
transportService.process(attrSession, |
|
|
|
TransportProtos.SubscribeToAttributeUpdatesMsg.newBuilder().setUnsubscribe(true).build(), |
|
|
|
new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); |
|
|
|
closeAndDeregister(sessionInfo, sessionId); |
|
|
|
closeAndDeregister(sessionInfo); |
|
|
|
} |
|
|
|
break; |
|
|
|
case SUBSCRIBE_RPC_COMMANDS_REQUEST: |
|
|
|
TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionIdMap.get(getTokenFromRequest(request)); |
|
|
|
TransportProtos.SessionInfoProto currentRpcSession = tokenToSessionInfoMap.get(getTokenFromRequest(request)); |
|
|
|
if (currentRpcSession == null) { |
|
|
|
rpcSubscriptions.add(sessionId); |
|
|
|
registerAsyncCoapSession(exchange, sessionInfo, coapTransportAdaptor, |
|
|
|
transportConfigurationContainer.getRpcRequestDynamicMessageBuilder(), getTokenFromRequest(request)); |
|
|
|
transportService.process(sessionInfo, |
|
|
|
TransportProtos.SubscribeToRPCMsg.getDefaultInstance(), |
|
|
|
new CoapNoOpCallback(exchange)); |
|
|
|
} else { |
|
|
|
UUID rpcSessionId = new UUID(currentRpcSession.getSessionIdMSB(), currentRpcSession.getSessionIdLSB()); |
|
|
|
reportActivity(currentRpcSession, attributeSubscriptions.contains(rpcSessionId), rpcSubscriptions.contains(rpcSessionId)); |
|
|
|
new CoapOkCallback(exchange, CoAP.ResponseCode.VALID, CoAP.ResponseCode.INTERNAL_SERVER_ERROR) |
|
|
|
); |
|
|
|
} |
|
|
|
break; |
|
|
|
case UNSUBSCRIBE_RPC_COMMANDS_REQUEST: |
|
|
|
TransportProtos.SessionInfoProto rpcSession = lookupAsyncSessionInfo(getTokenFromRequest(request)); |
|
|
|
if (rpcSession != null) { |
|
|
|
UUID rpcSessionId = new UUID(rpcSession.getSessionIdMSB(), rpcSession.getSessionIdLSB()); |
|
|
|
UUID rpcSessionId = toSessionId(rpcSession); |
|
|
|
rpcSubscriptions.remove(rpcSessionId); |
|
|
|
sessionInfoToObserveRelationMap.remove(rpcSession); |
|
|
|
transportService.process(rpcSession, |
|
|
|
TransportProtos.SubscribeToRPCMsg.newBuilder().setUnsubscribe(true).build(), |
|
|
|
new CoapOkCallback(exchange, CoAP.ResponseCode.DELETED, CoAP.ResponseCode.INTERNAL_SERVER_ERROR)); |
|
|
|
closeAndDeregister(sessionInfo, sessionId); |
|
|
|
closeAndDeregister(sessionInfo); |
|
|
|
} |
|
|
|
break; |
|
|
|
case TO_DEVICE_RPC_RESPONSE: |
|
|
|
@ -341,6 +362,10 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private UUID toSessionId(TransportProtos.SessionInfoProto sessionInfoProto) { |
|
|
|
return new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); |
|
|
|
} |
|
|
|
|
|
|
|
private void getFirmwareCallback(TransportProtos.SessionInfoProto sessionInfo, CoapExchange exchange, FirmwareType firmwareType) { |
|
|
|
TransportProtos.GetFirmwareRequestMsg requestMsg = TransportProtos.GetFirmwareRequestMsg.newBuilder() |
|
|
|
.setTenantIdMSB(sessionInfo.getTenantIdMSB()) |
|
|
|
@ -352,18 +377,18 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
} |
|
|
|
|
|
|
|
private TransportProtos.SessionInfoProto lookupAsyncSessionInfo(String token) { |
|
|
|
tokenToNotificationCounterMap.remove(token); |
|
|
|
return tokenToSessionIdMap.remove(token); |
|
|
|
tokenToObserveNotificationSeqMap.remove(token); |
|
|
|
return tokenToSessionInfoMap.remove(token); |
|
|
|
} |
|
|
|
|
|
|
|
private void registerAsyncCoapSession(CoapExchange exchange, TransportProtos.SessionInfoProto sessionInfo, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder, String token) { |
|
|
|
tokenToSessionIdMap.putIfAbsent(token, sessionInfo); |
|
|
|
tokenToSessionInfoMap.putIfAbsent(token, sessionInfo); |
|
|
|
transportService.registerAsyncSession(sessionInfo, getCoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder)); |
|
|
|
transportService.process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.OPEN), null); |
|
|
|
} |
|
|
|
|
|
|
|
private CoapSessionListener getCoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { |
|
|
|
return new CoapSessionListener(exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder); |
|
|
|
return new CoapSessionListener(this, exchange, coapTransportAdaptor, rpcRequestDynamicMessageBuilder); |
|
|
|
} |
|
|
|
|
|
|
|
private String getTokenFromRequest(Request request) { |
|
|
|
@ -481,11 +506,13 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
|
|
|
|
private static class CoapSessionListener implements SessionMsgListener { |
|
|
|
|
|
|
|
private final CoapTransportResource coapTransportResource; |
|
|
|
private final CoapExchange exchange; |
|
|
|
private final CoapTransportAdaptor coapTransportAdaptor; |
|
|
|
private final DynamicMessage.Builder rpcRequestDynamicMessageBuilder; |
|
|
|
|
|
|
|
CoapSessionListener(CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { |
|
|
|
CoapSessionListener(CoapTransportResource coapTransportResource, CoapExchange exchange, CoapTransportAdaptor coapTransportAdaptor, DynamicMessage.Builder rpcRequestDynamicMessageBuilder) { |
|
|
|
this.coapTransportResource = coapTransportResource; |
|
|
|
this.exchange = exchange; |
|
|
|
this.coapTransportAdaptor = coapTransportAdaptor; |
|
|
|
this.rpcRequestDynamicMessageBuilder = rpcRequestDynamicMessageBuilder; |
|
|
|
@ -494,7 +521,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
@Override |
|
|
|
public void onGetAttributesResponse(TransportProtos.GetAttributeResponseMsg msg) { |
|
|
|
try { |
|
|
|
exchange.respond(coapTransportAdaptor.convertToPublish(msg)); |
|
|
|
exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg)); |
|
|
|
} catch (AdaptorException e) { |
|
|
|
log.trace("Failed to reply due to error", e); |
|
|
|
exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); |
|
|
|
@ -512,8 +539,21 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { |
|
|
|
exchange.respond(CoAP.ResponseCode.SERVICE_UNAVAILABLE); |
|
|
|
public void onRemoteSessionCloseCommand(UUID sessionId, TransportProtos.SessionCloseNotificationProto sessionCloseNotification) { |
|
|
|
log.trace("[{}] Received the remote command to close the session: {}", sessionId, sessionCloseNotification.getMessage()); |
|
|
|
Map<TransportProtos.SessionInfoProto, ObserveRelation> sessionToObserveRelationMap = coapTransportResource.getSessionInfoToObserveRelationMap(); |
|
|
|
if (coapTransportResource.getObserverCount() > 0 && !CollectionUtils.isEmpty(sessionToObserveRelationMap)) { |
|
|
|
Set<TransportProtos.SessionInfoProto> observeSessions = sessionToObserveRelationMap.keySet(); |
|
|
|
Optional<TransportProtos.SessionInfoProto> observeSessionToClose = observeSessions.stream().filter(sessionInfoProto -> { |
|
|
|
UUID observeSessionId = new UUID(sessionInfoProto.getSessionIdMSB(), sessionInfoProto.getSessionIdLSB()); |
|
|
|
return observeSessionId.equals(sessionId); |
|
|
|
}).findFirst(); |
|
|
|
if (observeSessionToClose.isPresent()) { |
|
|
|
TransportProtos.SessionInfoProto sessionInfoProto = observeSessionToClose.get(); |
|
|
|
ObserveRelation observeRelation = sessionToObserveRelationMap.get(sessionInfoProto); |
|
|
|
coapTransportResource.clearAndNotifyObserveRelation(observeRelation, CoAP.ResponseCode.SERVICE_UNAVAILABLE); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
@ -529,7 +569,7 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
@Override |
|
|
|
public void onToServerRpcResponse(TransportProtos.ToServerRpcResponseMsg msg) { |
|
|
|
try { |
|
|
|
exchange.respond(coapTransportAdaptor.convertToPublish(msg)); |
|
|
|
exchange.respond(coapTransportAdaptor.convertToPublish(isConRequest(), msg)); |
|
|
|
} catch (AdaptorException e) { |
|
|
|
log.trace("Failed to reply due to error", e); |
|
|
|
exchange.respond(CoAP.ResponseCode.INTERNAL_SERVER_ERROR); |
|
|
|
@ -561,29 +601,29 @@ public class CoapTransportResource extends AbstractCoapTransportResource { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void addedObserveRelation(ObserveRelation relation) { |
|
|
|
if (log.isTraceEnabled()) { |
|
|
|
Request request = relation.getExchange().getRequest(); |
|
|
|
log.trace("Added Observe relation for token: {}", getTokenFromRequest(request)); |
|
|
|
} |
|
|
|
Request request = relation.getExchange().getRequest(); |
|
|
|
String token = getTokenFromRequest(request); |
|
|
|
sessionInfoToObserveRelationMap.putIfAbsent(tokenToSessionInfoMap.get(token), relation); |
|
|
|
log.trace("Added Observe relation for token: {}", token); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void removedObserveRelation(ObserveRelation relation) { |
|
|
|
Request request = relation.getExchange().getRequest(); |
|
|
|
String tokenFromRequest = getTokenFromRequest(request); |
|
|
|
log.trace("Relation removed for token: {}", tokenFromRequest); |
|
|
|
TransportProtos.SessionInfoProto sessionInfoToRemove = lookupAsyncSessionInfo(tokenFromRequest); |
|
|
|
if (sessionInfoToRemove != null) { |
|
|
|
closeAndDeregister(sessionInfoToRemove, new UUID(sessionInfoToRemove.getSessionIdMSB(), sessionInfoToRemove.getDeviceIdLSB())); |
|
|
|
} |
|
|
|
String token = getTokenFromRequest(request); |
|
|
|
TransportProtos.SessionInfoProto session = tokenToSessionInfoMap.get(token); |
|
|
|
sessionInfoToObserveRelationMap.remove(session); |
|
|
|
log.trace("Relation removed for token: {}", token); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private void closeAndDeregister(TransportProtos.SessionInfoProto session, UUID sessionId) { |
|
|
|
private void closeAndDeregister(TransportProtos.SessionInfoProto session) { |
|
|
|
UUID sessionId = toSessionId(session); |
|
|
|
transportService.process(session, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); |
|
|
|
transportService.deregisterSession(session); |
|
|
|
rpcSubscriptions.remove(sessionId); |
|
|
|
attributeSubscriptions.remove(sessionId); |
|
|
|
sessionInfoToObserveRelationMap.remove(session); |
|
|
|
} |
|
|
|
|
|
|
|
private TransportConfigurationContainer getTransportConfigurationContainer(DeviceProfile deviceProfile) throws AdaptorException { |
|
|
|
|