How to implement a pseudo blocking async queue in JS/TS?
It's quite simple actually, dequeue
will create a promise that enqueue
will resolve. We just have to keep the resolvers in a queue - and also care about the case where values are enqueued before they are dequeued, keeping the already fulfilled promises in a queue.
class AsyncBlockingQueue {
constructor() {
// invariant: at least one of the arrays is empty
this.resolvers = [];
this.promises = [];
}
_add() {
this.promises.push(new Promise(resolve => {
this.resolvers.push(resolve);
}));
}
enqueue(t) {
// if (this.resolvers.length) this.resolvers.shift()(t);
// else this.promises.push(Promise.resolve(t));
if (!this.resolvers.length) this._add();
this.resolvers.shift()(t);
}
dequeue() {
if (!this.promises.length) this._add();
return this.promises.shift();
}
// now some utilities:
isEmpty() { // there are no values available
return !this.promises.length; // this.length <= 0
}
isBlocked() { // it's waiting for values
return !!this.resolvers.length; // this.length < 0
}
get length() {
return this.promises.length - this.resolvers.length;
}
[Symbol.asyncIterator]() {
// Todo: Use AsyncIterator.from()
return {
next: () => this.dequeue().then(value => ({done: false, value})),
[Symbol.asyncIterator]() { return this; },
};
}
}
I don't know TypeScript, but presumably it's simple to add the the necessary type annotations.
For better performance, use a Queue implementation with circular buffers instead of plain arrays, e.g. this one. You might also use only a single queue and remember whether you currently store promises or resolvers.
This is simply @Bergi's answer but with typescript + generics with some modifications to make it work with strict mode for my typescript peeps out there.
class AsyncBlockingQueue<T> {
private _promises: Promise<T>[];
private _resolvers: ((t: T) => void)[];
constructor() {
this._resolvers = [];
this._promises = [];
}
private _add() {
this._promises.push(new Promise(resolve => {
this._resolvers.push(resolve);
}));
}
enqueue(t: T) {
if (!this._resolvers.length) this._add();
const resolve = this._resolvers.shift()!;
resolve(t);
}
dequeue() {
if (!this._promises.length) this._add();
const promise = this._promises.shift()!;
return promise;
}
isEmpty() {
return !this._promises.length;
}
isBlocked() {
return !!this._resolvers.length;
}
get length() {
return this._promises.length - this._resolvers.length;
}
}