|
|
|
@ -38,7 +38,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs |
|
|
|
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; |
|
|
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg; |
|
|
|
import org.thingsboard.server.queue.provider.TbCoreQueueProvider; |
|
|
|
import org.thingsboard.server.queue.util.TbMonolithOrCoreComponent; |
|
|
|
import org.thingsboard.server.queue.util.TbCoreComponent; |
|
|
|
import org.thingsboard.server.service.encoding.DataDecodingEncodingService; |
|
|
|
import org.thingsboard.server.service.queue.processing.AbstractConsumerService; |
|
|
|
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; |
|
|
|
@ -56,13 +56,12 @@ import java.util.UUID; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
import java.util.concurrent.CountDownLatch; |
|
|
|
import java.util.concurrent.ExecutorService; |
|
|
|
import java.util.concurrent.TimeUnit; |
|
|
|
import java.util.function.Function; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
@Service |
|
|
|
@TbMonolithOrCoreComponent |
|
|
|
@TbCoreComponent |
|
|
|
@Slf4j |
|
|
|
public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCoreMsg, ToCoreNotificationMsg> implements TbCoreConsumerService { |
|
|
|
|
|
|
|
@ -78,7 +77,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore |
|
|
|
private final SubscriptionManagerService subscriptionManagerService; |
|
|
|
private final TbCoreDeviceRpcService tbCoreDeviceRpcService; |
|
|
|
private final TbCoreConsumerStats stats = new TbCoreConsumerStats(); |
|
|
|
private volatile boolean stopped = false; |
|
|
|
|
|
|
|
public DefaultTbCoreConsumerService(TbCoreQueueProvider tbCoreQueueProvider, ActorSystemContext actorContext, |
|
|
|
DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService, |
|
|
|
@ -99,7 +97,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore |
|
|
|
|
|
|
|
@Override |
|
|
|
protected void launchMainConsumer() { |
|
|
|
log.info("Launching main consumer"); |
|
|
|
mainConsumerExecutor.execute(() -> { |
|
|
|
while (!stopped) { |
|
|
|
try { |
|
|
|
@ -144,15 +141,17 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore |
|
|
|
} |
|
|
|
mainConsumer.commit(); |
|
|
|
} catch (Exception e) { |
|
|
|
log.warn("Failed to obtain messages from queue.", e); |
|
|
|
try { |
|
|
|
Thread.sleep(pollDuration); |
|
|
|
} catch (InterruptedException e2) { |
|
|
|
log.trace("Failed to wait until the server has capacity to handle new requests", e2); |
|
|
|
if (!stopped) { |
|
|
|
log.warn("Failed to obtain messages from queue.", e); |
|
|
|
try { |
|
|
|
Thread.sleep(pollDuration); |
|
|
|
} catch (InterruptedException e2) { |
|
|
|
log.trace("Failed to wait until the server has capacity to handle new requests", e2); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
log.info("Tb Core Consumer stopped."); |
|
|
|
log.info("TB Core Consumer stopped."); |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
|