How can I create an Observer over a dynamic list in RxJava?
I would consider this approach based on BehaviourSubject. This differs from juanpavergara's solution in that an onNext() will be immediately emitted to the Observer when subscribing to the Observable.
public class ObservableList<T> {
protected final List<T> list;
protected final BehaviorSubject<List<T>> behaviorSubject;
public ObservableList(List<T> list) {
this.list = list;
this.behaviorSubject = BehaviorSubject.create(list);
}
public Observable<List<T>> getObservable() {
return behaviorSubject;
}
public void add(T element) {
list.add(element);
behaviorSubject.onNext(list);
}
}
private void main() {
final List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
final ObservableList<Integer> olist = new ObservableList<>(list);
olist.getObservable().subscribe(System.out::println);
olist.add(2);
olist.add(3);
}
This solution may be useful when implementing MVP, when you want to observe one resource (ie: a list of objects) returned by one component in the system (ie: one repository or DataSource), and you want the Observer (ie: Presenter or Interactor) to be notified when an element is added to the list in another part of the system.
There you go. Thanks to Dávid Karnok on RxJava Google Group
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.subjects.PublishSubject;
public class ObservableListExample {
public static class ObservableList<T> {
protected final List<T> list;
protected final PublishSubject<T> onAdd;
public ObservableList() {
this.list = new ArrayList<T>();
this.onAdd = PublishSubject.create();
}
public void add(T value) {
list.add(value);
onAdd.onNext(value);
}
public Observable<T> getObservable() {
return onAdd;
}
}
public static void main(String[] args) throws Exception {
ObservableList<Integer> olist = new ObservableList<>();
olist.getObservable().subscribe(System.out::println);
olist.add(1);
Thread.sleep(1000);
olist.add(2);
Thread.sleep(1000);
olist.add(3);
}
}
Hi you can use ObservableRxList to observable on addAll, add, remove and update list
public class ObservableRxList<T> {
protected final List<T> list;
protected final PublishSubject<List<T>> subject;
public ObservableRxList() {
this.list = new ArrayList<T>();
this.subject = PublishSubject.create();
}
public void add(T value) {
list.add(value);
subject.onNext(list);
}
public void addAll(List<T> value) {
list.addAll(value);
subject.onNext(list);
}
//not sure about this
public void update(T value) {
for (ListIterator<T> it = list.listIterator(); it.hasNext(); ) {
if (value == it.next()) {
it.set(value);
break;
}
}
subject.onNext(list);
}
public void update(int position, T value) {
list.set(position, value);
subject.onNext(list);
}
public void remove(T value) {
list.remove(value);
subject.onNext(list);
}
public void remove(int index) {
list.remove(index);
subject.onNext(list);
}
public Observable<List<T>> getObservable() {
return subject;
}
public List<T> getCurrentList() {
return list;
}
}
usage: every time use addAll, add, remove or update this observer will fire with whole updated list
mObservableRxList.getObservable().subscribe(productList-> {
this.products.clear();
this.products.addAll(productList;
productAdapter.notifyDataSetChanged();
});
You can merge two observables to one. One of them can be initial list of elements and second can be subject:
import rx.Observable;
import rx.subjects.ReplaySubject;
import java.util.ArrayList;
import java.util.List;
public class ExampleObservableList {
public static void main(String[] args) {
List<Integer> initialNumbers = new ArrayList<Integer>();
initialNumbers.add(1);
initialNumbers.add(2);
Observable<Integer> observableInitial = Observable.from(initialNumbers);
ReplaySubject<Integer> subject = ReplaySubject.create();
Observable<Integer> source = Observable.merge(observableInitial, subject);
source.subscribe(System.out::println);
for (int i = 0; i < 100; ++i) {
subject.onNext(i);
}
}
}
If you don't have initial elements you can use only ReplaySubject
(or other Subject
-> see http://reactivex.io/documentation/subject.html):
public static void main(String[] args) {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(System.out::println);
for (int i = 0; i < 100; ++i) {
source.onNext(i);
}
}