aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders
diff options
context:
space:
mode:
authorSerhii-Harnyk <serhii.harnyk@gmail.com>2016-11-24 13:24:03 +0000
committerSudheesh Katkam <sudheesh@apache.org>2017-01-30 10:09:39 -0800
commit60624af225f90992a15a707e1650e41ccecf5a53 (patch)
tree6708c9a1dc7b2cf7207f20e89f315834f3117e9c /exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders
parent5c3924c9844f7d25c0798c649bc032a0022b3a3e (diff)
DRILL-4764: Parquet file with INT_16, etc. logical types not supported by simple SELECT
closes #673
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java72
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java9
3 files changed, 92 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 662d5c977..495f70bc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -48,6 +48,8 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.TimeStampVector;
import org.apache.drill.exec.vector.TimeVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.UInt8Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarCharVector;
@@ -121,6 +123,13 @@ public class ColumnReaderFactory {
return new ParquetFixedWidthDictionaryReaders.DictionaryDecimal9Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal9Vector) v, schemaElement);
case TIME_MILLIS:
return new ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement);
+ case INT_8:
+ case INT_16:
+ return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+ case UINT_8:
+ case UINT_16:
+ case UINT_32:
+ return new ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement);
default:
throw new ExecutionSetupException("Unsupported dictionary converted type " + convertedType + " for primitive type INT32");
}
@@ -129,6 +138,8 @@ public class ColumnReaderFactory {
return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
}
switch (convertedType) {
+ case UINT_64:
+ return new ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement);
case DECIMAL:
return new ParquetFixedWidthDictionaryReaders.DictionaryDecimal18Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal18Vector) v, schemaElement);
case TIMESTAMP_MILLIS:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index d7b6fbb53..53a68ab1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -26,6 +26,8 @@ import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.TimeStampVector;
import org.apache.drill.exec.vector.TimeVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.UInt8Vector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
@@ -56,6 +58,41 @@ public class ParquetFixedWidthDictionaryReaders {
}
}
+ /**
+ * This class uses for reading unsigned integer fields.
+ */
+ static class DictionaryUInt4Reader extends FixedByteAlignedReader<UInt4Vector> {
+ DictionaryUInt4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt4Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ if (usingDictionary) {
+ UInt4Vector.Mutator mutator = valueVec.getMutator();
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+ }
+ // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
+ // and we will go into the else condition below. The readField method of the parent class requires the
+ // writer index to be set correctly.
+ readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ readLength = (int) Math.ceil(readLengthInBits / 8.0);
+ int writerIndex = valueVec.getBuffer().writerIndex();
+ valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
+ } else {
+ super.readField(recordsToReadInThisPass);
+ }
+ }
+ }
+
static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> {
DictionaryFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v,
@@ -174,6 +211,41 @@ public class ParquetFixedWidthDictionaryReaders {
}
}
+ /**
+ * This class uses for reading unsigned BigInt fields.
+ */
+ static class DictionaryUInt8Reader extends FixedByteAlignedReader<UInt8Vector> {
+ DictionaryUInt8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt8Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ if (usingDictionary) {
+ UInt8Vector.Mutator mutator = valueVec.getMutator();
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+ }
+ // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding
+ // and we will go into the else condition below. The readField method of the parent class requires the
+ // writer index to be set correctly.
+ readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ readLength = (int) Math.ceil(readLengthInBits / 8.0);
+ int writerIndex = valueVec.getBuffer().writerIndex();
+ valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength);
+ } else {
+ super.readField(recordsToReadInThisPass);
+ }
+ }
+ }
+
static class DictionaryDecimal18Reader extends FixedByteAlignedReader<Decimal18Vector> {
DictionaryDecimal18Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal18Vector v,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index be27f3ebd..3f5f3b275 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -61,6 +61,8 @@ public class ParquetToDrillTypeConverter {
return (TypeProtos.MinorType.BIGINT);
}
switch(convertedType) {
+ case UINT_64:
+ return TypeProtos.MinorType.UINT8;
case DECIMAL:
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return TypeProtos.MinorType.DECIMAL18;
@@ -77,6 +79,13 @@ public class ParquetToDrillTypeConverter {
return TypeProtos.MinorType.INT;
}
switch(convertedType) {
+ case UINT_8:
+ case UINT_16:
+ case UINT_32:
+ return TypeProtos.MinorType.UINT4;
+ case INT_8:
+ case INT_16:
+ return TypeProtos.MinorType.INT;
case DECIMAL:
ParquetReaderUtility.checkDecimalTypeEnabled(options);
return TypeProtos.MinorType.DECIMAL9;