diff options
author | Vlad Rozov <vrozov@apache.org> | 2018-05-09 13:24:11 -0700 |
---|---|---|
committer | Timothy Farkas <timothyfarkas@apache.org> | 2018-06-13 17:11:29 -0700 |
commit | ac8e69847659582e36c89fd52bb0856ab3bfbd21 (patch) | |
tree | 917e881ff0bc693badbf152e629a75505ace8a90 /exec/java-exec/src/main/java/org | |
parent | 98dbc3a222990703aebe983883779763e0cdc1e9 (diff) |
DRILL-6353: Upgrade Parquet MR dependencies
closes #1259
Diffstat (limited to 'exec/java-exec/src/main/java/org')
16 files changed, 147 insertions, 152 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java index 9e561ad36..ebceefb43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java @@ -113,7 +113,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical // can drop when left's max < right's min, or right's max < left's min final C leftMin = leftStat.genericGetMin(); final C rightMin = rightStat.genericGetMin(); - return leftStat.genericGetMax().compareTo(rightMin) < 0 || rightStat.genericGetMax().compareTo(leftMin) < 0; + return (leftStat.compareMaxToValue(rightMin) < 0) || (rightStat.compareMaxToValue(leftMin) < 0); }) { @Override public String toString() { @@ -132,7 +132,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> { // can drop when left's max <= right's min. final C rightMin = rightStat.genericGetMin(); - return leftStat.genericGetMax().compareTo(rightMin) <= 0; + return leftStat.compareMaxToValue(rightMin) <= 0; }); } @@ -146,7 +146,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> { // can drop when left's max < right's min. final C rightMin = rightStat.genericGetMin(); - return leftStat.genericGetMax().compareTo(rightMin) < 0; + return leftStat.compareMaxToValue(rightMin) < 0; }); } @@ -160,7 +160,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> { // can drop when right's max <= left's min. final C leftMin = leftStat.genericGetMin(); - return rightStat.genericGetMax().compareTo(leftMin) <= 0; + return rightStat.compareMaxToValue(leftMin) <= 0; }); } @@ -173,7 +173,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> { // can drop when right's max < left's min. final C leftMin = leftStat.genericGetMin(); - return rightStat.genericGetMax().compareTo(leftMin) < 0; + return rightStat.compareMaxToValue(leftMin) < 0; }); } @@ -188,8 +188,8 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical // can drop when there is only one unique value. final C leftMax = leftStat.genericGetMax(); final C rightMax = rightStat.genericGetMax(); - return leftStat.genericGetMin().compareTo(leftMax) == 0 && rightStat.genericGetMin().compareTo(rightMax) == 0 && - leftStat.genericGetMax().compareTo(rightMax) == 0; + return leftStat.compareMinToValue(leftMax) == 0 && rightStat.compareMinToValue(rightMax) == 0 && + leftStat.compareMaxToValue(rightMax) == 0; }); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java index 9b041020f..547dc0670 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java @@ -23,6 +23,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.TypedFieldExpr; import org.apache.drill.common.expression.visitors.ExprVisitor; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.parquet.column.statistics.BooleanStatistics; import org.apache.parquet.column.statistics.Statistics; import java.util.ArrayList; @@ -114,7 +115,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi private static LogicalExpression createIsTruePredicate(LogicalExpression expr) { return new ParquetIsPredicate<Boolean>(expr, //if max value is not true or if there are all nulls -> canDrop - (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) || isAllNulls(exprStat, evaluator.getRowCount()) + (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount()) ); } @@ -124,7 +125,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) { return new ParquetIsPredicate<Boolean>(expr, //if min value is not false or if there are all nulls -> canDrop - (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) || isAllNulls(exprStat, evaluator.getRowCount()) + (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount()) ); } @@ -134,7 +135,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) { return new ParquetIsPredicate<Boolean>(expr, //if min value is not false or if there are no nulls -> canDrop - (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) && hasNoNulls(exprStat) + (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat) ); } @@ -144,7 +145,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) { return new ParquetIsPredicate<Boolean>(expr, //if max value is not true or if there are no nulls -> canDrop - (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) && hasNoNulls(exprStat) + (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat) ); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java index 7ff103644..f804a7b06 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java @@ -43,7 +43,7 @@ class ParquetPredicatesHelper { * False if at least one row is not null. */ static boolean isAllNulls(Statistics stat, long rowCount) { - return stat.getNumNulls() == rowCount; + return stat.isNumNullsSet() && stat.getNumNulls() == rowCount; } /** @@ -54,7 +54,7 @@ class ParquetPredicatesHelper { * False if the parquet file hasn't nulls. */ static boolean hasNoNulls(Statistics stat) { - return stat.getNumNulls() == 0; + return !stat.isNumNullsSet() || stat.getNumNulls() == 0; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java index 6a320b85b..dc09ce1b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java @@ -33,11 +33,12 @@ import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.store.parquet2.DrillParquetReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -50,6 +51,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; + public abstract class AbstractParquetScanBatchCreator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class); @@ -146,11 +150,15 @@ public abstract class AbstractParquetScanBatchCreator { protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager); private ParquetMetadata readFooter(Configuration conf, String path) throws IOException { - Configuration newConf = new Configuration(conf); + conf = new Configuration(conf); conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false); conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false); conf.setBoolean(ENABLE_TIME_READ_COUNTER, false); - return ParquetFileReader.readFooter(newConf, new Path(path), ParquetMetadataConverter.NO_FILTER); + conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true); + ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build(); + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) { + return reader.getFooter(); + } } private boolean isComplex(ParquetMetadata footer) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java index dcd40cf91..79294daea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java @@ -21,14 +21,13 @@ import io.netty.buffer.DrillBuf; import java.io.IOException; import java.io.OutputStream; -import java.nio.ByteBuffer; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.Util; -import org.apache.parquet.hadoop.util.CompatibilityUtil; +import org.apache.parquet.hadoop.util.HadoopStreams; public class ColumnDataReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class); @@ -58,11 +57,7 @@ public class ColumnDataReader { public void loadPage(DrillBuf target, int pageLength) throws IOException { target.clear(); - ByteBuffer directBuffer = target.nioBuffer(0, pageLength); - int lengthLeftToRead = pageLength; - while (lengthLeftToRead > 0) { - lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, lengthLeftToRead); - } + HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength)); target.writerIndex(pageLength); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java index ea34c7d8b..d1562c48c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java @@ -42,6 +42,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import com.google.common.base.Preconditions; import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; public class FooterGatherer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class); @@ -160,7 +161,8 @@ public class FooterGatherer { footerBytes = ArrayUtils.subarray(footerBytes, start, start + size); } - ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(new ByteArrayInputStream(footerBytes)); + final ByteArrayInputStream from = new ByteArrayInputStream(footerBytes); + ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(from, NO_FILTER); Footer footer = new Footer(status.getPath(), metadata); return footer; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java index 09f1b26e2..ba6aac943 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java @@ -17,10 +17,11 @@ */ package org.apache.drill.exec.store.parquet; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import java.nio.ByteBuffer; -import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Map; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.OperatorContext; @@ -30,44 +31,42 @@ import org.apache.parquet.bytes.ByteBufferAllocator; /** * {@link ByteBufferAllocator} implementation that uses Drill's {@link BufferAllocator} to allocate and release * {@link ByteBuffer} objects.<br> - * To properly release an allocated {@link ByteBuf}, this class keeps track of it's corresponding {@link ByteBuffer} + * To properly release an allocated {@link DrillBuf}, this class keeps track of it's corresponding {@link ByteBuffer} * that was passed to the Parquet library. */ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class); private final BufferAllocator allocator; - private final HashMap<Key, ByteBuf> allocatedBuffers = new HashMap<>(); + private final Map<ByteBuffer, DrillBuf> allocatedBuffers = new IdentityHashMap<>(); - public ParquetDirectByteBufferAllocator(OperatorContext o){ - allocator = o.getAllocator(); + public ParquetDirectByteBufferAllocator(OperatorContext o) { + this(o.getAllocator()); } public ParquetDirectByteBufferAllocator(BufferAllocator allocator) { this.allocator = allocator; } - @Override public ByteBuffer allocate(int sz) { - ByteBuf bb = allocator.buffer(sz); - ByteBuffer b = bb.nioBuffer(0, sz); - final Key key = new Key(b); - allocatedBuffers.put(key, bb); - logger.debug("ParquetDirectByteBufferAllocator: Allocated {} bytes. Allocated ByteBuffer id: {}", sz, key.hash); - return b; + DrillBuf drillBuf = allocator.buffer(sz); + ByteBuffer byteBuffer = drillBuf.nioBuffer(0, sz); + allocatedBuffers.put(byteBuffer, drillBuf); + logger.debug("{}: Allocated {} bytes. Allocated DrillBuf with id {} and ByteBuffer {}", this, sz, drillBuf.getId(), System.identityHashCode(byteBuffer)); + return byteBuffer; } @Override - public void release(ByteBuffer b) { - final Key key = new Key(b); - final ByteBuf bb = allocatedBuffers.get(key); + public void release(ByteBuffer byteBuffer) { + final DrillBuf drillBuf = allocatedBuffers.remove(byteBuffer); // The ByteBuffer passed in may already have been freed or not allocated by this allocator. // If it is not found in the allocated buffers, do nothing - if(bb != null) { - logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: {}", key.hash); - bb.release(); - allocatedBuffers.remove(key); + if (drillBuf != null) { + logger.debug("{}: Freed DrillBuf with id {} and ByteBuffer {}", this, drillBuf.getId(), System.identityHashCode(byteBuffer)); + drillBuf.release(); + } else { + logger.warn("{}: ByteBuffer {} is not present", this, System.identityHashCode(byteBuffer)); } } @@ -75,41 +74,4 @@ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator { public boolean isDirect() { return true; } - - /** - * ByteBuffer wrapper that computes a fixed hashcode. - * <br><br> - * Parquet only handles {@link ByteBuffer} objects, so we need to use them as keys to keep track of their corresponding - * {@link ByteBuf}, but {@link ByteBuffer} is mutable and it can't be used as a {@link HashMap} key as it is.<br> - * This class solves this by providing a fixed hashcode for {@link ByteBuffer} and uses reference equality in case - * of collisions (we don't need to compare the content of {@link ByteBuffer} because the object passed to - * {@link #release(ByteBuffer)} will be the same object returned from a previous {@link #allocate(int)}. - */ - private class Key { - final int hash; - final ByteBuffer buffer; - - Key(final ByteBuffer buffer) { - this.buffer = buffer; - // remember, we can't use buffer.hashCode() - this.hash = System.identityHashCode(buffer); - } - - @Override - public int hashCode() { - return hash; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof Key)) { - return false; - } - final Key key = (Key) obj; - return hash == key.hash && buffer == key.buffer; - } - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 0e40c9e36..091792660 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -54,8 +54,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.CapacityByteArrayOutputStream; import org.apache.parquet.column.ColumnWriteStore; +import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; import org.apache.parquet.column.impl.ColumnWriteStoreV1; +import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore; import org.apache.parquet.hadoop.ParquetFileWriter; @@ -241,8 +243,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { // once PARQUET-1006 will be resolved pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize, pageSize, new ParquetDirectByteBufferAllocator(oContext)); - store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, enableDictionary, - writerVersion, new ParquetDirectByteBufferAllocator(oContext)); + ParquetProperties parquetProperties = ParquetProperties.builder() + .withPageSize(pageSize) + .withDictionaryEncoding(enableDictionary) + .withDictionaryPageSize(initialPageBufferSize) + .withWriterVersion(writerVersion) + .withAllocator(new ParquetDirectByteBufferAllocator(oContext)) + .withValuesWriterFactory(new DefaultV1ValuesWriterFactory()) + .build(); + store = new ColumnWriteStoreV1(pageStore, parquetProperties); MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema); consumer = columnIO.getRecordWriter(store); setUp(schema, consumer); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index bf75695b6..01d06444b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; +import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import io.netty.buffer.ByteBufUtil; import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream; @@ -30,6 +31,7 @@ import org.apache.drill.exec.util.filereader.DirectBufInputStream; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.Encoding; @@ -250,7 +252,7 @@ class PageReader { } public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) throws IOException { - return BytesInput.from(buf.nioBuffer(offset, length), 0, length); + return BytesInput.from(buf.nioBuffer(offset, length)); } @@ -319,41 +321,44 @@ class PageReader { byteLength = pageHeader.uncompressed_page_size; - final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity()); + final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity())); readPosInBytes = 0; if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) { repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL); - repetitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes); + repetitionLevels.initFromPage(currentPageCount, in); // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we // read the first zero here to simplify the reading processes, and start reading the first value the same as all // of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to // the first list of repetition levels - readPosInBytes = repetitionLevels.getNextOffset(); + readPosInBytes = in.position(); repetitionLevels.readInteger(); } - if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){ + if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) { parentColumnReader.currDefLevel = -1; definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - definitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes); - readPosInBytes = definitionLevels.getNextOffset(); + definitionLevels.initFromPage(currentPageCount, in); + readPosInBytes = in.position(); if (!valueEncoding.usesDictionary()) { valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); - valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes); + valueReader.initFromPage(currentPageCount, in); } } - if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { + if (valueReader == null && parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); - valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes); + valueReader.initFromPage(currentPageCount, in); } if (valueEncoding.usesDictionary()) { // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for // actually copying the values out into the vectors + Preconditions.checkState(readPosInBytes < pageData.capacity()); + int index = (int)readPosInBytes; + ByteBuffer byteBuffer = pageData.nioBuffer(index, pageData.capacity() - index); dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary); - dictionaryLengthDeterminingReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes); + dictionaryLengthDeterminingReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer)); dictionaryValueReader = new DictionaryValuesReader(dictionary); - dictionaryValueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes); + dictionaryValueReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer)); parentColumnReader.usingDictionary = true; } else { parentColumnReader.usingDictionary = false; @@ -445,25 +450,29 @@ class PageReader { * @throws IOException An IO related condition */ void resetDefinitionLevelReader(int skipCount) throws IOException { - if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) { - throw new UnsupportedOperationException("Unsupoorted Operation"); - } + Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel() == 1); + Preconditions.checkState(currentPageCount > 0); + final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding); final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding); - final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity()); - final int defStartPos = repetitionLevels != null ? repetitionLevels.getNextOffset() : 0; + + final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity())); + + if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) { + repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL); + repetitionLevels.initFromPage(currentPageCount, in); + repetitionLevels.readInteger(); + } + definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); parentColumnReader.currDefLevel = -1; // Now reinitialize the underlying decoder - assert currentPageCount > 0 : "Page count should be strictly upper than zero"; - definitionLevels.initFromPage(currentPageCount, pageDataBuffer, defStartPos); + definitionLevels.initFromPage(currentPageCount, in); // Skip values if requested by caller for (int idx = 0; idx < skipCount; ++idx) { definitionLevels.skip(); } } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java index b6205c1ef..385cb8369 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java @@ -66,12 +66,7 @@ final class VarLenBulkPageReader { this.buffer.order(ByteOrder.nativeOrder()); if (pageInfoInput != null) { - this.pageInfo.pageData = pageInfoInput.pageData; - this.pageInfo.pageDataOff = pageInfoInput.pageDataOff; - this.pageInfo.pageDataLen = pageInfoInput.pageDataLen; - this.pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead; - this.pageInfo.definitionLevels = pageInfoInput.definitionLevels; - this.pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader; + set(pageInfoInput, false); } this.columnPrecInfo = columnPrecInfoInput; @@ -87,15 +82,17 @@ final class VarLenBulkPageReader { nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry); } - final void set(PageDataInfo pageInfoInput) { + final void set(PageDataInfo pageInfoInput, boolean clear) { pageInfo.pageData = pageInfoInput.pageData; pageInfo.pageDataOff = pageInfoInput.pageDataOff; pageInfo.pageDataLen = pageInfoInput.pageDataLen; pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead; pageInfo.definitionLevels = pageInfoInput.definitionLevels; pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader; - - buffer.clear(); + pageInfo.numPageValues = pageInfoInput.numPageValues; + if (clear) { + buffer.clear(); + } } final VarLenColumnBulkEntry getEntry(int valuesToRead) { @@ -160,4 +157,4 @@ final class VarLenBulkPageReader { } } -}
\ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java index 8daf2cc1a..1b3073759 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java @@ -204,7 +204,7 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback); } else { - buffPagePayload.set(pageInfo); + buffPagePayload.set(pageInfo, true); } } else { if (buffPagePayload == null) { @@ -567,4 +567,4 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn } -}
\ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index ab655e921..a61cc1832 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -39,17 +39,19 @@ import org.apache.drill.exec.store.parquet.ParquetFormatConfig; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.statistics.Statistics; -import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.OriginalType; @@ -87,6 +89,7 @@ public class Metadata { public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"}; public static final String METADATA_FILENAME = ".drill.parquet_metadata"; public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories"; + public static final String PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED = "parquet.strings.signed-min-max.enabled"; private final ParquetFormatConfig formatConfig; @@ -409,9 +412,16 @@ public class Metadata { final FileStatus file, final FileSystem fs) throws IOException, InterruptedException { final ParquetMetadata metadata; final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI(); + final Configuration conf = new Configuration(fs.getConf()); + final ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder() + .useSignedStringMinMax(true) + .build(); try { - metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>) - () -> ParquetFileReader.readFooter(fs.getConf(), file, ParquetMetadataConverter.NO_FILTER)); + metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> { + try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), parquetReadOptions)) { + return parquetFileReader.getFooter(); + } + }); } catch(Exception e) { logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}", file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java index f208d6e82..1d764b1b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import io.netty.buffer.DrillBuf; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.parquet.hadoop.util.CompatibilityUtil; +import org.apache.parquet.hadoop.util.HadoopStreams; import java.io.Closeable; import java.io.IOException; @@ -179,7 +179,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement int nBytes = 0; if (bytesToRead > 0) { try { - nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead); + nBytes = HadoopStreams.wrap(getInputStream()).read(directBuffer); } catch (Exception e) { logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage()); throw new IOException((e)); @@ -193,8 +193,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement logger.trace( "PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, " + "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", this.streamId, this.startOffset, - this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, ((double) timer.elapsed(TimeUnit.MICROSECONDS)) - / 1000); + this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, + ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java index ae09a3708..ea2542eb8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java @@ -23,7 +23,8 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.parquet.hadoop.util.CompatibilityUtil; +import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.io.SeekableInputStream; import java.io.FilterInputStream; import java.io.IOException; @@ -86,12 +87,16 @@ public class DirectBufInputStream extends FilterInputStream { buf.clear(); ByteBuffer directBuffer = buf.nioBuffer(0, len); int lengthLeftToRead = len; + SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream()); while (lengthLeftToRead > 0) { if(logger.isTraceEnabled()) { logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize); } Stopwatch timer = Stopwatch.createStarted(); - int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead); + int bytesRead = seekableInputStream.read(directBuffer); + if (bytesRead < 0) { + return bytesRead; + } lengthLeftToRead -= bytesRead; if(logger.isTraceEnabled()) { logger.trace( @@ -113,7 +118,7 @@ public class DirectBufInputStream extends FilterInputStream { b.release(); throw e; } - if (bytesRead <= -1) { + if (bytesRead < 0) { b.release(); return null; } diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java index 6e9db7e94..89731ff2a 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java @@ -46,7 +46,7 @@ import org.apache.parquet.format.Util; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; -import org.apache.parquet.hadoop.util.CompatibilityUtil; +import org.apache.parquet.hadoop.util.HadoopStreams; import io.netty.buffer.ByteBuf; @@ -163,12 +163,10 @@ public class ColumnChunkIncReadStore implements PageReadStore { ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size); lastPage = buf; ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); - int lengthLeftToRead = pageHeader.compressed_page_size; - while (lengthLeftToRead > 0) { - lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead); - } + HadoopStreams.wrap(in).readFully(buffer); + buffer.flip(); return new DataPageV1( - decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + decompressor.decompress(BytesInput.from(buffer), pageHeader.getUncompressed_page_size()), pageHeader.data_page_header.num_values, pageHeader.uncompressed_page_size, fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()), @@ -182,28 +180,33 @@ public class ColumnChunkIncReadStore implements PageReadStore { buf = allocator.buffer(pageHeader.compressed_page_size); lastPage = buf; buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); - lengthLeftToRead = pageHeader.compressed_page_size; - while (lengthLeftToRead > 0) { - lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead); - } + HadoopStreams.wrap(in).readFully(buffer); + buffer.flip(); DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2(); int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length(); BytesInput decompressedPageData = decompressor.decompress( - BytesInput.from(buffer, 0, pageHeader.compressed_page_size), + BytesInput.from(buffer), pageHeader.uncompressed_page_size); + ByteBuffer byteBuffer = decompressedPageData.toByteBuffer(); + int limit = byteBuffer.limit(); + byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length()); + BytesInput repetitionLevels = BytesInput.from(byteBuffer.slice()); + byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length()); + byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length()); + BytesInput definitionLevels = BytesInput.from(byteBuffer.slice()); + byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length()); + byteBuffer.limit(limit); + BytesInput data = BytesInput.from(byteBuffer.slice()); + return new DataPageV2( dataHeaderV2.getNum_rows(), dataHeaderV2.getNum_nulls(), dataHeaderV2.getNum_values(), - BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()), - BytesInput.from(decompressedPageData.toByteBuffer(), - dataHeaderV2.getRepetition_levels_byte_length(), - dataHeaderV2.getDefinition_levels_byte_length()), + repetitionLevels, + definitionLevels, parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()), - BytesInput.from(decompressedPageData.toByteBuffer(), - dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(), - dataSize), + data, uncompressedPageSize, fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()), dataHeaderV2.isIs_compressed() diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java index 93f9920bd..0ed224525 100644 --- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java +++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java @@ -17,8 +17,6 @@ */ package org.apache.parquet.hadoop; -import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType; - import java.io.Closeable; import java.io.IOException; import java.util.List; @@ -119,7 +117,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab this.path = path; this.compressor = compressor; this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator); - this.totalStatistics = getStatsBasedOnType(this.path.getType()); + this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType()); } @Override @@ -226,11 +224,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab writer.writeDictionaryPage(dictionaryPage); // tracking the dictionary encoding is handled in writeDictionaryPage } - List<Encoding> encodings = Lists.newArrayList(); - encodings.addAll(rlEncodings); - encodings.addAll(dlEncodings); - encodings.addAll(dataEncodings); - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, encodings); + writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, rlEncodings, dlEncodings, dataEncodings); writer.endColumn(); logger.debug( String.format( |