|
|
|
@ -32,15 +32,17 @@ const maxActiveScripts = Number(config.get('script.max_active_scripts')); |
|
|
|
const slowQueryLogMs = Number(config.get('script.slow_query_log_ms')); |
|
|
|
const slowQueryLogBody = config.get('script.slow_query_log_body') === 'true'; |
|
|
|
|
|
|
|
const {performance} = require('perf_hooks'); |
|
|
|
const { performance } = require('node:perf_hooks'); |
|
|
|
|
|
|
|
function JsInvokeMessageProcessor(producer) { |
|
|
|
this.producer = producer; |
|
|
|
this.executor = new JsExecutor(useSandbox); |
|
|
|
this.scriptMap = new Map(); |
|
|
|
this.scriptIds = []; |
|
|
|
this.executedScriptIdsCounter = []; |
|
|
|
this.executedScriptsCounter = 0; |
|
|
|
this.lastStatTime = performance.now(); |
|
|
|
this.compilationTime = 0; |
|
|
|
} |
|
|
|
|
|
|
|
JsInvokeMessageProcessor.prototype.onJsInvokeMessage = function (message) { |
|
|
|
@ -125,7 +127,8 @@ JsInvokeMessageProcessor.prototype.processInvokeRequest = function (requestId, r |
|
|
|
const msSinceLastStat = nowMs - this.lastStatTime; |
|
|
|
const requestsPerSec = msSinceLastStat == 0 ? statFrequency : statFrequency / msSinceLastStat * 1000; |
|
|
|
this.lastStatTime = nowMs; |
|
|
|
logger.info('STAT[%s]: requests [%s], took [%s]ms, request/s [%s]', this.executedScriptsCounter, statFrequency, msSinceLastStat, requestsPerSec); |
|
|
|
logger.info('STAT[%s]: requests [%s], took [%s]ms, request/s [%s], compilation [%s]ms', this.executedScriptsCounter, statFrequency, msSinceLastStat, requestsPerSec, this.compilationTime); |
|
|
|
this.compilationTime = 0; |
|
|
|
} |
|
|
|
|
|
|
|
if (this.executedScriptsCounter % scriptBodyTraceFrequency == 0) { |
|
|
|
@ -167,6 +170,7 @@ JsInvokeMessageProcessor.prototype.processReleaseRequest = function (requestId, |
|
|
|
var index = this.scriptIds.indexOf(scriptId); |
|
|
|
if (index > -1) { |
|
|
|
this.scriptIds.splice(index, 1); |
|
|
|
this.executedScriptIdsCounter.splice(index, 1); |
|
|
|
} |
|
|
|
this.scriptMap.delete(scriptId); |
|
|
|
} |
|
|
|
@ -198,14 +202,18 @@ JsInvokeMessageProcessor.prototype.getOrCompileScript = function (scriptId, scri |
|
|
|
return new Promise(function (resolve, reject) { |
|
|
|
const script = self.scriptMap.get(scriptId); |
|
|
|
if (script) { |
|
|
|
incrementUseScriptId.call(self, scriptId); |
|
|
|
resolve(script); |
|
|
|
} else { |
|
|
|
const startTime = performance.now(); |
|
|
|
self.executor.compileScript(scriptBody).then( |
|
|
|
(compiledScript) => { |
|
|
|
self.compilationTime += (performance.now() - startTime); |
|
|
|
self.cacheScript(scriptId, compiledScript); |
|
|
|
resolve(compiledScript); |
|
|
|
}, |
|
|
|
(err) => { |
|
|
|
self.compilationTime += (performance.now() - startTime); |
|
|
|
reject(err); |
|
|
|
} |
|
|
|
); |
|
|
|
@ -216,11 +224,10 @@ JsInvokeMessageProcessor.prototype.getOrCompileScript = function (scriptId, scri |
|
|
|
JsInvokeMessageProcessor.prototype.cacheScript = function (scriptId, script) { |
|
|
|
if (!this.scriptMap.has(scriptId)) { |
|
|
|
this.scriptIds.push(scriptId); |
|
|
|
this.executedScriptIdsCounter.push(0); |
|
|
|
while (this.scriptIds.length > maxActiveScripts) { |
|
|
|
logger.info('Active scripts count [%s] exceeds maximum limit [%s]', this.scriptIds.length, maxActiveScripts); |
|
|
|
const prevScriptId = this.scriptIds.shift(); |
|
|
|
logger.info('Removing active script with id [%s]', prevScriptId); |
|
|
|
this.scriptMap.delete(prevScriptId); |
|
|
|
deleteMinUsedScript.apply(this); |
|
|
|
} |
|
|
|
} |
|
|
|
this.scriptMap.set(scriptId, script); |
|
|
|
@ -291,4 +298,27 @@ function getScriptId(request) { |
|
|
|
return Utils.toUUIDString(request.scriptIdMSB, request.scriptIdLSB); |
|
|
|
} |
|
|
|
|
|
|
|
function incrementUseScriptId(scriptId) { |
|
|
|
const index = this.scriptIds.indexOf(scriptId); |
|
|
|
if (this.executedScriptIdsCounter[index] < Number.MAX_SAFE_INTEGER) { |
|
|
|
this.executedScriptIdsCounter[index]++; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
function deleteMinUsedScript() { |
|
|
|
let min = Infinity; |
|
|
|
let minIndex = 0; |
|
|
|
const scriptIdsLength = this.executedScriptIdsCounter.length - 1; // ignored last added script
|
|
|
|
for (let i = 0; i < scriptIdsLength; i++) { |
|
|
|
if (this.executedScriptIdsCounter[i] < min) { |
|
|
|
min = this.executedScriptIdsCounter[i]; |
|
|
|
minIndex = i; |
|
|
|
} |
|
|
|
} |
|
|
|
const prevScriptId = this.scriptIds.splice(minIndex, 1)[0]; |
|
|
|
this.executedScriptIdsCounter.splice(minIndex, 1) |
|
|
|
logger.info('Removing active script with id [%s]', prevScriptId); |
|
|
|
this.scriptMap.delete(prevScriptId); |
|
|
|
} |
|
|
|
|
|
|
|
module.exports = JsInvokeMessageProcessor; |
|
|
|
|