Swift Combine alternative to Rx Observable.create
I think I found a way to mimic Observable.create
using a PassthroughSubject
in Combine
. Here is the helper I made:
struct AnyObserver<Output, Failure: Error> {
let onNext: ((Output) -> Void)
let onError: ((Failure) -> Void)
let onComplete: (() -> Void)
}
struct Disposable {
let dispose: () -> Void
}
extension AnyPublisher {
static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
let subject = PassthroughSubject<Output, Failure>()
var disposable: Disposable?
return subject
.handleEvents(receiveSubscription: { subscription in
disposable = subscribe(AnyObserver(
onNext: { output in subject.send(output) },
onError: { failure in subject.send(completion: .failure(failure)) },
onComplete: { subject.send(completion: .finished) }
))
}, receiveCancel: { disposable?.dispose() })
.eraseToAnyPublisher()
}
}
And here is how it looks in usage:
func loadWidgets() -> AnyPublisher<[Widget], Error> {
AnyPublisher.create { observer in
let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
observer.onNext(widgets)
observer.onComplete()
}, error: { error in
observer.onError(error)
})
return Disposable {
loadTask.cancel()
}
}
}
From what I've learned, the support for initializing an AnyPublisher
with a closure has been dropped in Xcode 11 beta 3. This would be a corresponding solution for Rx's Observable.create
in this case, but for now I believe that the Future
is a goto solution if you only need to propagate single value. In other cases I would go for returning a PassthroughSubject
and propagating multiple values this way, but it will not allow you to start a task when the observation starts and I believe it's far from ideal compared to Observable.create
.
In terms of cancellation, it does not have an isDisposed
property similar to a Disposable
, so it's not possible to directly check the state of it and stop your own tasks from executing. The only way that I can think of right now would be to observe for a cancel
event, but it's surely not as comfortable as a Disposable
.
Also, I'd assume that cancel
might in fact stop tasks like network requests from URLSession
based on the docs here: https://developer.apple.com/documentation/combine/cancellable
Add an isCancelled operation outside the closure and check it in the future's closure. isCancelled can be toggled with the handleEvent() operator.
var isCancelled = false
func loadWidgets() -> AnyPublisher<[Widget], Error> {
return HandleEvents<Future<Any, Error>> { resolve in
// start the request when someone subscribes
let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
// publish result on success
resolve(.success(widgets))
}, error: { error in
// publish error on failure
resolve(.failure(error))
}
if isCancelled {
loadTask.cancel()
}
).handleEvents(receiveCancel: {
isCancelled = true
})
}
}
and somewhere in the app you do this to cancel the event
loadWidgets().cancel()
Also check this article