Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.internal.operators.flowable; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Publisher; import io.reactivex.*; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.DisposableSubscriber;
Returns an Iterable that blocks until the Observable emits another item, then returns that item.

Type parameters:
  • <T> – the value type
/** * Returns an Iterable that blocks until the Observable emits another item, then returns that item. * <p> * <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.next.png" alt=""> * * @param <T> the value type */
public final class BlockingFlowableNext<T> implements Iterable<T> { final Publisher<? extends T> source; public BlockingFlowableNext(Publisher<? extends T> source) { this.source = source; } @Override public Iterator<T> iterator() { NextSubscriber<T> nextSubscriber = new NextSubscriber<T>(); return new NextIterator<T>(source, nextSubscriber); } // test needs to access the observer.waiting flag static final class NextIterator<T> implements Iterator<T> { private final NextSubscriber<T> subscriber; private final Publisher<? extends T> items; private T next; private boolean hasNext = true; private boolean isNextConsumed = true; private Throwable error; private boolean started; NextIterator(Publisher<? extends T> items, NextSubscriber<T> subscriber) { this.items = items; this.subscriber = subscriber; } @Override public boolean hasNext() { if (error != null) { // If any error has already been thrown, throw it again. throw ExceptionHelper.wrapOrThrow(error); } // Since an iterator should not be used in different thread, // so we do not need any synchronization. if (!hasNext) { // the iterator has reached the end. return false; } // next has not been used yet. return !isNextConsumed || moveToNext(); } private boolean moveToNext() { try { if (!started) { started = true; // if not started, start now subscriber.setWaiting(); Flowable.<T>fromPublisher(items) .materialize().subscribe(subscriber); } Notification<T> nextNotification = subscriber.takeNext(); if (nextNotification.isOnNext()) { isNextConsumed = false; next = nextNotification.getValue(); return true; } // If an observable is completed or fails, // hasNext() always return false. hasNext = false; if (nextNotification.isOnComplete()) { return false; } if (nextNotification.isOnError()) { error = nextNotification.getError(); throw ExceptionHelper.wrapOrThrow(error); } throw new IllegalStateException("Should not reach here"); } catch (InterruptedException e) { subscriber.dispose(); error = e; throw ExceptionHelper.wrapOrThrow(e); } } @Override public T next() { if (error != null) { // If any error has already been thrown, throw it again. throw ExceptionHelper.wrapOrThrow(error); } if (hasNext()) { isNextConsumed = true; return next; } else { throw new NoSuchElementException("No more elements"); } } @Override public void remove() { throw new UnsupportedOperationException("Read only iterator"); } } static final class NextSubscriber<T> extends DisposableSubscriber<Notification<T>> { private final BlockingQueue<Notification<T>> buf = new ArrayBlockingQueue<Notification<T>>(1); final AtomicInteger waiting = new AtomicInteger(); @Override public void onComplete() { // ignore } @Override public void onError(Throwable e) { RxJavaPlugins.onError(e); } @Override public void onNext(Notification<T> args) { if (waiting.getAndSet(0) == 1 || !args.isOnNext()) { Notification<T> toOffer = args; while (!buf.offer(toOffer)) { Notification<T> concurrentItem = buf.poll(); // in case if we won race condition with onComplete/onError method if (concurrentItem != null && !concurrentItem.isOnNext()) { toOffer = concurrentItem; } } } } public Notification<T> takeNext() throws InterruptedException { setWaiting(); BlockingHelper.verifyNonBlocking(); return buf.take(); } void setWaiting() { waiting.set(1); } } }