How can I make sure that a job doesn't run twice in Bull?
The problem, I believe is your scan
function is async. So your job.progress
function calls scan
and then immediately calls done
allowing the queue to process another job.
A solution could be to pass the done
callback as a parameter to your scan
and scheduleScan
functions, and invoke it, once you have completed your job (or on error).
Another (better) solution could be to ensure that you always return a Promise
from scan
and scheduleScan
, then await the promise to resolve and then call done
. If doing this, make sure you chain all your promise returns in your scheduleScan
function.
queue.process(1, (job, done) => {
job.progress(100).then(() => {
scan(job)
.then(done)
.catch(done)
})
})
export function scan() {
// business logic
return scheduleScan()
}
// Chain all of your promise returns. Otherwise
// the scan function will return sooner and allow done to be called
// prior to the scheduleScan function finishing it's execution
export function scheduleScan() {
return queue.getJob(..).then(() => {
....
return queue.add()...
....
return queue.add(...)
.catch(e => {
console.log(e);
// propogate errors!
throw e;
})
}
The scan function is an asynchronous function. In you queue.process()
function you have to await the scan function and then call the done()
callback.
export async function scan(job) {
// it does some calculations, then it creates a new schedule.
return scheduleScan(stock, period, milliseconds, "scan.js");
}
queue.process(1, (job, done) => {
job.progress(100).then(async() => {
await scan(job);
done();
});
});
export async function scheduleScan(stock, period, milliseconds, triggeredBy) {
let uniqueId = stringHash(stock + ":" + period);
try {
const existingJob = await queue.getJob(uniqueId);
if (!existingJob) {
const job = await addJob({
queue,
stock,
period,
uniqueId,
milliseconds,
triggeredBy
});
return job;
} else {
const jobState = await existingJob.getState();
if (jobState === "completed") {
await existingJob.remove();
const newJob = await addJob({
queue,
stock,
period,
uniqueId,
milliseconds,
triggeredBy
});
return newJob;
}
}
} catch (err) {
throw new Error(err);
}
}
export function addJob({ queue, stock, period, milliseconds, triggeredBy }) {
if (milliseconds) {
return queue.add(
{ stock, period, triggeredBy },
{ delay: milliseconds, jobId: uniqueId }
);
} else {
return queue.add({ stock, period, triggeredBy }, { jobId: uniqueId });
}
}
Try this! I've tried to refactor the code a bit by using async-await.