Fork me on GitHub

RxJava2(3)

组合操作符

merge()

作用

将两个或多个观察者组合在一起,并行发送事件

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observable.merge(Observable.interval(100,TimeUnit.MILLISECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) {
return "A" + aLong;
}
}),Observable.interval(100,TimeUnit.MILLISECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) {
return "B" + aLong;
}
})).subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
Log.i(TAG, "accept: " + s);
}
});

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
accept: A0
accept: B0
accept: A1
accept: B1
accept: A2
accept: B2
accept: A3
accept: B3
...
accept: A13
accept: B13
accept: B14
accept: A14
accept: A15
accept: B15
accept: A16
accept: B16
//不保证顺序

merageArray()

与merage一样,但他可以发送4个以上的被观察者

zip()

作用

将多个观察者合并,根据观察者发送事件的顺序结合起来,但是最终发送事件的数量与被观察者中最少数量一致

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return "A" + aLong;
}
}), Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return "B" + aLong;
}
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept: " + s);
}
});

输出结果

1
2
3
4
5
06-25 21:44:02.480 30781-30804/com.example.bibingwei.androiddemo I/------: accept: A1B1
06-25 21:44:03.480 30781-30804/com.example.bibingwei.androiddemo I/------: accept: A2B2
06-25 21:44:04.480 30781-30804/com.example.bibingwei.androiddemo I/------: accept: A3B3
06-25 21:44:05.480 30781-30804/com.example.bibingwei.androiddemo I/------: accept: A4B4
06-25 21:44:06.480 30781-30804/com.example.bibingwei.androiddemo I/------: accept: A5B5

combineLatest()

作用

只要其中有一个Observable发送了事件,那么这个事件就会和其他的Observable最近发送的事件结合起来一起发送。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    Observable.combineLatest(Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
Log.i(TAG, "发送: "+"A" + aLong);
return "A" + aLong;
}
}), Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
.map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
Log.i(TAG, "发送: "+"B" + aLong);
return "B" + aLong;
}
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept: " + s);
}
});

输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
06-26 10:07:44.310 18849-18914/com.example.bibingwei.androiddemo I/------: 发送: A1
06-26 10:07:45.310 18849-18914/com.example.bibingwei.androiddemo I/------: 发送: A2
06-26 10:07:45.310 18849-18915/com.example.bibingwei.androiddemo I/------: 发送: B1
accept: A2B1
06-26 10:07:46.310 18849-18914/com.example.bibingwei.androiddemo I/------: 发送: A3
accept: A3B1
06-26 10:07:47.310 18849-18914/com.example.bibingwei.androiddemo I/------: 发送: A4
accept: A4B1
06-26 10:07:47.310 18849-18915/com.example.bibingwei.androiddemo I/------: 发送: B2
accept: A4B2
06-26 10:07:49.310 18849-18915/com.example.bibingwei.androiddemo I/------: 发送: B3
accept: A4B3
06-26 10:07:51.310 18849-18915/com.example.bibingwei.androiddemo I/------: 发送: B4
accept: A4B4
06-26 10:07:53.310 18849-18915/com.example.bibingwei.androiddemo I/------: 发送: B5
accept: A4B5

reduce()

作用

看图,意图很明显

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
    Observable.just(1,2,3,4,5)
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
Log.i(TAG, "之前一共发送: " + integer +" 现在发送: " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept: " + integer);
}
});

输出结果

1
2
3
4
5
06-26 10:27:42.960 19672-19672/com.example.bibingwei.androiddemo I/------: 之前一共发送: 1 现在发送: 2
之前一共发送: 3 现在发送: 3
之前一共发送: 6 现在发送: 4
之前一共发送: 10 现在发送: 5
accept: 15

startWith()

作用

在发送事件之前加上一些事件

使用

1
2
3
4
5
6
7
8
Observable.just(1,2,3,4)
.startWith(9)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept: " +integer);
}
});

输出结果

1
2
3
4
5
06-26 10:35:48.370 20803-20803/com.example.bibingwei.androiddemo I/------: accept: 9
accept: 1
accept: 2
accept: 3
accept: 4