How to use RxJava combineLatest operator with more than 9 observables
RxKotlin supports upto 9 opertators in parameters in combineLatest() method but to use more than 9 parameters means to pass unlimited dynamic custom object arraylist you can use it as below:
First Let me give you simple example with only two parameters with custom data types
val name = Observable.just("MyName")
val age = Observable.just(25)
Observables.combineLatest(name, age) { n, a -> "$n - age:${a}" }
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
Now what if i want to pass multiple parameters in combineLatest? Then your answer is below: (i have used custom data types, so someone's custom problem can also be solved here)
val myList = arrayOf(Observable.just("MyName"),
Observable.just(2),
Observable.just(3.55),
Observable.just("My Another String"),
Observable.just(5),
Observable.just(6),
Observable.just(7),
Observable.just(8),
Observable.just(9),
Observable.just(10),
Observable.just(11),
Observable.just(12),
Observable.just(13),
Observable.just(14),
Observable.just(15))
Observable.combineLatest(myList, {
val a = it[0] as String
val b = it[1] as Int
val c = it[2] as Float
val d = it[3] as String
val e = it[4] as Int
val f = it[5] as Int
val g = it[6] as Int
val h = it[7] as Int
val i = it[8] as Int
val j = it[9] as Int
val k = it[10] as Int
val l = it[11] as Int
val m = it[12] as Int
"$a - age:${b}" })
.subscribe({
Log.d("combineLatest", "onNext - ${it}")
})
Here is a simple extension function for RxKotlin if you have 10 sources for combineLatest
. You can easily create similar functions for more sources or adapt this to work with plain RxJava.
import io.reactivex.Observable
import io.reactivex.rxkotlin.Observables
@Suppress("UNCHECKED_CAST", "unused")
inline fun <T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, T9 : Any, T10 : Any, R : Any> Observables.combineLatest(
source1: Observable<T1>, source2: Observable<T2>,
source3: Observable<T3>, source4: Observable<T4>,
source5: Observable<T5>, source6: Observable<T6>,
source7: Observable<T7>, source8: Observable<T8>,
source9: Observable<T9>, source10: Observable<T10>,
crossinline combineFunction: (T1, T2, T3, T4, T5, T6, T7, T8, T9, T10) -> R
): Observable<R> =
Observable.combineLatest(arrayOf(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10)) {
combineFunction(
it[0] as T1,
it[1] as T2,
it[2] as T3,
it[3] as T4,
it[4] as T5,
it[5] as T6,
it[6] as T7,
it[7] as T8,
it[8] as T9,
it[9] as T10
)
}
Note: I've created this as an extension function to stay consistent with how combineLatest
function calls look like for less than 10 sources (Observables.combineLatest(...)
). That way I don't have to think about which combineLatest
version I need for what number of parameters. Technically there is no need to make it an extension function.
There is a combineLatest
that takes a List
of observables. Here's an example on how to use it:
List<Observable<?>> list = Arrays.asList(Observable.just(1), Observable.just("2"));
Observable.combineLatest(list, new FuncN<String>() {
@Override
public String call(Object... args) {
String concat = "";
for (Object value : args) {
if (value instanceof Integer) {
concat += (Integer) value;
} else if (value instanceof String) {
concat += (String) value;
}
}
return concat;
}
});
Yo expand on that answer, I am using it to read multiple characteristics at once, it can be done like so:
connectionObservable
.flatMap((Func1<RxBleConnection, Observable<?>>) rxBleConnection -> {
List<Observable<?>> list1 = Arrays.asList(
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...),
rxBleConnection.readCharacteristic(UUID...));
return Observable.combineLatest(list1, args -> {
Object o = doSomethingWithResults(args);
return o;
});
})
.observeOn(AndroidSchedulers.mainThread())
.doOnUnsubscribe(this::clearConnectionSubscription)
.subscribe(retVal -> {
Log.d(TAG, "result:" + retVal.toString());
Log.w(TAG, "SUCCESS");
triggerDisconnect();
}, MyActivity.this::onReadFailure);
}
Comments if you have suggestions on how to improve this process.