Swift Combine alternative to Rx Observable.create

I think I found a way to mimic Observable.create using a PassthroughSubject in Combine. Here is the helper I made:

struct AnyObserver<Output, Failure: Error> {
    let onNext: ((Output) -> Void)
    let onError: ((Failure) -> Void)
    let onComplete: (() -> Void)
}

struct Disposable {
    let dispose: () -> Void
}

extension AnyPublisher {
    static func create(subscribe: @escaping (AnyObserver<Output, Failure>) -> Disposable) -> Self {
        let subject = PassthroughSubject<Output, Failure>()
        var disposable: Disposable?
        return subject
            .handleEvents(receiveSubscription: { subscription in
                disposable = subscribe(AnyObserver(
                    onNext: { output in subject.send(output) },
                    onError: { failure in subject.send(completion: .failure(failure)) },
                    onComplete: { subject.send(completion: .finished) }
                ))
            }, receiveCancel: { disposable?.dispose() })
            .eraseToAnyPublisher()
    }
}

And here is how it looks in usage:

func loadWidgets() -> AnyPublisher<[Widget], Error> {
    AnyPublisher.create { observer in
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
          observer.onNext(widgets)
          observer.onComplete()
        }, error: { error in
          observer.onError(error)
        })
        return Disposable {
          loadTask.cancel()
        }
    }
}

From what I've learned, the support for initializing an AnyPublisher with a closure has been dropped in Xcode 11 beta 3. This would be a corresponding solution for Rx's Observable.create in this case, but for now I believe that the Future is a goto solution if you only need to propagate single value. In other cases I would go for returning a PassthroughSubject and propagating multiple values this way, but it will not allow you to start a task when the observation starts and I believe it's far from ideal compared to Observable.create.

In terms of cancellation, it does not have an isDisposed property similar to a Disposable, so it's not possible to directly check the state of it and stop your own tasks from executing. The only way that I can think of right now would be to observe for a cancel event, but it's surely not as comfortable as a Disposable. Also, I'd assume that cancel might in fact stop tasks like network requests from URLSession based on the docs here: https://developer.apple.com/documentation/combine/cancellable


Add an isCancelled operation outside the closure and check it in the future's closure. isCancelled can be toggled with the handleEvent() operator.

    var isCancelled = false
    func loadWidgets() -> AnyPublisher<[Widget], Error> {
    return HandleEvents<Future<Any, Error>> { resolve in
        // start the request when someone subscribes
        let loadTask = WidgetLoader.request("allWidgets", completion: { widgets in
            // publish result on success
            resolve(.success(widgets))
        }, error: { error in
            // publish error on failure
            resolve(.failure(error))   
        }
        if isCancelled {
            loadTask.cancel()
        }  
        ).handleEvents(receiveCancel: {
        isCancelled = true
        })
    }
}

and somewhere in the app you do this to cancel the event

loadWidgets().cancel()

Also check this article

Tags:

Swift

Combine