@ -45,6 +45,7 @@ import org.thingsboard.server.queue.settings.TbQueueRemoteJsInvokeSettings;
import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings ;
import org.thingsboard.server.queue.settings.TbQueueTransportApiSettings ;
import org.thingsboard.server.queue.settings.TbQueueTransportNotificationSettings ;
import org.thingsboard.server.queue.settings.TbQueueVersionControlSettings ;
import org.thingsboard.server.queue.sqs.TbAwsSqsAdmin ;
import org.thingsboard.server.queue.sqs.TbAwsSqsConsumerTemplate ;
import org.thingsboard.server.queue.sqs.TbAwsSqsProducerTemplate ;
@ -66,18 +67,22 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
private final TbServiceInfoProvider serviceInfoProvider ;
private final TbQueueRemoteJsInvokeSettings jsInvokeSettings ;
private final TbQueueTransportNotificationSettings transportNotificationSettings ;
private final TbQueueVersionControlSettings vcSettings ;
private final TbQueueAdmin coreAdmin ;
private final TbQueueAdmin ruleEngineAdmin ;
private final TbQueueAdmin jsExecutorAdmin ;
private final TbQueueAdmin transportApiAdmin ;
private final TbQueueAdmin notificationAdmin ;
private final TbQueueAdmin otaAdmin ;
private final TbQueueAdmin vcAdmin ;
public AwsSqsTbCoreQueueFactory ( TbAwsSqsSettings sqsSettings ,
TbQueueCoreSettings coreSettings ,
TbQueueTransportApiSettings transportApiSettings ,
TbQueueRuleEngineSettings ruleEngineSettings ,
NotificationsTopicService notificationsTopicService ,
TbQueueVersionControlSettings vcSettings ,
TbServiceInfoProvider serviceInfoProvider ,
TbQueueRemoteJsInvokeSettings jsInvokeSettings ,
TbAwsSqsQueueAttributes sqsQueueAttributes ,
@ -90,12 +95,15 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
this . serviceInfoProvider = serviceInfoProvider ;
this . jsInvokeSettings = jsInvokeSettings ;
this . transportNotificationSettings = transportNotificationSettings ;
this . vcSettings = vcSettings ;
this . coreAdmin = new TbAwsSqsAdmin ( sqsSettings , sqsQueueAttributes . getCoreAttributes ( ) ) ;
this . ruleEngineAdmin = new TbAwsSqsAdmin ( sqsSettings , sqsQueueAttributes . getRuleEngineAttributes ( ) ) ;
this . jsExecutorAdmin = new TbAwsSqsAdmin ( sqsSettings , sqsQueueAttributes . getJsExecutorAttributes ( ) ) ;
this . transportApiAdmin = new TbAwsSqsAdmin ( sqsSettings , sqsQueueAttributes . getTransportApiAttributes ( ) ) ;
this . notificationAdmin = new TbAwsSqsAdmin ( sqsSettings , sqsQueueAttributes . getNotificationsAttributes ( ) ) ;
this . otaAdmin = new TbAwsSqsAdmin ( sqsSettings , sqsQueueAttributes . getOtaAttributes ( ) ) ;
this . vcAdmin = new TbAwsSqsAdmin ( sqsSettings , sqsQueueAttributes . getVcAttributes ( ) ) ;
}
@Override
@ -183,19 +191,18 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
@Override
public TbQueueConsumer < TbProtoQueueMsg < ToOtaPackageStateServiceMsg > > createToOtaPackageStateServiceMsgConsumer ( ) {
return new TbAwsSqsConsumerTemplate < > ( core Admin, sqsSettings , coreSettings . getOtaPackageTopic ( ) ,
return new TbAwsSqsConsumerTemplate < > ( ota Admin, sqsSettings , coreSettings . getOtaPackageTopic ( ) ,
msg - > new TbProtoQueueMsg < > ( msg . getKey ( ) , ToOtaPackageStateServiceMsg . parseFrom ( msg . getData ( ) ) , msg . getHeaders ( ) ) ) ;
}
@Override
public TbQueueProducer < TbProtoQueueMsg < ToOtaPackageStateServiceMsg > > createToOtaPackageStateServiceMsgProducer ( ) {
return new TbAwsSqsProducerTemplate < > ( core Admin, sqsSettings , coreSettings . getOtaPackageTopic ( ) ) ;
return new TbAwsSqsProducerTemplate < > ( ota Admin, sqsSettings , coreSettings . getOtaPackageTopic ( ) ) ;
}
@Override
public TbQueueProducer < TbProtoQueueMsg < TransportProtos . ToVersionControlServiceMsg > > createVersionControlMsgProducer ( ) {
//TODO: version-control
return null ;
return new TbAwsSqsProducerTemplate < > ( vcAdmin , sqsSettings , vcSettings . getTopic ( ) ) ;
}
@PreDestroy
@ -215,5 +222,11 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
if ( notificationAdmin ! = null ) {
notificationAdmin . destroy ( ) ;
}
if ( otaAdmin ! = null ) {
otaAdmin . destroy ( ) ;
}
if ( vcAdmin ! = null ) {
vcAdmin . destroy ( ) ;
}
}
}