RXjava 设计原理

以下代码原理分析基于 RXjava 1.3.8 ,RXJava 2.x 和其原理类似。

1、RXJava 线程调度原理

RXJava线程调用使用静态代理(原理类的包裹类),在原线程中调用静态代理,静态代理在其他线程(线程池中)执行原始接口。

Apptalking

2、RXJava 如何保证顺序执行

OnSubscribe 端处理
OnSubscribe 中 call 是在一个线程task中执行的,完全可以保证阻塞顺序执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();

SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);
// 在异步线程中执行
inner.schedule(parent);
}

static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {

final Subscriber<? super T> actual;

final boolean requestOn;

final Worker worker;

Observable<T> source;

Thread t;

SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
this.actual = actual;
this.requestOn = requestOn;
this.worker = worker;
this.source = source;
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}

@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
//调用原始的call方法
src.unsafeSubscribe(this);
}

@Override
public void setProducer(final Producer p) {
actual.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread() || !requestOn) {
p.request(n);
} else {
worker.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
}
}

Subscriber 端处理

  • ObserveOnSubscriber 中使用队列存储临时数据,当上游的 doNext 调用时间短于Subscriber中的doNext时候,参数值存放在队列中,可以起到缓冲的作用,同时也不会影响到上游的doNext 执行。
  • 单线程执行,doNext 是顺序阻塞执行的,所以必须在一个线程中依次获取队列的数值,进行执行。单线程设计不是启用一个线程,而是在调用doNext的时候如果没有线程,启动一个线程执行,线程在获取完Message后退出。

Apptalking

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final boolean delayError;
final Queue<Object> queue;
/** The emission threshold that should trigger a replenishing request. */
final int limit;

// the status of the current stream
volatile boolean finished;

final AtomicLong requested = new AtomicLong();

final AtomicLong counter = new AtomicLong();

/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;

/** Remembers how many elements have been emitted before the requests run out. */
long emitted;

// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}

void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;

localChild.setProducer(new Producer() {

@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}

});
localChild.add(recursiveScheduler);
localChild.add(this);
}

@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}

@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}

@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}

// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;

// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;

// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)

for (;;) {
long requestAmount = requested.get();

while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;

if (checkTerminated(done, empty, localChild, q)) {
return;
}

if (empty) {
break;
}

localChild.onNext(NotificationLite.<T>getValue(v));

currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}

if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}

emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}

boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}

if (done) {
if (delayError) {
if (isEmpty) {
Throwable e = error;
try {
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
}
} else {
Throwable e = error;
if (e != null) {
q.clear();
try {
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
} else
if (isEmpty) {
try {
a.onCompleted();
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
}

}

return false;
}
}