@ -15,11 +15,14 @@
* /
package org.thingsboard.server.transport.snmp ;
import jakarta.annotation.PreDestroy ;
import lombok.Getter ;
import lombok.RequiredArgsConstructor ;
import lombok.extern.slf4j.Slf4j ;
import org.springframework.beans.factory.annotation.Value ;
import org.springframework.context.event.EventListener ;
import org.springframework.stereotype.Component ;
import org.thingsboard.common.util.ThingsBoardThreadFactory ;
import org.thingsboard.server.common.data.Device ;
import org.thingsboard.server.common.data.DeviceProfile ;
import org.thingsboard.server.common.data.DeviceTransportType ;
@ -53,9 +56,12 @@ import java.util.LinkedList;
import java.util.List ;
import java.util.Map ;
import java.util.Optional ;
import java.util.Set ;
import java.util.UUID ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.ConcurrentLinkedDeque ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.TimeUnit ;
@TbSnmpTransportComponent
@Component
@ -72,30 +78,52 @@ public class SnmpTransportContext extends TransportContext {
private final SnmpAuthService snmpAuthService ;
private final Map < DeviceId , DeviceSessionContext > sessions = new ConcurrentHashMap < > ( ) ;
private final Collection < DeviceId > allSnmpDevicesIds = new ConcurrentLinkedDeque < > ( ) ;
private final Set < DeviceId > allSnmpDevicesIds = ConcurrentHashMap . newKeySet ( ) ;
private final ExecutorService snmpExecutor = Executors . newSingleThreadExecutor ( ThingsBoardThreadFactory . forName ( "snmp-bootstrap" ) ) ;
@Value ( "${transport.snmp.batch_retries}" )
private int snmpBootstrapBatchRetries ;
@AfterStartUp ( order = AfterStartUp . AFTER_TRANSPORT_SERVICE )
public void fetchDevicesAndEstablishSessions ( ) {
log . info ( "Initializing SNMP devices sessions" ) ;
snmpExecutor . execute ( this : : bootstrapWithRetries ) ;
}
private void bootstrapWithRetries ( ) {
log . info ( "Initializing SNMP devices sessions" ) ;
int batchIndex = 0 ;
int batchSize = 512 ;
boolean nextBatchExists = true ;
while ( nextBatchExists ) {
TransportProtos . GetSnmpDevicesResponseMsg snmpDevicesResponse = protoEntityService . getSnmpDevicesIds ( batchIndex , batchSize ) ;
snmpDevicesResponse . getIdsList ( ) . stream ( )
. map ( id - > new DeviceId ( UUID . fromString ( id ) ) )
. peek ( allSnmpDevicesIds : : add )
. filter ( deviceId - > balancingService . isManagedByCurrentTransport ( deviceId . getId ( ) ) )
. map ( protoEntityService : : getDeviceById )
. forEach ( device - > getExecutor ( ) . execute ( ( ) - > establishDeviceSession ( device ) ) ) ;
nextBatchExists = snmpDevicesResponse . getHasNextPage ( ) ;
batchIndex + + ;
for ( int attempt = 1 ; attempt < = snmpBootstrapBatchRetries ; attempt + + ) {
try {
TransportProtos . GetSnmpDevicesResponseMsg snmpDevicesResponse = protoEntityService . getSnmpDevicesIds ( batchIndex , batchSize ) ;
snmpDevicesResponse . getIdsList ( ) . stream ( )
. map ( id - > new DeviceId ( UUID . fromString ( id ) ) )
. peek ( allSnmpDevicesIds : : add )
. filter ( deviceId - > balancingService . isManagedByCurrentTransport ( deviceId . getId ( ) ) )
. map ( protoEntityService : : getDeviceById )
. forEach ( device - > getExecutor ( ) . execute ( ( ) - > establishDeviceSession ( device ) ) ) ;
nextBatchExists = snmpDevicesResponse . getHasNextPage ( ) ;
batchIndex + + ;
break ;
} catch ( Exception e ) {
if ( attempt > = snmpBootstrapBatchRetries ) {
log . error ( "SNMP bootstrap: batch {} failed after {} attempts." , batchIndex , attempt , e ) ;
return ;
}
log . warn ( "SNMP bootstrap: batch {} attempt {}/{} failed." , batchIndex , attempt , snmpBootstrapBatchRetries , e ) ;
try {
TimeUnit . SECONDS . sleep ( 10 ) ;
} catch ( InterruptedException ex ) {
log . warn ( "SNMP bootstrap interrupted. Stopping bootstrap task." ) ;
return ;
}
}
}
}
log . debug ( "Found all SNMP devices ids: {}" , allSnmpDevicesIds ) ;
log . debug ( "Found SNMP devices ids: {}" , allSnmpDevicesIds ) ;
}
private void establishDeviceSession ( Device device ) {
@ -300,4 +328,9 @@ public class SnmpTransportContext extends TransportContext {
return sessions . values ( ) ;
}
@PreDestroy
public void destroy ( ) {
snmpExecutor . shutdown ( ) ;
}
}