diff options
Diffstat (limited to 'exec/java-exec/src/main')
15 files changed, 340 insertions, 119 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java index d9eae0fb8..492d49517 100644 --- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java @@ -44,7 +44,7 @@ package org.apache.drill.exec.vector; * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker. */ @SuppressWarnings("unused") -public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector{ +public final class ${className} extends BaseValueVector implements <#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector, NullableVector{ private int valueCount; final UInt1Vector bits; @@ -96,6 +96,10 @@ public final class ${className} extends BaseValueVector implements <#if type.maj return values.getData(); } + public ${valuesName} getValuesVector() { + return values; + } + <#if type.major == "VarLen"> @Override public SerializedField getMetadata() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java index b240407f6..272a5c352 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java @@ -118,7 +118,6 @@ public abstract class ColumnReader<V extends ValueVector> { readField(recordsToRead); valuesReadInCurrentPass += recordsReadInThisIteration; - totalValuesRead += recordsReadInThisIteration; pageReader.valuesRead += recordsReadInThisIteration; pageReader.readPosInBytes = readStartInBytes + readLength; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java index 243744e2a..3d36b64ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java @@ -20,8 +20,12 @@ package org.apache.drill.exec.store.parquet.columnreaders; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.Decimal28SparseVector; import org.apache.drill.exec.vector.Decimal38SparseVector; +import org.apache.drill.exec.vector.Float4Vector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.NullableBigIntVector; import org.apache.drill.exec.vector.NullableDecimal28SparseVector; import org.apache.drill.exec.vector.NullableDecimal38SparseVector; @@ -72,8 +76,19 @@ public class ColumnReaderFactory { return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } else{ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { - return new ParquetFixedWidthDictionaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement); + switch (columnChunkMetaData.getType()) { + case INT32: + return new ParquetFixedWidthDictionaryReaders.DictionaryIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (IntVector) v, schemaElement); + case INT64: + return new ParquetFixedWidthDictionaryReaders.DictionaryBigIntReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (BigIntVector) v, schemaElement); + case FLOAT: + return new ParquetFixedWidthDictionaryReaders.DictionaryFloat4Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float4Vector) v, schemaElement); + case DOUBLE: + return new ParquetFixedWidthDictionaryReaders.DictionaryFloat8Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Float8Vector) v, schemaElement); + default: + throw new ExecutionSetupException("Unsupported dictionary column type " + descriptor.getType().name() ); + } + } else { return new FixedByteAlignedReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java index 54771e4b7..bfbefdb8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.vector.DateVector; import org.apache.drill.exec.vector.Decimal28SparseVector; import org.apache.drill.exec.vector.Decimal38SparseVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VariableWidthVector; import org.joda.time.DateTimeUtils; import parquet.column.ColumnDescriptor; @@ -68,6 +69,29 @@ class FixedByteAlignedReader extends ColumnReader { (int) readStartInBytes, (int) readLength); } + public static class FixedBinaryReader extends FixedByteAlignedReader { + // TODO - replace this with fixed binary type in drill + VariableWidthVector castedVector; + + FixedBinaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, + VariableWidthVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, true, v, schemaElement); + castedVector = v; + } + + protected void readField(long recordsToReadInThisPass) { + // we can use the standard read method to transfer the data + super.readField(recordsToReadInThisPass); + // TODO - replace this with fixed binary type in drill + // now we need to write the lengths of each value + int byteLength = dataTypeLengthInBits / 8; + for (int i = 0; i < recordsToReadInThisPass; i++) { + castedVector.getMutator().setValueLengthSafe(i, byteLength); + } + } + + } + public static abstract class ConvertedReader extends FixedByteAlignedReader { protected int dataTypeLengthInBytes; @@ -104,10 +128,20 @@ class FixedByteAlignedReader extends ColumnReader { @Override void addNext(int start, int index) { - dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay( - NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, start) +// dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay( +// NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, start) + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytebuf, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); } + + // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared + public static int readIntLittleEndian(ByteBuf in, int offset) { + int ch4 = in.getByte(offset) & 0xff; + int ch3 = in.getByte(offset + 1) & 0xff; + int ch2 = in.getByte(offset + 2) & 0xff; + int ch1 = in.getByte(offset + 3) & 0xff; + return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); + } } public static class Decimal28Reader extends ConvertedReader { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java index bbff57420..18618e612 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java @@ -51,6 +51,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { castedRepeatedVector = (RepeatedFixedWidthVector) valueVector; this.dataTypeLengthInBytes = dataTypeLengthInBytes; this.dataReader = dataReader; + this.dataReader.pageReader.clear(); this.dataReader.pageReader = this.pageReader; // this is not in the reset method because it needs to be initialized only for the very first page read // in all other cases if a read ends at a page boundary we will need to keep track of this flag and not @@ -209,5 +210,10 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { public int capacity() { return castedRepeatedVector.getMutator().getDataVector().getData().capacity(); } + + public void clear() { + super.clear(); + dataReader.clear(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index f88d56a4f..557bd9fc7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -82,6 +82,10 @@ public class NullableFixedByteAlignedReaders { for (int i = 0; i < recordsToReadInThisPass; i++){ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger()); + } } } } @@ -97,8 +101,14 @@ public class NullableFixedByteAlignedReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - for (int i = 0; i < recordsToReadInThisPass; i++){ - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readLong()); + } } } } @@ -114,8 +124,14 @@ public class NullableFixedByteAlignedReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - for (int i = 0; i < recordsToReadInThisPass; i++){ - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readFloat()); + } } } } @@ -131,8 +147,14 @@ public class NullableFixedByteAlignedReaders { // this method is called by its superclass during a read loop @Override protected void readField(long recordsToReadInThisPass) { - for (int i = 0; i < recordsToReadInThisPass; i++){ - valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + } + } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readDouble()); + } } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java index ba9ff802d..dc29fbd1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java @@ -116,6 +116,8 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten @Override protected void readField(long recordsToRead) { + // TODO - unlike most implementations of this method, the recordsReadInThisIteration field is not set here + // should verify that this is not breaking anything if (usingDictionary) { currDictValToWrite = pageReader.dictionaryValueReader.readBytes(); // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 4165cbda8..639577dfd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -91,10 +91,14 @@ final class PageReader { List<ByteBuf> allocatedBuffers; + // These need to be held throughout reading of the entire column chunk + List<ByteBuf> allocatedDictionaryBuffers; + PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{ this.parentColumnReader = parentStatus; allocatedBuffers = new ArrayList<ByteBuf>(); + allocatedDictionaryBuffers = new ArrayList<ByteBuf>(); long totalByteLength = columnChunkMetaData.getTotalUncompressedSize(); long start = columnChunkMetaData.getFirstDataPageOffset(); @@ -108,7 +112,7 @@ final class PageReader { BytesInput bytesIn; ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size()); - allocatedBuffers.add(uncompressedData); + allocatedDictionaryBuffers.add(uncompressedData); if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) { dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size); bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData, @@ -157,6 +161,7 @@ final class PageReader { if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { return false; } + clearBuffers(); // next, we need to decompress the bytes // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one @@ -168,7 +173,7 @@ final class PageReader { //TODO: Handle buffer allocation exception BytesInput bytesIn; ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size()); - allocatedBuffers.add(uncompressedData); + allocatedDictionaryBuffers.add(uncompressedData); if( parentColumnReader.columnChunkMetaData.getCodec()== CompressionCodecName.UNCOMPRESSED) { dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size); bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData, @@ -229,6 +234,7 @@ final class PageReader { } pageDataByteArray = DrillBuf.wrapByteBuffer(currentPage.getBytes().toByteBuffer()); + allocatedBuffers.add(pageDataByteArray); readPosInBytes = 0; if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) { @@ -244,29 +250,29 @@ final class PageReader { } if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){ parentColumnReader.currDefLevel = -1; - if (!currentPage.getValueEncoding().usesDictionary()) { - parentColumnReader.usingDictionary = false; - definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - readPosInBytes = definitionLevels.getNextOffset(); - if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { - valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); - valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - } - } else { - parentColumnReader.usingDictionary = true; - definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - readPosInBytes = definitionLevels.getNextOffset(); - // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for - // actually copying the values out into the vectors - dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary); - dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - dictionaryValueReader = new DictionaryValuesReader(dictionary); - dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); - this.parentColumnReader.usingDictionary = true; + definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); + readPosInBytes = definitionLevels.getNextOffset(); + if ( ! currentPage.getValueEncoding().usesDictionary()) { + valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); } } + if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { + valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); + } + if (currentPage.getValueEncoding().usesDictionary()) { + // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for + // actually copying the values out into the vectors + dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary); + dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); + dictionaryValueReader = new DictionaryValuesReader(dictionary); + dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); + parentColumnReader.usingDictionary = true; + } else { + parentColumnReader.usingDictionary = false; + } // readPosInBytes is used for actually reading the values after we determine how many will fit in the vector // readyToReadPosInBytes serves a similar purpose for the vector types where we must count up the values that will // fit one record at a time, such as for variable length data. Both operations must start in the same location after the @@ -275,13 +281,26 @@ final class PageReader { return true; } + public void clearBuffers() { + for (ByteBuf b : allocatedBuffers) { + b.release(); + } + allocatedBuffers.clear(); + } + + public void clearDictionaryBuffers() { + for (ByteBuf b : allocatedDictionaryBuffers) { + b.release(); + } + allocatedDictionaryBuffers.clear(); + } + public void clear(){ this.dataReader.clear(); // Free all memory, including fixed length types. (Data is being copied for all types not just var length types) //if(!this.parentColumnReader.isFixedLength) { - for (ByteBuf b : allocatedBuffers) { - b.release(); - } + clearBuffers(); + clearDictionaryBuffers(); //} } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java deleted file mode 100644 index ad849b4d0..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java +++ /dev/null @@ -1,53 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - ******************************************************************************/ -package org.apache.drill.exec.store.parquet.columnreaders; - -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.vector.BigIntVector; -import org.apache.drill.exec.vector.ValueVector; -import parquet.column.ColumnDescriptor; -import parquet.format.SchemaElement; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.schema.PrimitiveType; - -public class ParquetFixedWidthDictionaryReader extends ColumnReader{ - - ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, - ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, - SchemaElement schemaElement) throws ExecutionSetupException { - super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - } - - @Override - public void readField(long recordsToReadInThisPass) { - - recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() - - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); - int defLevel; - for (int i = 0; i < recordsReadInThisIteration; i++){ - defLevel = pageReader.definitionLevels.readInteger(); - // if the value is defined - if (defLevel == columnDescriptor.getMaxDefinitionLevel()){ - if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) - ((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass, - pageReader.valueReader.readLong() ); - } - // otherwise the value is skipped, because the bit vector indicating nullability is zero filled - } - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java new file mode 100644 index 000000000..bad6e6a9d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReaders.java @@ -0,0 +1,133 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet.columnreaders; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.Float4Vector; +import org.apache.drill.exec.vector.Float8Vector; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.NullableIntVector; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; + +public class ParquetFixedWidthDictionaryReaders { + + static class DictionaryIntReader extends FixedByteAlignedReader { + + IntVector castedVector; + + DictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, IntVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedVector = v; + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + if (usingDictionary) { + for (int i = 0; i < recordsReadInThisIteration; i++){ + castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); + } + } + } + } + + static class DictionaryBigIntReader extends FixedByteAlignedReader { + + BigIntVector castedVector; + + DictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, BigIntVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedVector = v; + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + try { + castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong()); + } catch ( Exception ex) { + throw ex; + } + } + } + } + + static class DictionaryFloat4Reader extends FixedByteAlignedReader { + + Float4Vector castedVector; + + DictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float4Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedVector = v; + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat()); + } + } + } + + static class DictionaryFloat8Reader extends FixedByteAlignedReader { + + Float8Vector castedVector; + + DictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, Float8Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + castedVector = v; + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass) { + recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount() + - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + + for (int i = 0; i < recordsReadInThisIteration; i++){ + castedVector.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble()); + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index c72e75030..f3d9e2cd2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -60,6 +60,7 @@ import parquet.format.SchemaElement; import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ParquetFileWriter; +import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.PrimitiveType; @@ -110,6 +111,7 @@ public class ParquetRecordReader extends AbstractRecordReader { private final CodecFactoryExposer codecFactoryExposer; int rowGroupIndex; + long totalRecordsRead; public ParquetRecordReader(FragmentContext fragmentContext, // String path, // @@ -223,6 +225,10 @@ public class ParquetRecordReader extends AbstractRecordReader { // ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); FileMetaData fileMetaData; + logger.debug("Reading row group({}) with {} records in file {}.", rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(), + hadoopPath.toUri().getPath()); + totalRecordsRead = 0; + // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below // store a map from column name to converted types if they are non-null HashMap<String, SchemaElement> schemaElements = new HashMap<>(); @@ -247,13 +253,11 @@ public class ParquetRecordReader extends AbstractRecordReader { if (column.getMaxRepetitionLevel() > 0) { allFieldsFixedLength = false; } - // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder - // TODO - implement this when the feature is added upstream - if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){ - bitWidthAllFixedFields += se.getType_length() * 8; - } else { - bitWidthAllFixedFields += getTypeLengthInBits(column.getType()); - } + if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){ + bitWidthAllFixedFields += se.getType_length() * 8; + } else { + bitWidthAllFixedFields += getTypeLengthInBits(column.getType()); + } } else { allFieldsFixedLength = false; } @@ -287,13 +291,15 @@ public class ParquetRecordReader extends AbstractRecordReader { v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { if (column.getMaxRepetitionLevel() > 0) { - ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, + ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, + column, columnChunkMetaData, recordsPerBatch, ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement); varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader, getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement)); } else { - columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v, + columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, + column, columnChunkMetaData, recordsPerBatch, v, schemaElement)); } } else { @@ -393,6 +399,7 @@ public class ParquetRecordReader extends AbstractRecordReader { vv.getMutator().setValueCount( (int) recordsToRead); } mockRecordsRead += recordsToRead; + totalRecordsRead += recordsToRead; return (int) recordsToRead; } @@ -418,6 +425,8 @@ public class ParquetRecordReader extends AbstractRecordReader { } } + + totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass(); return firstColumnStatus.getRecordsReadInCurrentPass(); } catch (IOException e) { throw new DrillRuntimeException(e); @@ -426,6 +435,10 @@ public class ParquetRecordReader extends AbstractRecordReader { @Override public void cleanup() { + logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath()); + // enable this for debugging when it is know that a whole file will be read + // limit kills upstream operators once it has enough records, so this assert will fail +// assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount(); for (ColumnReader column : columnStatuses) { column.clear(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java index 5bba6be67..481b28956 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java @@ -95,8 +95,7 @@ public class ParquetToDrillTypeConverter { case FIXED_LEN_BYTE_ARRAY: if (convertedType == null) { checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); + return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build(); } else if (convertedType == ConvertedType.DECIMAL) { return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision()); } @@ -159,8 +158,7 @@ public class ParquetToDrillTypeConverter { case FIXED_LEN_BYTE_ARRAY: if (convertedType == null) { checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); + return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build(); } else if (convertedType == ConvertedType.DECIMAL) { return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision()); } @@ -223,8 +221,7 @@ public class ParquetToDrillTypeConverter { case FIXED_LEN_BYTE_ARRAY: if (convertedType == null) { checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type."); - return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY) - .setWidth(length).setMode(mode).build(); + return TypeProtos.MajorType.newBuilder().setMinorType(MinorType.VARBINARY).setMode(mode).build(); } else if (convertedType == ConvertedType.DECIMAL) { return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java index ecfa11022..2f3711dc1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java @@ -184,13 +184,13 @@ public class VarLengthColumnReaders { if(index >= varCharVector.getValueCapacity()) return false; if (usingDictionary) { - DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer()); int st=0; - int len=currDictVal.length(); + int len=currDictValToWrite.length(); VarCharHolder holder = new VarCharHolder(); holder.buffer=b; holder.start=0; - holder.end=currDictVal.length(); + holder.end=currDictValToWrite.length(); success = varCharVector.getMutator().setSafe(index, holder); } else { @@ -230,8 +230,8 @@ public class VarLengthColumnReaders { if(index >= vector.getValueCapacity()) return false; if (usingDictionary) { - DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); - success = mutator.setSafe(index, 1, 0, currDictVal.length(), b); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer()); + success = mutator.setSafe(index, 1, 0, currDictValToWrite.length(), b); } else { success = mutator.setSafe(index, 1, start, start+length, value); @@ -263,13 +263,13 @@ public class VarLengthColumnReaders { if(index >= varBinaryVector.getValueCapacity()) return false; if (usingDictionary) { - DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer()); int st=0; - int len=currDictVal.length(); + int len=currDictValToWrite.length(); VarBinaryHolder holder = new VarBinaryHolder(); holder.buffer=b; holder.start=0; - holder.end=currDictVal.length(); + holder.end=currDictValToWrite.length(); success = varBinaryVector.getMutator().setSafe(index, holder); } else { @@ -307,11 +307,11 @@ public class VarLengthColumnReaders { if(index >= nullableVarBinaryVector.getValueCapacity()) return false; if (usingDictionary) { - DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictValToWrite.toByteBuffer()); NullableVarBinaryHolder holder = new NullableVarBinaryHolder(); holder.buffer=b; holder.start=0; - holder.end=currDictVal.length(); + holder.end=currDictValToWrite.length(); holder.isSet=1; success = nullableVarBinaryVector.getMutator().setSafe(index, holder); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java index 829b44a92..4f02c7010 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java @@ -82,10 +82,17 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe protected boolean readAndStoreValueSizeInformation() throws IOException { // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division - try { + if (usingDictionary) { + if (currLengthDeterminingDictVal == null) { + currLengthDeterminingDictVal = pageReader.dictionaryLengthDeterminingReader.readBytes(); + } + currDictValToWrite = currLengthDeterminingDictVal; + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division + dataTypeLengthInBits = currLengthDeterminingDictVal.length(); + } + else { + // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) pageReader.readyToReadPosInBytes); - } catch (Throwable t) { - throw t; } // this should not fail diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java new file mode 100644 index 000000000..92f60d66e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/NullableVector.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +public interface NullableVector extends ValueVector{ + + public ValueVector getValuesVector(); +} |