27 changed files with 2648 additions and 1733 deletions
@ -1,30 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
const config = require('config'), |
|||
logger = require('../config/logger')._logger('httpServer'), |
|||
express = require('express'); |
|||
|
|||
const httpPort = Number(config.get('http_port')); |
|||
|
|||
const app = express(); |
|||
|
|||
app.get('/livenessProbe', async (req, res) => { |
|||
const date = new Date(); |
|||
const message = { now: date.toISOString() }; |
|||
res.send(message); |
|||
}) |
|||
|
|||
app.listen(httpPort, () => logger.info(`Started http endpoint on port ${httpPort}. Please, use /livenessProbe !`)) |
|||
@ -0,0 +1,46 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import express from 'express'; |
|||
import { _logger} from '../config/logger'; |
|||
|
|||
export class HttpServer { |
|||
|
|||
private logger = _logger('httpServer'); |
|||
private app = express(); |
|||
private server; |
|||
|
|||
constructor(httpPort: number) { |
|||
this.app.get('/livenessProbe', async (req, res) => { |
|||
const message = { |
|||
now: new Date().toISOString() |
|||
}; |
|||
res.send(message); |
|||
}) |
|||
|
|||
this.server = this.app.listen(httpPort, () => { |
|||
this.logger.info('Started http endpoint on port %s. Please, use /livenessProbe !', httpPort); |
|||
}).on('error', (error) => { |
|||
this.logger.error(error); |
|||
}); |
|||
} |
|||
|
|||
stop() { |
|||
this.server.close(() => { |
|||
this.logger.info('Http server stop'); |
|||
}); |
|||
} |
|||
} |
|||
@ -1,90 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
'use strict'; |
|||
|
|||
const vm = require('vm'); |
|||
|
|||
function JsExecutor(useSandbox) { |
|||
this.useSandbox = useSandbox; |
|||
} |
|||
|
|||
JsExecutor.prototype.compileScript = function(code) { |
|||
if (this.useSandbox) { |
|||
return createScript(code); |
|||
} else { |
|||
return createFunction(code); |
|||
} |
|||
} |
|||
|
|||
JsExecutor.prototype.executeScript = function(script, args, timeout) { |
|||
if (this.useSandbox) { |
|||
return invokeScript(script, args, timeout); |
|||
} else { |
|||
return invokeFunction(script, args); |
|||
} |
|||
} |
|||
|
|||
function createScript(code) { |
|||
return new Promise((resolve, reject) => { |
|||
try { |
|||
code = "("+code+")(...args)"; |
|||
var script = new vm.Script(code); |
|||
resolve(script); |
|||
} catch (err) { |
|||
reject(err); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
function invokeScript(script, args, timeout) { |
|||
return new Promise((resolve, reject) => { |
|||
try { |
|||
var sandbox = Object.create(null); |
|||
sandbox.args = args; |
|||
var result = script.runInNewContext(sandbox, {timeout: timeout}); |
|||
resolve(result); |
|||
} catch (err) { |
|||
reject(err); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
|
|||
function createFunction(code) { |
|||
return new Promise((resolve, reject) => { |
|||
try { |
|||
code = "return ("+code+")(...args)"; |
|||
const parsingContext = vm.createContext({}); |
|||
const func = vm.compileFunction(code, ['args'], {parsingContext: parsingContext}); |
|||
resolve(func); |
|||
} catch (err) { |
|||
reject(err); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
function invokeFunction(func, args) { |
|||
return new Promise((resolve, reject) => { |
|||
try { |
|||
var result = func(args); |
|||
resolve(result); |
|||
} catch (err) { |
|||
reject(err); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
module.exports = JsExecutor; |
|||
@ -0,0 +1,69 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
|
|||
export interface TbMessage { |
|||
scriptIdMSB: string; |
|||
scriptIdLSB: string; |
|||
} |
|||
|
|||
export interface RemoteJsRequest { |
|||
compileRequest?: JsCompileRequest; |
|||
invokeRequest?: JsInvokeRequest; |
|||
releaseRequest?: JsReleaseRequest; |
|||
} |
|||
|
|||
export interface JsReleaseRequest extends TbMessage { |
|||
functionName: string; |
|||
} |
|||
|
|||
export interface JsInvokeRequest extends TbMessage { |
|||
functionName: string; |
|||
scriptBody: string; |
|||
timeout: number; |
|||
args: string[]; |
|||
} |
|||
|
|||
export interface JsCompileRequest extends TbMessage { |
|||
functionName: string; |
|||
scriptBody: string; |
|||
} |
|||
|
|||
|
|||
export interface JsReleaseResponse extends TbMessage { |
|||
success: boolean; |
|||
} |
|||
|
|||
export interface JsCompileResponse extends TbMessage { |
|||
success: boolean; |
|||
errorCode?: number; |
|||
errorDetails?: string; |
|||
} |
|||
|
|||
export interface JsInvokeResponse { |
|||
success: boolean; |
|||
result: string; |
|||
errorCode?: number; |
|||
errorDetails?: string; |
|||
} |
|||
|
|||
export interface RemoteJsResponse { |
|||
requestIdMSB: string; |
|||
requestIdLSB: string; |
|||
compileResponse?: JsCompileResponse; |
|||
invokeResponse?: JsInvokeResponse; |
|||
releaseResponse?: JsReleaseResponse; |
|||
} |
|||
@ -0,0 +1,93 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import vm, { Script } from 'vm'; |
|||
|
|||
export type TbScript = Script | Function; |
|||
|
|||
export class JsExecutor { |
|||
useSandbox: boolean; |
|||
|
|||
constructor(useSandbox: boolean) { |
|||
this.useSandbox = useSandbox; |
|||
} |
|||
|
|||
compileScript(code: string): Promise<TbScript> { |
|||
if (this.useSandbox) { |
|||
return this.createScript(code); |
|||
} else { |
|||
return this.createFunction(code); |
|||
} |
|||
} |
|||
|
|||
executeScript(script: TbScript, args: string[], timeout?: number): Promise<any> { |
|||
if (this.useSandbox) { |
|||
return this.invokeScript(script as Script, args, timeout); |
|||
} else { |
|||
return this.invokeFunction(script as Function, args); |
|||
} |
|||
} |
|||
|
|||
private createScript(code: string): Promise<Script> { |
|||
return new Promise((resolve, reject) => { |
|||
try { |
|||
code = "("+code+")(...args)"; |
|||
const script = new vm.Script(code); |
|||
resolve(script); |
|||
} catch (err) { |
|||
reject(err); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private invokeScript(script: Script, args: string[], timeout: number | undefined): Promise<any> { |
|||
return new Promise((resolve, reject) => { |
|||
try { |
|||
const sandbox = Object.create(null); |
|||
sandbox.args = args; |
|||
const result = script.runInNewContext(sandbox, {timeout: timeout}); |
|||
resolve(result); |
|||
} catch (err) { |
|||
reject(err); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
|
|||
private createFunction(code: string): Promise<Function> { |
|||
return new Promise((resolve, reject) => { |
|||
try { |
|||
code = "return ("+code+")(...args)"; |
|||
const parsingContext = vm.createContext({}); |
|||
const func = vm.compileFunction(code, ['args'], {parsingContext: parsingContext}); |
|||
resolve(func); |
|||
} catch (err) { |
|||
reject(err); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
private invokeFunction(func: Function, args: string[]): Promise<any> { |
|||
return new Promise((resolve, reject) => { |
|||
try { |
|||
const result = func(args); |
|||
resolve(result); |
|||
} catch (err) { |
|||
reject(err); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
@ -1,344 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
'use strict'; |
|||
|
|||
const COMPILATION_ERROR = 0; |
|||
const RUNTIME_ERROR = 1; |
|||
const TIMEOUT_ERROR = 2; |
|||
const UNRECOGNIZED = -1; |
|||
|
|||
const config = require('config'), |
|||
Long = require('long'), |
|||
logger = require('../config/logger')._logger('JsInvokeMessageProcessor'), |
|||
Utils = require('./utils'), |
|||
JsExecutor = require('./jsExecutor'); |
|||
|
|||
const statFrequency = Number(config.get('script.stat_print_frequency')); |
|||
const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency')); |
|||
const useSandbox = config.get('script.use_sandbox') === 'true'; |
|||
const maxActiveScripts = Number(config.get('script.max_active_scripts')); |
|||
const slowQueryLogMs = Number(config.get('script.slow_query_log_ms')); |
|||
const slowQueryLogBody = config.get('script.slow_query_log_body') === 'true'; |
|||
|
|||
const { performance } = require('node:perf_hooks'); |
|||
|
|||
function JsInvokeMessageProcessor(producer) { |
|||
this.producer = producer; |
|||
this.executor = new JsExecutor(useSandbox); |
|||
this.scriptMap = new Map(); |
|||
this.scriptIds = []; |
|||
this.executedScriptIdsCounter = []; |
|||
this.executedScriptsCounter = 0; |
|||
this.lastStatTime = performance.now(); |
|||
this.compilationTime = 0; |
|||
} |
|||
|
|||
JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) { |
|||
var tStart = performance.now(); |
|||
let requestId; |
|||
let responseTopic; |
|||
let expireTs; |
|||
let headers; |
|||
let request; |
|||
let buf; |
|||
try { |
|||
request = JSON.parse(Buffer.from(message.data).toString('utf8')); |
|||
headers = message.headers; |
|||
buf = Buffer.from(headers.data['requestId']); |
|||
requestId = Utils.UUIDFromBuffer(buf); |
|||
buf = Buffer.from(headers.data['responseTopic']); |
|||
responseTopic = buf.toString('utf8'); |
|||
buf = Buffer.from(headers.data['expireTs']); |
|||
expireTs = Long.fromBytes(buf, false, false).toNumber(); |
|||
|
|||
const now = Date.now(); |
|||
// if (logger.isDebugEnabled()) {
|
|||
// logger.debug('expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now)
|
|||
// }
|
|||
|
|||
if (expireTs && expireTs <= now) { |
|||
if (logger.isDebugEnabled()) { |
|||
logger.debug('Message expired! expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now) |
|||
} |
|||
return; |
|||
} |
|||
|
|||
logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic); |
|||
|
|||
if (request.compileRequest) { |
|||
this.processCompileRequest(requestId, responseTopic, headers, request.compileRequest); |
|||
} else if (request.invokeRequest) { |
|||
this.processInvokeRequest(requestId, responseTopic, headers, request.invokeRequest); |
|||
} else if (request.releaseRequest) { |
|||
this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest); |
|||
} else { |
|||
logger.error('[%s] Unknown request received!', requestId); |
|||
} |
|||
|
|||
} catch (err) { |
|||
logger.error('[%s] Failed to process request: %s', requestId, err.message); |
|||
logger.error(err.stack); |
|||
} |
|||
|
|||
var tFinish = performance.now(); |
|||
var tTook = tFinish - tStart; |
|||
|
|||
if (tTook > slowQueryLogMs) { |
|||
let functionName; |
|||
if (request.invokeRequest) { |
|||
try { |
|||
buf = Buffer.from(request.invokeRequest['functionName']); |
|||
functionName = buf.toString('utf8'); |
|||
} catch (err) { |
|||
logger.error('[%s] Failed to read functionName from message header: %s', requestId, err.message); |
|||
logger.error(err.stack); |
|||
} |
|||
} |
|||
logger.warn('[%s] SLOW PROCESSING [%s]ms, functionName [%s]', requestId, tTook, functionName); |
|||
if (slowQueryLogBody) { |
|||
logger.info('Slow request body: %s', JSON.stringify(request, null, 4)) |
|||
} |
|||
} |
|||
|
|||
} |
|||
|
|||
JsInvokeMessageProcessor.prototype.processCompileRequest = function (requestId, responseTopic, headers, compileRequest) { |
|||
var scriptId = getScriptId(compileRequest); |
|||
logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); |
|||
|
|||
this.executor.compileScript(compileRequest.scriptBody).then( |
|||
(script) => { |
|||
this.cacheScript(scriptId, script); |
|||
var compileResponse = createCompileResponse(scriptId, true); |
|||
logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); |
|||
}, |
|||
(err) => { |
|||
var compileResponse = createCompileResponse(scriptId, false, COMPILATION_ERROR, err); |
|||
logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); |
|||
} |
|||
); |
|||
} |
|||
|
|||
JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, responseTopic, headers, invokeRequest) { |
|||
var scriptId = getScriptId(invokeRequest); |
|||
logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId); |
|||
this.executedScriptsCounter++; |
|||
if (this.executedScriptsCounter % statFrequency == 0) { |
|||
const nowMs = performance.now(); |
|||
const msSinceLastStat = nowMs - this.lastStatTime; |
|||
const requestsPerSec = msSinceLastStat == 0 ? statFrequency : statFrequency / msSinceLastStat * 1000; |
|||
this.lastStatTime = nowMs; |
|||
logger.info('STAT[%s]: requests [%s], took [%s]ms, request/s [%s], compilation [%s]ms', this.executedScriptsCounter, statFrequency, msSinceLastStat, requestsPerSec, this.compilationTime); |
|||
this.compilationTime = 0; |
|||
} |
|||
|
|||
if (this.executedScriptsCounter % scriptBodyTraceFrequency == 0) { |
|||
logger.info('[%s] Executing script body: [%s]', scriptId, invokeRequest.scriptBody); |
|||
} |
|||
this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then( |
|||
(script) => { |
|||
this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then( |
|||
(result) => { |
|||
var invokeResponse = createInvokeResponse(result, true); |
|||
logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); |
|||
}, |
|||
(err) => { |
|||
var errorCode; |
|||
if (err && isString(err.message) && err.message.includes('Script execution timed out')) { |
|||
errorCode = TIMEOUT_ERROR; |
|||
} else { |
|||
errorCode = RUNTIME_ERROR; |
|||
} |
|||
var invokeResponse = createInvokeResponse("", false, errorCode, err); |
|||
logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); |
|||
} |
|||
) |
|||
}, |
|||
(err) => { |
|||
var invokeResponse = createInvokeResponse("", false, COMPILATION_ERROR, err); |
|||
logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); |
|||
} |
|||
); |
|||
} |
|||
|
|||
JsInvokeMessageProcessor.prototype.processReleaseRequest = function (requestId, responseTopic, headers, releaseRequest) { |
|||
var scriptId = getScriptId(releaseRequest); |
|||
logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId); |
|||
if (this.scriptMap.has(scriptId)) { |
|||
var index = this.scriptIds.indexOf(scriptId); |
|||
if (index > -1) { |
|||
this.scriptIds.splice(index, 1); |
|||
this.executedScriptIdsCounter.splice(index, 1); |
|||
} |
|||
this.scriptMap.delete(scriptId); |
|||
} |
|||
var releaseResponse = createReleaseResponse(scriptId, true); |
|||
logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, undefined, releaseResponse); |
|||
} |
|||
|
|||
JsInvokeMessageProcessor.prototype.sendResponse = function (requestId, responseTopic, headers, scriptId, compileResponse, invokeResponse, releaseResponse) { |
|||
var tStartSending = performance.now(); |
|||
var remoteResponse = createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); |
|||
var rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); |
|||
logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId); |
|||
this.producer.send(responseTopic, scriptId, rawResponse, headers).then( |
|||
() => { |
|||
logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); |
|||
}, |
|||
(err) => { |
|||
if (err) { |
|||
logger.error('[%s] Failed to send response to queue: %s', requestId, err.message); |
|||
logger.error(err.stack); |
|||
} |
|||
} |
|||
); |
|||
} |
|||
|
|||
JsInvokeMessageProcessor.prototype.getOrCompileScript = function (scriptId, scriptBody) { |
|||
var self = this; |
|||
return new Promise(function (resolve, reject) { |
|||
const script = self.scriptMap.get(scriptId); |
|||
if (script) { |
|||
incrementUseScriptId.call(self, scriptId); |
|||
resolve(script); |
|||
} else { |
|||
const startTime = performance.now(); |
|||
self.executor.compileScript(scriptBody).then( |
|||
(compiledScript) => { |
|||
self.compilationTime += (performance.now() - startTime); |
|||
self.cacheScript(scriptId, compiledScript); |
|||
resolve(compiledScript); |
|||
}, |
|||
(err) => { |
|||
self.compilationTime += (performance.now() - startTime); |
|||
reject(err); |
|||
} |
|||
); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
JsInvokeMessageProcessor.prototype.cacheScript = function (scriptId, script) { |
|||
if (!this.scriptMap.has(scriptId)) { |
|||
this.scriptIds.push(scriptId); |
|||
this.executedScriptIdsCounter.push(0); |
|||
while (this.scriptIds.length > maxActiveScripts) { |
|||
logger.info('Active scripts count [%s] exceeds maximum limit [%s]', this.scriptIds.length, maxActiveScripts); |
|||
deleteMinUsedScript.apply(this); |
|||
} |
|||
} |
|||
this.scriptMap.set(scriptId, script); |
|||
logger.info("scriptMap size is [%s]", this.scriptMap.size); |
|||
} |
|||
|
|||
function createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse) { |
|||
const requestIdBits = Utils.UUIDToBits(requestId); |
|||
return { |
|||
requestIdMSB: requestIdBits[0], |
|||
requestIdLSB: requestIdBits[1], |
|||
compileResponse: compileResponse, |
|||
invokeResponse: invokeResponse, |
|||
releaseResponse: releaseResponse |
|||
}; |
|||
} |
|||
|
|||
function createCompileResponse(scriptId, success, errorCode, err) { |
|||
const scriptIdBits = Utils.UUIDToBits(scriptId); |
|||
return { |
|||
errorCode: errorCode, |
|||
success: success, |
|||
errorDetails: parseJsErrorDetails(err), |
|||
scriptIdMSB: scriptIdBits[0], |
|||
scriptIdLSB: scriptIdBits[1] |
|||
}; |
|||
} |
|||
|
|||
function createInvokeResponse(result, success, errorCode, err) { |
|||
return { |
|||
errorCode: errorCode, |
|||
success: success, |
|||
errorDetails: parseJsErrorDetails(err), |
|||
result: result |
|||
}; |
|||
} |
|||
|
|||
function createReleaseResponse(scriptId, success) { |
|||
const scriptIdBits = Utils.UUIDToBits(scriptId); |
|||
return { |
|||
success: success, |
|||
scriptIdMSB: scriptIdBits[0], |
|||
scriptIdLSB: scriptIdBits[1] |
|||
}; |
|||
} |
|||
|
|||
function parseJsErrorDetails(err) { |
|||
if (!err) { |
|||
return undefined; |
|||
} |
|||
var details = err.name + ': ' + err.message; |
|||
if (err.stack) { |
|||
var lines = err.stack.split('\n'); |
|||
if (lines && lines.length) { |
|||
var line = lines[0]; |
|||
var splitted = line.split(':'); |
|||
if (splitted && splitted.length === 2) { |
|||
if (!isNaN(splitted[1])) { |
|||
details += ' in at line number ' + splitted[1]; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return details; |
|||
} |
|||
|
|||
function getScriptId(request) { |
|||
return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB); |
|||
} |
|||
|
|||
function incrementUseScriptId(scriptId) { |
|||
const index = this.scriptIds.indexOf(scriptId); |
|||
if (this.executedScriptIdsCounter[index] < Number.MAX_SAFE_INTEGER) { |
|||
this.executedScriptIdsCounter[index]++; |
|||
} |
|||
} |
|||
|
|||
function deleteMinUsedScript() { |
|||
let min = Infinity; |
|||
let minIndex = 0; |
|||
const scriptIdsLength = this.executedScriptIdsCounter.length - 1; // ignored last added script
|
|||
for (let i = 0; i < scriptIdsLength; i++) { |
|||
if (this.executedScriptIdsCounter[i] < min) { |
|||
min = this.executedScriptIdsCounter[i]; |
|||
minIndex = i; |
|||
} |
|||
} |
|||
const prevScriptId = this.scriptIds.splice(minIndex, 1)[0]; |
|||
this.executedScriptIdsCounter.splice(minIndex, 1) |
|||
logger.info('Removing active script with id [%s]', prevScriptId); |
|||
this.scriptMap.delete(prevScriptId); |
|||
} |
|||
|
|||
function isString(value) { |
|||
return typeof value === 'string'; |
|||
} |
|||
|
|||
module.exports = JsInvokeMessageProcessor; |
|||
@ -0,0 +1,331 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import config from 'config'; |
|||
import { _logger } from '../config/logger'; |
|||
import { JsExecutor, TbScript } from './jsExecutor'; |
|||
import { performance } from 'perf_hooks'; |
|||
import { isString, parseJsErrorDetails, toUUIDString, UUIDFromBuffer, UUIDToBits } from './utils'; |
|||
import { IQueue } from '../queue/queue.models'; |
|||
import { |
|||
JsCompileRequest, |
|||
JsCompileResponse, |
|||
JsInvokeRequest, |
|||
JsInvokeResponse, |
|||
JsReleaseRequest, |
|||
JsReleaseResponse, |
|||
RemoteJsRequest, |
|||
RemoteJsResponse, |
|||
TbMessage |
|||
} from './jsExecutor.models'; |
|||
import Long from 'long'; |
|||
|
|||
const COMPILATION_ERROR = 0; |
|||
const RUNTIME_ERROR = 1; |
|||
const TIMEOUT_ERROR = 2; |
|||
|
|||
const statFrequency = Number(config.get('script.stat_print_frequency')); |
|||
const scriptBodyTraceFrequency = Number(config.get('script.script_body_trace_frequency')); |
|||
const useSandbox = config.get('script.use_sandbox') === 'true'; |
|||
const maxActiveScripts = Number(config.get('script.max_active_scripts')); |
|||
const slowQueryLogMs = Number(config.get('script.slow_query_log_ms')); |
|||
const slowQueryLogBody = config.get('script.slow_query_log_body') === 'true'; |
|||
|
|||
export class JsInvokeMessageProcessor { |
|||
|
|||
private logger = _logger(`JsInvokeMessageProcessor`); |
|||
private producer: IQueue; |
|||
private executor = new JsExecutor(useSandbox); |
|||
private scriptMap = new Map<string, TbScript>(); |
|||
private scriptIds: string[] = []; |
|||
private executedScriptIdsCounter: number[] = []; |
|||
private executedScriptsCounter = 0; |
|||
private lastStatTime = performance.now(); |
|||
private compilationTime = 0; |
|||
|
|||
constructor(produced: IQueue) { |
|||
this.producer = produced; |
|||
} |
|||
|
|||
onJsInvokeMessage(message: any) { |
|||
const tStart = performance.now(); |
|||
let requestId = ''; |
|||
let responseTopic: string; |
|||
let expireTs; |
|||
let headers; |
|||
let request: RemoteJsRequest = {}; |
|||
let buf: Buffer; |
|||
try { |
|||
request = JSON.parse(Buffer.from(message.data).toString('utf8')); |
|||
headers = message.headers; |
|||
buf = Buffer.from(headers.data['requestId']); |
|||
requestId = UUIDFromBuffer(buf); |
|||
buf = Buffer.from(headers.data['responseTopic']); |
|||
responseTopic = buf.toString('utf8'); |
|||
buf = Buffer.from(headers.data['expireTs']); |
|||
expireTs = Long.fromBytes(Array.from(buf), false, false).toNumber(); |
|||
|
|||
const now = Date.now(); |
|||
|
|||
if (expireTs && expireTs <= now) { |
|||
if (this.logger.isDebugEnabled()) { |
|||
this.logger.debug('Message expired! expireTs is %s, buf is %s. Now is %s, ms to expire left %s', expireTs, buf.toString('hex'), now, expireTs - now) |
|||
} |
|||
return; |
|||
} |
|||
|
|||
this.logger.debug('[%s] Received request, responseTopic: [%s]', requestId, responseTopic); |
|||
|
|||
if (request.compileRequest) { |
|||
this.processCompileRequest(requestId, responseTopic, headers, request.compileRequest); |
|||
} else if (request.invokeRequest) { |
|||
this.processInvokeRequest(requestId, responseTopic, headers, request.invokeRequest); |
|||
} else if (request.releaseRequest) { |
|||
this.processReleaseRequest(requestId, responseTopic, headers, request.releaseRequest); |
|||
} else { |
|||
this.logger.error('[%s] Unknown request received!', requestId); |
|||
} |
|||
|
|||
} catch (err: any) { |
|||
this.logger.error('[%s] Failed to process request: %s', requestId, err.message); |
|||
this.logger.error(err.stack); |
|||
} |
|||
|
|||
const tFinish = performance.now(); |
|||
const tTook = tFinish - tStart; |
|||
|
|||
if (tTook > slowQueryLogMs) { |
|||
let functionName; |
|||
if (request.invokeRequest) { |
|||
try { |
|||
buf = Buffer.from(request.invokeRequest['functionName']); |
|||
functionName = buf.toString('utf8'); |
|||
} catch (err: any) { |
|||
this.logger.error('[%s] Failed to read functionName from message header: %s', requestId, err.message); |
|||
this.logger.error(err.stack); |
|||
} |
|||
} |
|||
this.logger.warn('[%s] SLOW PROCESSING [%s]ms, functionName [%s]', requestId, tTook, functionName); |
|||
if (slowQueryLogBody) { |
|||
this.logger.info('Slow request body: %s', JSON.stringify(request, null, 4)) |
|||
} |
|||
} |
|||
} |
|||
|
|||
processCompileRequest(requestId: string, responseTopic: string, headers: any, compileRequest: JsCompileRequest) { |
|||
const scriptId = JsInvokeMessageProcessor.getScriptId(compileRequest); |
|||
this.logger.debug('[%s] Processing compile request, scriptId: [%s]', requestId, scriptId); |
|||
|
|||
this.executor.compileScript(compileRequest.scriptBody).then( |
|||
(script) => { |
|||
this.cacheScript(scriptId, script); |
|||
const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, true); |
|||
this.logger.debug('[%s] Sending success compile response, scriptId: [%s]', requestId, scriptId); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); |
|||
}, |
|||
(err) => { |
|||
const compileResponse = JsInvokeMessageProcessor.createCompileResponse(scriptId, false, COMPILATION_ERROR, err); |
|||
this.logger.debug('[%s] Sending failed compile response, scriptId: [%s]', requestId, scriptId); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, compileResponse); |
|||
} |
|||
); |
|||
} |
|||
|
|||
processInvokeRequest(requestId: string, responseTopic: string, headers: any, invokeRequest: JsInvokeRequest) { |
|||
const scriptId = JsInvokeMessageProcessor.getScriptId(invokeRequest); |
|||
this.logger.debug('[%s] Processing invoke request, scriptId: [%s]', requestId, scriptId); |
|||
this.executedScriptsCounter++; |
|||
if (this.executedScriptsCounter % statFrequency == 0) { |
|||
const nowMs = performance.now(); |
|||
const msSinceLastStat = nowMs - this.lastStatTime; |
|||
const requestsPerSec = msSinceLastStat == 0 ? statFrequency : statFrequency / msSinceLastStat * 1000; |
|||
this.lastStatTime = nowMs; |
|||
this.logger.info('STAT[%s]: requests [%s], took [%s]ms, request/s [%s], compilation [%s]ms', this.executedScriptsCounter, statFrequency, msSinceLastStat, requestsPerSec, this.compilationTime); |
|||
this.compilationTime = 0; |
|||
} |
|||
|
|||
if (this.executedScriptsCounter % scriptBodyTraceFrequency == 0) { |
|||
this.logger.info('[%s] Executing script body: [%s]', scriptId, invokeRequest.scriptBody); |
|||
} |
|||
this.getOrCompileScript(scriptId, invokeRequest.scriptBody).then( |
|||
(script) => { |
|||
this.executor.executeScript(script, invokeRequest.args, invokeRequest.timeout).then( |
|||
(result) => { |
|||
const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse(result, true); |
|||
this.logger.debug('[%s] Sending success invoke response, scriptId: [%s]', requestId, scriptId); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); |
|||
}, |
|||
(err: any) => { |
|||
let errorCode; |
|||
if (err && isString(err.message) && err.message.includes('Script execution timed out')) { |
|||
errorCode = TIMEOUT_ERROR; |
|||
} else { |
|||
errorCode = RUNTIME_ERROR; |
|||
} |
|||
const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, errorCode, err); |
|||
this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, errorCode); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); |
|||
} |
|||
) |
|||
}, |
|||
(err: any) => { |
|||
const invokeResponse = JsInvokeMessageProcessor.createInvokeResponse("", false, COMPILATION_ERROR, err); |
|||
this.logger.debug('[%s] Sending failed invoke response, scriptId: [%s], errorCode: [%s]', requestId, scriptId, COMPILATION_ERROR); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, invokeResponse); |
|||
} |
|||
); |
|||
} |
|||
|
|||
processReleaseRequest(requestId: string, responseTopic: string, headers: any, releaseRequest: JsReleaseRequest) { |
|||
const scriptId = JsInvokeMessageProcessor.getScriptId(releaseRequest); |
|||
this.logger.debug('[%s] Processing release request, scriptId: [%s]', requestId, scriptId); |
|||
if (this.scriptMap.has(scriptId)) { |
|||
const index = this.scriptIds.indexOf(scriptId); |
|||
if (index > -1) { |
|||
this.scriptIds.splice(index, 1); |
|||
this.executedScriptIdsCounter.splice(index, 1); |
|||
} |
|||
this.scriptMap.delete(scriptId); |
|||
} |
|||
const releaseResponse = JsInvokeMessageProcessor.createReleaseResponse(scriptId, true); |
|||
this.logger.debug('[%s] Sending success release response, scriptId: [%s]', requestId, scriptId); |
|||
this.sendResponse(requestId, responseTopic, headers, scriptId, undefined, undefined, releaseResponse); |
|||
} |
|||
|
|||
sendResponse(requestId: string, responseTopic: string, headers: any, scriptId: string, |
|||
compileResponse?: JsCompileResponse, invokeResponse?: JsInvokeResponse, releaseResponse?: JsReleaseResponse) { |
|||
const tStartSending = performance.now(); |
|||
const remoteResponse = JsInvokeMessageProcessor.createRemoteResponse(requestId, compileResponse, invokeResponse, releaseResponse); |
|||
const rawResponse = Buffer.from(JSON.stringify(remoteResponse), 'utf8'); |
|||
this.logger.debug('[%s] Sending response to queue, scriptId: [%s]', requestId, scriptId); |
|||
this.producer.send(responseTopic, scriptId, rawResponse, headers).then( |
|||
() => { |
|||
this.logger.debug('[%s] Response sent to queue, took [%s]ms, scriptId: [%s]', requestId, (performance.now() - tStartSending), scriptId); |
|||
}, |
|||
(err: any) => { |
|||
if (err) { |
|||
this.logger.error('[%s] Failed to send response to queue: %s', requestId, err.message); |
|||
this.logger.error(err.stack); |
|||
} |
|||
} |
|||
); |
|||
} |
|||
|
|||
getOrCompileScript(scriptId: string, scriptBody: string): Promise<TbScript> { |
|||
const self = this; |
|||
return new Promise(function (resolve, reject) { |
|||
const script = self.scriptMap.get(scriptId); |
|||
if (script) { |
|||
self.incrementUseScriptId(scriptId); |
|||
resolve(script); |
|||
} else { |
|||
const startTime = performance.now(); |
|||
self.executor.compileScript(scriptBody).then( |
|||
(compiledScript) => { |
|||
self.compilationTime += (performance.now() - startTime); |
|||
self.cacheScript(scriptId, compiledScript); |
|||
resolve(compiledScript); |
|||
}, |
|||
(err) => { |
|||
self.compilationTime += (performance.now() - startTime); |
|||
reject(err); |
|||
} |
|||
); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
cacheScript(scriptId: string, script: TbScript) { |
|||
if (!this.scriptMap.has(scriptId)) { |
|||
this.scriptIds.push(scriptId); |
|||
this.executedScriptIdsCounter.push(0); |
|||
while (this.scriptIds.length > maxActiveScripts) { |
|||
this.logger.info('Active scripts count [%s] exceeds maximum limit [%s]', this.scriptIds.length, maxActiveScripts); |
|||
this.deleteMinUsedScript(); |
|||
} |
|||
} |
|||
this.scriptMap.set(scriptId, script); |
|||
this.logger.info("scriptMap size is [%s]", this.scriptMap.size); |
|||
} |
|||
|
|||
private static createRemoteResponse(requestId: string, compileResponse?: JsCompileResponse, |
|||
invokeResponse?: JsInvokeResponse, releaseResponse?: JsReleaseResponse): RemoteJsResponse { |
|||
const requestIdBits = UUIDToBits(requestId); |
|||
return { |
|||
requestIdMSB: requestIdBits[0], |
|||
requestIdLSB: requestIdBits[1], |
|||
compileResponse: compileResponse, |
|||
invokeResponse: invokeResponse, |
|||
releaseResponse: releaseResponse |
|||
}; |
|||
} |
|||
|
|||
private static createCompileResponse(scriptId: string, success: boolean, errorCode?: number, err?: any): JsCompileResponse { |
|||
const scriptIdBits = UUIDToBits(scriptId); |
|||
return { |
|||
errorCode: errorCode, |
|||
success: success, |
|||
errorDetails: parseJsErrorDetails(err), |
|||
scriptIdMSB: scriptIdBits[0], |
|||
scriptIdLSB: scriptIdBits[1] |
|||
}; |
|||
} |
|||
|
|||
private static createInvokeResponse(result: string, success: boolean, errorCode?: number, err?: any): JsInvokeResponse { |
|||
return { |
|||
errorCode: errorCode, |
|||
success: success, |
|||
errorDetails: parseJsErrorDetails(err), |
|||
result: result |
|||
}; |
|||
} |
|||
|
|||
private static createReleaseResponse(scriptId: string, success: boolean): JsReleaseResponse { |
|||
const scriptIdBits = UUIDToBits(scriptId); |
|||
return { |
|||
success: success, |
|||
scriptIdMSB: scriptIdBits[0], |
|||
scriptIdLSB: scriptIdBits[1] |
|||
}; |
|||
} |
|||
|
|||
private static getScriptId(request: TbMessage): string { |
|||
return toUUIDString(request.scriptIdMSB, request.scriptIdLSB); |
|||
} |
|||
|
|||
private incrementUseScriptId(scriptId: string) { |
|||
const index = this.scriptIds.indexOf(scriptId); |
|||
if (this.executedScriptIdsCounter[index] < Number.MAX_SAFE_INTEGER) { |
|||
this.executedScriptIdsCounter[index]++; |
|||
} |
|||
} |
|||
|
|||
private deleteMinUsedScript() { |
|||
let min = Infinity; |
|||
let minIndex = 0; |
|||
const scriptIdsLength = this.executedScriptIdsCounter.length - 1; // ignored last added script
|
|||
for (let i = 0; i < scriptIdsLength; i++) { |
|||
if (this.executedScriptIdsCounter[i] < min) { |
|||
min = this.executedScriptIdsCounter[i]; |
|||
minIndex = i; |
|||
} |
|||
} |
|||
const prevScriptId = this.scriptIds.splice(minIndex, 1)[0]; |
|||
this.executedScriptIdsCounter.splice(minIndex, 1) |
|||
this.logger.info('Removing active script with id [%s]', prevScriptId); |
|||
this.scriptMap.delete(prevScriptId); |
|||
} |
|||
} |
|||
@ -1,37 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
'use strict'; |
|||
|
|||
const Long = require('long'), |
|||
uuidParse = require('uuid-parse'); |
|||
|
|||
exports.toUUIDString = function(mostSigBits, leastSigBits) { |
|||
var msbBytes = Long.fromValue(mostSigBits, false).toBytes(false); |
|||
var lsbBytes = Long.fromValue(leastSigBits, false).toBytes(false); |
|||
var uuidBytes = msbBytes.concat(lsbBytes); |
|||
return uuidParse.unparse(uuidBytes); |
|||
} |
|||
|
|||
exports.UUIDFromBuffer = function(buf) { |
|||
return uuidParse.unparse(buf); |
|||
} |
|||
|
|||
exports.UUIDToBits = function(uuidString) { |
|||
const bytes = uuidParse.parse(uuidString); |
|||
var msb = Long.fromBytes(bytes.slice(0,8), false, false).toString(); |
|||
var lsb = Long.fromBytes(bytes.slice(-8), false, false).toString(); |
|||
return [msb, lsb]; |
|||
} |
|||
@ -0,0 +1,66 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import Long from 'long'; |
|||
import uuidParse from 'uuid-parse'; |
|||
|
|||
export function toUUIDString(mostSigBits: string, leastSigBits: string): string { |
|||
const msbBytes = Long.fromValue(mostSigBits, false).toBytes(false); |
|||
const lsbBytes = Long.fromValue(leastSigBits, false).toBytes(false); |
|||
const uuidBytes = msbBytes.concat(lsbBytes); |
|||
return uuidParse.unparse(uuidBytes as any); |
|||
} |
|||
|
|||
export function UUIDFromBuffer(buf: Buffer): string { |
|||
return uuidParse.unparse(buf); |
|||
} |
|||
|
|||
export function UUIDToBits(uuidString: string): [string, string] { |
|||
const bytes = Array.from(uuidParse.parse(uuidString)); |
|||
const msb = Long.fromBytes(bytes.slice(0, 8), false, false).toString(); |
|||
const lsb = Long.fromBytes(bytes.slice(-8), false, false).toString(); |
|||
return [msb, lsb]; |
|||
} |
|||
|
|||
export function isString(value: any): boolean { |
|||
return typeof value === 'string'; |
|||
} |
|||
|
|||
export function sleep(ms: number): Promise<void> { |
|||
return new Promise((resolve) => { |
|||
setTimeout(resolve, ms); |
|||
}); |
|||
} |
|||
|
|||
export function parseJsErrorDetails(err: any): string | undefined { |
|||
if (!err) { |
|||
return undefined; |
|||
} |
|||
let details = err.name + ': ' + err.message; |
|||
if (err.stack) { |
|||
const lines = err.stack.split('\n'); |
|||
if (lines && lines.length) { |
|||
const line = lines[0]; |
|||
const split = line.split(':'); |
|||
if (split && split.length === 2) { |
|||
if (!isNaN(split[1])) { |
|||
details += ' in at line number ' + split[1]; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return details; |
|||
} |
|||
@ -1,220 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
'use strict'; |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('awsSqsTemplate'); |
|||
const uuid = require('uuid-random'); |
|||
|
|||
const requestTopic = config.get('request_topic'); |
|||
|
|||
const accessKeyId = config.get('aws_sqs.access_key_id'); |
|||
const secretAccessKey = config.get('aws_sqs.secret_access_key'); |
|||
const region = config.get('aws_sqs.region'); |
|||
const AWS = require('aws-sdk'); |
|||
const queueProperties = config.get('aws_sqs.queue_properties'); |
|||
const pollInterval = config.get('js.response_poll_interval'); |
|||
|
|||
let queueAttributes = {FifoQueue: 'true'}; |
|||
let sqsClient; |
|||
let requestQueueURL; |
|||
const queueUrls = new Map(); |
|||
let stopped = false; |
|||
|
|||
function AwsSqsProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
let msgBody = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
|
|||
let responseQueueUrl = queueUrls.get(topicToSqsQueueName(responseTopic)); |
|||
|
|||
if (!responseQueueUrl) { |
|||
responseQueueUrl = await createQueue(responseTopic); |
|||
queueUrls.set(responseTopic, responseQueueUrl); |
|||
} |
|||
|
|||
let msgId = uuid(); |
|||
|
|||
let params = { |
|||
MessageBody: msgBody, |
|||
QueueUrl: responseQueueUrl, |
|||
MessageGroupId: msgId, |
|||
MessageDeduplicationId: msgId |
|||
}; |
|||
|
|||
return new Promise((resolve, reject) => { |
|||
sqsClient.sendMessage(params, function (err, data) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(data); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
AWS.config.update({accessKeyId: accessKeyId, secretAccessKey: secretAccessKey, region: region}); |
|||
|
|||
sqsClient = new AWS.SQS({apiVersion: '2012-11-05'}); |
|||
|
|||
const queues = await getQueues(); |
|||
|
|||
if (queues) { |
|||
queues.forEach(queueUrl => { |
|||
const delimiterPosition = queueUrl.lastIndexOf('/'); |
|||
const queueName = queueUrl.substring(delimiterPosition + 1); |
|||
queueUrls.set(queueName, queueUrl); |
|||
}); |
|||
} |
|||
|
|||
parseQueueProperties(); |
|||
|
|||
requestQueueURL = queueUrls.get(topicToSqsQueueName(requestTopic)); |
|||
if (!requestQueueURL) { |
|||
requestQueueURL = await createQueue(requestTopic); |
|||
} |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new AwsSqsProducer()); |
|||
|
|||
const params = { |
|||
MaxNumberOfMessages: 10, |
|||
QueueUrl: requestQueueURL, |
|||
WaitTimeSeconds: pollInterval / 1000 |
|||
}; |
|||
while (!stopped) { |
|||
let pollStartTs = new Date().getTime(); |
|||
const messages = await new Promise((resolve, reject) => { |
|||
sqsClient.receiveMessage(params, function (err, data) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(data.Messages); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
if (messages && messages.length > 0) { |
|||
const entries = []; |
|||
|
|||
messages.forEach(message => { |
|||
entries.push({ |
|||
Id: message.MessageId, |
|||
ReceiptHandle: message.ReceiptHandle |
|||
}); |
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.Body)); |
|||
}); |
|||
|
|||
const deleteBatch = { |
|||
QueueUrl: requestQueueURL, |
|||
Entries: entries |
|||
}; |
|||
sqsClient.deleteMessageBatch(deleteBatch, function (err, data) { |
|||
if (err) { |
|||
logger.error("Failed to delete messages from queue.", err.message); |
|||
} else { |
|||
//do nothing
|
|||
} |
|||
}); |
|||
} else { |
|||
let pollDuration = new Date().getTime() - pollStartTs; |
|||
if (pollDuration < pollInterval) { |
|||
await sleep(pollInterval - pollDuration); |
|||
} |
|||
} |
|||
} |
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
function createQueue(topic) { |
|||
let queueName = topicToSqsQueueName(topic); |
|||
let queueParams = {QueueName: queueName, Attributes: queueAttributes}; |
|||
|
|||
return new Promise((resolve, reject) => { |
|||
sqsClient.createQueue(queueParams, function (err, data) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(data.QueueUrl); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
function getQueues() { |
|||
return new Promise((resolve, reject) => { |
|||
sqsClient.listQueues(function (err, data) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(data.QueueUrls); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
function topicToSqsQueueName(topic) { |
|||
return topic.replace(/\./g, '_') + '.fifo'; |
|||
} |
|||
|
|||
function parseQueueProperties() { |
|||
const props = queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
queueAttributes[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
} |
|||
|
|||
function sleep(ms) { |
|||
return new Promise((resolve) => { |
|||
setTimeout(resolve, ms); |
|||
}); |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
stopped = true; |
|||
logger.info('Aws Sqs client stopped.'); |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
if (sqsClient) { |
|||
logger.info('Stopping Aws Sqs client.') |
|||
try { |
|||
await sqsClient.close(); |
|||
logger.info('Aws Sqs client stopped.') |
|||
process.exit(status); |
|||
} catch (e) { |
|||
logger.info('Aws Sqs client stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
@ -0,0 +1,215 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import config from 'config'; |
|||
import { _logger } from '../config/logger'; |
|||
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' |
|||
import { IQueue } from './queue.models'; |
|||
import { |
|||
CreateQueueCommand, |
|||
CreateQueueRequest, |
|||
DeleteMessageBatchCommand, |
|||
DeleteMessageBatchRequest, |
|||
DeleteMessageBatchRequestEntry, |
|||
ListQueuesCommand, |
|||
ListQueuesResult, |
|||
ReceiveMessageCommand, |
|||
ReceiveMessageRequest, |
|||
ReceiveMessageResult, |
|||
SendMessageCommand, |
|||
SendMessageRequest, |
|||
SQSClient |
|||
} from '@aws-sdk/client-sqs'; |
|||
import uuid from 'uuid-random'; |
|||
import { sleep } from '../api/utils'; |
|||
|
|||
export class AwsSqsTemplate implements IQueue { |
|||
|
|||
private logger = _logger(`awsSqsTemplate`); |
|||
private requestTopic: string = config.get('request_topic'); |
|||
private accessKeyId: string = config.get('aws_sqs.access_key_id'); |
|||
private secretAccessKey: string = config.get('aws_sqs.secret_access_key'); |
|||
private region: string = config.get('aws_sqs.region'); |
|||
private queueProperties: string = config.get('aws_sqs.queue_properties'); |
|||
private pollInterval = Number(config.get('js.response_poll_interval')); |
|||
|
|||
private sqsClient: SQSClient; |
|||
private requestQueueURL: string |
|||
private stopped = false; |
|||
private queueUrls = new Map<string, string>(); |
|||
private queueAttributes: { [n: string]: string } = { |
|||
FifoQueue: 'true' |
|||
}; |
|||
|
|||
constructor() { |
|||
} |
|||
|
|||
async init() { |
|||
try { |
|||
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
|
|||
this.sqsClient = new SQSClient({ |
|||
apiVersion: '2012-11-05', |
|||
credentials: { |
|||
accessKeyId: this.accessKeyId, |
|||
secretAccessKey: this.secretAccessKey |
|||
}, |
|||
region: this.region |
|||
}); |
|||
|
|||
const queues = await this.getQueues(); |
|||
|
|||
if (queues.QueueUrls) { |
|||
queues.QueueUrls.forEach(queueUrl => { |
|||
const delimiterPosition = queueUrl.lastIndexOf('/'); |
|||
const queueName = queueUrl.substring(delimiterPosition + 1); |
|||
this.queueUrls.set(queueName, queueUrl); |
|||
}); |
|||
} |
|||
|
|||
this.parseQueueProperties(); |
|||
|
|||
this.requestQueueURL = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(this.requestTopic)) || ''; |
|||
if (!this.requestQueueURL) { |
|||
this.requestQueueURL = await this.createQueue(this.requestTopic); |
|||
} |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(this); |
|||
|
|||
const params: ReceiveMessageRequest = { |
|||
MaxNumberOfMessages: 10, |
|||
QueueUrl: this.requestQueueURL, |
|||
WaitTimeSeconds: this.pollInterval / 1000 |
|||
}; |
|||
while (!this.stopped) { |
|||
let pollStartTs = new Date().getTime(); |
|||
const messagesResponse: ReceiveMessageResult = await this.sqsClient.send(new ReceiveMessageCommand(params)); |
|||
const messages = messagesResponse.Messages; |
|||
|
|||
if (messages && messages.length > 0) { |
|||
const entries: DeleteMessageBatchRequestEntry[] = []; |
|||
|
|||
messages.forEach(message => { |
|||
entries.push({ |
|||
Id: message.MessageId, |
|||
ReceiptHandle: message.ReceiptHandle |
|||
}); |
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.Body || '')); |
|||
}); |
|||
|
|||
const deleteBatch: DeleteMessageBatchRequest = { |
|||
QueueUrl: this.requestQueueURL, |
|||
Entries: entries |
|||
}; |
|||
try { |
|||
await this.sqsClient.send(new DeleteMessageBatchCommand(deleteBatch)) |
|||
} catch (err: any) { |
|||
this.logger.error("Failed to delete messages from queue.", err.message); |
|||
} |
|||
} else { |
|||
let pollDuration = new Date().getTime() - pollStartTs; |
|||
if (pollDuration < this.pollInterval) { |
|||
await sleep(this.pollInterval - pollDuration); |
|||
} |
|||
} |
|||
} |
|||
} catch (e: any) { |
|||
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
this.logger.error(e.stack); |
|||
await this.exit(-1); |
|||
} |
|||
} |
|||
|
|||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { |
|||
let msgBody = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
|
|||
let responseQueueUrl = this.queueUrls.get(AwsSqsTemplate.topicToSqsQueueName(responseTopic)); |
|||
|
|||
if (!responseQueueUrl) { |
|||
responseQueueUrl = await this.createQueue(responseTopic); |
|||
this.queueUrls.set(responseTopic, responseQueueUrl); |
|||
} |
|||
|
|||
let msgId = uuid(); |
|||
|
|||
let params: SendMessageRequest = { |
|||
MessageBody: msgBody, |
|||
QueueUrl: responseQueueUrl, |
|||
MessageGroupId: msgId, |
|||
MessageDeduplicationId: msgId |
|||
}; |
|||
|
|||
return this.sqsClient.send(new SendMessageCommand(params)) |
|||
} |
|||
|
|||
private async getQueues(): Promise<ListQueuesResult> { |
|||
return this.sqsClient.send(new ListQueuesCommand({})); |
|||
} |
|||
|
|||
private parseQueueProperties() { |
|||
const props = this.queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
this.queueAttributes[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
} |
|||
|
|||
private static topicToSqsQueueName(topic: string): string { |
|||
return topic.replace(/\./g, '_') + '.fifo'; |
|||
} |
|||
|
|||
private async createQueue(topic: string): Promise<string> { |
|||
let queueName = AwsSqsTemplate.topicToSqsQueueName(topic); |
|||
let queueParams: CreateQueueRequest = { |
|||
QueueName: queueName, |
|||
Attributes: this.queueAttributes |
|||
}; |
|||
|
|||
const result = await this.sqsClient.send(new CreateQueueCommand(queueParams)); |
|||
return result.QueueUrl || ''; |
|||
} |
|||
|
|||
static async build(): Promise<AwsSqsTemplate> { |
|||
const queue = new AwsSqsTemplate(); |
|||
await queue.init(); |
|||
return queue; |
|||
} |
|||
|
|||
async exit(status: number) { |
|||
this.stopped = true; |
|||
this.logger.info('Exiting with status: %d ...', status); |
|||
if (this.sqsClient) { |
|||
this.logger.info('Stopping Aws Sqs client.') |
|||
try { |
|||
this.sqsClient.destroy(); |
|||
// @ts-ignore
|
|||
delete this.sqsClient; |
|||
this.logger.info('Aws Sqs client stopped.') |
|||
process.exit(status); |
|||
} catch (e: any) { |
|||
this.logger.info('Aws Sqs client stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
} |
|||
@ -1,296 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
const {logLevel, Kafka, CompressionTypes, Partitioners} = require('kafkajs'); |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('kafkaTemplate'), |
|||
KafkaJsWinstonLogCreator = require('../config/logger').KafkaJsWinstonLogCreator; |
|||
const replicationFactor = Number(config.get('kafka.replication_factor')); |
|||
const topicProperties = config.get('kafka.topic_properties'); |
|||
const kafkaClientId = config.get('kafka.client_id'); |
|||
const acks = Number(config.get('kafka.acks')); |
|||
const maxBatchSize = Number(config.get('kafka.batch_size')); |
|||
const linger = Number(config.get('kafka.linger_ms')); |
|||
const requestTimeout = Number(config.get('kafka.requestTimeout')); |
|||
const compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None; |
|||
const partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently')); |
|||
|
|||
let kafkaClient; |
|||
let kafkaAdmin; |
|||
let consumer; |
|||
let producer; |
|||
|
|||
const configEntries = []; |
|||
|
|||
let batchMessages = []; |
|||
let sendLoopInstance; |
|||
|
|||
function KafkaProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
logger.debug('Pending queue response, scriptId: [%s]', scriptId); |
|||
const message = { |
|||
topic: responseTopic, |
|||
messages: [{ |
|||
key: scriptId, |
|||
value: rawResponse, |
|||
headers: headers.data |
|||
}] |
|||
}; |
|||
|
|||
await pushMessageToSendLater(message); |
|||
} |
|||
} |
|||
|
|||
async function pushMessageToSendLater(message) { |
|||
batchMessages.push(message); |
|||
if (batchMessages.length >= maxBatchSize) { |
|||
await sendMessagesAsBatch(true); |
|||
} |
|||
} |
|||
|
|||
function sendLoopWithLinger() { |
|||
if (sendLoopInstance) { |
|||
clearTimeout(sendLoopInstance); |
|||
// } else {
|
|||
// logger.debug("Starting new send loop with linger [%s]", linger)
|
|||
} |
|||
sendLoopInstance = setTimeout(sendMessagesAsBatch, linger); |
|||
} |
|||
|
|||
async function sendMessagesAsBatch(isImmediately) { |
|||
if (sendLoopInstance) { |
|||
// logger.debug("sendMessagesAsBatch: Clear sendLoop scheduler. Starting new send loop with linger [%s]", linger);
|
|||
clearTimeout(sendLoopInstance); |
|||
} |
|||
sendLoopInstance = null; |
|||
if (batchMessages.length > 0) { |
|||
logger.debug('sendMessagesAsBatch, length: [%s], %s', batchMessages.length, isImmediately ? 'immediately' : ''); |
|||
const messagesToSend = batchMessages; |
|||
batchMessages = []; |
|||
try { |
|||
await producer.sendBatch({ |
|||
topicMessages: messagesToSend, |
|||
acks: acks, |
|||
compression: compressionType |
|||
}) |
|||
logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length); |
|||
} catch(err) { |
|||
logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length); |
|||
logger.error(err.stack); |
|||
batchMessages = messagesToSend.concat(batchMessages); |
|||
} |
|||
} |
|||
sendLoopWithLinger(); |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
|
|||
const kafkaBootstrapServers = config.get('kafka.bootstrap.servers'); |
|||
const requestTopic = config.get('request_topic'); |
|||
const useConfluent = config.get('kafka.use_confluent_cloud'); |
|||
|
|||
logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
|||
logger.info('Kafka Requests Topic: %s', requestTopic); |
|||
|
|||
let kafkaConfig = { |
|||
brokers: kafkaBootstrapServers.split(','), |
|||
logLevel: logLevel.INFO, |
|||
logCreator: KafkaJsWinstonLogCreator |
|||
}; |
|||
|
|||
if (kafkaClientId) { |
|||
kafkaConfig['clientId'] = kafkaClientId; |
|||
} else { |
|||
logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); |
|||
} |
|||
|
|||
kafkaConfig['requestTimeout'] = requestTimeout; |
|||
|
|||
if (useConfluent) { |
|||
kafkaConfig['sasl'] = { |
|||
mechanism: config.get('kafka.confluent.sasl.mechanism'), |
|||
username: config.get('kafka.confluent.username'), |
|||
password: config.get('kafka.confluent.password') |
|||
}; |
|||
kafkaConfig['ssl'] = true; |
|||
} |
|||
|
|||
kafkaClient = new Kafka(kafkaConfig); |
|||
|
|||
parseTopicProperties(); |
|||
|
|||
kafkaAdmin = kafkaClient.admin(); |
|||
await kafkaAdmin.connect(); |
|||
|
|||
let partitions = 1; |
|||
|
|||
for (let i = 0; i < configEntries.length; i++) { |
|||
let param = configEntries[i]; |
|||
if (param.name === 'partitions') { |
|||
partitions = param.value; |
|||
configEntries.splice(i, 1); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
let topics = await kafkaAdmin.listTopics(); |
|||
|
|||
if (!topics.includes(requestTopic)) { |
|||
let createRequestTopicResult = await createTopic(requestTopic, partitions); |
|||
if (createRequestTopicResult) { |
|||
logger.info('Created new topic: %s', requestTopic); |
|||
} |
|||
} |
|||
|
|||
consumer = kafkaClient.consumer({groupId: 'js-executor-group'}); |
|||
producer = kafkaClient.producer({ createPartitioner: Partitioners.DefaultPartitioner }); |
|||
|
|||
/* |
|||
//producer event instrumentation to debug
|
|||
const { CONNECT } = producer.events; |
|||
const removeListenerC = producer.on(CONNECT, e => logger.info(`producer CONNECT`)); |
|||
const { DISCONNECT } = producer.events; |
|||
const removeListenerD = producer.on(DISCONNECT, e => logger.info(`producer DISCONNECT`)); |
|||
const { REQUEST } = producer.events; |
|||
const removeListenerR = producer.on(REQUEST, e => logger.info(`producer REQUEST ${e.payload.broker}`)); |
|||
const { REQUEST_TIMEOUT } = producer.events; |
|||
const removeListenerRT = producer.on(REQUEST_TIMEOUT, e => logger.info(`producer REQUEST_TIMEOUT ${e.payload.broker}`)); |
|||
const { REQUEST_QUEUE_SIZE } = producer.events; |
|||
const removeListenerRQS = producer.on(REQUEST_QUEUE_SIZE, e => logger.info(`producer REQUEST_QUEUE_SIZE ${e.payload.broker} size ${e.queueSize}`)); |
|||
*/ |
|||
|
|||
/* |
|||
//consumer event instrumentation to debug
|
|||
const removeListeners = {} |
|||
const { FETCH_START } = consumer.events; |
|||
removeListeners[FETCH_START] = consumer.on(FETCH_START, e => logger.info(`consumer FETCH_START`)); |
|||
const { FETCH } = consumer.events; |
|||
removeListeners[FETCH] = consumer.on(FETCH, e => logger.info(`consumer FETCH numberOfBatches ${e.payload.numberOfBatches} duration ${e.payload.duration}`)); |
|||
const { START_BATCH_PROCESS } = consumer.events; |
|||
removeListeners[START_BATCH_PROCESS] = consumer.on(START_BATCH_PROCESS, e => logger.info(`consumer START_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`)); |
|||
const { END_BATCH_PROCESS } = consumer.events; |
|||
removeListeners[END_BATCH_PROCESS] = consumer.on(END_BATCH_PROCESS, e => logger.info(`consumer END_BATCH_PROCESS topic ${e.payload.topic} batchSize ${e.payload.batchSize}`)); |
|||
const { COMMIT_OFFSETS } = consumer.events; |
|||
removeListeners[COMMIT_OFFSETS] = consumer.on(COMMIT_OFFSETS, e => logger.info(`consumer COMMIT_OFFSETS topics ${e.payload.topics}`)); |
|||
*/ |
|||
|
|||
const { CRASH } = consumer.events; |
|||
|
|||
consumer.on(CRASH, e => { |
|||
logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); |
|||
if (!e.payload.restart) { |
|||
logger.error('Going to exit due to not retryable error!'); |
|||
exit(-1); |
|||
} |
|||
}); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new KafkaProducer()); |
|||
await consumer.connect(); |
|||
await producer.connect(); |
|||
sendLoopWithLinger(); |
|||
await consumer.subscribe({topic: requestTopic}); |
|||
|
|||
logger.info('Started ThingsBoard JavaScript Executor Microservice.'); |
|||
await consumer.run({ |
|||
partitionsConsumedConcurrently: partitionsConsumedConcurrently, |
|||
eachMessage: async ({topic, partition, message}) => { |
|||
let headers = message.headers; |
|||
let key = message.key; |
|||
let msg = {}; |
|||
msg.key = key.toString('utf8'); |
|||
msg.data = message.value; |
|||
msg.headers = {data: headers}; |
|||
messageProcessor.onJsInvokeMessage(msg); |
|||
}, |
|||
}); |
|||
|
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
function createTopic(topic, partitions) { |
|||
return kafkaAdmin.createTopics({ |
|||
topics: [{ |
|||
topic: topic, |
|||
numPartitions: partitions, |
|||
replicationFactor: replicationFactor, |
|||
configEntries: configEntries |
|||
}] |
|||
}); |
|||
} |
|||
|
|||
function parseTopicProperties() { |
|||
const props = topicProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
configEntries.push({name: p.substring(0, delimiterPosition), value: p.substring(delimiterPosition + 1)}); |
|||
}); |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
|
|||
if (kafkaAdmin) { |
|||
logger.info('Stopping Kafka Admin...'); |
|||
await kafkaAdmin.disconnect(); |
|||
logger.info('Kafka Admin stopped.'); |
|||
} |
|||
|
|||
if (consumer) { |
|||
logger.info('Stopping Kafka Consumer...'); |
|||
let _consumer = consumer; |
|||
consumer = null; |
|||
try { |
|||
await _consumer.disconnect(); |
|||
logger.info('Kafka Consumer stopped.'); |
|||
await disconnectProducer(); |
|||
process.exit(status); |
|||
} catch (e) { |
|||
logger.info('Kafka Consumer stop error.'); |
|||
await disconnectProducer(); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
|
|||
async function disconnectProducer() { |
|||
if (producer) { |
|||
logger.info('Stopping Kafka Producer...'); |
|||
var _producer = producer; |
|||
producer = null; |
|||
try { |
|||
logger.info('Stopping loop...'); |
|||
clearTimeout(sendLoopInstance); |
|||
await sendMessagesAsBatch(); |
|||
await _producer.disconnect(); |
|||
logger.info('Kafka Producer stopped.'); |
|||
} catch (e) { |
|||
logger.info('Kafka Producer stop error.'); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,291 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import config from 'config'; |
|||
import { _logger, KafkaJsWinstonLogCreator } from '../config/logger'; |
|||
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' |
|||
import { IQueue } from './queue.models'; |
|||
import { |
|||
Admin, |
|||
CompressionTypes, |
|||
Consumer, |
|||
Kafka, |
|||
KafkaConfig, |
|||
logLevel, |
|||
Partitioners, |
|||
Producer, |
|||
TopicMessages |
|||
} from 'kafkajs'; |
|||
|
|||
export class KafkaTemplate implements IQueue { |
|||
|
|||
private logger = _logger(`kafkaTemplate`); |
|||
private replicationFactor = Number(config.get('kafka.replication_factor')); |
|||
private topicProperties: string = config.get('kafka.topic_properties'); |
|||
private kafkaClientId: string = config.get('kafka.client_id'); |
|||
private acks = Number(config.get('kafka.acks')); |
|||
private maxBatchSize = Number(config.get('kafka.batch_size')); |
|||
private linger = Number(config.get('kafka.linger_ms')); |
|||
private requestTimeout = Number(config.get('kafka.requestTimeout')); |
|||
private compressionType = (config.get('kafka.compression') === "gzip") ? CompressionTypes.GZIP : CompressionTypes.None; |
|||
private partitionsConsumedConcurrently = Number(config.get('kafka.partitions_consumed_concurrently')); |
|||
|
|||
private kafkaClient: Kafka; |
|||
private kafkaAdmin: Admin; |
|||
private consumer: Consumer; |
|||
private producer: Producer; |
|||
private configEntries: any[] = []; |
|||
private batchMessages: TopicMessages[] = []; |
|||
private sendLoopInstance: NodeJS.Timeout; |
|||
|
|||
constructor() { |
|||
} |
|||
|
|||
async init(): Promise<void> { |
|||
try { |
|||
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
|
|||
const kafkaBootstrapServers: string = config.get('kafka.bootstrap.servers'); |
|||
const requestTopic: string = config.get('request_topic'); |
|||
const useConfluent = config.get('kafka.use_confluent_cloud'); |
|||
|
|||
this.logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers); |
|||
this.logger.info('Kafka Requests Topic: %s', requestTopic); |
|||
|
|||
let kafkaConfig: KafkaConfig = { |
|||
brokers: kafkaBootstrapServers.split(','), |
|||
logLevel: logLevel.INFO, |
|||
logCreator: KafkaJsWinstonLogCreator |
|||
}; |
|||
|
|||
if (this.kafkaClientId) { |
|||
kafkaConfig['clientId'] = this.kafkaClientId; |
|||
} else { |
|||
this.logger.warn('KAFKA_CLIENT_ID is undefined. Consider to define the env variable KAFKA_CLIENT_ID'); |
|||
} |
|||
|
|||
kafkaConfig['requestTimeout'] = this.requestTimeout; |
|||
|
|||
if (useConfluent) { |
|||
kafkaConfig['sasl'] = { |
|||
mechanism: config.get('kafka.confluent.sasl.mechanism') as any, |
|||
username: config.get('kafka.confluent.username'), |
|||
password: config.get('kafka.confluent.password') |
|||
}; |
|||
kafkaConfig['ssl'] = true; |
|||
} |
|||
|
|||
this.parseTopicProperties(); |
|||
|
|||
this.kafkaClient = new Kafka(kafkaConfig); |
|||
this.kafkaAdmin = this.kafkaClient.admin(); |
|||
await this.kafkaAdmin.connect(); |
|||
|
|||
let partitions = 1; |
|||
|
|||
for (let i = 0; i < this.configEntries.length; i++) { |
|||
let param = this.configEntries[i]; |
|||
if (param.name === 'partitions') { |
|||
partitions = param.value; |
|||
this.configEntries.splice(i, 1); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
let topics = await this.kafkaAdmin.listTopics(); |
|||
|
|||
if (!topics.includes(requestTopic)) { |
|||
let createRequestTopicResult = await this.createTopic(requestTopic, partitions); |
|||
if (createRequestTopicResult) { |
|||
this.logger.info('Created new topic: %s', requestTopic); |
|||
} |
|||
} |
|||
|
|||
this.consumer = this.kafkaClient.consumer({groupId: 'js-executor-group'}); |
|||
this.producer = this.kafkaClient.producer({createPartitioner: Partitioners.DefaultPartitioner}); |
|||
|
|||
const {CRASH} = this.consumer.events; |
|||
|
|||
this.consumer.on(CRASH, e => { |
|||
this.logger.error(`Got consumer CRASH event, should restart: ${e.payload.restart}`); |
|||
if (!e.payload.restart) { |
|||
this.logger.error('Going to exit due to not retryable error!'); |
|||
this.exit(-1); |
|||
} |
|||
}); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(this); |
|||
await this.consumer.connect(); |
|||
await this.producer.connect(); |
|||
this.sendLoopWithLinger(); |
|||
await this.consumer.subscribe({topic: requestTopic}); |
|||
|
|||
this.logger.info('Started ThingsBoard JavaScript Executor Microservice.'); |
|||
await this.consumer.run({ |
|||
partitionsConsumedConcurrently: this.partitionsConsumedConcurrently, |
|||
eachMessage: async ({topic, partition, message}) => { |
|||
let headers = message.headers; |
|||
let key = message.key || new Buffer([]); |
|||
let msg = { |
|||
key: key.toString('utf8'), |
|||
data: message.value, |
|||
headers: { |
|||
data: headers |
|||
} |
|||
}; |
|||
messageProcessor.onJsInvokeMessage(msg); |
|||
}, |
|||
}); |
|||
|
|||
} catch (e: any) { |
|||
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
this.logger.error(e.stack); |
|||
await this.exit(-1); |
|||
} |
|||
} |
|||
|
|||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { |
|||
this.logger.debug('Pending queue response, scriptId: [%s]', scriptId); |
|||
const message = { |
|||
topic: responseTopic, |
|||
messages: [{ |
|||
key: scriptId, |
|||
value: rawResponse, |
|||
headers: headers.data |
|||
}] |
|||
}; |
|||
|
|||
await this.pushMessageToSendLater(message); |
|||
} |
|||
|
|||
private async pushMessageToSendLater(message: TopicMessages) { |
|||
this.batchMessages.push(message); |
|||
if (this.batchMessages.length >= this.maxBatchSize) { |
|||
await this.sendMessagesAsBatch(true); |
|||
} |
|||
} |
|||
|
|||
private async sendMessagesAsBatch(isImmediately = false): Promise<void> { |
|||
if (this.sendLoopInstance) { |
|||
clearTimeout(this.sendLoopInstance); |
|||
} |
|||
if (this.batchMessages.length > 0) { |
|||
this.logger.debug('sendMessagesAsBatch, length: [%s], %s', this.batchMessages.length, isImmediately ? 'immediately' : ''); |
|||
const messagesToSend = this.batchMessages; |
|||
this.batchMessages = []; |
|||
try { |
|||
await this.producer.sendBatch({ |
|||
topicMessages: messagesToSend, |
|||
acks: this.acks, |
|||
compression: this.compressionType |
|||
}) |
|||
this.logger.debug('Response batch sent to kafka, length: [%s]', messagesToSend.length); |
|||
} catch (err: any) { |
|||
this.logger.error('Failed batch send to kafka, length: [%s], pending to reprocess msgs', messagesToSend.length); |
|||
this.logger.error(err.stack); |
|||
this.batchMessages = messagesToSend.concat(this.batchMessages); |
|||
} |
|||
} |
|||
this.sendLoopWithLinger(); |
|||
} |
|||
|
|||
private parseTopicProperties() { |
|||
const props = this.topicProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
this.configEntries.push({ |
|||
name: p.substring(0, delimiterPosition), |
|||
value: p.substring(delimiterPosition + 1) |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
private createTopic(topic: string, partitions: number): Promise<boolean> { |
|||
return this.kafkaAdmin.createTopics({ |
|||
topics: [{ |
|||
topic: topic, |
|||
numPartitions: partitions, |
|||
replicationFactor: this.replicationFactor, |
|||
configEntries: this.configEntries |
|||
}] |
|||
}); |
|||
} |
|||
|
|||
private sendLoopWithLinger() { |
|||
if (this.sendLoopInstance) { |
|||
clearTimeout(this.sendLoopInstance); |
|||
// } else {
|
|||
// this.logger.debug("Starting new send loop with linger [%s]", this.linger)
|
|||
} |
|||
this.sendLoopInstance = setTimeout(async () => { |
|||
await this.sendMessagesAsBatch() |
|||
}, this.linger); |
|||
} |
|||
|
|||
static async build(): Promise<KafkaTemplate> { |
|||
const queue = new KafkaTemplate(); |
|||
await queue.init(); |
|||
return queue; |
|||
} |
|||
|
|||
|
|||
async exit(status: number): Promise<void> { |
|||
this.logger.info('Exiting with status: %d ...', status); |
|||
|
|||
if (this.kafkaAdmin) { |
|||
this.logger.info('Stopping Kafka Admin...'); |
|||
await this.kafkaAdmin.disconnect(); |
|||
// @ts-ignore
|
|||
delete this.kafkaAdmin; |
|||
this.logger.info('Kafka Admin stopped.'); |
|||
} |
|||
|
|||
if (this.consumer) { |
|||
this.logger.info('Stopping Kafka Consumer...'); |
|||
try { |
|||
await this.consumer.disconnect(); |
|||
// @ts-ignore
|
|||
delete this.consumer; |
|||
this.logger.info('Kafka Consumer stopped.'); |
|||
await this.disconnectProducer(); |
|||
process.exit(status); |
|||
} catch (e: any) { |
|||
this.logger.info('Kafka Consumer stop error.'); |
|||
await this.disconnectProducer(); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
|
|||
private async disconnectProducer(): Promise<void> { |
|||
if (this.producer) { |
|||
this.logger.info('Stopping Kafka Producer...'); |
|||
try { |
|||
this.logger.info('Stopping loop...'); |
|||
clearTimeout(this.sendLoopInstance); |
|||
await this.sendMessagesAsBatch(); |
|||
await this.producer.disconnect(); |
|||
// @ts-ignore
|
|||
delete this.producer; |
|||
this.logger.info('Kafka Producer stopped.'); |
|||
} catch (e) { |
|||
this.logger.info('Kafka Producer stop error.'); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,162 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
'use strict'; |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('pubSubTemplate'); |
|||
const {PubSub} = require('@google-cloud/pubsub'); |
|||
|
|||
const projectId = config.get('pubsub.project_id'); |
|||
const credentials = JSON.parse(config.get('pubsub.service_account')); |
|||
const requestTopic = config.get('request_topic'); |
|||
const queueProperties = config.get('pubsub.queue_properties'); |
|||
|
|||
let pubSubClient; |
|||
|
|||
const topics = []; |
|||
const subscriptions = []; |
|||
const queueProps = []; |
|||
|
|||
function PubSubProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
|
|||
if (!(subscriptions.includes(responseTopic) && topics.includes(requestTopic))) { |
|||
await createTopic(requestTopic); |
|||
} |
|||
|
|||
let data = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
let dataBuffer = Buffer.from(data); |
|||
return pubSubClient.topic(responseTopic).publish(dataBuffer); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
pubSubClient = new PubSub({projectId: projectId, credentials: credentials}); |
|||
|
|||
parseQueueProperties(); |
|||
|
|||
const topicList = await pubSubClient.getTopics(); |
|||
|
|||
if (topicList) { |
|||
topicList[0].forEach(topic => { |
|||
topics.push(getName(topic.name)); |
|||
}); |
|||
} |
|||
|
|||
const subscriptionList = await pubSubClient.getSubscriptions(); |
|||
|
|||
if (subscriptionList) { |
|||
topicList[0].forEach(sub => { |
|||
subscriptions.push(getName(sub.name)); |
|||
}); |
|||
} |
|||
|
|||
if (!(subscriptions.includes(requestTopic) && topics.includes(requestTopic))) { |
|||
await createTopic(requestTopic); |
|||
} |
|||
|
|||
const subscription = pubSubClient.subscription(requestTopic); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new PubSubProducer()); |
|||
|
|||
const messageHandler = message => { |
|||
|
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); |
|||
message.ack(); |
|||
}; |
|||
|
|||
subscription.on('message', messageHandler); |
|||
|
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
async function createTopic(topic) { |
|||
if (!topics.includes(topic)) { |
|||
try { |
|||
await pubSubClient.createTopic(topic); |
|||
logger.info('Created new Pub/Sub topic: %s', topic); |
|||
} catch (e) { |
|||
logger.info('Pub/Sub topic already exists'); |
|||
} |
|||
topics.push(topic); |
|||
} |
|||
await createSubscription(topic) |
|||
} |
|||
|
|||
async function createSubscription(topic) { |
|||
if (!subscriptions.includes(topic)) { |
|||
try { |
|||
await pubSubClient.createSubscription(topic, topic, { |
|||
topic: topic, |
|||
subscription: topic, |
|||
ackDeadlineSeconds: queueProps['ackDeadlineInSec'], |
|||
messageRetentionDuration: {seconds: queueProps['messageRetentionInSec']} |
|||
}); |
|||
logger.info('Created new Pub/Sub subscription: %s', topic); |
|||
} catch (e) { |
|||
logger.info('Pub/Sub subscription already exists.'); |
|||
} |
|||
|
|||
subscriptions.push(topic); |
|||
} |
|||
} |
|||
|
|||
function parseQueueProperties() { |
|||
const props = queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
} |
|||
|
|||
function getName(fullName) { |
|||
const delimiterPosition = fullName.lastIndexOf('/'); |
|||
return fullName.substring(delimiterPosition + 1); |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
if (pubSubClient) { |
|||
logger.info('Stopping Pub/Sub client.') |
|||
try { |
|||
await pubSubClient.close(); |
|||
logger.info('Pub/Sub client stopped.') |
|||
process.exit(status); |
|||
} catch (e) { |
|||
logger.info('Pub/Sub client stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,175 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import config from 'config'; |
|||
import { _logger } from '../config/logger'; |
|||
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' |
|||
import { PubSub } from '@google-cloud/pubsub'; |
|||
import { IQueue } from './queue.models'; |
|||
import { Message } from '@google-cloud/pubsub/build/src/subscriber'; |
|||
|
|||
export class PubSubTemplate implements IQueue { |
|||
|
|||
private logger = _logger(`pubSubTemplate`); |
|||
private projectId: string = config.get('pubsub.project_id'); |
|||
private credentials = JSON.parse(config.get('pubsub.service_account')); |
|||
private requestTopic: string = config.get('request_topic'); |
|||
private queueProperties: string = config.get('pubsub.queue_properties'); |
|||
|
|||
private pubSubClient: PubSub; |
|||
private queueProps: { [n: string]: string } = {}; |
|||
private topics: string[] = []; |
|||
private subscriptions: string[] = []; |
|||
|
|||
constructor() { |
|||
} |
|||
|
|||
async init() { |
|||
try { |
|||
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
this.pubSubClient = new PubSub({ |
|||
projectId: this.projectId, |
|||
credentials: this.credentials |
|||
}); |
|||
|
|||
this.parseQueueProperties(); |
|||
|
|||
const topicList = await this.pubSubClient.getTopics(); |
|||
|
|||
if (topicList) { |
|||
topicList[0].forEach(topic => { |
|||
this.topics.push(PubSubTemplate.getName(topic.name)); |
|||
}); |
|||
} |
|||
|
|||
const subscriptionList = await this.pubSubClient.getSubscriptions(); |
|||
|
|||
if (subscriptionList) { |
|||
topicList[0].forEach(sub => { |
|||
this.subscriptions.push(PubSubTemplate.getName(sub.name)); |
|||
}); |
|||
} |
|||
|
|||
if (!(this.subscriptions.includes(this.requestTopic) && this.topics.includes(this.requestTopic))) { |
|||
await this.createTopic(this.requestTopic); |
|||
await this.createSubscription(this.requestTopic); |
|||
} |
|||
|
|||
const subscription = this.pubSubClient.subscription(this.requestTopic); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(this); |
|||
|
|||
const messageHandler = (message: Message) => { |
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.data.toString('utf8'))); |
|||
message.ack(); |
|||
}; |
|||
|
|||
subscription.on('message', messageHandler); |
|||
|
|||
} catch (e: any) { |
|||
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
this.logger.error(e.stack); |
|||
await this.exit(-1); |
|||
} |
|||
} |
|||
|
|||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { |
|||
if (!(this.subscriptions.includes(responseTopic) && this.topics.includes(this.requestTopic))) { |
|||
await this.createTopic(this.requestTopic); |
|||
await this.createSubscription(this.requestTopic); |
|||
} |
|||
|
|||
let data = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
let dataBuffer = Buffer.from(data); |
|||
return this.pubSubClient.topic(responseTopic).publishMessage({data: dataBuffer}); |
|||
} |
|||
|
|||
private parseQueueProperties() { |
|||
const props = this.queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
this.queueProps[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
} |
|||
|
|||
private static getName(fullName: string): string { |
|||
const delimiterPosition = fullName.lastIndexOf('/'); |
|||
return fullName.substring(delimiterPosition + 1); |
|||
} |
|||
|
|||
private async createTopic(topic: string) { |
|||
if (!this.topics.includes(topic)) { |
|||
try { |
|||
await this.pubSubClient.createTopic(topic); |
|||
this.logger.info('Created new Pub/Sub topic: %s', topic); |
|||
} catch (e) { |
|||
this.logger.info('Pub/Sub topic already exists'); |
|||
} |
|||
this.topics.push(topic); |
|||
} |
|||
} |
|||
|
|||
private async createSubscription(topic: string) { |
|||
if (!this.subscriptions.includes(topic)) { |
|||
try { |
|||
await this.pubSubClient.createSubscription(topic, topic, { |
|||
topic: topic, |
|||
name: topic, |
|||
ackDeadlineSeconds: Number(this.queueProps['ackDeadlineInSec']), |
|||
messageRetentionDuration: { |
|||
seconds: this.queueProps['messageRetentionInSec'] |
|||
} |
|||
}); |
|||
this.logger.info('Created new Pub/Sub subscription: %s', topic); |
|||
} catch (e) { |
|||
this.logger.info('Pub/Sub subscription already exists.'); |
|||
} |
|||
|
|||
this.subscriptions.push(topic); |
|||
} |
|||
} |
|||
|
|||
static async build(): Promise<PubSubTemplate> { |
|||
const queue = new PubSubTemplate(); |
|||
await queue.init(); |
|||
return queue; |
|||
} |
|||
|
|||
async exit(status: number): Promise<void> { |
|||
this.logger.info('Exiting with status: %d ...', status); |
|||
if (this.pubSubClient) { |
|||
this.logger.info('Stopping Pub/Sub client.') |
|||
try { |
|||
await this.pubSubClient.close(); |
|||
// @ts-ignore
|
|||
delete this.pubSubClient; |
|||
this.logger.info('Pub/Sub client stopped.') |
|||
process.exit(status); |
|||
} catch (e) { |
|||
this.logger.info('Pub/Sub client stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,21 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
export interface IQueue { |
|||
init(): Promise<void>; |
|||
send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any>; |
|||
exit(status: number): Promise<void>; |
|||
} |
|||
@ -1,181 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
'use strict'; |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('rabbitmqTemplate'); |
|||
|
|||
const requestTopic = config.get('request_topic'); |
|||
const host = config.get('rabbitmq.host'); |
|||
const port = config.get('rabbitmq.port'); |
|||
const vhost = config.get('rabbitmq.virtual_host'); |
|||
const username = config.get('rabbitmq.username'); |
|||
const password = config.get('rabbitmq.password'); |
|||
const queueProperties = config.get('rabbitmq.queue_properties'); |
|||
const pollInterval = config.get('js.response_poll_interval'); |
|||
|
|||
const amqp = require('amqplib/callback_api'); |
|||
|
|||
let queueOptions = {durable: false, exclusive: false, autoDelete: false}; |
|||
let connection; |
|||
let channel; |
|||
let stopped = false; |
|||
let queues = []; |
|||
|
|||
function RabbitMqProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
|
|||
if (!queues.includes(responseTopic)) { |
|||
await createQueue(responseTopic); |
|||
queues.push(responseTopic); |
|||
} |
|||
|
|||
let data = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
let dataBuffer = Buffer.from(data); |
|||
channel.sendToQueue(responseTopic, dataBuffer); |
|||
return new Promise((resolve, reject) => { |
|||
channel.waitForConfirms((err) => { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
const url = `amqp://${username}:${password}@${host}:${port}${vhost}`; |
|||
|
|||
connection = await new Promise((resolve, reject) => { |
|||
amqp.connect(url, function (err, connection) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(connection); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
channel = await new Promise((resolve, reject) => { |
|||
connection.createConfirmChannel(function (err, channel) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(channel); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
parseQueueProperties(); |
|||
|
|||
await createQueue(requestTopic); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new RabbitMqProducer()); |
|||
|
|||
while (!stopped) { |
|||
let pollStartTs = new Date().getTime(); |
|||
let message = await new Promise((resolve, reject) => { |
|||
channel.get(requestTopic, {}, function (err, msg) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(msg); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
if (message) { |
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); |
|||
channel.ack(message); |
|||
} else { |
|||
let pollDuration = new Date().getTime() - pollStartTs; |
|||
if (pollDuration < pollInterval) { |
|||
await sleep(pollInterval - pollDuration); |
|||
} |
|||
} |
|||
} |
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
function parseQueueProperties() { |
|||
let args = {}; |
|||
const props = queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
args[p.substring(0, delimiterPosition)] = +p.substring(delimiterPosition + 1); |
|||
}); |
|||
queueOptions['arguments'] = args; |
|||
} |
|||
|
|||
async function createQueue(topic) { |
|||
return new Promise((resolve, reject) => { |
|||
channel.assertQueue(topic, queueOptions, function (err) { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
resolve(); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
function sleep(ms) { |
|||
return new Promise((resolve) => { |
|||
setTimeout(resolve, ms); |
|||
}); |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
|
|||
if (channel) { |
|||
logger.info('Stopping RabbitMq chanel.') |
|||
await channel.close(); |
|||
logger.info('RabbitMq chanel stopped'); |
|||
} |
|||
|
|||
if (connection) { |
|||
logger.info('Stopping RabbitMq connection.') |
|||
try { |
|||
await connection.close(); |
|||
logger.info('RabbitMq client connection.') |
|||
process.exit(status); |
|||
} catch (e) { |
|||
logger.info('RabbitMq connection stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
@ -0,0 +1,151 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import config from 'config'; |
|||
import { _logger } from '../config/logger'; |
|||
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' |
|||
import { IQueue } from './queue.models'; |
|||
import amqp, { ConfirmChannel, Connection } from 'amqplib'; |
|||
import { Options, Replies } from 'amqplib/properties'; |
|||
import { sleep } from '../api/utils'; |
|||
|
|||
export class RabbitMqTemplate implements IQueue { |
|||
|
|||
private logger = _logger(`rabbitmqTemplate`); |
|||
private requestTopic: string = config.get('request_topic'); |
|||
private host = config.get('rabbitmq.host'); |
|||
private port = config.get('rabbitmq.port'); |
|||
private vhost = config.get('rabbitmq.virtual_host'); |
|||
private username = config.get('rabbitmq.username'); |
|||
private password = config.get('rabbitmq.password'); |
|||
private queueProperties: string = config.get('rabbitmq.queue_properties'); |
|||
private pollInterval = Number(config.get('js.response_poll_interval')); |
|||
|
|||
private queueOptions: Options.AssertQueue = { |
|||
durable: false, |
|||
exclusive: false, |
|||
autoDelete: false |
|||
}; |
|||
private connection: Connection; |
|||
private channel: ConfirmChannel; |
|||
private stopped = false; |
|||
private topics: string[] = []; |
|||
|
|||
constructor() { |
|||
} |
|||
|
|||
async init(): Promise<void> { |
|||
try { |
|||
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
|
|||
const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`; |
|||
this.connection = await amqp.connect(url); |
|||
this.channel = await this.connection.createConfirmChannel(); |
|||
|
|||
this.parseQueueProperties(); |
|||
|
|||
await this.createQueue(this.requestTopic); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(this); |
|||
|
|||
while (!this.stopped) { |
|||
let pollStartTs = new Date().getTime(); |
|||
let message = await this.channel.get(this.requestTopic); |
|||
|
|||
if (message) { |
|||
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8'))); |
|||
this.channel.ack(message); |
|||
} else { |
|||
let pollDuration = new Date().getTime() - pollStartTs; |
|||
if (pollDuration < this.pollInterval) { |
|||
await sleep(this.pollInterval - pollDuration); |
|||
} |
|||
} |
|||
} |
|||
} catch (e: any) { |
|||
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
this.logger.error(e.stack); |
|||
await this.exit(-1); |
|||
} |
|||
} |
|||
|
|||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { |
|||
|
|||
if (!this.topics.includes(responseTopic)) { |
|||
await this.createQueue(responseTopic); |
|||
this.topics.push(responseTopic); |
|||
} |
|||
|
|||
let data = JSON.stringify( |
|||
{ |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}); |
|||
let dataBuffer = Buffer.from(data); |
|||
this.channel.sendToQueue(responseTopic, dataBuffer); |
|||
return this.channel.waitForConfirms() |
|||
} |
|||
|
|||
private parseQueueProperties() { |
|||
let args: { [n: string]: number } = {}; |
|||
const props = this.queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
args[p.substring(0, delimiterPosition)] = Number(p.substring(delimiterPosition + 1)); |
|||
}); |
|||
this.queueOptions['arguments'] = args; |
|||
} |
|||
|
|||
private async createQueue(topic: string): Promise<Replies.AssertQueue> { |
|||
return this.channel.assertQueue(topic, this.queueOptions); |
|||
} |
|||
|
|||
static async build(): Promise<RabbitMqTemplate> { |
|||
const queue = new RabbitMqTemplate(); |
|||
await queue.init(); |
|||
return queue; |
|||
} |
|||
|
|||
async exit(status: number) { |
|||
this.logger.info('Exiting with status: %d ...', status); |
|||
|
|||
if (this.channel) { |
|||
this.logger.info('Stopping RabbitMq chanel.') |
|||
await this.channel.close(); |
|||
// @ts-ignore
|
|||
delete this.channel; |
|||
this.logger.info('RabbitMq chanel stopped'); |
|||
} |
|||
|
|||
if (this.connection) { |
|||
this.logger.info('Stopping RabbitMq connection.') |
|||
try { |
|||
await this.connection.close(); |
|||
// @ts-ignore
|
|||
delete this.connection; |
|||
this.logger.info('RabbitMq client connection.') |
|||
process.exit(status); |
|||
} catch (e) { |
|||
this.logger.info('RabbitMq connection stop error.'); |
|||
process.exit(status); |
|||
} |
|||
} else { |
|||
process.exit(status); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -1,177 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
'use strict'; |
|||
|
|||
const config = require('config'), |
|||
JsInvokeMessageProcessor = require('../api/jsInvokeMessageProcessor'), |
|||
logger = require('../config/logger')._logger('serviceBusTemplate'); |
|||
const {ServiceBusClient, ServiceBusAdministrationClient} = require("@azure/service-bus"); |
|||
|
|||
const requestTopic = config.get('request_topic'); |
|||
const namespaceName = config.get('service_bus.namespace_name'); |
|||
const sasKeyName = config.get('service_bus.sas_key_name'); |
|||
const sasKey = config.get('service_bus.sas_key'); |
|||
const queueProperties = config.get('service_bus.queue_properties'); |
|||
|
|||
let sbClient; |
|||
let receiver; |
|||
let serviceBusService; |
|||
|
|||
let queueOptions = {}; |
|||
const queues = []; |
|||
const senderMap = new Map(); |
|||
|
|||
function ServiceBusProducer() { |
|||
this.send = async (responseTopic, scriptId, rawResponse, headers) => { |
|||
if (!queues.includes(requestTopic)) { |
|||
await createQueueIfNotExist(requestTopic); |
|||
queues.push(requestTopic); |
|||
} |
|||
|
|||
let customSender = senderMap.get(responseTopic); |
|||
|
|||
if (!customSender) { |
|||
customSender = new CustomSender(responseTopic); |
|||
senderMap.set(responseTopic, customSender); |
|||
} |
|||
|
|||
let data = { |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}; |
|||
|
|||
return customSender.send({body: data}); |
|||
} |
|||
} |
|||
|
|||
function CustomSender(topic) { |
|||
this.sender = sbClient.createSender(topic); |
|||
|
|||
this.send = async (message) => { |
|||
return this.sender.sendMessages(message); |
|||
} |
|||
} |
|||
|
|||
(async () => { |
|||
try { |
|||
logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
|
|||
const connectionString = `Endpoint=sb://${namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${sasKeyName};SharedAccessKey=${sasKey}`; |
|||
sbClient = new ServiceBusClient(connectionString) |
|||
serviceBusService = new ServiceBusAdministrationClient(connectionString); |
|||
|
|||
parseQueueProperties(); |
|||
|
|||
await new Promise((resolve, reject) => { |
|||
serviceBusService.listQueues((err, data) => { |
|||
if (err) { |
|||
reject(err); |
|||
} else { |
|||
for (const queue of data) { |
|||
queues.push(queue.name); |
|||
} |
|||
resolve(); |
|||
} |
|||
}); |
|||
}); |
|||
|
|||
if (!queues.includes(requestTopic)) { |
|||
await createQueueIfNotExist(requestTopic); |
|||
queues.push(requestTopic); |
|||
} |
|||
|
|||
receiver = sbClient.createReceiver(requestTopic, {receiveMode: 'peekLock'}); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(new ServiceBusProducer()); |
|||
|
|||
const messageHandler = async (message) => { |
|||
if (message) { |
|||
messageProcessor.onJsInvokeMessage(message.body); |
|||
await message.complete(); |
|||
} |
|||
}; |
|||
const errorHandler = (error) => { |
|||
logger.error('Failed to receive message from queue.', error); |
|||
}; |
|||
receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) |
|||
} catch (e) { |
|||
logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
logger.error(e.stack); |
|||
await exit(-1); |
|||
} |
|||
})(); |
|||
|
|||
async function createQueueIfNotExist(topic) { |
|||
return new Promise((resolve, reject) => { |
|||
serviceBusService.createQueue(topic, queueOptions, (err) => { |
|||
if (err && err.code !== "MessageEntityAlreadyExistsError") { |
|||
reject(err); |
|||
} else { |
|||
resolve(); |
|||
} |
|||
}); |
|||
}); |
|||
} |
|||
|
|||
function parseQueueProperties() { |
|||
let properties = {}; |
|||
const props = queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
queueOptions = { |
|||
requiresDuplicateDetection: false, |
|||
maxSizeInMegabytes: properties['maxSizeInMb'], |
|||
defaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, |
|||
lockDuration: `PT${properties['lockDurationInSec']}S` |
|||
}; |
|||
} |
|||
|
|||
process.on('exit', () => { |
|||
exit(0); |
|||
}); |
|||
|
|||
async function exit(status) { |
|||
logger.info('Exiting with status: %d ...', status); |
|||
logger.info('Stopping Azure Service Bus resources...') |
|||
if (receiver) { |
|||
try { |
|||
await receiver.close(); |
|||
} catch (e) { |
|||
|
|||
} |
|||
} |
|||
|
|||
senderMap.forEach((k, v) => { |
|||
try { |
|||
v.sender.close(); |
|||
} catch (e) { |
|||
|
|||
} |
|||
}); |
|||
|
|||
if (sbClient) { |
|||
try { |
|||
sbClient.close(); |
|||
} catch (e) { |
|||
|
|||
} |
|||
} |
|||
logger.info('Azure Service Bus resources stopped.') |
|||
process.exit(status); |
|||
} |
|||
@ -0,0 +1,175 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import config from 'config'; |
|||
import { _logger } from '../config/logger'; |
|||
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor' |
|||
import { IQueue } from './queue.models'; |
|||
import { |
|||
CreateQueueOptions, |
|||
ProcessErrorArgs, |
|||
ServiceBusAdministrationClient, |
|||
ServiceBusClient, |
|||
ServiceBusReceivedMessage, |
|||
ServiceBusReceiver, |
|||
ServiceBusSender |
|||
} from '@azure/service-bus'; |
|||
|
|||
export class ServiceBusTemplate implements IQueue { |
|||
|
|||
private logger = _logger(`serviceBusTemplate`); |
|||
private requestTopic: string = config.get('request_topic'); |
|||
private namespaceName = config.get('service_bus.namespace_name'); |
|||
private sasKeyName = config.get('service_bus.sas_key_name'); |
|||
private sasKey = config.get('service_bus.sas_key'); |
|||
private queueProperties: string = config.get('service_bus.queue_properties'); |
|||
|
|||
private sbClient: ServiceBusClient; |
|||
private serviceBusService: ServiceBusAdministrationClient; |
|||
private queueOptions: CreateQueueOptions = {}; |
|||
private queues: string[] = []; |
|||
private receiver: ServiceBusReceiver; |
|||
private senderMap = new Map<string, ServiceBusSender>(); |
|||
|
|||
constructor() { |
|||
} |
|||
|
|||
async init() { |
|||
try { |
|||
this.logger.info('Starting ThingsBoard JavaScript Executor Microservice...'); |
|||
|
|||
const connectionString = `Endpoint=sb://${this.namespaceName}.servicebus.windows.net/;SharedAccessKeyName=${this.sasKeyName};SharedAccessKey=${this.sasKey}`; |
|||
this.sbClient = new ServiceBusClient(connectionString) |
|||
this.serviceBusService = new ServiceBusAdministrationClient(connectionString); |
|||
|
|||
this.parseQueueProperties(); |
|||
|
|||
const listQueues = await this.serviceBusService.listQueues(); |
|||
for await (const queue of listQueues) { |
|||
this.queues.push(queue.name); |
|||
} |
|||
|
|||
if (!this.queues.includes(this.requestTopic)) { |
|||
await this.createQueueIfNotExist(this.requestTopic); |
|||
this.queues.push(this.requestTopic); |
|||
} |
|||
|
|||
this.receiver = this.sbClient.createReceiver(this.requestTopic, {receiveMode: 'peekLock'}); |
|||
|
|||
const messageProcessor = new JsInvokeMessageProcessor(this); |
|||
|
|||
const messageHandler = async (message: ServiceBusReceivedMessage) => { |
|||
if (message) { |
|||
messageProcessor.onJsInvokeMessage(message.body); |
|||
await this.receiver.completeMessage(message); |
|||
} |
|||
}; |
|||
const errorHandler = async (error: ProcessErrorArgs) => { |
|||
this.logger.error('Failed to receive message from queue.', error); |
|||
}; |
|||
this.receiver.subscribe({processMessage: messageHandler, processError: errorHandler}) |
|||
} catch (e: any) { |
|||
this.logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message); |
|||
this.logger.error(e.stack); |
|||
await this.exit(-1); |
|||
} |
|||
} |
|||
|
|||
async send(responseTopic: string, scriptId: string, rawResponse: Buffer, headers: any): Promise<any> { |
|||
if (!this.queues.includes(this.requestTopic)) { |
|||
await this.createQueueIfNotExist(this.requestTopic); |
|||
this.queues.push(this.requestTopic); |
|||
} |
|||
|
|||
let customSender = this.senderMap.get(responseTopic); |
|||
|
|||
if (!customSender) { |
|||
customSender = this.sbClient.createSender(responseTopic); |
|||
this.senderMap.set(responseTopic, customSender); |
|||
} |
|||
|
|||
let data = { |
|||
key: scriptId, |
|||
data: [...rawResponse], |
|||
headers: headers |
|||
}; |
|||
|
|||
return customSender.sendMessages({body: data}); |
|||
} |
|||
|
|||
private parseQueueProperties() { |
|||
let properties: { [n: string]: string } = {}; |
|||
const props = this.queueProperties.split(';'); |
|||
props.forEach(p => { |
|||
const delimiterPosition = p.indexOf(':'); |
|||
properties[p.substring(0, delimiterPosition)] = p.substring(delimiterPosition + 1); |
|||
}); |
|||
this.queueOptions = { |
|||
requiresDuplicateDetection: false, |
|||
maxSizeInMegabytes: Number(properties['maxSizeInMb']), |
|||
defaultMessageTimeToLive: `PT${properties['messageTimeToLiveInSec']}S`, |
|||
lockDuration: `PT${properties['lockDurationInSec']}S` |
|||
}; |
|||
} |
|||
|
|||
private async createQueueIfNotExist(topic: string) { |
|||
try { |
|||
await this.serviceBusService.createQueue(topic, this.queueOptions) |
|||
} catch (err: any) { |
|||
if (err && err.code !== "MessageEntityAlreadyExistsError") { |
|||
throw new Error(err); |
|||
} |
|||
} |
|||
} |
|||
|
|||
static async build(): Promise<ServiceBusTemplate> { |
|||
const queue = new ServiceBusTemplate(); |
|||
await queue.init(); |
|||
return queue; |
|||
} |
|||
|
|||
async exit(status: number) { |
|||
this.logger.info('Exiting with status: %d ...', status); |
|||
this.logger.info('Stopping Azure Service Bus resources...') |
|||
if (this.receiver) { |
|||
try { |
|||
await this.receiver.close(); |
|||
// @ts-ignore
|
|||
delete this.receiver; |
|||
} catch (e) { |
|||
} |
|||
} |
|||
|
|||
this.senderMap.forEach(k => { |
|||
try { |
|||
k.close(); |
|||
} catch (e) { |
|||
} |
|||
}); |
|||
this.senderMap.clear(); |
|||
|
|||
if (this.sbClient) { |
|||
try { |
|||
await this.sbClient.close(); |
|||
// @ts-ignore
|
|||
delete this.sbClient; |
|||
} catch (e) { |
|||
} |
|||
} |
|||
this.logger.info('Azure Service Bus resources stopped.') |
|||
process.exit(status); |
|||
} |
|||
} |
|||
@ -1,55 +0,0 @@ |
|||
/* |
|||
* Copyright © 2016-2022 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
const config = require('config'), logger = require('./config/logger')._logger('main'); |
|||
|
|||
logger.info('===CONFIG BEGIN==='); |
|||
logger.info(JSON.stringify(config, null, 4)); |
|||
logger.info('===CONFIG END==='); |
|||
|
|||
const serviceType = config.get('queue_type'); |
|||
switch (serviceType) { |
|||
case 'kafka': |
|||
logger.info('Starting kafka template.'); |
|||
require('./queue/kafkaTemplate'); |
|||
logger.info('kafka template started.'); |
|||
break; |
|||
case 'pubsub': |
|||
logger.info('Starting Pub/Sub template.') |
|||
require('./queue/pubSubTemplate'); |
|||
logger.info('Pub/Sub template started.') |
|||
break; |
|||
case 'aws-sqs': |
|||
logger.info('Starting Aws Sqs template.') |
|||
require('./queue/awsSqsTemplate'); |
|||
logger.info('Aws Sqs template started.') |
|||
break; |
|||
case 'rabbitmq': |
|||
logger.info('Starting RabbitMq template.') |
|||
require('./queue/rabbitmqTemplate'); |
|||
logger.info('RabbitMq template started.') |
|||
break; |
|||
case 'service-bus': |
|||
logger.info('Starting Azure Service Bus template.') |
|||
require('./queue/serviceBusTemplate'); |
|||
logger.info('Azure Service Bus template started.') |
|||
break; |
|||
default: |
|||
logger.error('Unknown service type: ', serviceType); |
|||
process.exit(-1); |
|||
} |
|||
|
|||
require('./api/httpServer'); |
|||
|
|||
@ -0,0 +1,85 @@ |
|||
///
|
|||
/// Copyright © 2016-2022 The Thingsboard Authors
|
|||
///
|
|||
/// Licensed under the Apache License, Version 2.0 (the "License");
|
|||
/// you may not use this file except in compliance with the License.
|
|||
/// You may obtain a copy of the License at
|
|||
///
|
|||
/// http://www.apache.org/licenses/LICENSE-2.0
|
|||
///
|
|||
/// Unless required by applicable law or agreed to in writing, software
|
|||
/// distributed under the License is distributed on an "AS IS" BASIS,
|
|||
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|||
/// See the License for the specific language governing permissions and
|
|||
/// limitations under the License.
|
|||
///
|
|||
|
|||
import config from 'config'; |
|||
import { _logger } from './config/logger'; |
|||
import { HttpServer } from './api/httpServer'; |
|||
import { IQueue } from './queue/queue.models'; |
|||
import { KafkaTemplate } from './queue/kafkaTemplate'; |
|||
import { PubSubTemplate } from './queue/pubSubTemplate'; |
|||
import { AwsSqsTemplate } from './queue/awsSqsTemplate'; |
|||
import { RabbitMqTemplate } from './queue/rabbitmqTemplate'; |
|||
import { ServiceBusTemplate } from './queue/serviceBusTemplate'; |
|||
|
|||
const logger = _logger('main'); |
|||
|
|||
logger.info('===CONFIG BEGIN==='); |
|||
logger.info(JSON.stringify(config, null, 4)); |
|||
logger.info('===CONFIG END==='); |
|||
|
|||
const serviceType = config.get('queue_type'); |
|||
const httpPort = Number(config.get('http_port')); |
|||
let queues: IQueue; |
|||
let httpServer: HttpServer; |
|||
|
|||
(async () => { |
|||
switch (serviceType) { |
|||
case 'kafka': |
|||
logger.info('Starting kafka template.'); |
|||
queues = await KafkaTemplate.build(); |
|||
logger.info('kafka template started.'); |
|||
break; |
|||
case 'pubsub': |
|||
logger.info('Starting Pub/Sub template.') |
|||
queues = await PubSubTemplate.build(); |
|||
logger.info('Pub/Sub template started.') |
|||
break; |
|||
case 'aws-sqs': |
|||
logger.info('Starting Aws Sqs template.') |
|||
queues = await AwsSqsTemplate.build(); |
|||
logger.info('Aws Sqs template started.') |
|||
break; |
|||
case 'rabbitmq': |
|||
logger.info('Starting RabbitMq template.') |
|||
queues = await RabbitMqTemplate.build(); |
|||
logger.info('RabbitMq template started.') |
|||
break; |
|||
case 'service-bus': |
|||
logger.info('Starting Azure Service Bus template.') |
|||
queues = await ServiceBusTemplate.build(); |
|||
logger.info('Azure Service Bus template started.') |
|||
break; |
|||
default: |
|||
logger.error('Unknown service type: ', serviceType); |
|||
process.exit(-1); |
|||
} |
|||
|
|||
httpServer = new HttpServer(httpPort); |
|||
})(); |
|||
|
|||
process.on('SIGTERM', () => { |
|||
process.exit(); |
|||
}); |
|||
|
|||
process.on('exit', async () => { |
|||
if (httpServer) { |
|||
httpServer.stop(); |
|||
} |
|||
if (queues) { |
|||
queues.exit(0); |
|||
} |
|||
}); |
|||
|
|||
@ -0,0 +1,13 @@ |
|||
{ |
|||
"compilerOptions": { |
|||
"outDir": "target/src", |
|||
"target": "es2016", |
|||
"module": "commonjs", |
|||
"esModuleInterop": true, |
|||
"forceConsistentCasingInFileNames": true, |
|||
"strict": true, |
|||
"skipLibCheck": true, |
|||
"strictPropertyInitialization": false |
|||
}, |
|||
"exclude": ["node_modules", "target"] |
|||
} |
|||
File diff suppressed because it is too large
Loading…
Reference in new issue