How to use RxJava combineLatest operator with more than 9 observables

RxKotlin supports upto 9 opertators in parameters in combineLatest() method but to use more than 9 parameters means to pass unlimited dynamic custom object arraylist you can use it as below:

First Let me give you simple example with only two parameters with custom data types

val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
                .subscribe({
                    Log.d("combineLatest", "onNext - ${it}")
                })

Now what if i want to pass multiple parameters in combineLatest? Then your answer is below: (i have used custom data types, so someone's custom problem can also be solved here)

val myList = arrayOf(Observable.just("MyName"),
                Observable.just(2),
                Observable.just(3.55),
                Observable.just("My Another String"),
                Observable.just(5),
                Observable.just(6),
                Observable.just(7),
                Observable.just(8),
                Observable.just(9),
                Observable.just(10),
                Observable.just(11),
                Observable.just(12),
                Observable.just(13),
                Observable.just(14),
                Observable.just(15))

Observable.combineLatest(myList, {
    val a = it[0] as String
    val b = it[1] as Int
    val c = it[2] as Float
    val d = it[3] as String
    val e = it[4] as Int
    val f = it[5] as Int
    val g = it[6] as Int
    val h = it[7] as Int
    val i = it[8] as Int
    val j = it[9] as Int
    val k = it[10] as Int
    val l = it[11] as Int
    val m = it[12] as Int
    "$a - age:${b}" })
        .subscribe({
            Log.d("combineLatest", "onNext - ${it}")
        })

Here is a simple extension function for RxKotlin if you have 10 sources for combineLatest. You can easily create similar functions for more sources or adapt this to work with plain RxJava.


import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables

@Suppress("UNCHECKED_CAST", "unused")
inline fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, T9 : Any, T10 : Any, R : Any> Observables.combineLatest(
    source1: Observable<T1>, source2: Observable<T2>,
    source3: Observable<T3>, source4: Observable<T4>,
    source5: Observable<T5>, source6: Observable<T6>,
    source7: Observable<T7>, source8: Observable<T8>,
    source9: Observable<T9>, source10: Observable<T10>,
    crossinline combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R
): Observable<R> =
    Observable.combineLatest(arrayOf(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10)) {
        combineFunction(
            it[0] as T1,
            it[1] as T2,
            it[2] as T3,
            it[3] as T4,
            it[4] as T5,
            it[5] as T6,
            it[6] as T7,
            it[7] as T8,
            it[8] as T9,
            it[9] as T10
        )
    }

Note: I've created this as an extension function to stay consistent with how combineLatest function calls look like for less than 10 sources (Observables.combineLatest(...)). That way I don't have to think about which combineLatest version I need for what number of parameters. Technically there is no need to make it an extension function.


There is a combineLatest that takes a List of observables. Here's an example on how to use it:

List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
    @Override
    public String call(Object... args) {
        String concat = "";
        for (Object value : args) {
            if (value instanceof Integer) {
                concat += (Integer) value;
            } else if (value instanceof String) {
                concat += (String) value;
            }
        }
        return concat;
    }
});

Yo expand on that answer, I am using it to read multiple characteristics at once, it can be done like so:

connectionObservable
                .flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
                    List<Observable<?>> list1 = Arrays.asList(
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...),
                            rxBleConnection.readCharacteristic(UUID...));
                    return Observable.combineLatest(list1, args -> {
                       Object o =  doSomethingWithResults(args);
                        return o;
                    });
                })
                .observeOn(AndroidSchedulers.mainThread())
                .doOnUnsubscribe(this::clearConnectionSubscription)
                .subscribe(retVal -> {
                    Log.d(TAG, "result:" + retVal.toString());
                    Log.w(TAG, "SUCCESS");
                    triggerDisconnect();

                }, MyActivity.this::onReadFailure);
    }

Comments if you have suggestions on how to improve this process.

Tags:

Java

Rx Java