diff options
author | Rahul Raj <rajrahul@gmail.com> | 2018-03-14 12:05:45 +0530 |
---|---|---|
committer | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2018-04-06 12:01:04 +0300 |
commit | 127e4150b9495c465f8c37a534dfd50512013765 (patch) | |
tree | de66eb23912ba6b04f54d235f838baef7ffc8d6c /exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders | |
parent | 67669a000c19a39510118f20366f3a189809200a (diff) |
DRILL-6016: Fix for Error reading INT96 created by Apache Spark
closes #1166
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders')
2 files changed, 33 insertions, 1 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 09cdc5d5a..ba5f1decf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -156,8 +156,13 @@ public class ColumnReaderFactory { case DOUBLE: return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement); case FIXED_LEN_BYTE_ARRAY: - case INT96: return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); + case INT96: + if (recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) { + return new ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement); + } else { + return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement); + } default: throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() ); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java index 5fbac204e..50330465b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -34,6 +34,8 @@ import org.apache.parquet.format.SchemaElement; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.io.api.Binary; +import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; + public class ParquetFixedWidthDictionaryReaders { static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> { @@ -294,6 +296,31 @@ public class ParquetFixedWidthDictionaryReaders { } } + static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader<TimeStampVector> { + DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPageCount + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + try { + Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes(); + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true)); + } catch ( Exception ex) { + throw ex; + } + } + } + } + static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> { DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v, |