aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/codegen/templates/NullableValueVectors.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java38
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java53
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java133
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java23
15 files changed, 340 insertions, 119 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index d9eae0fb8..492d49517 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -44,7 +44,7 @@ package org.apache.drill.exec.vector;
* NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
*/
@SuppressWarnings("unused")
-public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector{
+public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector{
private int valueCount;
final UInt1Vector bits;
@@ -96,6 +96,10 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
return values.getData();
}
+ public ${valuesName} getValuesVector() {
+ return values;
+ }
+
<#if type.major == "VarLen">
@Override
public SerializedField getMetadata() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index b240407f6..272a5c352 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -118,7 +118,6 @@ public abstract class ColumnReader<V extends ValueVector> {
readField(recordsToRead);
valuesReadInCurrentPass += recordsReadInThisIteration;
- totalValuesRead += recordsReadInThisIteration;
pageReader.valuesRead += recordsReadInThisIteration;
pageReader.readPosInBytes = readStartInBytes + readLength;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 243744e2a..3d36b64ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -20,8 +20,12 @@ package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.Decimal28SparseVector;
import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
@@ -72,8 +76,19 @@ public class ColumnReaderFactory {
return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
} else{
if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- return new ParquetFixedWidthDictionaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement);
+ switch (columnChunkMetaData.getType()) {
+ case INT32:
+ return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement);
+ case INT64:
+ return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement);
+ case FLOAT:
+ return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement);
+ case DOUBLE:
+ return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement);
+ default:
+ throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() );
+ }
+
} else {
return new FixedByteAlignedReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
fixedLength, v, schemaElement);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 54771e4b7..bfbefdb8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.vector.DateVector;
import org.apache.drill.exec.vector.Decimal28SparseVector;
import org.apache.drill.exec.vector.Decimal38SparseVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
import org.joda.time.DateTimeUtils;
import parquet.column.ColumnDescriptor;
@@ -68,6 +69,29 @@ class FixedByteAlignedReader extends ColumnReader {
(int) readStartInBytes, (int) readLength);
}
+ public static class FixedBinaryReader extends FixedByteAlignedReader {
+ // TODO - replace this with fixed binary type in drill
+ VariableWidthVector castedVector;
+
+ FixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ VariableWidthVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, true, v, schemaElement);
+ castedVector = v;
+ }
+
+ protected void readField(long recordsToReadInThisPass) {
+ // we can use the standard read method to transfer the data
+ super.readField(recordsToReadInThisPass);
+ // TODO - replace this with fixed binary type in drill
+ // now we need to write the lengths of each value
+ int byteLength = dataTypeLengthInBits / 8;
+ for (int i = 0; i < recordsToReadInThisPass; i++) {
+ castedVector.getMutator().setValueLengthSafe(i, byteLength);
+ }
+ }
+
+ }
+
public static abstract class ConvertedReader extends FixedByteAlignedReader {
protected int dataTypeLengthInBytes;
@@ -104,10 +128,20 @@ class FixedByteAlignedReader extends ColumnReader {
@Override
void addNext(int start, int index) {
- dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
- NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, start)
+// dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
+// NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, start)
+ dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytebuf, start)
- ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
}
+
+ // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
+ public static int readIntLittleEndian(ByteBuf in, int offset) {
+ int ch4 = in.getByte(offset) & 0xff;
+ int ch3 = in.getByte(offset + 1) & 0xff;
+ int ch2 = in.getByte(offset + 2) & 0xff;
+ int ch1 = in.getByte(offset + 3) & 0xff;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
}
public static class Decimal28Reader extends ConvertedReader {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index bbff57420..18618e612 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -51,6 +51,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
castedRepeatedVector = (RepeatedFixedWidthVector) valueVector;
this.dataTypeLengthInBytes = dataTypeLengthInBytes;
this.dataReader = dataReader;
+ this.dataReader.pageReader.clear();
this.dataReader.pageReader = this.pageReader;
// this is not in the reset method because it needs to be initialized only for the very first page read
// in all other cases if a read ends at a page boundary we will need to keep track of this flag and not
@@ -209,5 +210,10 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
public int capacity() {
return castedRepeatedVector.getMutator().getDataVector().getData().capacity();
}
+
+ public void clear() {
+ super.clear();
+ dataReader.clear();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index f88d56a4f..557bd9fc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -82,6 +82,10 @@ public class NullableFixedByteAlignedReaders {
for (int i = 0; i < recordsToReadInThisPass; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
}
+ } else {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger());
+ }
}
}
}
@@ -97,8 +101,14 @@ public class NullableFixedByteAlignedReaders {
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
- for (int i = 0; i < recordsToReadInThisPass; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+ if (usingDictionary) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+ }
+ } else {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong());
+ }
}
}
}
@@ -114,8 +124,14 @@ public class NullableFixedByteAlignedReaders {
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
- for (int i = 0; i < recordsToReadInThisPass; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
+ if (usingDictionary) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
+ }
+ } else {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readFloat());
+ }
}
}
}
@@ -131,8 +147,14 @@ public class NullableFixedByteAlignedReaders {
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
- for (int i = 0; i < recordsToReadInThisPass; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
+ if (usingDictionary) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
+ }
+ } else {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readDouble());
+ }
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index ba9ff802d..dc29fbd1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -116,6 +116,8 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten
@Override
protected void readField(long recordsToRead) {
+ // TODO - unlike most implementations of this method, the recordsReadInThisIteration field is not set here
+ // should verify that this is not breaking anything
if (usingDictionary) {
currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
// re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 4165cbda8..639577dfd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -91,10 +91,14 @@ final class PageReader {
List<ByteBuf> allocatedBuffers;
+ // These need to be held throughout reading of the entire column chunk
+ List<ByteBuf> allocatedDictionaryBuffers;
+
PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
throws ExecutionSetupException{
this.parentColumnReader = parentStatus;
allocatedBuffers = new ArrayList<ByteBuf>();
+ allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
long start = columnChunkMetaData.getFirstDataPageOffset();
@@ -108,7 +112,7 @@ final class PageReader {
BytesInput bytesIn;
ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
- allocatedBuffers.add(uncompressedData);
+ allocatedDictionaryBuffers.add(uncompressedData);
if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) {
dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size);
bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
@@ -157,6 +161,7 @@ final class PageReader {
if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
return false;
}
+ clearBuffers();
// next, we need to decompress the bytes
// TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
@@ -168,7 +173,7 @@ final class PageReader {
//TODO: Handle buffer allocation exception
BytesInput bytesIn;
ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
- allocatedBuffers.add(uncompressedData);
+ allocatedDictionaryBuffers.add(uncompressedData);
if( parentColumnReader.columnChunkMetaData.getCodec()== CompressionCodecName.UNCOMPRESSED) {
dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size);
bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
@@ -229,6 +234,7 @@ final class PageReader {
}
pageDataByteArray = DrillBuf.wrapByteBuffer(currentPage.getBytes().toByteBuffer());
+ allocatedBuffers.add(pageDataByteArray);
readPosInBytes = 0;
if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
@@ -244,29 +250,29 @@ final class PageReader {
}
if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
parentColumnReader.currDefLevel = -1;
- if (!currentPage.getValueEncoding().usesDictionary()) {
- parentColumnReader.usingDictionary = false;
- definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
- definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
- readPosInBytes = definitionLevels.getNextOffset();
- if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
- valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
- valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
- }
- } else {
- parentColumnReader.usingDictionary = true;
- definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
- definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
- readPosInBytes = definitionLevels.getNextOffset();
- // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
- // actually copying the values out into the vectors
- dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
- dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
- dictionaryValueReader = new DictionaryValuesReader(dictionary);
- dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
- this.parentColumnReader.usingDictionary = true;
+ definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
+ definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ readPosInBytes = definitionLevels.getNextOffset();
+ if ( ! currentPage.getValueEncoding().usesDictionary()) {
+ valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
+ valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
}
}
+ if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
+ valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
+ valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ }
+ if (currentPage.getValueEncoding().usesDictionary()) {
+ // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
+ // actually copying the values out into the vectors
+ dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
+ dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ dictionaryValueReader = new DictionaryValuesReader(dictionary);
+ dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ parentColumnReader.usingDictionary = true;
+ } else {
+ parentColumnReader.usingDictionary = false;
+ }
// readPosInBytes is used for actually reading the values after we determine how many will fit in the vector
// readyToReadPosInBytes serves a similar purpose for the vector types where we must count up the values that will
// fit one record at a time, such as for variable length data. Both operations must start in the same location after the
@@ -275,13 +281,26 @@ final class PageReader {
return true;
}
+ public void clearBuffers() {
+ for (ByteBuf b : allocatedBuffers) {
+ b.release();
+ }
+ allocatedBuffers.clear();
+ }
+
+ public void clearDictionaryBuffers() {
+ for (ByteBuf b : allocatedDictionaryBuffers) {
+ b.release();
+ }
+ allocatedDictionaryBuffers.clear();
+ }
+
public void clear(){
this.dataReader.clear();
// Free all memory, including fixed length types. (Data is being copied for all types not just var length types)
//if(!this.parentColumnReader.isFixedLength) {
- for (ByteBuf b : allocatedBuffers) {
- b.release();
- }
+ clearBuffers();
+ clearDictionaryBuffers();
//}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
deleted file mode 100644
index ad849b4d0..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store.parquet.columnreaders;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.ValueVector;
-import parquet.column.ColumnDescriptor;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.PrimitiveType;
-
-public class ParquetFixedWidthDictionaryReader extends ColumnReader{
-
- ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- @Override
- public void readField(long recordsToReadInThisPass) {
-
- recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
- - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
- int defLevel;
- for (int i = 0; i < recordsReadInThisIteration; i++){
- defLevel = pageReader.definitionLevels.readInteger();
- // if the value is defined
- if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
- if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64)
- ((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass,
- pageReader.valueReader.readLong() );
- }
- // otherwise the value is skipped, because the bit vector indicating nullability is zero filled
- }
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
new file mode 100644
index 000000000..bad6e6a9d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java
@@ -0,0 +1,133 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.Float4Vector;
+import org.apache.drill.exec.vector.Float8Vector;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+public class ParquetFixedWidthDictionaryReaders {
+
+ static class DictionaryIntReader extends FixedByteAlignedReader {
+
+ IntVector castedVector;
+
+ DictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ castedVector = v;
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ if (usingDictionary) {
+ for (int i = 0; i < recordsReadInThisIteration; i++){
+ castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+ }
+ }
+ }
+ }
+
+ static class DictionaryBigIntReader extends FixedByteAlignedReader {
+
+ BigIntVector castedVector;
+
+ DictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ castedVector = v;
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ for (int i = 0; i < recordsReadInThisIteration; i++){
+ try {
+ castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+ } catch ( Exception ex) {
+ throw ex;
+ }
+ }
+ }
+ }
+
+ static class DictionaryFloat4Reader extends FixedByteAlignedReader {
+
+ Float4Vector castedVector;
+
+ DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ castedVector = v;
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ for (int i = 0; i < recordsReadInThisIteration; i++){
+ castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
+ }
+ }
+ }
+
+ static class DictionaryFloat8Reader extends FixedByteAlignedReader {
+
+ Float8Vector castedVector;
+
+ DictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ castedVector = v;
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+
+ for (int i = 0; i < recordsReadInThisIteration; i++){
+ castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index c72e75030..f3d9e2cd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -60,6 +60,7 @@ import parquet.format.SchemaElement;
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ParquetFileWriter;
+import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.PrimitiveType;
@@ -110,6 +111,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
private final CodecFactoryExposer codecFactoryExposer;
int rowGroupIndex;
+ long totalRecordsRead;
public ParquetRecordReader(FragmentContext fragmentContext, //
String path, //
@@ -223,6 +225,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
// ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
FileMetaData fileMetaData;
+ logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
+ hadoopPath.toUri().getPath());
+ totalRecordsRead = 0;
+
// TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
// store a map from column name to converted types if they are non-null
HashMap<String, SchemaElement> schemaElements = new HashMap<>();
@@ -247,13 +253,11 @@ public class ParquetRecordReader extends AbstractRecordReader {
if (column.getMaxRepetitionLevel() > 0) {
allFieldsFixedLength = false;
}
- // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
- // TODO - implement this when the feature is added upstream
- if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
- bitWidthAllFixedFields += se.getType_length() * 8;
- } else {
- bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
- }
+ if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
+ bitWidthAllFixedFields += se.getType_length() * 8;
+ } else {
+ bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
+ }
} else {
allFieldsFixedLength = false;
}
@@ -287,13 +291,15 @@ public class ParquetRecordReader extends AbstractRecordReader {
v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
if (column.getMaxRepetitionLevel() > 0) {
- ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch,
+ ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
+ column, columnChunkMetaData, recordsPerBatch,
((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement);
varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement));
}
else {
- columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
+ columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
+ column, columnChunkMetaData, recordsPerBatch, v,
schemaElement));
}
} else {
@@ -393,6 +399,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
vv.getMutator().setValueCount( (int) recordsToRead);
}
mockRecordsRead += recordsToRead;
+ totalRecordsRead += recordsToRead;
return (int) recordsToRead;
}
@@ -418,6 +425,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
}
+
+ totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
return firstColumnStatus.getRecordsReadInCurrentPass();
} catch (IOException e) {
throw new DrillRuntimeException(e);
@@ -426,6 +435,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
@Override
public void cleanup() {
+ logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
+ // enable this for debugging when it is know that a whole file will be read
+ // limit kills upstream operators once it has enough records, so this assert will fail
+// assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
for (ColumnReader column : columnStatuses) {
column.clear();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
index 5bba6be67..481b28956 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -95,8 +95,7 @@ public class ParquetToDrillTypeConverter {
case FIXED_LEN_BYTE_ARRAY:
if (convertedType == null) {
checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
+ return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build();
} else if (convertedType == ConvertedType.DECIMAL) {
return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
}
@@ -159,8 +158,7 @@ public class ParquetToDrillTypeConverter {
case FIXED_LEN_BYTE_ARRAY:
if (convertedType == null) {
checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
+ return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build();
} else if (convertedType == ConvertedType.DECIMAL) {
return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
}
@@ -223,8 +221,7 @@ public class ParquetToDrillTypeConverter {
case FIXED_LEN_BYTE_ARRAY:
if (convertedType == null) {
checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
+ return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build();
} else if (convertedType == ConvertedType.DECIMAL) {
return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index ecfa11022..2f3711dc1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -184,13 +184,13 @@ public class VarLengthColumnReaders {
if(index >= varCharVector.getValueCapacity()) return false;
if (usingDictionary) {
- DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
+ DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
int st=0;
- int len=currDictVal.length();
+ int len=currDictValToWrite.length();
VarCharHolder holder = new VarCharHolder();
holder.buffer=b;
holder.start=0;
- holder.end=currDictVal.length();
+ holder.end=currDictValToWrite.length();
success = varCharVector.getMutator().setSafe(index, holder);
}
else {
@@ -230,8 +230,8 @@ public class VarLengthColumnReaders {
if(index >= vector.getValueCapacity()) return false;
if (usingDictionary) {
- DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
- success = mutator.setSafe(index, 1, 0, currDictVal.length(), b);
+ DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
+ success = mutator.setSafe(index, 1, 0, currDictValToWrite.length(), b);
}
else {
success = mutator.setSafe(index, 1, start, start+length, value);
@@ -263,13 +263,13 @@ public class VarLengthColumnReaders {
if(index >= varBinaryVector.getValueCapacity()) return false;
if (usingDictionary) {
- DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
+ DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
int st=0;
- int len=currDictVal.length();
+ int len=currDictValToWrite.length();
VarBinaryHolder holder = new VarBinaryHolder();
holder.buffer=b;
holder.start=0;
- holder.end=currDictVal.length();
+ holder.end=currDictValToWrite.length();
success = varBinaryVector.getMutator().setSafe(index, holder);
}
else {
@@ -307,11 +307,11 @@ public class VarLengthColumnReaders {
if(index >= nullableVarBinaryVector.getValueCapacity()) return false;
if (usingDictionary) {
- DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
+ DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer());
NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
holder.buffer=b;
holder.start=0;
- holder.end=currDictVal.length();
+ holder.end=currDictValToWrite.length();
holder.isSet=1;
success = nullableVarBinaryVector.getMutator().setSafe(index, holder);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
index 829b44a92..4f02c7010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -82,10 +82,17 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe
protected boolean readAndStoreValueSizeInformation() throws IOException {
// re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
- try {
+ if (usingDictionary) {
+ if (currLengthDeterminingDictVal == null) {
+ currLengthDeterminingDictVal = pageReader.dictionaryLengthDeterminingReader.readBytes();
+ }
+ currDictValToWrite = currLengthDeterminingDictVal;
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ dataTypeLengthInBits = currLengthDeterminingDictVal.length();
+ }
+ else {
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) pageReader.readyToReadPosInBytes);
- } catch (Throwable t) {
- throw t;
}
// this should not fail
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java
new file mode 100644
index 000000000..92f60d66e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+public interface NullableVector extends ValueVector{
+
+ public ValueVector getValuesVector();
+}