mirror of https://github.com/Budibase/budibase.git
11 changed files with 255 additions and 102 deletions
@ -1,44 +0,0 @@ |
|||
let events = require("events") |
|||
|
|||
// Bull works with a Job wrapper around all messages that contains a lot more information about
|
|||
// the state of the message, implement this for the sake of maintaining API consistency
|
|||
function newJob(queue, message) { |
|||
return { |
|||
timestamp: Date.now(), |
|||
queue: queue, |
|||
data: message, |
|||
} |
|||
} |
|||
|
|||
// designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock
|
|||
class InMemoryQueue { |
|||
// opts is not used by this as there is no real use case when in memory, but is the same API as Bull
|
|||
constructor(name, opts) { |
|||
this._name = name |
|||
this._opts = opts |
|||
this._messages = [] |
|||
this._emitter = new events.EventEmitter() |
|||
} |
|||
|
|||
// same API as bull, provide a callback and it will respond when messages are available
|
|||
process(func) { |
|||
this._emitter.on("message", async () => { |
|||
if (this._messages.length <= 0) { |
|||
return |
|||
} |
|||
let msg = this._messages.shift() |
|||
let resp = func(msg) |
|||
if (resp.then != null) { |
|||
await resp |
|||
} |
|||
}) |
|||
} |
|||
|
|||
// simply puts a message to the queue and emits to the queue for processing
|
|||
add(msg) { |
|||
this._messages.push(newJob(this._name, msg)) |
|||
this._emitter.emit("message") |
|||
} |
|||
} |
|||
|
|||
module.exports = InMemoryQueue |
|||
@ -0,0 +1,73 @@ |
|||
const CouchDB = require("./index") |
|||
const emitter = require("../events/index") |
|||
const InMemoryQueue = require("../utilities/queue/inMemoryQueue") |
|||
|
|||
/** |
|||
* This functionality makes sure that when records with links are created, updated or deleted they are processed |
|||
* correctly - making sure that no stale links are left around and that all links have been made successfully. |
|||
*/ |
|||
|
|||
const EventType = { |
|||
RECORD_SAVE: "record:save", |
|||
RECORD_UPDATE: "record:update", |
|||
RECORD_DELETE: "record:delete", |
|||
MODEL_SAVE: "model:save", |
|||
MODEL_DELETE: "model:delete", |
|||
} |
|||
const linkedRecordQueue = new InMemoryQueue("linkedRecordQueue") |
|||
|
|||
function createEmitterCallback(eventName) { |
|||
emitter.on(eventName, function(event) { |
|||
if (!event || !event.record || !event.record.modelId) { |
|||
return |
|||
} |
|||
linkedRecordQueue.add({ |
|||
type: eventName, |
|||
event, |
|||
}) |
|||
}) |
|||
} |
|||
|
|||
for (let typeKey of Object.keys(EventType)) { |
|||
createEmitterCallback(EventType[typeKey]) |
|||
} |
|||
|
|||
function doesModelHaveLinkedRecords(model) { |
|||
for (let key of Object.keys(model.schema)) { |
|||
const { type } = model.schema[key] |
|||
if (type === "link") { |
|||
return true |
|||
} |
|||
} |
|||
return false |
|||
} |
|||
|
|||
linkedRecordQueue.process(async job => { |
|||
let event = job.data |
|||
// can't operate without these properties
|
|||
if (event.instanceId == null || event.modelId == null) { |
|||
return |
|||
} |
|||
const db = new CouchDB(event.instanceId) |
|||
let model = event.model == null ? await db.get(event.modelId) : event.model |
|||
// model doesn't have links, can stop here
|
|||
if (!doesModelHaveLinkedRecords(model)) { |
|||
return |
|||
} |
|||
// no linked records to operate on
|
|||
if (model == null) { |
|||
return |
|||
} |
|||
switch (event.type) { |
|||
case EventType.RECORD_SAVE: |
|||
break |
|||
case EventType.RECORD_UPDATE: |
|||
break |
|||
case EventType.RECORD_DELETE: |
|||
break |
|||
case EventType.MODEL_SAVE: |
|||
break |
|||
case EventType.MODEL_DELETE: |
|||
break |
|||
} |
|||
}) |
|||
@ -0,0 +1,78 @@ |
|||
let events = require("events") |
|||
|
|||
/** |
|||
* Bull works with a Job wrapper around all messages that contains a lot more information about |
|||
* the state of the message, this object constructor implements the same schema of Bull jobs |
|||
* for the sake of maintaining API consistency. |
|||
* @param {string} queue The name of the queue which the message will be carried on. |
|||
* @param {object} message The JSON message which will be passed back to the consumer. |
|||
* @returns {Object} A new job which can now be put onto the queue, this is mostly an |
|||
* internal structure so that an in memory queue can be easily swapped for a Bull queue. |
|||
*/ |
|||
function newJob(queue, message) { |
|||
return { |
|||
timestamp: Date.now(), |
|||
queue: queue, |
|||
data: message, |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock.
|
|||
* It is relatively simple, using an event emitter internally to register when messages are available |
|||
* to the consumers - in can support many inputs and many consumers. |
|||
*/ |
|||
class InMemoryQueue { |
|||
/** |
|||
* The constructor the queue, exactly the same as that of Bulls. |
|||
* @param {string} name The name of the queue which is being configured. |
|||
* @param {object|null} opts This is not used by the in memory queue as there is no real use |
|||
* case when in memory, but is the same API as Bull |
|||
*/ |
|||
constructor(name, opts = null) { |
|||
this._name = name |
|||
this._opts = opts |
|||
this._messages = [] |
|||
this._emitter = new events.EventEmitter() |
|||
} |
|||
|
|||
/** |
|||
* Same callback API as Bull, each callback passed to this will consume messages as they are |
|||
* available. Please note this is a queue service, not a notification service, so each |
|||
* consumer will receive different messages. |
|||
* @param {function<object>} func The callback function which will return a "Job", the same |
|||
* as the Bull API, within this job the property "data" contains the JSON message. Please |
|||
* note this is incredibly limited compared to Bull as in reality the Job would contain |
|||
* a lot more information about the queue and current status of Bull cluster. |
|||
*/ |
|||
process(func) { |
|||
this._emitter.on("message", async () => { |
|||
if (this._messages.length <= 0) { |
|||
return |
|||
} |
|||
let msg = this._messages.shift() |
|||
let resp = func(msg) |
|||
if (resp.then != null) { |
|||
await resp |
|||
} |
|||
}) |
|||
} |
|||
|
|||
// simply puts a message to the queue and emits to the queue for processing
|
|||
/** |
|||
* Simple function to replicate the add message functionality of Bull, putting |
|||
* a new message on the queue. This then emits an event which will be used to |
|||
* return the message to a consumer (if one is attached). |
|||
* @param {object} msg A message to be transported over the queue, this should be |
|||
* a JSON message as this is required by Bull. |
|||
*/ |
|||
add(msg) { |
|||
if (typeof msg !== "object") { |
|||
throw "Queue only supports carrying JSON." |
|||
} |
|||
this._messages.push(newJob(this._name, msg)) |
|||
this._emitter.emit("message") |
|||
} |
|||
} |
|||
|
|||
module.exports = InMemoryQueue |
|||
Loading…
Reference in new issue