/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.lucene.index;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;

Holds shared SegmentReader instances. IndexWriter uses SegmentReaders for 1) applying deletes/DV updates, 2) doing merges, 3) handing out a real-time reader. This pool reuses instances of the SegmentReaders in all these places if it is in "near real-time mode" (getReader() has been called on this instance).
/** Holds shared SegmentReader instances. IndexWriter uses * SegmentReaders for 1) applying deletes/DV updates, 2) doing * merges, 3) handing out a real-time reader. This pool * reuses instances of the SegmentReaders in all these * places if it is in "near real-time mode" (getReader() * has been called on this instance). */
final class ReaderPool implements Closeable { private final Map<SegmentCommitInfo,ReadersAndUpdates> readerMap = new HashMap<>(); private final Directory directory; private final Directory originalDirectory; private final FieldInfos.FieldNumbers fieldNumbers; private final LongSupplier completedDelGenSupplier; private final InfoStream infoStream; private final SegmentInfos segmentInfos; private final String softDeletesField; private final Map<String, String> readerAttributes; // This is a "write once" variable (like the organic dye // on a DVD-R that may or may not be heated by a laser and // then cooled to permanently record the event): it's // false, by default until {@link #enableReaderPooling()} // is called for the first time, // at which point it's switched to true and never changes // back to false. Once this is true, we hold open and // reuse SegmentReader instances internally for applying // deletes, doing merges, and reopening near real-time // readers. // in practice this should be called once the readers are likely // to be needed and reused ie if IndexWriter#getReader is called. private volatile boolean poolReaders; private final AtomicBoolean closed = new AtomicBoolean(false); ReaderPool(Directory directory, Directory originalDirectory, SegmentInfos segmentInfos, FieldInfos.FieldNumbers fieldNumbers, LongSupplier completedDelGenSupplier, InfoStream infoStream, String softDeletesField, StandardDirectoryReader reader, Map<String, String> readerAttributes) throws IOException { this.directory = directory; this.originalDirectory = originalDirectory; this.segmentInfos = segmentInfos; this.fieldNumbers = fieldNumbers; this.completedDelGenSupplier = completedDelGenSupplier; this.infoStream = infoStream; this.softDeletesField = softDeletesField; this.readerAttributes = readerAttributes; if (reader != null) { // Pre-enroll all segment readers into the reader pool; this is necessary so // any in-memory NRT live docs are correctly carried over, and so NRT readers // pulled from this IW share the same segment reader: List<LeafReaderContext> leaves = reader.leaves(); assert segmentInfos.size() == leaves.size(); for (int i=0;i<leaves.size();i++) { LeafReaderContext leaf = leaves.get(i); SegmentReader segReader = (SegmentReader) leaf.reader(); SegmentReader newReader = new SegmentReader(segmentInfos.info(i), segReader, segReader.getLiveDocs(), segReader.getHardLiveDocs(), segReader.numDocs(), true); readerMap.put(newReader.getOriginalSegmentInfo(), new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), newReader, newPendingDeletes(newReader, newReader.getOriginalSegmentInfo()), readerAttributes)); } } }
Asserts this info still exists in IW's segment infos
/** Asserts this info still exists in IW's segment infos */
synchronized boolean assertInfoIsLive(SegmentCommitInfo info) { int idx = segmentInfos.indexOf(info); assert idx != -1: "info=" + info + " isn't live"; assert segmentInfos.info(idx) == info: "info=" + info + " doesn't match live info in segmentInfos"; return true; }
Drops reader for the given SegmentCommitInfo if it's pooled
Returns:true if a reader is pooled
/** * Drops reader for the given {@link SegmentCommitInfo} if it's pooled * @return <code>true</code> if a reader is pooled */
synchronized boolean drop(SegmentCommitInfo info) throws IOException { final ReadersAndUpdates rld = readerMap.get(info); if (rld != null) { assert info == rld.info; readerMap.remove(info); rld.dropReaders(); return true; } return false; }
Returns the sum of the ram used by all the buffered readers and updates in MB
/** * Returns the sum of the ram used by all the buffered readers and updates in MB */
synchronized long ramBytesUsed() { long bytes = 0; for (ReadersAndUpdates rld : readerMap.values()) { bytes += rld.ramBytesUsed.get(); } return bytes; }
Returns true iff any of the buffered readers and updates has at least one pending delete
/** * Returns <code>true</code> iff any of the buffered readers and updates has at least one pending delete */
synchronized boolean anyDeletions() { for(ReadersAndUpdates rld : readerMap.values()) { if (rld.getDelCount() > 0) { return true; } } return false; }
Enables reader pooling for this pool. This should be called once the readers in this pool are shared with an outside resource like an NRT reader. Once reader pooling is enabled a ReadersAndUpdates will be kept around in the reader pool on calling release(ReadersAndUpdates, boolean) until the segment get dropped via calls to drop(SegmentCommitInfo) or dropAll() or close(). Reader pooling is disabled upon construction but can't be disabled again once it's enabled.
/** * Enables reader pooling for this pool. This should be called once the readers in this pool are shared with an * outside resource like an NRT reader. Once reader pooling is enabled a {@link ReadersAndUpdates} will be kept around * in the reader pool on calling {@link #release(ReadersAndUpdates, boolean)} until the segment get dropped via calls * to {@link #drop(SegmentCommitInfo)} or {@link #dropAll()} or {@link #close()}. * Reader pooling is disabled upon construction but can't be disabled again once it's enabled. */
void enableReaderPooling() { poolReaders = true; } boolean isReaderPoolingEnabled() { return poolReaders; }
Releases the ReadersAndUpdates. This should only be called if the get(SegmentCommitInfo, boolean) is called with the create paramter set to true.
Returns:true if any files were written by this release call.
/** * Releases the {@link ReadersAndUpdates}. This should only be called if the {@link #get(SegmentCommitInfo, boolean)} * is called with the create paramter set to true. * @return <code>true</code> if any files were written by this release call. */
synchronized boolean release(ReadersAndUpdates rld, boolean assertInfoLive) throws IOException { boolean changed = false; // Matches incRef in get: rld.decRef(); if (rld.refCount() == 0) { // This happens if the segment was just merged away, // while a buffered deletes packet was still applying deletes/updates to it. assert readerMap.containsKey(rld.info) == false: "seg=" + rld.info + " has refCount 0 but still unexpectedly exists in the reader pool"; } else { // Pool still holds a ref: assert rld.refCount() > 0: "refCount=" + rld.refCount() + " reader=" + rld.info; if (poolReaders == false && rld.refCount() == 1 && readerMap.containsKey(rld.info)) { // This is the last ref to this RLD, and we're not // pooling, so remove it: if (rld.writeLiveDocs(directory)) { // Make sure we only write del docs for a live segment: assert assertInfoLive == false || assertInfoIsLive(rld.info); // Must checkpoint because we just // created new _X_N.del and field updates files; // don't call IW.checkpoint because that also // increments SIS.version, which we do not want to // do here: it was done previously (after we // invoked BDS.applyDeletes), whereas here all we // did was move the state to disk: changed = true; } if (rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream)) { changed = true; } if (rld.getNumDVUpdates() == 0) { rld.dropReaders(); readerMap.remove(rld.info); } else { // We are forced to pool this segment until its deletes fully apply (no delGen gaps) } } } return changed; } @Override public synchronized void close() throws IOException { if (closed.compareAndSet(false, true)) { dropAll(); } }
Writes all doc values updates to disk if there are any.
Returns:true iff any files where written
/** * Writes all doc values updates to disk if there are any. * @return <code>true</code> iff any files where written */
boolean writeAllDocValuesUpdates() throws IOException { Collection<ReadersAndUpdates> copy; synchronized (this) { // this needs to be protected by the reader pool lock otherwise we hit ConcurrentModificationException copy = new HashSet<>(readerMap.values()); } boolean any = false; for (ReadersAndUpdates rld : copy) { any |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream); } return any; }
Writes all doc values updates to disk if there are any.
Returns:true iff any files where written
/** * Writes all doc values updates to disk if there are any. * @return <code>true</code> iff any files where written */
boolean writeDocValuesUpdatesForMerge(List<SegmentCommitInfo> infos) throws IOException { boolean any = false; for (SegmentCommitInfo info : infos) { ReadersAndUpdates rld = get(info, false); if (rld != null) { any |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream); rld.setIsMerging(); } } return any; }
Returns a list of all currently maintained ReadersAndUpdates sorted by it's ram consumption largest to smallest. This list can also contain readers that don't consume any ram at this point ie. don't have any updates buffered.
/** * Returns a list of all currently maintained ReadersAndUpdates sorted by it's ram consumption largest to smallest. * This list can also contain readers that don't consume any ram at this point ie. don't have any updates buffered. */
synchronized List<ReadersAndUpdates> getReadersByRam() { class RamRecordingHolder { final ReadersAndUpdates updates; final long ramBytesUsed; RamRecordingHolder(ReadersAndUpdates updates) { this.updates = updates; this.ramBytesUsed = updates.ramBytesUsed.get(); } } final ArrayList<RamRecordingHolder> readersByRam; synchronized (this) { if (readerMap.isEmpty()) { return Collections.emptyList(); } readersByRam = new ArrayList<>(readerMap.size()); for (ReadersAndUpdates rld : readerMap.values()) { // we have to record the ram usage once and then sort // since the ram usage can change concurrently and that will confuse the sort or hit an assertion // the we can acquire here is not enough we would need to lock all ReadersAndUpdates to make sure it doesn't // change readersByRam.add(new RamRecordingHolder(rld)); } } // Sort this outside of the lock by largest ramBytesUsed: CollectionUtil.introSort(readersByRam, (a, b) -> Long.compare(b.ramBytesUsed, a.ramBytesUsed)); return Collections.unmodifiableList(readersByRam.stream().map(h -> h.updates).collect(Collectors.toList())); }
Remove all our references to readers, and commits any pending changes.
/** Remove all our references to readers, and commits * any pending changes. */
synchronized void dropAll() throws IOException { Throwable priorE = null; final Iterator<Map.Entry<SegmentCommitInfo,ReadersAndUpdates>> it = readerMap.entrySet().iterator(); while(it.hasNext()) { final ReadersAndUpdates rld = it.next().getValue(); // Important to remove as-we-go, not with .clear() // in the end, in case we hit an exception; // otherwise we could over-decref if close() is // called again: it.remove(); // NOTE: it is allowed that these decRefs do not // actually close the SRs; this happens when a // near real-time reader is kept open after the // IndexWriter instance is closed: try { rld.dropReaders(); } catch (Throwable t) { priorE = IOUtils.useOrSuppress(priorE, t); } } assert readerMap.size() == 0; if (priorE != null) { throw IOUtils.rethrowAlways(priorE); } }
Commit live docs changes for the segment readers for the provided infos.
Throws:
  • IOException – If there is a low-level I/O error
/** * Commit live docs changes for the segment readers for * the provided infos. * * @throws IOException If there is a low-level I/O error */
synchronized boolean commit(SegmentInfos infos) throws IOException { boolean atLeastOneChange = false; for (SegmentCommitInfo info : infos) { final ReadersAndUpdates rld = readerMap.get(info); if (rld != null) { assert rld.info == info; boolean changed = rld.writeLiveDocs(directory); changed |= rld.writeFieldUpdates(directory, fieldNumbers, completedDelGenSupplier.getAsLong(), infoStream); if (changed) { // Make sure we only write del docs for a live segment: assert assertInfoIsLive(info); // Must checkpoint because we just // created new _X_N.del and field updates files; // don't call IW.checkpoint because that also // increments SIS.version, which we do not want to // do here: it was done previously (after we // invoked BDS.applyDeletes), whereas here all we // did was move the state to disk: atLeastOneChange = true; } } } return atLeastOneChange; }
Returns true iff there are any buffered doc values updates. Otherwise false.
/** * Returns <code>true</code> iff there are any buffered doc values updates. Otherwise <code>false</code>. */
synchronized boolean anyDocValuesChanges() { for (ReadersAndUpdates rld : readerMap.values()) { // NOTE: we don't check for pending deletes because deletes carry over in RAM to NRT readers if (rld.getNumDVUpdates() != 0) { return true; } } return false; }
Obtain a ReadersAndLiveDocs instance from the readerPool. If create is true, you must later call release(ReadersAndUpdates, boolean).
/** * Obtain a ReadersAndLiveDocs instance from the * readerPool. If create is true, you must later call * {@link #release(ReadersAndUpdates, boolean)}. */
synchronized ReadersAndUpdates get(SegmentCommitInfo info, boolean create) { assert info.info.dir == originalDirectory: "info.dir=" + info.info.dir + " vs " + originalDirectory; if (closed.get()) { assert readerMap.isEmpty() : "Reader map is not empty: " + readerMap; throw new AlreadyClosedException("ReaderPool is already closed"); } ReadersAndUpdates rld = readerMap.get(info); if (rld == null) { if (create == false) { return null; } rld = new ReadersAndUpdates(segmentInfos.getIndexCreatedVersionMajor(), info, newPendingDeletes(info), readerAttributes); // Steal initial reference: readerMap.put(info, rld); } else { assert rld.info == info: "rld.info=" + rld.info + " info=" + info + " isLive?=" + assertInfoIsLive(rld.info) + " vs " + assertInfoIsLive(info); } if (create) { // Return ref to caller: rld.incRef(); } assert noDups(); return rld; } private PendingDeletes newPendingDeletes(SegmentCommitInfo info) { return softDeletesField == null ? new PendingDeletes(info) : new PendingSoftDeletes(softDeletesField, info); } private PendingDeletes newPendingDeletes(SegmentReader reader, SegmentCommitInfo info) { return softDeletesField == null ? new PendingDeletes(reader, info) : new PendingSoftDeletes(softDeletesField, reader, info); } // Make sure that every segment appears only once in the // pool: private boolean noDups() { Set<String> seen = new HashSet<>(); for(SegmentCommitInfo info : readerMap.keySet()) { assert !seen.contains(info.info.name); seen.add(info.info.name); } return true; } }