RXJava MainThread 线程调度

以下代码原理分析基于 RXJava 2.2.8 。

1、线程调度

RXJava 线程调度的原理在我的另一篇文中已经做了介绍 《RXjava 设计原理

2、Main Thread 线程调度

RXJava 默认是在创建自身的线程的中执行的,我们设定为在main thread 线程中创建了RXJava 。 RXJava 线程调度是通过静态的代理模式在其它的线程中执行真实的接口实现的。在线程池中执行完后,重新回到 main thread 中执行,因为线程是已经存在的,不可能重新创建线程,也不可能使用线程池,这里和其他的线程调度的方式是不一样的。

线程调度原理
线程调度的原理是通过阻塞队列实现的,在 main thread 中创建阻塞队列,并且死循环获取队列中的信息,遇到结束的消息时候终止死循环。 阻塞队列中的消息是在其他线程中执行真实接口的时候方进去的。

参考下图

Apptalking

3、源码分析

实例代码

1
2
3
4
5
6
7
8
9
public class RxJava2 {
public static void main(String[] args) {
Flowable.fromArray(1, 2, 3)
.subscribeOn(Schedulers.computation())
.blockingSubscribe(integer -> {
System.out.println(Thread.currentThread().getName());
});
}
}

FlowableBlockingSubscribe
FlowableBlockingSubscribe 是一个 Subscribe 静态的代理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T> subscriber) {
// 创建阻塞队列,用于获取消息
final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
// 用于消息的接收
BlockingSubscriber<T> bs = new BlockingSubscriber<T>(queue);

o.subscribe(bs);
// 死循环获取消息
try {
for (;;) {
if (bs.isCancelled()) {
break;
}
Object v = queue.poll();
if (v == null) {
if (bs.isCancelled()) {
break;
}
BlockingHelper.verifyNonBlocking();
v = queue.take();
}
if (bs.isCancelled()) {
break;
}
if (v == BlockingSubscriber.TERMINATED
// 接收消息
|| NotificationLite.acceptFull(v, subscriber)) {
break;
}
}
} catch (InterruptedException e) {
bs.cancel();
subscriber.onError(e);
}
}

BlockingSubscriber

BlockingSubscriber 接收消息,并放入到阻塞队里里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public final class BlockingSubscriber<T> extends AtomicReference<Subscription> implements FlowableSubscriber<T>, Subscription {

private static final long serialVersionUID = -4875965440900746268L;

public static final Object TERMINATED = new Object();

final Queue<Object> queue;

public BlockingSubscriber(Queue<Object> queue) {
this.queue = queue;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this, s)) {
queue.offer(NotificationLite.subscription(this));
}
}

@Override
public void onNext(T t) {
queue.offer(NotificationLite.next(t));
}

@Override
public void onError(Throwable t) {
queue.offer(NotificationLite.error(t));
}

@Override
public void onComplete() {
queue.offer(NotificationLite.complete());
}

@Override
public void request(long n) {
get().request(n);
}

@Override
public void cancel() {
if (SubscriptionHelper.cancel(this)) {
queue.offer(TERMINATED);
}
}

public boolean isCancelled() {
return get() == SubscriptionHelper.CANCELLED;
}
}

NotificationLite
NotificationLite 用于消息的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static <T> boolean acceptFull(Object o, Subscriber<? super T> s) {
if (o == COMPLETE) {
s.onComplete();
return true;
} else
if (o instanceof ErrorNotification) {
s.onError(((ErrorNotification)o).e);
return true;
} else
if (o instanceof SubscriptionNotification) {
s.onSubscribe(((SubscriptionNotification)o).upstream);
return false;
}
s.onNext((T)o);
return false;
}