aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org
diff options
context:
space:
mode:
authorVlad Rozov <vrozov@apache.org>2018-05-09 13:24:11 -0700
committerTimothy Farkas <timothyfarkas@apache.org>2018-06-13 17:11:29 -0700
commitac8e69847659582e36c89fd52bb0856ab3bfbd21 (patch)
tree917e881ff0bc693badbf152e629a75505ace8a90 /exec/java-exec/src/main/java/org
parent98dbc3a222990703aebe983883779763e0cdc1e9 (diff)
DRILL-6353: Upgrade Parquet MR dependencies
closes #1259
Diffstat (limited to 'exec/java-exec/src/main/java/org')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java10
16 files changed, 147 insertions, 152 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
index 9e561ad36..ebceefb43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetComparisonPredicate.java
@@ -113,7 +113,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
// can drop when left's max < right's min, or right's max < left's min
final C leftMin = leftStat.genericGetMin();
final C rightMin = rightStat.genericGetMin();
- return leftStat.genericGetMax().compareTo(rightMin) < 0 || rightStat.genericGetMax().compareTo(leftMin) < 0;
+ return (leftStat.compareMaxToValue(rightMin) < 0) || (rightStat.compareMaxToValue(leftMin) < 0);
}) {
@Override
public String toString() {
@@ -132,7 +132,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when left's max <= right's min.
final C rightMin = rightStat.genericGetMin();
- return leftStat.genericGetMax().compareTo(rightMin) <= 0;
+ return leftStat.compareMaxToValue(rightMin) <= 0;
});
}
@@ -146,7 +146,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when left's max < right's min.
final C rightMin = rightStat.genericGetMin();
- return leftStat.genericGetMax().compareTo(rightMin) < 0;
+ return leftStat.compareMaxToValue(rightMin) < 0;
});
}
@@ -160,7 +160,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when right's max <= left's min.
final C leftMin = leftStat.genericGetMin();
- return rightStat.genericGetMax().compareTo(leftMin) <= 0;
+ return rightStat.compareMaxToValue(leftMin) <= 0;
});
}
@@ -173,7 +173,7 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
return new ParquetComparisonPredicate<C>(left, right, (leftStat, rightStat) -> {
// can drop when right's max < left's min.
final C leftMin = leftStat.genericGetMin();
- return rightStat.genericGetMax().compareTo(leftMin) < 0;
+ return rightStat.compareMaxToValue(leftMin) < 0;
});
}
@@ -188,8 +188,8 @@ public class ParquetComparisonPredicate<C extends Comparable<C>> extends Logical
// can drop when there is only one unique value.
final C leftMax = leftStat.genericGetMax();
final C rightMax = rightStat.genericGetMax();
- return leftStat.genericGetMin().compareTo(leftMax) == 0 && rightStat.genericGetMin().compareTo(rightMax) == 0 &&
- leftStat.genericGetMax().compareTo(rightMax) == 0;
+ return leftStat.compareMinToValue(leftMax) == 0 && rightStat.compareMinToValue(rightMax) == 0 &&
+ leftStat.compareMaxToValue(rightMax) == 0;
});
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
index 9b041020f..547dc0670 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetIsPredicate.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.Statistics;
import java.util.ArrayList;
@@ -114,7 +115,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
private static LogicalExpression createIsTruePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if max value is not true or if there are all nulls -> canDrop
- (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) || isAllNulls(exprStat, evaluator.getRowCount())
+ (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() || isAllNulls(exprStat, evaluator.getRowCount())
);
}
@@ -124,7 +125,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
private static LogicalExpression createIsFalsePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if min value is not false or if there are all nulls -> canDrop
- (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) || isAllNulls(exprStat, evaluator.getRowCount())
+ (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() || isAllNulls(exprStat, evaluator.getRowCount())
);
}
@@ -134,7 +135,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
private static LogicalExpression createIsNotTruePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if min value is not false or if there are no nulls -> canDrop
- (exprStat, evaluator) -> !exprStat.genericGetMin().equals(Boolean.FALSE) && hasNoNulls(exprStat)
+ (exprStat, evaluator) -> ((BooleanStatistics)exprStat).getMin() && hasNoNulls(exprStat)
);
}
@@ -144,7 +145,7 @@ public class ParquetIsPredicate<C extends Comparable<C>> extends LogicalExpressi
private static LogicalExpression createIsNotFalsePredicate(LogicalExpression expr) {
return new ParquetIsPredicate<Boolean>(expr,
//if max value is not true or if there are no nulls -> canDrop
- (exprStat, evaluator) -> !exprStat.genericGetMax().equals(Boolean.TRUE) && hasNoNulls(exprStat)
+ (exprStat, evaluator) -> !((BooleanStatistics)exprStat).getMax() && hasNoNulls(exprStat)
);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
index 7ff103644..f804a7b06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/stat/ParquetPredicatesHelper.java
@@ -43,7 +43,7 @@ class ParquetPredicatesHelper {
* False if at least one row is not null.
*/
static boolean isAllNulls(Statistics stat, long rowCount) {
- return stat.getNumNulls() == rowCount;
+ return stat.isNumNullsSet() && stat.getNumNulls() == rowCount;
}
/**
@@ -54,7 +54,7 @@ class ParquetPredicatesHelper {
* False if the parquet file hasn't nulls.
*/
static boolean hasNoNulls(Statistics stat) {
- return stat.getNumNulls() == 0;
+ return !stat.isNumNullsSet() || stat.getNumNulls() == 0;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 6a320b85b..dc09ce1b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -33,11 +33,12 @@ import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -50,6 +51,9 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.drill.exec.store.parquet.metadata.Metadata.PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
public abstract class AbstractParquetScanBatchCreator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
@@ -146,11 +150,15 @@ public abstract class AbstractParquetScanBatchCreator {
protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
private ParquetMetadata readFooter(Configuration conf, String path) throws IOException {
- Configuration newConf = new Configuration(conf);
+ conf = new Configuration(conf);
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
- return ParquetFileReader.readFooter(newConf, new Path(path), ParquetMetadataConverter.NO_FILTER);
+ conf.setBoolean(PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED, true);
+ ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(NO_FILTER).build();
+ try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), conf), options)) {
+ return reader.getFooter();
+ }
}
private boolean isComplex(ParquetMetadata footer) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index dcd40cf91..79294daea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -21,14 +21,13 @@ import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
public class ColumnDataReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
@@ -58,11 +57,7 @@ public class ColumnDataReader {
public void loadPage(DrillBuf target, int pageLength) throws IOException {
target.clear();
- ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
- int lengthLeftToRead = pageLength;
- while (lengthLeftToRead > 0) {
- lengthLeftToRead -= CompatibilityUtil.getBuf(input, directBuffer, lengthLeftToRead);
- }
+ HadoopStreams.wrap(input).read(target.nioBuffer(0, pageLength));
target.writerIndex(pageLength);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
index ea34c7d8b..d1562c48c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FooterGatherer.java
@@ -42,6 +42,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import com.google.common.base.Preconditions;
import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
public class FooterGatherer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FooterGatherer.class);
@@ -160,7 +161,8 @@ public class FooterGatherer {
footerBytes = ArrayUtils.subarray(footerBytes, start, start + size);
}
- ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(new ByteArrayInputStream(footerBytes));
+ final ByteArrayInputStream from = new ByteArrayInputStream(footerBytes);
+ ParquetMetadata metadata = ParquetFormatPlugin.parquetMetadataConverter.readParquetMetadata(from, NO_FILTER);
Footer footer = new Footer(status.getPath(), metadata);
return footer;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 09f1b26e2..ba6aac943 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -17,10 +17,11 @@
*/
package org.apache.drill.exec.store.parquet;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Map;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
@@ -30,44 +31,42 @@ import org.apache.parquet.bytes.ByteBufferAllocator;
/**
* {@link ByteBufferAllocator} implementation that uses Drill's {@link BufferAllocator} to allocate and release
* {@link ByteBuffer} objects.<br>
- * To properly release an allocated {@link ByteBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
+ * To properly release an allocated {@link DrillBuf}, this class keeps track of it's corresponding {@link ByteBuffer}
* that was passed to the Parquet library.
*/
public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
private final BufferAllocator allocator;
- private final HashMap<Key, ByteBuf> allocatedBuffers = new HashMap<>();
+ private final Map<ByteBuffer, DrillBuf> allocatedBuffers = new IdentityHashMap<>();
- public ParquetDirectByteBufferAllocator(OperatorContext o){
- allocator = o.getAllocator();
+ public ParquetDirectByteBufferAllocator(OperatorContext o) {
+ this(o.getAllocator());
}
public ParquetDirectByteBufferAllocator(BufferAllocator allocator) {
this.allocator = allocator;
}
-
@Override
public ByteBuffer allocate(int sz) {
- ByteBuf bb = allocator.buffer(sz);
- ByteBuffer b = bb.nioBuffer(0, sz);
- final Key key = new Key(b);
- allocatedBuffers.put(key, bb);
- logger.debug("ParquetDirectByteBufferAllocator: Allocated {} bytes. Allocated ByteBuffer id: {}", sz, key.hash);
- return b;
+ DrillBuf drillBuf = allocator.buffer(sz);
+ ByteBuffer byteBuffer = drillBuf.nioBuffer(0, sz);
+ allocatedBuffers.put(byteBuffer, drillBuf);
+ logger.debug("{}: Allocated {} bytes. Allocated DrillBuf with id {} and ByteBuffer {}", this, sz, drillBuf.getId(), System.identityHashCode(byteBuffer));
+ return byteBuffer;
}
@Override
- public void release(ByteBuffer b) {
- final Key key = new Key(b);
- final ByteBuf bb = allocatedBuffers.get(key);
+ public void release(ByteBuffer byteBuffer) {
+ final DrillBuf drillBuf = allocatedBuffers.remove(byteBuffer);
// The ByteBuffer passed in may already have been freed or not allocated by this allocator.
// If it is not found in the allocated buffers, do nothing
- if(bb != null) {
- logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: {}", key.hash);
- bb.release();
- allocatedBuffers.remove(key);
+ if (drillBuf != null) {
+ logger.debug("{}: Freed DrillBuf with id {} and ByteBuffer {}", this, drillBuf.getId(), System.identityHashCode(byteBuffer));
+ drillBuf.release();
+ } else {
+ logger.warn("{}: ByteBuffer {} is not present", this, System.identityHashCode(byteBuffer));
}
}
@@ -75,41 +74,4 @@ public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
public boolean isDirect() {
return true;
}
-
- /**
- * ByteBuffer wrapper that computes a fixed hashcode.
- * <br><br>
- * Parquet only handles {@link ByteBuffer} objects, so we need to use them as keys to keep track of their corresponding
- * {@link ByteBuf}, but {@link ByteBuffer} is mutable and it can't be used as a {@link HashMap} key as it is.<br>
- * This class solves this by providing a fixed hashcode for {@link ByteBuffer} and uses reference equality in case
- * of collisions (we don't need to compare the content of {@link ByteBuffer} because the object passed to
- * {@link #release(ByteBuffer)} will be the same object returned from a previous {@link #allocate(int)}.
- */
- private class Key {
- final int hash;
- final ByteBuffer buffer;
-
- Key(final ByteBuffer buffer) {
- this.buffer = buffer;
- // remember, we can't use buffer.hashCode()
- this.hash = System.identityHashCode(buffer);
- }
-
- @Override
- public int hashCode() {
- return hash;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof Key)) {
- return false;
- }
- final Key key = (Key) obj;
- return hash == key.hash && buffer == key.buffer;
- }
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 0e40c9e36..091792660 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -54,8 +54,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.ColumnWriteStore;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -241,8 +243,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
// once PARQUET-1006 will be resolved
pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema, initialSlabSize,
pageSize, new ParquetDirectByteBufferAllocator(oContext));
- store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, enableDictionary,
- writerVersion, new ParquetDirectByteBufferAllocator(oContext));
+ ParquetProperties parquetProperties = ParquetProperties.builder()
+ .withPageSize(pageSize)
+ .withDictionaryEncoding(enableDictionary)
+ .withDictionaryPageSize(initialPageBufferSize)
+ .withWriterVersion(writerVersion)
+ .withAllocator(new ParquetDirectByteBufferAllocator(oContext))
+ .withValuesWriterFactory(new DefaultV1ValuesWriterFactory())
+ .build();
+ store = new ColumnWriteStoreV1(pageStore, parquetProperties);
MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store);
setUp(schema, consumer);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index bf75695b6..01d06444b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
+import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBufUtil;
import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.util.filereader.DirectBufInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
@@ -250,7 +252,7 @@ class PageReader {
}
public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) throws IOException {
- return BytesInput.from(buf.nioBuffer(offset, length), 0, length);
+ return BytesInput.from(buf.nioBuffer(offset, length));
}
@@ -319,41 +321,44 @@ class PageReader {
byteLength = pageHeader.uncompressed_page_size;
- final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
+ final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
readPosInBytes = 0;
if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
- repetitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ repetitionLevels.initFromPage(currentPageCount, in);
// we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
// a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
// read the first zero here to simplify the reading processes, and start reading the first value the same as all
// of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to
// the first list of repetition levels
- readPosInBytes = repetitionLevels.getNextOffset();
+ readPosInBytes = in.position();
repetitionLevels.readInteger();
}
- if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
+ if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
parentColumnReader.currDefLevel = -1;
definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
- definitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
- readPosInBytes = definitionLevels.getNextOffset();
+ definitionLevels.initFromPage(currentPageCount, in);
+ readPosInBytes = in.position();
if (!valueEncoding.usesDictionary()) {
valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
- valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ valueReader.initFromPage(currentPageCount, in);
}
}
- if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+ if (valueReader == null && parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
- valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ valueReader.initFromPage(currentPageCount, in);
}
if (valueEncoding.usesDictionary()) {
// initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
// actually copying the values out into the vectors
+ Preconditions.checkState(readPosInBytes < pageData.capacity());
+ int index = (int)readPosInBytes;
+ ByteBuffer byteBuffer = pageData.nioBuffer(index, pageData.capacity() - index);
dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
- dictionaryLengthDeterminingReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ dictionaryLengthDeterminingReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer));
dictionaryValueReader = new DictionaryValuesReader(dictionary);
- dictionaryValueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
+ dictionaryValueReader.initFromPage(currentPageCount, ByteBufferInputStream.wrap(byteBuffer));
parentColumnReader.usingDictionary = true;
} else {
parentColumnReader.usingDictionary = false;
@@ -445,25 +450,29 @@ class PageReader {
* @throws IOException An IO related condition
*/
void resetDefinitionLevelReader(int skipCount) throws IOException {
- if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0) {
- throw new UnsupportedOperationException("Unsupoorted Operation");
- }
+ Preconditions.checkState(parentColumnReader.columnDescriptor.getMaxDefinitionLevel() == 1);
+ Preconditions.checkState(currentPageCount > 0);
+ final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
- final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
- final int defStartPos = repetitionLevels != null ? repetitionLevels.getNextOffset() : 0;
+
+ final ByteBufferInputStream in = ByteBufferInputStream.wrap(pageData.nioBuffer(0, pageData.capacity()));
+
+ if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
+ repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+ repetitionLevels.initFromPage(currentPageCount, in);
+ repetitionLevels.readInteger();
+ }
+
definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
parentColumnReader.currDefLevel = -1;
// Now reinitialize the underlying decoder
- assert currentPageCount > 0 : "Page count should be strictly upper than zero";
- definitionLevels.initFromPage(currentPageCount, pageDataBuffer, defStartPos);
+ definitionLevels.initFromPage(currentPageCount, in);
// Skip values if requested by caller
for (int idx = 0; idx < skipCount; ++idx) {
definitionLevels.skip();
}
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
index b6205c1ef..385cb8369 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBulkPageReader.java
@@ -66,12 +66,7 @@ final class VarLenBulkPageReader {
this.buffer.order(ByteOrder.nativeOrder());
if (pageInfoInput != null) {
- this.pageInfo.pageData = pageInfoInput.pageData;
- this.pageInfo.pageDataOff = pageInfoInput.pageDataOff;
- this.pageInfo.pageDataLen = pageInfoInput.pageDataLen;
- this.pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
- this.pageInfo.definitionLevels = pageInfoInput.definitionLevels;
- this.pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
+ set(pageInfoInput, false);
}
this.columnPrecInfo = columnPrecInfoInput;
@@ -87,15 +82,17 @@ final class VarLenBulkPageReader {
nullableDictionaryReader = new VarLenNullableDictionaryReader(buffer, pageInfo, columnPrecInfo, entry);
}
- final void set(PageDataInfo pageInfoInput) {
+ final void set(PageDataInfo pageInfoInput, boolean clear) {
pageInfo.pageData = pageInfoInput.pageData;
pageInfo.pageDataOff = pageInfoInput.pageDataOff;
pageInfo.pageDataLen = pageInfoInput.pageDataLen;
pageInfo.numPageFieldsRead = pageInfoInput.numPageFieldsRead;
pageInfo.definitionLevels = pageInfoInput.definitionLevels;
pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
-
- buffer.clear();
+ pageInfo.numPageValues = pageInfoInput.numPageValues;
+ if (clear) {
+ buffer.clear();
+ }
}
final VarLenColumnBulkEntry getEntry(int valuesToRead) {
@@ -160,4 +157,4 @@ final class VarLenBulkPageReader {
}
}
-} \ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
index 8daf2cc1a..1b3073759 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenColumnBulkInput.java
@@ -204,7 +204,7 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
buffPagePayload = new VarLenBulkPageReader(pageInfo, columnPrecInfo, callback);
} else {
- buffPagePayload.set(pageInfo);
+ buffPagePayload.set(pageInfo, true);
}
} else {
if (buffPagePayload == null) {
@@ -567,4 +567,4 @@ final class VarLenColumnBulkInput<V extends ValueVector> implements VarLenBulkIn
}
-} \ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index ab655e921..a61cc1832 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -39,17 +39,19 @@ import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
@@ -87,6 +89,7 @@ public class Metadata {
public static final String[] OLD_METADATA_FILENAMES = {".drill.parquet_metadata.v2"};
public static final String METADATA_FILENAME = ".drill.parquet_metadata";
public static final String METADATA_DIRECTORIES_FILENAME = ".drill.parquet_metadata_directories";
+ public static final String PARQUET_STRINGS_SIGNED_MIN_MAX_ENABLED = "parquet.strings.signed-min-max.enabled";
private final ParquetFormatConfig formatConfig;
@@ -409,9 +412,16 @@ public class Metadata {
final FileStatus file, final FileSystem fs) throws IOException, InterruptedException {
final ParquetMetadata metadata;
final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI();
+ final Configuration conf = new Configuration(fs.getConf());
+ final ParquetReadOptions parquetReadOptions = ParquetReadOptions.builder()
+ .useSignedStringMinMax(true)
+ .build();
try {
- metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)
- () -> ParquetFileReader.readFooter(fs.getConf(), file, ParquetMetadataConverter.NO_FILTER));
+ metadata = processUserUgi.doAs((PrivilegedExceptionAction<ParquetMetadata>)() -> {
+ try (ParquetFileReader parquetFileReader = ParquetFileReader.open(HadoopInputFile.fromStatus(file, conf), parquetReadOptions)) {
+ return parquetFileReader.getFooter();
+ }
+ });
} catch(Exception e) {
logger.error("Exception while reading footer of parquet file [Details - path: {}, owner: {}] as process user {}",
file.getPath(), file.getOwner(), processUserUgi.getShortUserName(), e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index f208d6e82..1d764b1b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
import java.io.Closeable;
import java.io.IOException;
@@ -179,7 +179,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
int nBytes = 0;
if (bytesToRead > 0) {
try {
- nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead);
+ nBytes = HadoopStreams.wrap(getInputStream()).read(directBuffer);
} catch (Exception e) {
logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage());
throw new IOException((e));
@@ -193,8 +193,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
logger.trace(
"PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
+ "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", this.streamId, this.startOffset,
- this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, ((double) timer.elapsed(TimeUnit.MICROSECONDS))
- / 1000);
+ this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer,
+ ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
index ae09a3708..ea2542eb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -23,7 +23,8 @@ import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
@@ -86,12 +87,16 @@ public class DirectBufInputStream extends FilterInputStream {
buf.clear();
ByteBuffer directBuffer = buf.nioBuffer(0, len);
int lengthLeftToRead = len;
+ SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream());
while (lengthLeftToRead > 0) {
if(logger.isTraceEnabled()) {
logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
}
Stopwatch timer = Stopwatch.createStarted();
- int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead);
+ int bytesRead = seekableInputStream.read(directBuffer);
+ if (bytesRead < 0) {
+ return bytesRead;
+ }
lengthLeftToRead -= bytesRead;
if(logger.isTraceEnabled()) {
logger.trace(
@@ -113,7 +118,7 @@ public class DirectBufInputStream extends FilterInputStream {
b.release();
throw e;
}
- if (bytesRead <= -1) {
+ if (bytesRead < 0) {
b.release();
return null;
}
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 6e9db7e94..89731ff2a 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -46,7 +46,7 @@ import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
import io.netty.buffer.ByteBuf;
@@ -163,12 +163,10 @@ public class ColumnChunkIncReadStore implements PageReadStore {
ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
lastPage = buf;
ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
- int lengthLeftToRead = pageHeader.compressed_page_size;
- while (lengthLeftToRead > 0) {
- lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
- }
+ HadoopStreams.wrap(in).readFully(buffer);
+ buffer.flip();
return new DataPageV1(
- decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+ decompressor.decompress(BytesInput.from(buffer), pageHeader.getUncompressed_page_size()),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
@@ -182,28 +180,33 @@ public class ColumnChunkIncReadStore implements PageReadStore {
buf = allocator.buffer(pageHeader.compressed_page_size);
lastPage = buf;
buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
- lengthLeftToRead = pageHeader.compressed_page_size;
- while (lengthLeftToRead > 0) {
- lengthLeftToRead -= CompatibilityUtil.getBuf(in, buffer, lengthLeftToRead);
- }
+ HadoopStreams.wrap(in).readFully(buffer);
+ buffer.flip();
DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
BytesInput decompressedPageData =
decompressor.decompress(
- BytesInput.from(buffer, 0, pageHeader.compressed_page_size),
+ BytesInput.from(buffer),
pageHeader.uncompressed_page_size);
+ ByteBuffer byteBuffer = decompressedPageData.toByteBuffer();
+ int limit = byteBuffer.limit();
+ byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length());
+ BytesInput repetitionLevels = BytesInput.from(byteBuffer.slice());
+ byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length());
+ byteBuffer.limit(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length());
+ BytesInput definitionLevels = BytesInput.from(byteBuffer.slice());
+ byteBuffer.position(dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length());
+ byteBuffer.limit(limit);
+ BytesInput data = BytesInput.from(byteBuffer.slice());
+
return new DataPageV2(
dataHeaderV2.getNum_rows(),
dataHeaderV2.getNum_nulls(),
dataHeaderV2.getNum_values(),
- BytesInput.from(decompressedPageData.toByteBuffer(), 0, dataHeaderV2.getRepetition_levels_byte_length()),
- BytesInput.from(decompressedPageData.toByteBuffer(),
- dataHeaderV2.getRepetition_levels_byte_length(),
- dataHeaderV2.getDefinition_levels_byte_length()),
+ repetitionLevels,
+ definitionLevels,
parquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
- BytesInput.from(decompressedPageData.toByteBuffer(),
- dataHeaderV2.getRepetition_levels_byte_length() + dataHeaderV2.getDefinition_levels_byte_length(),
- dataSize),
+ data,
uncompressedPageSize,
fromParquetStatistics(dataHeaderV2.getStatistics(), columnDescriptor.getType()),
dataHeaderV2.isIs_compressed()
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
index 93f9920bd..0ed224525 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ParquetColumnChunkPageWriteStore.java
@@ -17,8 +17,6 @@
*/
package org.apache.parquet.hadoop;
-import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
@@ -119,7 +117,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
this.path = path;
this.compressor = compressor;
this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
- this.totalStatistics = getStatsBasedOnType(this.path.getType());
+ this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType());
}
@Override
@@ -226,11 +224,7 @@ public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeab
writer.writeDictionaryPage(dictionaryPage);
// tracking the dictionary encoding is handled in writeDictionaryPage
}
- List<Encoding> encodings = Lists.newArrayList();
- encodings.addAll(rlEncodings);
- encodings.addAll(dlEncodings);
- encodings.addAll(dataEncodings);
- writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, encodings);
+ writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, rlEncodings, dlEncodings, dataEncodings);
writer.endColumn();
logger.debug(
String.format(