/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.index;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.lucene.util.ThreadInterruptedException;

DocumentsWriterPerThreadPool controls ThreadState instances and their thread assignments during indexing. Each ThreadState holds a reference to a DocumentsWriterPerThread that is once a ThreadState is obtained from the pool exclusively used for indexing a single document by the obtaining thread. Each indexing thread must obtain such a ThreadState to make progress. Depending on the DocumentsWriterPerThreadPool implementation ThreadState assignments might differ from document to document.

Once a DocumentsWriterPerThread is selected for flush the thread pool is reusing the flushing DocumentsWriterPerThreads ThreadState with a new DocumentsWriterPerThread instance.

/** * {@link DocumentsWriterPerThreadPool} controls {@link ThreadState} instances * and their thread assignments during indexing. Each {@link ThreadState} holds * a reference to a {@link DocumentsWriterPerThread} that is once a * {@link ThreadState} is obtained from the pool exclusively used for indexing a * single document by the obtaining thread. Each indexing thread must obtain * such a {@link ThreadState} to make progress. Depending on the * {@link DocumentsWriterPerThreadPool} implementation {@link ThreadState} * assignments might differ from document to document. * <p> * Once a {@link DocumentsWriterPerThread} is selected for flush the thread pool * is reusing the flushing {@link DocumentsWriterPerThread}s ThreadState with a * new {@link DocumentsWriterPerThread} instance. * </p> */
final class DocumentsWriterPerThreadPool {
ThreadState references and guards a DocumentsWriterPerThread instance that is used during indexing to build a in-memory index segment. ThreadState also holds all flush related per-thread data controlled by DocumentsWriterFlushControl.

A ThreadState, its methods and members should only accessed by one thread a time. Users must acquire the lock via ReentrantLock.lock() and release the lock in a finally block via ReentrantLock.unlock() before accessing the state.

/** * {@link ThreadState} references and guards a * {@link DocumentsWriterPerThread} instance that is used during indexing to * build a in-memory index segment. {@link ThreadState} also holds all flush * related per-thread data controlled by {@link DocumentsWriterFlushControl}. * <p> * A {@link ThreadState}, its methods and members should only accessed by one * thread a time. Users must acquire the lock via {@link ThreadState#lock()} * and release the lock in a finally block via {@link ThreadState#unlock()} * before accessing the state. */
@SuppressWarnings("serial") final static class ThreadState extends ReentrantLock { DocumentsWriterPerThread dwpt; // TODO this should really be part of DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl volatile boolean flushPending = false; // TODO this should really be part of DocumentsWriterFlushControl // write access guarded by DocumentsWriterFlushControl long bytesUsed = 0; // set by DocumentsWriter after each indexing op finishes volatile long lastSeqNo; ThreadState(DocumentsWriterPerThread dpwt) { this.dwpt = dpwt; } private void reset() { assert this.isHeldByCurrentThread(); this.dwpt = null; this.bytesUsed = 0; this.flushPending = false; } boolean isInitialized() { assert this.isHeldByCurrentThread(); return dwpt != null; }
Returns the number of currently active bytes in this ThreadState's DocumentsWriterPerThread
/** * Returns the number of currently active bytes in this ThreadState's * {@link DocumentsWriterPerThread} */
public long getBytesUsedPerThread() { assert this.isHeldByCurrentThread(); // public for FlushPolicy return bytesUsed; } /** * Returns this {@link ThreadState}s {@link DocumentsWriterPerThread} */ public DocumentsWriterPerThread getDocumentsWriterPerThread() { assert this.isHeldByCurrentThread(); // public for FlushPolicy return dwpt; }
Returns true iff this ThreadState is marked as flush pending otherwise false
/** * Returns <code>true</code> iff this {@link ThreadState} is marked as flush * pending otherwise <code>false</code> */
public boolean isFlushPending() { return flushPending; } } private final List<ThreadState> threadStates = new ArrayList<>(); private final List<ThreadState> freeList = new ArrayList<>(); private int takenThreadStatePermits = 0;
Returns the active number of ThreadState instances.
/** * Returns the active number of {@link ThreadState} instances. */
synchronized int getActiveThreadStateCount() { return threadStates.size(); } synchronized void lockNewThreadStates() { // this is similar to a semaphore - we need to acquire all permits ie. takenThreadStatePermits must be == 0 // any call to lockNewThreadStates() must be followed by unlockNewThreadStates() otherwise we will deadlock at some // point assert takenThreadStatePermits >= 0; takenThreadStatePermits++; } synchronized void unlockNewThreadStates() { assert takenThreadStatePermits > 0; takenThreadStatePermits--; if (takenThreadStatePermits == 0) { notifyAll(); } }
Returns a new ThreadState iff any new state is available otherwise null.

NOTE: the returned ThreadState is already locked iff non- null.

Returns:a new ThreadState iff any new state is available otherwise null
/** * Returns a new {@link ThreadState} iff any new state is available otherwise * <code>null</code>. * <p> * NOTE: the returned {@link ThreadState} is already locked iff non- * <code>null</code>. * * @return a new {@link ThreadState} iff any new state is available otherwise * <code>null</code> */
private synchronized ThreadState newThreadState() { assert takenThreadStatePermits >= 0; while (takenThreadStatePermits > 0) { // we can't create new thread-states while not all permits are available try { wait(); } catch (InterruptedException ie) { throw new ThreadInterruptedException(ie); } } ThreadState threadState = new ThreadState(null); threadState.lock(); // lock so nobody else will get this ThreadState threadStates.add(threadState); return threadState; } DocumentsWriterPerThread reset(ThreadState threadState) { assert threadState.isHeldByCurrentThread(); final DocumentsWriterPerThread dwpt = threadState.dwpt; threadState.reset(); return dwpt; } void recycle(DocumentsWriterPerThread dwpt) { // don't recycle DWPT by default } // TODO: maybe we should try to do load leveling here: we want roughly even numbers // of items (docs, deletes, DV updates) to most take advantage of concurrency while flushing
This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument).
/** This method is used by DocumentsWriter/FlushControl to obtain a ThreadState to do an indexing operation (add/updateDocument). */
ThreadState getAndLock() { ThreadState threadState = null; synchronized (this) { if (freeList.isEmpty()) { // ThreadState is already locked before return by this method: return newThreadState(); } else { // Important that we are LIFO here! This way if number of concurrent indexing threads was once high, but has now reduced, we only use a // limited number of thread states: threadState = freeList.remove(freeList.size()-1); if (threadState.dwpt == null) { // This thread-state is not initialized, e.g. it // was just flushed. See if we can instead find // another free thread state that already has docs // indexed. This way if incoming thread concurrency // has decreased, we don't leave docs // indefinitely buffered, tying up RAM. This // will instead get those thread states flushed, // freeing up RAM for larger segment flushes: for(int i=0;i<freeList.size();i++) { ThreadState ts = freeList.get(i); if (ts.dwpt != null) { // Use this one instead, and swap it with // the un-initialized one: freeList.set(i, threadState); threadState = ts; break; } } } } } // This could take time, e.g. if the threadState is [briefly] checked for flushing: threadState.lock(); return threadState; } void release(ThreadState state) { state.unlock(); synchronized (this) { freeList.add(state); } }
Returns the ith active ThreadState where i is the given ord.
Params:
Returns:the ith active ThreadState where i is the given ord.
/** * Returns the <i>i</i>th active {@link ThreadState} where <i>i</i> is the * given ord. * * @param ord * the ordinal of the {@link ThreadState} * @return the <i>i</i>th active {@link ThreadState} where <i>i</i> is the * given ord. */
synchronized ThreadState getThreadState(int ord) { return threadStates.get(ord); } // TODO: merge this with getActiveThreadStateCount: they are the same! synchronized int getMaxThreadStates() { return threadStates.size(); } }