|
|
|
@ -20,7 +20,6 @@ import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' |
|
|
|
import { IQueue } from './queue.models'; |
|
|
|
import amqp, { ConfirmChannel, Connection } from 'amqplib'; |
|
|
|
import { Options, Replies } from 'amqplib/properties'; |
|
|
|
import { sleep } from '../api/utils'; |
|
|
|
|
|
|
|
export class RabbitMqTemplate implements IQueue { |
|
|
|
|
|
|
|
@ -32,7 +31,6 @@ export class RabbitMqTemplate implements IQueue { |
|
|
|
private username = config.get('rabbitmq.username'); |
|
|
|
private password = config.get('rabbitmq.password'); |
|
|
|
private queueProperties: string = config.get('rabbitmq.queue_properties'); |
|
|
|
private pollInterval = Number(config.get('js.response_poll_interval')); |
|
|
|
|
|
|
|
private queueOptions: Options.AssertQueue = { |
|
|
|
durable: false, |
|
|
|
@ -41,7 +39,6 @@ export class RabbitMqTemplate implements IQueue { |
|
|
|
}; |
|
|
|
private connection: Connection; |
|
|
|
private channel: ConfirmChannel; |
|
|
|
private stopped = false; |
|
|
|
private topics: string[] = []; |
|
|
|
|
|
|
|
name = 'RabbitMQ'; |
|
|
|
@ -60,20 +57,12 @@ export class RabbitMqTemplate implements IQueue { |
|
|
|
|
|
|
|
const messageProcessor = new JsInvokeMessageProcessor(this); |
|
|
|
|
|
|
|
while (!this.stopped) { |
|
|
|
let pollStartTs = new Date().getTime(); |
|
|
|
let message = await this.channel.get(this.requestTopic); |
|
|
|
|
|
|
|
await this.channel.consume(this.requestTopic, (message) => { |
|
|
|
if (message) { |
|
|
|
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); |
|
|
|
this.channel.ack(message); |
|
|
|
} else { |
|
|
|
let pollDuration = new Date().getTime() - pollStartTs; |
|
|
|
if (pollDuration < this.pollInterval) { |
|
|
|
await sleep(this.pollInterval - pollDuration); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { |
|
|
|
|