diff options
author | Serhii-Harnyk <serhii.harnyk@gmail.com> | 2016-11-24 13:24:03 +0000 |
---|---|---|
committer | Sudheesh Katkam <sudheesh@apache.org> | 2017-01-30 10:09:39 -0800 |
commit | 60624af225f90992a15a707e1650e41ccecf5a53 (patch) | |
tree | 6708c9a1dc7b2cf7207f20e89f315834f3117e9c /exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders | |
parent | 5c3924c9844f7d25c0798c649bc032a0022b3a3e (diff) |
DRILL-4764: Parquet file with INT_16, etc. logical types not supported by simple SELECT
closes #673
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders')
3 files changed, 92 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 662d5c977..495f70bc5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -48,6 +48,8 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.TimeStampVector; import org.apache.drill.exec.vector.TimeVector; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.UInt8Vector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; @@ -121,6 +123,13 @@ public class ColumnReaderFactory { return new ParquetFixedWidthDictionaryReaders.DictionaryDecimal9Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal9Vector) v, schemaElement); case TIME_MILLIS: return new ParquetFixedWidthDictionaryReaders.DictionaryTimeReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeVector) v, schemaElement); + case INT_8: + case INT_16: + return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement); + case UINT_8: + case UINT_16: + case UINT_32: + return new ParquetFixedWidthDictionaryReaders.DictionaryUInt4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt4Vector) v, schemaElement); default: throw new ExecutionSetupException("Unsupported dictionary converted type " + convertedType + " for primitive type INT32"); } @@ -129,6 +138,8 @@ public class ColumnReaderFactory { return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement); } switch (convertedType) { + case UINT_64: + return new ParquetFixedWidthDictionaryReaders.DictionaryUInt8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (UInt8Vector) v, schemaElement); case DECIMAL: return new ParquetFixedWidthDictionaryReaders.DictionaryDecimal18Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal18Vector) v, schemaElement); case TIMESTAMP_MILLIS: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java index d7b6fbb53..53a68ab1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -26,6 +26,8 @@ import org.apache.drill.exec.vector.Float8Vector; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.TimeStampVector; import org.apache.drill.exec.vector.TimeVector; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.UInt8Vector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.SchemaElement; @@ -56,6 +58,41 @@ public class ParquetFixedWidthDictionaryReaders { } } + /** + * This class uses for reading unsigned integer fields. + */ + static class DictionaryUInt4Reader extends FixedByteAlignedReader<UInt4Vector> { + DictionaryUInt4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt4Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + if (usingDictionary) { + UInt4Vector.Mutator mutator = valueVec.getMutator(); + for (int i = 0; i < recordsReadInThisIteration; i++) { + mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); + } + // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding + // and we will go into the else condition below. The readField method of the parent class requires the + // writer index to be set correctly. + readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; + readLength = (int) Math.ceil(readLengthInBits / 8.0); + int writerIndex = valueVec.getBuffer().writerIndex(); + valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength); + } else { + super.readField(recordsToReadInThisPass); + } + } + } + static class DictionaryFixedBinaryReader extends FixedByteAlignedReader<VarBinaryVector> { DictionaryFixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, VarBinaryVector v, @@ -174,6 +211,41 @@ public class ParquetFixedWidthDictionaryReaders { } } + /** + * This class uses for reading unsigned BigInt fields. + */ + static class DictionaryUInt8Reader extends FixedByteAlignedReader<UInt8Vector> { + DictionaryUInt8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, UInt8Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + if (usingDictionary) { + UInt8Vector.Mutator mutator = valueVec.getMutator(); + for (int i = 0; i < recordsReadInThisIteration; i++) { + mutator.setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + } + // Set the write Index. The next page that gets read might be a page that does not use dictionary encoding + // and we will go into the else condition below. The readField method of the parent class requires the + // writer index to be set correctly. + readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; + readLength = (int) Math.ceil(readLengthInBits / 8.0); + int writerIndex = valueVec.getBuffer().writerIndex(); + valueVec.getBuffer().setIndex(0, writerIndex + (int) readLength); + } else { + super.readField(recordsToReadInThisPass); + } + } + } + static class DictionaryDecimal18Reader extends FixedByteAlignedReader<Decimal18Vector> { DictionaryDecimal18Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Decimal18Vector v, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java index be27f3ebd..3f5f3b275 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java @@ -61,6 +61,8 @@ public class ParquetToDrillTypeConverter { return (TypeProtos.MinorType.BIGINT); } switch(convertedType) { + case UINT_64: + return TypeProtos.MinorType.UINT8; case DECIMAL: ParquetReaderUtility.checkDecimalTypeEnabled(options); return TypeProtos.MinorType.DECIMAL18; @@ -77,6 +79,13 @@ public class ParquetToDrillTypeConverter { return TypeProtos.MinorType.INT; } switch(convertedType) { + case UINT_8: + case UINT_16: + case UINT_32: + return TypeProtos.MinorType.UINT4; + case INT_8: + case INT_16: + return TypeProtos.MinorType.INT; case DECIMAL: ParquetReaderUtility.checkDecimalTypeEnabled(options); return TypeProtos.MinorType.DECIMAL9; |