mirror of https://github.com/Budibase/budibase.git
committed by
GitHub
31 changed files with 375 additions and 148 deletions
@ -0,0 +1,6 @@ |
|||
export interface IntegrationBase { |
|||
create?(query: any): Promise<[any]> |
|||
read?(query: any): Promise<[any]> |
|||
update?(query: any): Promise<[any]> |
|||
delete?(query: any): Promise<[any]> |
|||
} |
|||
@ -0,0 +1,60 @@ |
|||
const workerFarm = require("worker-farm") |
|||
const env = require("../environment") |
|||
|
|||
const ThreadType = { |
|||
QUERY: "query", |
|||
AUTOMATION: "automation", |
|||
} |
|||
|
|||
function typeToFile(type) { |
|||
let filename = null |
|||
switch (type) { |
|||
case ThreadType.QUERY: |
|||
filename = "./query" |
|||
break |
|||
case ThreadType.AUTOMATION: |
|||
filename = "./automation" |
|||
break |
|||
default: |
|||
throw "Unknown thread type" |
|||
} |
|||
return require.resolve(filename) |
|||
} |
|||
|
|||
class Thread { |
|||
constructor(type, opts = { timeoutMs: null, count: 1 }) { |
|||
this.type = type |
|||
if (!env.isTest()) { |
|||
const workerOpts = { |
|||
autoStart: true, |
|||
maxConcurrentWorkers: opts.count ? opts.count : 1, |
|||
} |
|||
if (opts.timeoutMs) { |
|||
workerOpts.maxCallTime = opts.timeoutMs |
|||
} |
|||
this.workers = workerFarm(workerOpts, typeToFile(type)) |
|||
} |
|||
} |
|||
|
|||
run(data) { |
|||
return new Promise((resolve, reject) => { |
|||
let fncToCall |
|||
// if in test then don't use threading
|
|||
if (env.isTest()) { |
|||
fncToCall = require(typeToFile(this.type)) |
|||
} else { |
|||
fncToCall = this.workers |
|||
} |
|||
fncToCall(data, (err, response) => { |
|||
if (err) { |
|||
reject(err) |
|||
} else { |
|||
resolve(response) |
|||
} |
|||
}) |
|||
}) |
|||
} |
|||
} |
|||
|
|||
module.exports.Thread = Thread |
|||
module.exports.ThreadType = ThreadType |
|||
@ -0,0 +1,63 @@ |
|||
const ScriptRunner = require("../utilities/scriptRunner") |
|||
const { integrations } = require("../integrations") |
|||
|
|||
function formatResponse(resp) { |
|||
if (typeof resp === "string") { |
|||
try { |
|||
resp = JSON.parse(resp) |
|||
} catch (err) { |
|||
resp = { response: resp } |
|||
} |
|||
} |
|||
return resp |
|||
} |
|||
|
|||
async function runAndTransform(datasource, queryVerb, query, transformer) { |
|||
const Integration = integrations[datasource.source] |
|||
if (!Integration) { |
|||
throw "Integration type does not exist." |
|||
} |
|||
const integration = new Integration(datasource.config) |
|||
|
|||
let rows = formatResponse(await integration[queryVerb](query)) |
|||
|
|||
// transform as required
|
|||
if (transformer) { |
|||
const runner = new ScriptRunner(transformer, { data: rows }) |
|||
rows = runner.execute() |
|||
} |
|||
|
|||
// needs to an array for next step
|
|||
if (!Array.isArray(rows)) { |
|||
rows = [rows] |
|||
} |
|||
|
|||
// map into JSON if just raw primitive here
|
|||
if (rows.find(row => typeof row !== "object")) { |
|||
rows = rows.map(value => ({ value })) |
|||
} |
|||
|
|||
// get all the potential fields in the schema
|
|||
let keys = rows.flatMap(Object.keys) |
|||
|
|||
if (integration.end) { |
|||
integration.end() |
|||
} |
|||
|
|||
return { rows, keys } |
|||
} |
|||
|
|||
module.exports = (input, callback) => { |
|||
runAndTransform( |
|||
input.datasource, |
|||
input.queryVerb, |
|||
input.query, |
|||
input.transformer |
|||
) |
|||
.then(response => { |
|||
callback(null, response) |
|||
}) |
|||
.catch(err => { |
|||
callback(err) |
|||
}) |
|||
} |
|||
Loading…
Reference in new issue