aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders
diff options
context:
space:
mode:
authorRahul Raj <rajrahul@gmail.com>2018-03-14 12:05:45 +0530
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-04-06 12:01:04 +0300
commit127e4150b9495c465f8c37a534dfd50512013765 (patch)
treede66eb23912ba6b04f54d235f838baef7ffc8d6c /exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders
parent67669a000c19a39510118f20366f3a189809200a (diff)
DRILL-6016: Fix for Error reading INT96 created by Apache Spark
closes #1166
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java27
2 files changed, 33 insertions, 1 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 09cdc5d5a..ba5f1decf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -156,8 +156,13 @@ public class ColumnReaderFactory {
case DOUBLE:
return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
case FIXED_LEN_BYTE_ARRAY:
- case INT96:
return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+ case INT96:
+ if (recordReader.getFragmentContext().getOptions().getOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP).bool_val) {
+ return new ParquetFixedWidthDictionaryReaders.DictionaryBinaryAsTimeStampReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (TimeStampVector) v, schemaElement);
+ } else {
+ return new ParquetFixedWidthDictionaryReaders.DictionaryFixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+ }
default:
throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() );
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
index 5fbac204e..50330465b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -34,6 +34,8 @@ import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.io.api.Binary;
+import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary;
+
public class ParquetFixedWidthDictionaryReaders {
static class DictionaryIntReader extends FixedByteAlignedReader<IntVector> {
@@ -294,6 +296,31 @@ public class ParquetFixedWidthDictionaryReaders {
}
}
+ static class DictionaryBinaryAsTimeStampReader extends FixedByteAlignedReader<TimeStampVector> {
+ DictionaryBinaryAsTimeStampReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, TimeStampVector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ for (int i = 0; i < recordsReadInThisIteration; i++){
+ try {
+ Binary binaryTimeStampValue = pageReader.dictionaryValueReader.readBytes();
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, getDateTimeValueFromBinary(binaryTimeStampValue, true));
+ } catch ( Exception ex) {
+ throw ex;
+ }
+ }
+ }
+ }
+
static class DictionaryFloat4Reader extends FixedByteAlignedReader<Float4Vector> {
DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v,