RxJs: poll until interval done or correct data received
A small optimization to the excellent answer from @matt-burnell. You can replace the filter and take operators with the first operator as follows
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.first(x => x.completed)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
);
Also, for people that may not know, the flatMap operator is an alias for mergeMap in RxJS 5.0.
Starting from the top, you've got a promise that you turn into an observable. Once this yields a value, you want make a call once per second until you receive a certain response (success) or until a certain amount of time has passed. We can map each part of this explanation to an Rx method:
"Once this yields a value" = map
/flatMap
(flatMap
in this case because what comes next will also be observables, and we need to flatten them out)
"once per second" = interval
"receive a certain response" = filter
"or" = amb
"certain amount of time has passed" = timer
From there, we can piece it together like so:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.amb(
Rx.Observable.timer(60000)
.flatMap(() => Rx.Observable.throw(new Error('Timeout')))
)
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
Once we've got our initial result, we project that into a race between two observables, one that will yield a value when it receives a successful response, and one that will yield a value when a certain amount of time has passed. The second flatMap
there is because .throw
isn't present on observable instances, and the method on Rx.Observable
returns an observable which also needs to be flattened out.
It turns out that the amb
/ timer
combo can actually be replaced by timeout
, like so:
Rx.Observable
.fromPromise(submitJobToQueue(jobData))
.flatMap(jobQueueData =>
Rx.Observable.interval(1000)
.flatMap(() => pollQueueForResult(jobQueueData.jobId))
.filter(x => x.completed)
.take(1)
.map(() => 'Completed')
.timeout(60000, Rx.Observable.throw(new Error('Timeout')))
)
.subscribe(
x => console.log('Result', x),
x => console.log('Error', x)
)
;
I omitted the .delay
you had in your sample as it wasn't described in your desired logic, but it could be fitted trivially to this solution.
So, to directly answer your questions:
- In the code above there is no need to manually stop anything, as the
interval
will be disposed of the moment the subscriber count drops to zero, which will occur either when thetake(1)
oramb
/timeout
completes. - Yes, both usages in your original were valid, as in both cases you were projecting each element of an observable into a new observable, and wanting to flatten the resultant observable of observables out into a regular observable.
Here's the jsbin I threw together to test the solution (you can tweak the value returned in pollQueueForResult
to obtain the desired success/timeout; times have been divided by 10 for the sake of quick testing).
Not your question, but I needed the same functionality
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMapTo } from 'rxjs/operators'
const defaultMaxWaitTimeMilliseconds = 5 * 1000
function isAsyncThingSatisfied(result) {
return true
}
export function doAsyncThingSeveralTimesWithTimeout(
doAsyncThingReturnsPromise,
maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
checkEveryMilliseconds = 500,
) {
const subject$ = race(
interval(checkEveryMilliseconds).pipe(
mergeMap(() => doAsyncThingReturnsPromise()),
takeWhileInclusive(result => isAsyncThingSatisfied(result)),
),
of(null).pipe(
delay(maxWaitTimeMilliseconds),
switchMapTo(throwError('doAsyncThingSeveralTimesWithTimeout timeout'))
)
)
return subject$.toPromise(Promise) // will return first result satistieble result of doAsyncThingReturnsPromise or throw error on timeout
}
Example
// mailhogWaitForNEmails
import { takeWhileInclusive } from 'rxjs-take-while-inclusive'
import { of, interval, race, throwError } from 'rxjs'
import { catchError, timeout, mergeMap, delay, switchMap } from 'rxjs/operators'
const defaultMaxWaitTimeMilliseconds = 5 * 1000
export function mailhogWaitForNEmails(
mailhogClient,
numberOfExpectedEmails,
maxWaitTimeMilliseconds = defaultMaxWaitTimeMilliseconds,
checkEveryMilliseconds = 500,
) {
let tries = 0
const mails$ = race(
interval(checkEveryMilliseconds).pipe(
mergeMap(() => mailhogClient.getAll()),
takeWhileInclusive(mails => {
tries += 1
return mails.total < numberOfExpectedEmails
}),
),
of(null).pipe(
delay(maxWaitTimeMilliseconds),
switchMap(() => throwError(`mailhogWaitForNEmails timeout after ${tries} tries`))
)
)
// toPromise returns promise which contains the last value from the Observable sequence.
// If the Observable sequence is in error, then the Promise will be in the rejected stage.
// If the sequence is empty, the Promise will not resolve.
return mails$.toPromise(Promise)
}
// mailhogWaitForEmailAndClean
import { mailhogWaitForNEmails } from './mailhogWaitForNEmails'
export async function mailhogWaitForEmailAndClean(mailhogClient) {
const mails = await mailhogWaitForNEmails(mailhogClient, 1)
if (mails.count !== 1) {
throw new Error(
`Expected to receive 1 email, but received ${mails.count} emails`,
)
}
await mailhogClient.deleteAll()
return mails.items[0]
}