package org.apache.avro.file;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
public class DataFileStream<D> implements Iterator<D>, Iterable<D>, Closeable {
public static final class {
Schema ;
Map<String, byte[]> = new HashMap<>();
private transient List<String> = new ArrayList<>();
byte[] = new byte[DataFileConstants.SYNC_SIZE];
private () {
}
}
private DatumReader<D> reader;
private long blockSize;
private boolean availableBlock = false;
private Header ;
BinaryDecoder vin;
BinaryDecoder datumIn = null;
ByteBuffer blockBuffer;
long blockCount;
long blockRemaining;
byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
private Codec codec;
public DataFileStream(InputStream in, DatumReader<D> reader) throws IOException {
this.reader = reader;
initialize(in);
}
protected DataFileStream(DatumReader<D> reader) throws IOException {
this.reader = reader;
}
void initialize(InputStream in) throws IOException {
this.header = new Header();
this.vin = DecoderFactory.get().binaryDecoder(in, vin);
byte[] magic = new byte[DataFileConstants.MAGIC.length];
try {
vin.readFixed(magic);
} catch (IOException e) {
throw new IOException("Not an Avro data file.", e);
}
if (!Arrays.equals(DataFileConstants.MAGIC, magic))
throw new InvalidAvroMagicException("Not an Avro data file.");
long l = vin.readMapStart();
if (l > 0) {
do {
for (long i = 0; i < l; i++) {
String key = vin.readString(null).toString();
ByteBuffer value = vin.readBytes(null);
byte[] bb = new byte[value.remaining()];
value.get(bb);
header.meta.put(key, bb);
header.metaKeyList.add(key);
}
} while ((l = vin.mapNext()) != 0);
}
vin.readFixed(header.sync);
header.metaKeyList = Collections.unmodifiableList(header.metaKeyList);
header.schema = new Schema.Parser().setValidate(false).parse(getMetaString(DataFileConstants.SCHEMA));
this.codec = resolveCodec();
reader.setSchema(header.schema);
}
void (InputStream in, Header header) throws IOException {
this.header = header;
this.codec = resolveCodec();
reader.setSchema(header.schema);
}
Codec resolveCodec() {
String codecStr = getMetaString(DataFileConstants.CODEC);
if (codecStr != null) {
return CodecFactory.fromString(codecStr).createInstance();
} else {
return CodecFactory.nullCodec().createInstance();
}
}
public Header () {
return header;
}
public Schema getSchema() {
return header.schema;
}
public List<String> getMetaKeys() {
return header.metaKeyList;
}
public byte[] getMeta(String key) {
return header.meta.get(key);
}
public String getMetaString(String key) {
byte[] value = getMeta(key);
if (value == null) {
return null;
}
return new String(value, StandardCharsets.UTF_8);
}
public long getMetaLong(String key) {
return Long.parseLong(getMetaString(key));
}
@Override
public Iterator<D> iterator() {
return this;
}
private DataBlock block = null;
@Override
public boolean hasNext() {
try {
if (blockRemaining == 0) {
if (null != datumIn) {
boolean atEnd = datumIn.isEnd();
if (!atEnd) {
throw new IOException("Block read partially, the data may be corrupt");
}
}
if (hasNextBlock()) {
block = nextRawBlock(block);
block.decompressUsing(codec);
blockBuffer = block.getAsByteBuffer();
datumIn = DecoderFactory.get().binaryDecoder(blockBuffer.array(),
blockBuffer.arrayOffset() + blockBuffer.position(), blockBuffer.remaining(), datumIn);
}
}
return blockRemaining != 0;
} catch (EOFException e) {
return false;
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
@Override
public D next() {
try {
return next(null);
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
public D next(D reuse) throws IOException {
if (!hasNext())
throw new NoSuchElementException();
D result = reader.read(reuse, datumIn);
if (0 == --blockRemaining) {
blockFinished();
}
return result;
}
public ByteBuffer nextBlock() throws IOException {
if (!hasNext())
throw new NoSuchElementException();
if (blockRemaining != blockCount)
throw new IllegalStateException("Not at block start.");
blockRemaining = 0;
datumIn = null;
return blockBuffer;
}
public long getBlockCount() {
return blockCount;
}
public long getBlockSize() {
return blockSize;
}
protected void blockFinished() throws IOException {
}
boolean hasNextBlock() {
try {
if (availableBlock)
return true;
if (vin.isEnd())
return false;
blockRemaining = vin.readLong();
blockSize = vin.readLong();
if (blockSize > Integer.MAX_VALUE || blockSize < 0) {
throw new IOException("Block size invalid or too large for this " + "implementation: " + blockSize);
}
blockCount = blockRemaining;
availableBlock = true;
return true;
} catch (EOFException eof) {
return false;
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
DataBlock (DataBlock reuse) throws IOException {
if (!hasNextBlock()) {
throw new NoSuchElementException();
}
if (reuse == null || reuse.data.length < (int) blockSize) {
reuse = new DataBlock(blockRemaining, (int) blockSize);
} else {
reuse.numEntries = blockRemaining;
reuse.blockSize = (int) blockSize;
}
vin.readFixed(reuse.data, 0, reuse.blockSize);
vin.readFixed(syncBuffer);
availableBlock = false;
if (!Arrays.equals(syncBuffer, header.sync))
throw new IOException("Invalid sync!");
return reuse;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
vin.inputStream().close();
}
static class DataBlock {
private byte[] data;
private long numEntries;
private int blockSize;
private int offset = 0;
private boolean flushOnWrite = true;
private DataBlock(long numEntries, int blockSize) {
this.data = new byte[blockSize];
this.numEntries = numEntries;
this.blockSize = blockSize;
}
DataBlock(ByteBuffer block, long numEntries) {
this.data = block.array();
this.blockSize = block.remaining();
this.offset = block.arrayOffset() + block.position();
this.numEntries = numEntries;
}
byte[] getData() {
return data;
}
long getNumEntries() {
return numEntries;
}
int getBlockSize() {
return blockSize;
}
boolean isFlushOnWrite() {
return flushOnWrite;
}
void setFlushOnWrite(boolean flushOnWrite) {
this.flushOnWrite = flushOnWrite;
}
ByteBuffer getAsByteBuffer() {
return ByteBuffer.wrap(data, offset, blockSize);
}
void decompressUsing(Codec c) throws IOException {
ByteBuffer result = c.decompress(getAsByteBuffer());
data = result.array();
blockSize = result.remaining();
}
void compressUsing(Codec c) throws IOException {
ByteBuffer result = c.compress(getAsByteBuffer());
data = result.array();
blockSize = result.remaining();
}
void writeBlockTo(BinaryEncoder e, byte[] sync) throws IOException {
e.writeLong(this.numEntries);
e.writeLong(this.blockSize);
e.writeFixed(this.data, offset, this.blockSize);
e.writeFixed(sync);
if (flushOnWrite) {
e.flush();
}
}
}
}