diff options
author | Jason Altekruse <altekrusejason@gmail.com> | 2014-08-25 14:56:27 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-08-29 18:31:11 -0700 |
commit | c1fcb6528cf1d037df91d54629f2c71902cb25cc (patch) | |
tree | 223ecbeb12731e19b1e01ef2543c80e9bad08117 /exec/java-exec/src/main | |
parent | 42bbf6fdd833b786374d790f3f40320ae75fcb7b (diff) |
DRILL-1307: add support for fixed binary columns in parquet reader.
DRILL-1314: Fix issue reading impala produced files
DRILL-1304: Regression selecting a single column from a parquet file.
Fixed issue with var length dictionary reading.
Reduced memory usage by freeing buffers after we finish reading a page (except for dictionary pages which need to be kept in memory until the entire row group has been read)
Rebased onto merge branch.
Successfully backed out the changes that had changed the structure of the nullable column readers. This re-introduced some redundancy but avoided a bug that was holding up the release. Ended up falling back on the higher level reader API, only in the case where we are reading a dictionary column and then the next page is not dictionary encoded. This can be fixed to use the optimized read instead, but it doesn't conform to the overall structure of the current reader and is a bit difficult to fix.
Diffstat (limited to 'exec/java-exec/src/main')
15 files changed, 340 insertions, 119 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java index d9eae0fb8..492d49517 100644 --- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java @@ -44,7 +44,7 @@ package org.apache.drill.exec.vector; * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker. */ @SuppressWarnings("unused") -public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector{ +public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector{ private int valueCount; final UInt1Vector bits; @@ -96,6 +96,10 @@ public final class ${className} extends BaseValueVector implements <#if type.maj return values.getData(); } + public ${valuesName} getValuesVector() { + return values; + } + <#if type.major == "VarLen"> @Override public SerializedField getMetadata() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java index b240407f6..272a5c352 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java @@ -118,7 +118,6 @@ public abstract class ColumnReader<V extends ValueVector> { readField(recordsToRead); valuesReadInCurrentPass += recordsReadInThisIteration; - totalValuesRead += recordsReadInThisIteration; pageReader.valuesRead += recordsReadInThisIteration; pageReader.readPosInBytes = readStartInBytes + readLength; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 243744e2a..3d36b64ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -20,8 +20,12 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.Decimal28SparseVector; import org.apache.drill.exec.vector.Decimal38SparseVector; +import org.apache.drill.exec.vector.Float4Vector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.NullableBigIntVector; import org.apache.drill.exec.vector.NullableDecimal28SparseVector; import org.apache.drill.exec.vector.NullableDecimal38SparseVector; @@ -72,8 +76,19 @@ public class ColumnReaderFactory { return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } else{ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { - return new ParquetFixedWidthDictionaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement); + switch (columnChunkMetaData.getType()) { + case INT32: + return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement); + case INT64: + return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement); + case FLOAT: + return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement); + case DOUBLE: + return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement); + default: + throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() ); + } + } else { return new FixedByteAlignedReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java index 54771e4b7..bfbefdb8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.vector.DateVector; import org.apache.drill.exec.vector.Decimal28SparseVector; import org.apache.drill.exec.vector.Decimal38SparseVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VariableWidthVector; import org.joda.time.DateTimeUtils; import parquet.column.ColumnDescriptor; @@ -68,6 +69,29 @@ class FixedByteAlignedReader extends ColumnReader { (int) readStartInBytes, (int) readLength); } + public static class FixedBinaryReader extends FixedByteAlignedReader { + // TODO - replace this with fixed binary type in drill + VariableWidthVector castedVector; + + FixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + VariableWidthVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, true, v, schemaElement); + castedVector = v; + } + + protected void readField(long recordsToReadInThisPass) { + // we can use the standard read method to transfer the data + super.readField(recordsToReadInThisPass); + // TODO - replace this with fixed binary type in drill + // now we need to write the lengths of each value + int byteLength = dataTypeLengthInBits / 8; + for (int i = 0; i < recordsToReadInThisPass; i++) { + castedVector.getMutator().setValueLengthSafe(i, byteLength); + } + } + + } + public static abstract class ConvertedReader extends FixedByteAlignedReader { protected int dataTypeLengthInBytes; @@ -104,10 +128,20 @@ class FixedByteAlignedReader extends ColumnReader { @Override void addNext(int start, int index) { - dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay( - NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, start) +// dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay( +// NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, start) + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytebuf, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); } + + // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared + public static int readIntLittleEndian(ByteBuf in, int offset) { + int ch4 = in.getByte(offset) & 0xff; + int ch3 = in.getByte(offset + 1) & 0xff; + int ch2 = in.getByte(offset + 2) & 0xff; + int ch1 = in.getByte(offset + 3) & 0xff; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } } public static class Decimal28Reader extends ConvertedReader { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java index bbff57420..18618e612 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java @@ -51,6 +51,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { castedRepeatedVector = (RepeatedFixedWidthVector) valueVector; this.dataTypeLengthInBytes = dataTypeLengthInBytes; this.dataReader = dataReader; + this.dataReader.pageReader.clear(); this.dataReader.pageReader = this.pageReader; // this is not in the reset method because it needs to be initialized only for the very first page read // in all other cases if a read ends at a page boundary we will need to keep track of this flag and not @@ -209,5 +210,10 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { public int capacity() { return castedRepeatedVector.getMutator().getDataVector().getData().capacity(); } + + public void clear() { + super.clear(); + dataReader.clear(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index f88d56a4f..557bd9fc7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -82,6 +82,10 @@ public class NullableFixedByteAlignedReaders { for (int i = 0; i < recordsToReadInThisPass; i++){ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger()); + } } } } @@ -97,8 +101,14 @@ public class NullableFixedByteAlignedReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - for (int i = 0; i < recordsToReadInThisPass; i++){ - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong()); + } } } } @@ -114,8 +124,14 @@ public class NullableFixedByteAlignedReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - for (int i = 0; i < recordsToReadInThisPass; i++){ - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readFloat()); + } } } } @@ -131,8 +147,14 @@ public class NullableFixedByteAlignedReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - for (int i = 0; i < recordsToReadInThisPass; i++){ - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readDouble()); + } } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java index ba9ff802d..dc29fbd1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java @@ -116,6 +116,8 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten @Override protected void readField(long recordsToRead) { + // TODO - unlike most implementations of this method, the recordsReadInThisIteration field is not set here + // should verify that this is not breaking anything if (usingDictionary) { currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 4165cbda8..639577dfd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -91,10 +91,14 @@ final class PageReader { List<ByteBuf> allocatedBuffers; + // These need to be held throughout reading of the entire column chunk + List<ByteBuf> allocatedDictionaryBuffers; + PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{ this.parentColumnReader = parentStatus; allocatedBuffers = new ArrayList<ByteBuf>(); + allocatedDictionaryBuffers = new ArrayList<ByteBuf>(); long totalByteLength = columnChunkMetaData.getTotalUncompressedSize(); long start = columnChunkMetaData.getFirstDataPageOffset(); @@ -108,7 +112,7 @@ final class PageReader { BytesInput bytesIn; ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size()); - allocatedBuffers.add(uncompressedData); + allocatedDictionaryBuffers.add(uncompressedData); if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) { dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size); bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData, @@ -157,6 +161,7 @@ final class PageReader { if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { return false; } + clearBuffers(); // next, we need to decompress the bytes // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one @@ -168,7 +173,7 @@ final class PageReader { //TODO: Handle buffer allocation exception BytesInput bytesIn; ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size()); - allocatedBuffers.add(uncompressedData); + allocatedDictionaryBuffers.add(uncompressedData); if( parentColumnReader.columnChunkMetaData.getCodec()== CompressionCodecName.UNCOMPRESSED) { dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size); bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData, @@ -229,6 +234,7 @@ final class PageReader { } pageDataByteArray = DrillBuf.wrapByteBuffer(currentPage.getBytes().toByteBuffer()); + allocatedBuffers.add(pageDataByteArray); readPosInBytes = 0; if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) { @@ -244,29 +250,29 @@ final class PageReader { } if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){ parentColumnReader.currDefLevel = -1; - if (!currentPage.getValueEncoding().usesDictionary()) { - parentColumnReader.usingDictionary = false; - definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - readPosInBytes = definitionLevels.getNextOffset(); - if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { - valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); - valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - } - } else { - parentColumnReader.usingDictionary = true; - definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - readPosInBytes = definitionLevels.getNextOffset(); - // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for - // actually copying the values out into the vectors - dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary); - dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - dictionaryValueReader = new DictionaryValuesReader(dictionary); - dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - this.parentColumnReader.usingDictionary = true; + definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); + readPosInBytes = definitionLevels.getNextOffset(); + if ( ! currentPage.getValueEncoding().usesDictionary()) { + valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); } } + if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { + valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); + } + if (currentPage.getValueEncoding().usesDictionary()) { + // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for + // actually copying the values out into the vectors + dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary); + dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); + dictionaryValueReader = new DictionaryValuesReader(dictionary); + dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); + parentColumnReader.usingDictionary = true; + } else { + parentColumnReader.usingDictionary = false; + } // readPosInBytes is used for actually reading the values after we determine how many will fit in the vector // readyToReadPosInBytes serves a similar purpose for the vector types where we must count up the values that will // fit one record at a time, such as for variable length data. Both operations must start in the same location after the @@ -275,13 +281,26 @@ final class PageReader { return true; } + public void clearBuffers() { + for (ByteBuf b : allocatedBuffers) { + b.release(); + } + allocatedBuffers.clear(); + } + + public void clearDictionaryBuffers() { + for (ByteBuf b : allocatedDictionaryBuffers) { + b.release(); + } + allocatedDictionaryBuffers.clear(); + } + public void clear(){ this.dataReader.clear(); // Free all memory, including fixed length types. (Data is being copied for all types not just var length types) //if(!this.parentColumnReader.isFixedLength) { - for (ByteBuf b : allocatedBuffers) { - b.release(); - } + clearBuffers(); + clearDictionaryBuffers(); //} } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java deleted file mode 100644 index ad849b4d0..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java +++ /dev/null @@ -1,53 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ******************************************************************************/ -package org.apache.drill.exec.store.parquet.columnreaders; - -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.vector.BigIntVector; -import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.schema.PrimitiveType; - -public class ParquetFixedWidthDictionaryReader extends ColumnReader{ - - ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - } - - @Override - public void readField(long recordsToReadInThisPass) { - - recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() - - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - int defLevel; - for (int i = 0; i < recordsReadInThisIteration; i++){ - defLevel = pageReader.definitionLevels.readInteger(); - // if the value is defined - if (defLevel == columnDescriptor.getMaxDefinitionLevel()){ - if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) - ((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass, - pageReader.valueReader.readLong() ); - } - // otherwise the value is skipped, because the bit vector indicating nullability is zero filled - } - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java new file mode 100644 index 000000000..bad6e6a9d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -0,0 +1,133 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.Float4Vector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; + +public class ParquetFixedWidthDictionaryReaders { + + static class DictionaryIntReader extends FixedByteAlignedReader { + + IntVector castedVector; + + DictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedVector = v; + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + if (usingDictionary) { + for (int i = 0; i < recordsReadInThisIteration; i++){ + castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); + } + } + } + } + + static class DictionaryBigIntReader extends FixedByteAlignedReader { + + BigIntVector castedVector; + + DictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedVector = v; + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + try { + castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + } catch ( Exception ex) { + throw ex; + } + } + } + } + + static class DictionaryFloat4Reader extends FixedByteAlignedReader { + + Float4Vector castedVector; + + DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedVector = v; + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + } + } + } + + static class DictionaryFloat8Reader extends FixedByteAlignedReader { + + Float8Vector castedVector; + + DictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedVector = v; + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index c72e75030..f3d9e2cd2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -60,6 +60,7 @@ import parquet.format.SchemaElement; import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ParquetFileWriter; +import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.PrimitiveType; @@ -110,6 +111,7 @@ public class ParquetRecordReader extends AbstractRecordReader { private final CodecFactoryExposer codecFactoryExposer; int rowGroupIndex; + long totalRecordsRead; public ParquetRecordReader(FragmentContext fragmentContext, // String path, // @@ -223,6 +225,10 @@ public class ParquetRecordReader extends AbstractRecordReader { // ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); FileMetaData fileMetaData; + logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(), + hadoopPath.toUri().getPath()); + totalRecordsRead = 0; + // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below // store a map from column name to converted types if they are non-null HashMap<String, SchemaElement> schemaElements = new HashMap<>(); @@ -247,13 +253,11 @@ public class ParquetRecordReader extends AbstractRecordReader { if (column.getMaxRepetitionLevel() > 0) { allFieldsFixedLength = false; } - // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder - // TODO - implement this when the feature is added upstream - if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){ - bitWidthAllFixedFields += se.getType_length() * 8; - } else { - bitWidthAllFixedFields += getTypeLengthInBits(column.getType()); - } + if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){ + bitWidthAllFixedFields += se.getType_length() * 8; + } else { + bitWidthAllFixedFields += getTypeLengthInBits(column.getType()); + } } else { allFieldsFixedLength = false; } @@ -287,13 +291,15 @@ public class ParquetRecordReader extends AbstractRecordReader { v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { if (column.getMaxRepetitionLevel() > 0) { - ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, + ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, + column, columnChunkMetaData, recordsPerBatch, ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement); varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader, getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement)); } else { - columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v, + columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, + column, columnChunkMetaData, recordsPerBatch, v, schemaElement)); } } else { @@ -393,6 +399,7 @@ public class ParquetRecordReader extends AbstractRecordReader { vv.getMutator().setValueCount( (int) recordsToRead); } mockRecordsRead += recordsToRead; + totalRecordsRead += recordsToRead; return (int) recordsToRead; } @@ -418,6 +425,8 @@ public class ParquetRecordReader extends AbstractRecordReader { } } + + totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass(); return firstColumnStatus.getRecordsReadInCurrentPass(); } catch (IOException e) { throw new DrillRuntimeException(e); @@ -426,6 +435,10 @@ public class ParquetRecordReader extends AbstractRecordReader { @Override public void cleanup() { + logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath()); + // enable this for debugging when it is know that a whole file will be read + // limit kills upstream operators once it has enough records, so this assert will fail +// assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount(); for (ColumnReader column : columnStatuses) { column.clear(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java index 5bba6be67..481b28956 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java @@ -95,8 +95,7 @@ public class ParquetToDrillTypeConverter { case FIXED_LEN_BYTE_ARRAY: if (convertedType == null) { checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); + return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build(); } else if (convertedType == ConvertedType.DECIMAL) { return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); } @@ -159,8 +158,7 @@ public class ParquetToDrillTypeConverter { case FIXED_LEN_BYTE_ARRAY: if (convertedType == null) { checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); + return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build(); } else if (convertedType == ConvertedType.DECIMAL) { return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); } @@ -223,8 +221,7 @@ public class ParquetToDrillTypeConverter { case FIXED_LEN_BYTE_ARRAY: if (convertedType == null) { checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); + return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build(); } else if (convertedType == ConvertedType.DECIMAL) { return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java index ecfa11022..2f3711dc1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java @@ -184,13 +184,13 @@ public class VarLengthColumnReaders { if(index >= varCharVector.getValueCapacity()) return false; if (usingDictionary) { - DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer()); int st=0; - int len=currDictVal.length(); + int len=currDictValToWrite.length(); VarCharHolder holder = new VarCharHolder(); holder.buffer=b; holder.start=0; - holder.end=currDictVal.length(); + holder.end=currDictValToWrite.length(); success = varCharVector.getMutator().setSafe(index, holder); } else { @@ -230,8 +230,8 @@ public class VarLengthColumnReaders { if(index >= vector.getValueCapacity()) return false; if (usingDictionary) { - DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); - success = mutator.setSafe(index, 1, 0, currDictVal.length(), b); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer()); + success = mutator.setSafe(index, 1, 0, currDictValToWrite.length(), b); } else { success = mutator.setSafe(index, 1, start, start+length, value); @@ -263,13 +263,13 @@ public class VarLengthColumnReaders { if(index >= varBinaryVector.getValueCapacity()) return false; if (usingDictionary) { - DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer()); int st=0; - int len=currDictVal.length(); + int len=currDictValToWrite.length(); VarBinaryHolder holder = new VarBinaryHolder(); holder.buffer=b; holder.start=0; - holder.end=currDictVal.length(); + holder.end=currDictValToWrite.length(); success = varBinaryVector.getMutator().setSafe(index, holder); } else { @@ -307,11 +307,11 @@ public class VarLengthColumnReaders { if(index >= nullableVarBinaryVector.getValueCapacity()) return false; if (usingDictionary) { - DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer()); NullableVarBinaryHolder holder = new NullableVarBinaryHolder(); holder.buffer=b; holder.start=0; - holder.end=currDictVal.length(); + holder.end=currDictValToWrite.length(); holder.isSet=1; success = nullableVarBinaryVector.getMutator().setSafe(index, holder); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java index 829b44a92..4f02c7010 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java @@ -82,10 +82,17 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe protected boolean readAndStoreValueSizeInformation() throws IOException { // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division - try { + if (usingDictionary) { + if (currLengthDeterminingDictVal == null) { + currLengthDeterminingDictVal = pageReader.dictionaryLengthDeterminingReader.readBytes(); + } + currDictValToWrite = currLengthDeterminingDictVal; + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division + dataTypeLengthInBits = currLengthDeterminingDictVal.length(); + } + else { + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) pageReader.readyToReadPosInBytes); - } catch (Throwable t) { - throw t; } // this should not fail diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java new file mode 100644 index 000000000..92f60d66e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +public interface NullableVector extends ValueVector{ + + public ValueVector getValuesVector(); +} |