Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.internal.operators.maybe; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.*; import io.reactivex.annotations.Nullable; import org.reactivestreams.Subscriber; import io.reactivex.*; import io.reactivex.disposables.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.SimpleQueue; import io.reactivex.internal.subscriptions.*; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins;
Run all MaybeSources of an array at once and signal their values as they become available.
Type parameters:
  • <T> – the value type
/** * Run all MaybeSources of an array at once and signal their values as they become available. * * @param <T> the value type */
public final class MaybeMergeArray<T> extends Flowable<T> { final MaybeSource<? extends T>[] sources; public MaybeMergeArray(MaybeSource<? extends T>[] sources) { this.sources = sources; } @Override protected void subscribeActual(Subscriber<? super T> s) { MaybeSource<? extends T>[] maybes = sources; int n = maybes.length; SimpleQueueWithConsumerIndex<Object> queue; if (n <= bufferSize()) { queue = new MpscFillOnceSimpleQueue<Object>(n); } else { queue = new ClqSimpleQueue<Object>(); } MergeMaybeObserver<T> parent = new MergeMaybeObserver<T>(s, n, queue); s.onSubscribe(parent); AtomicThrowable e = parent.error; for (MaybeSource<? extends T> source : maybes) { if (parent.isCancelled() || e.get() != null) { return; } source.subscribe(parent); } } static final class MergeMaybeObserver<T> extends BasicIntQueueSubscription<T> implements MaybeObserver<T> { private static final long serialVersionUID = -660395290758764731L; final Subscriber<? super T> downstream; final CompositeDisposable set; final AtomicLong requested; final SimpleQueueWithConsumerIndex<Object> queue; final AtomicThrowable error; final int sourceCount; volatile boolean cancelled; boolean outputFused; long consumed; MergeMaybeObserver(Subscriber<? super T> actual, int sourceCount, SimpleQueueWithConsumerIndex<Object> queue) { this.downstream = actual; this.sourceCount = sourceCount; this.set = new CompositeDisposable(); this.requested = new AtomicLong(); this.error = new AtomicThrowable(); this.queue = queue; } @Override public int requestFusion(int mode) { if ((mode & ASYNC) != 0) { outputFused = true; return ASYNC; } return NONE; } @Nullable @SuppressWarnings("unchecked") @Override public T poll() throws Exception { for (;;) { Object o = queue.poll(); if (o != NotificationLite.COMPLETE) { return (T)o; } } } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public void clear() { queue.clear(); } @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(requested, n); drain(); } } @Override public void cancel() { if (!cancelled) { cancelled = true; set.dispose(); if (getAndIncrement() == 0) { queue.clear(); } } } @Override public void onSubscribe(Disposable d) { set.add(d); } @Override public void onSuccess(T value) { queue.offer(value); drain(); } @Override public void onError(Throwable e) { if (error.addThrowable(e)) { set.dispose(); queue.offer(NotificationLite.COMPLETE); drain(); } else { RxJavaPlugins.onError(e); } } @Override public void onComplete() { queue.offer(NotificationLite.COMPLETE); drain(); } boolean isCancelled() { return cancelled; } @SuppressWarnings("unchecked") void drainNormal() { int missed = 1; Subscriber<? super T> a = downstream; SimpleQueueWithConsumerIndex<Object> q = queue; long e = consumed; for (;;) { long r = requested.get(); while (e != r) { if (cancelled) { q.clear(); return; } Throwable ex = error.get(); if (ex != null) { q.clear(); a.onError(error.terminate()); return; } if (q.consumerIndex() == sourceCount) { a.onComplete(); return; } Object v = q.poll(); if (v == null) { break; } if (v != NotificationLite.COMPLETE) { a.onNext((T)v); e++; } } if (e == r) { Throwable ex = error.get(); if (ex != null) { q.clear(); a.onError(error.terminate()); return; } while (q.peek() == NotificationLite.COMPLETE) { q.drop(); } if (q.consumerIndex() == sourceCount) { a.onComplete(); return; } } consumed = e; missed = addAndGet(-missed); if (missed == 0) { break; } } } void drainFused() { int missed = 1; Subscriber<? super T> a = downstream; SimpleQueueWithConsumerIndex<Object> q = queue; for (;;) { if (cancelled) { q.clear(); return; } Throwable ex = error.get(); if (ex != null) { q.clear(); a.onError(ex); return; } boolean d = q.producerIndex() == sourceCount; if (!q.isEmpty()) { a.onNext(null); } if (d) { a.onComplete(); return; } missed = addAndGet(-missed); if (missed == 0) { break; } } } void drain() { if (getAndIncrement() != 0) { return; } if (outputFused) { drainFused(); } else { drainNormal(); } } } interface SimpleQueueWithConsumerIndex<T> extends SimpleQueue<T> { @Nullable @Override T poll(); T peek(); void drop(); int consumerIndex(); int producerIndex(); } static final class MpscFillOnceSimpleQueue<T> extends AtomicReferenceArray<T> implements SimpleQueueWithConsumerIndex<T> { private static final long serialVersionUID = -7969063454040569579L; final AtomicInteger producerIndex; int consumerIndex; MpscFillOnceSimpleQueue(int length) { super(length); this.producerIndex = new AtomicInteger(); } @Override public boolean offer(T value) { ObjectHelper.requireNonNull(value, "value is null"); int idx = producerIndex.getAndIncrement(); if (idx < length()) { lazySet(idx, value); return true; } return false; } @Override public boolean offer(T v1, T v2) { throw new UnsupportedOperationException(); } @Nullable @Override public T poll() { int ci = consumerIndex; if (ci == length()) { return null; } AtomicInteger pi = producerIndex; for (;;) { T v = get(ci); if (v != null) { consumerIndex = ci + 1; lazySet(ci, null); return v; } if (pi.get() == ci) { return null; } } } @Override public T peek() { int ci = consumerIndex; if (ci == length()) { return null; } return get(ci); } @Override public void drop() { int ci = consumerIndex; lazySet(ci, null); consumerIndex = ci + 1; } @Override public boolean isEmpty() { return consumerIndex == producerIndex(); } @Override public void clear() { while (poll() != null && !isEmpty()) { } } @Override public int consumerIndex() { return consumerIndex; } @Override public int producerIndex() { return producerIndex.get(); } } static final class ClqSimpleQueue<T> extends ConcurrentLinkedQueue<T> implements SimpleQueueWithConsumerIndex<T> { private static final long serialVersionUID = -4025173261791142821L; int consumerIndex; final AtomicInteger producerIndex; ClqSimpleQueue() { this.producerIndex = new AtomicInteger(); } @Override public boolean offer(T v1, T v2) { throw new UnsupportedOperationException(); } @Override public boolean offer(T e) { producerIndex.getAndIncrement(); return super.offer(e); } @Nullable @Override public T poll() { T v = super.poll(); if (v != null) { consumerIndex++; } return v; } @Override public int consumerIndex() { return consumerIndex; } @Override public int producerIndex() { return producerIndex.get(); } @Override public void drop() { poll(); } } }