|
|
|
@ -1,13 +1,14 @@ |
|
|
|
import { threadSetup } from "./utils" |
|
|
|
threadSetup() |
|
|
|
import { isRecurring } from "../automations/utils" |
|
|
|
import { default as actions } from "../automations/actions" |
|
|
|
import { default as automationUtils } from "../automations/automationUtils" |
|
|
|
import { default as AutomationEmitter } from "../events/AutomationEmitter" |
|
|
|
import { generateAutomationMetadataID } from "../db/utils" |
|
|
|
import { definitions as triggerDefs } from "../automations/triggerInfo" |
|
|
|
import { AutomationErrors } from "../constants" |
|
|
|
import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants" |
|
|
|
import { storeLog } from "../automations/logging" |
|
|
|
import { Automation, AutomationStep } from "@budibase/types" |
|
|
|
import { Automation, AutomationStep, AutomationStatus } from "@budibase/types" |
|
|
|
import { |
|
|
|
LoopStep, |
|
|
|
LoopStepTypes, |
|
|
|
@ -15,13 +16,15 @@ import { |
|
|
|
AutomationEvent, |
|
|
|
TriggerOutput, |
|
|
|
AutomationContext, |
|
|
|
AutomationMetadata, |
|
|
|
} from "../definitions/automations" |
|
|
|
const { doInAppContext, getAppDB } = require("@budibase/backend-core/context") |
|
|
|
const { logAlertWithInfo } = require("@budibase/backend-core/logging") |
|
|
|
const { processObject } = require("@budibase/string-templates") |
|
|
|
const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId |
|
|
|
const LOOP_STEP_ID = actions.ACTION_DEFINITIONS.LOOP.stepId |
|
|
|
const CRON_STEP_ID = triggerDefs.CRON.stepId |
|
|
|
const STOPPED_STATUS = { success: true, status: "STOPPED" } |
|
|
|
const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED } |
|
|
|
const { cloneDeep } = require("lodash/fp") |
|
|
|
const env = require("../environment") |
|
|
|
|
|
|
|
@ -64,7 +67,6 @@ function getLoopIterations(loopStep: LoopStep, input: LoopInput) { |
|
|
|
* inputs and handles any outputs. |
|
|
|
*/ |
|
|
|
class Orchestrator { |
|
|
|
_metadata: any |
|
|
|
_chainCount: number |
|
|
|
_appId: string |
|
|
|
_automation: Automation |
|
|
|
@ -73,8 +75,8 @@ class Orchestrator { |
|
|
|
executionOutput: AutomationContext |
|
|
|
|
|
|
|
constructor(automation: Automation, triggerOutput: TriggerOutput) { |
|
|
|
this._metadata = triggerOutput.metadata |
|
|
|
this._chainCount = this._metadata ? this._metadata.automationChainCount : 0 |
|
|
|
const metadata = triggerOutput.metadata |
|
|
|
this._chainCount = metadata ? metadata.automationChainCount : 0 |
|
|
|
this._appId = triggerOutput.appId as string |
|
|
|
const triggerStepId = automation.definition.trigger.stepId |
|
|
|
triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput) |
|
|
|
@ -108,14 +110,63 @@ class Orchestrator { |
|
|
|
return step |
|
|
|
} |
|
|
|
|
|
|
|
async getMetadata() { |
|
|
|
async getMetadata(): Promise<AutomationMetadata> { |
|
|
|
const metadataId = generateAutomationMetadataID(this._automation._id) |
|
|
|
const db = getAppDB() |
|
|
|
let metadata: any |
|
|
|
let metadata: AutomationMetadata |
|
|
|
try { |
|
|
|
metadata = await db.get(metadataId) |
|
|
|
} catch (err) { |
|
|
|
metadata = {} |
|
|
|
metadata = { |
|
|
|
_id: metadataId, |
|
|
|
errorCount: 0, |
|
|
|
} |
|
|
|
} |
|
|
|
return metadata |
|
|
|
} |
|
|
|
|
|
|
|
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> { |
|
|
|
if (!metadata.errorCount) { |
|
|
|
return false |
|
|
|
} |
|
|
|
const automation = this._automation |
|
|
|
const trigger = automation.definition.trigger |
|
|
|
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { |
|
|
|
// TODO: need to disable the recurring here
|
|
|
|
this.updateExecutionOutput(trigger.id, trigger.stepId, {}, STOPPED_STATUS) |
|
|
|
await storeLog(automation, this.executionOutput) |
|
|
|
return true |
|
|
|
} |
|
|
|
return false |
|
|
|
} |
|
|
|
|
|
|
|
async updateMetadata(metadata: AutomationMetadata) { |
|
|
|
const output = this.executionOutput, |
|
|
|
automation = this._automation |
|
|
|
if (!output || !isRecurring(automation)) { |
|
|
|
return |
|
|
|
} |
|
|
|
const count = metadata.errorCount |
|
|
|
const isError = output.status === AutomationStatus.ERROR |
|
|
|
// nothing to do in this scenario, escape
|
|
|
|
if (!count && !isError) { |
|
|
|
return |
|
|
|
} |
|
|
|
if (isError) { |
|
|
|
metadata.errorCount = count ? count + 1 : 1 |
|
|
|
} else { |
|
|
|
metadata.errorCount = 0 |
|
|
|
} |
|
|
|
const db = getAppDB() |
|
|
|
try { |
|
|
|
await db.put(metadata) |
|
|
|
} catch (err) { |
|
|
|
logAlertWithInfo( |
|
|
|
"Failed to write automation metadata", |
|
|
|
db.name, |
|
|
|
automation._id, |
|
|
|
err |
|
|
|
) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -162,6 +213,17 @@ class Orchestrator { |
|
|
|
let stepCount = 0 |
|
|
|
let loopStepNumber: any = undefined |
|
|
|
let loopSteps: LoopStep[] | undefined = [] |
|
|
|
let metadata |
|
|
|
|
|
|
|
// check if this is a recurring automation,
|
|
|
|
if (isRecurring(automation)) { |
|
|
|
metadata = await this.getMetadata() |
|
|
|
const shouldStop = await this.checkIfShouldStop(metadata) |
|
|
|
if (shouldStop) { |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
for (let step of automation.definition.steps) { |
|
|
|
stepCount++ |
|
|
|
let input, |
|
|
|
@ -349,11 +411,14 @@ class Orchestrator { |
|
|
|
|
|
|
|
// store the logs for the automation run
|
|
|
|
await storeLog(this._automation, this.executionOutput) |
|
|
|
if (isRecurring(automation) && metadata) { |
|
|
|
await this.updateMetadata(metadata) |
|
|
|
} |
|
|
|
return this.executionOutput |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
module.exports = ( |
|
|
|
export default ( |
|
|
|
input: AutomationEvent, |
|
|
|
callback: (error: any, response?: any) => void |
|
|
|
) => { |
|
|
|
|