aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org
diff options
context:
space:
mode:
authorJason Altekruse <altekrusejason@gmail.com>2014-07-22 13:08:33 -0500
committerJacques Nadeau <jacques@apache.org>2014-07-29 08:36:27 -0700
commit5b73c2140f100bb74cbf4a20cd31d13ebf5a4b88 (patch)
treec2d2286ead2a84896ed3b28073f8e9aae58d0c28 /exec/java-exec/src/main/java/org
parent686a282c93f3412ca4f977a4c32c44264f8b9a23 (diff)
DRILL-945: Implementation of repeated reader and writer for parquet. Includes a fairly substantial refactoring of the overall reader structure.
Fix records counts expected in parquet read tests, previously the wrong records counts were being expected and messages were sent to the logger, but in the form of debug messages. not errors, so they were not flagging when the tests were failing. After review: Rename PageReadStatus to PageReader, removed underscore from package name column_readers, address review comment from parth in VarLengthColumnReaders. Fix regression in nullable varlength reader for parquet. Change output names in parquet writer/reader tests to prevent issues with files not being deleted correctly during tests.
Diffstat (limited to 'exec/java-exec/src/main/java/org')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java156
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java169
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java711
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java190
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java)35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java)156
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java175
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java)32
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java213
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java)17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java)51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java238
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java130
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java)64
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java)13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java361
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java236
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java81
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java60
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java)107
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java96
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java29
30 files changed, 1923 insertions, 1465 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index 8c6f12085..80fbd807c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -28,7 +28,7 @@ import parquet.bytes.BytesInput;
import parquet.format.PageHeader;
import parquet.format.Util;
-class ColumnDataReader {
+public class ColumnDataReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
private final long endPosition;
@@ -41,11 +41,7 @@ class ColumnDataReader {
}
public PageHeader readPageHeader() throws IOException{
- try{
return Util.readPageHeader(input);
- }catch (IOException e) {
- throw e;
- }
}
public BytesInput getPageAsBytesInput(int pageLength) throws IOException{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
deleted file mode 100644
index 17759d3d3..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReader.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
-import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
-import org.apache.drill.exec.vector.NullableDateVector;
-import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import org.joda.time.DateTimeUtils;
-import parquet.bytes.BytesUtils;
-import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-
-import java.math.BigDecimal;
-
-class NullableFixedByteAlignedReader extends NullableColumnReader {
-
- protected byte[] bytes;
-
- NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- // this method is called by its superclass during a read loop
- @Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-
- this.recordsReadInThisIteration = recordsToReadInThisPass;
-
- // set up metadata
- this.readStartInBytes = pageReadStatus.readPosInBytes;
- this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
- this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
- this.bytes = pageReadStatus.pageDataByteArray;
-
- // fill in data.
- vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
- }
-
- public static abstract class NullableConvertedReader extends NullableFixedByteAlignedReader {
-
- protected int dataTypeLengthInBytes;
-
- NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- @Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
-
- this.recordsReadInThisIteration = recordsToReadInThisPass;
-
- // set up metadata
- this.readStartInBytes = pageReadStatus.readPosInBytes;
- this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
- this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
- this.bytes = pageReadStatus.pageDataByteArray;
-
- dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
- for (int i = 0; i < recordsReadInThisIteration; i++) {
- addNext((int) readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
- }
- }
-
- abstract void addNext(int start, int index);
- }
-
- public static class NullableDateReader extends NullableConvertedReader {
-
- NullableDateVector dateVector;
-
- NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- dateVector = (NullableDateVector) v;
- }
-
- @Override
- void addNext(int start, int index) {
- dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
- }
-
- // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
- public static int readIntLittleEndian(byte[] in, int offset) {
- int ch4 = in[offset] & 0xff;
- int ch3 = in[offset + 1] & 0xff;
- int ch2 = in[offset + 2] & 0xff;
- int ch1 = in[offset + 3] & 0xff;
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
- }
-
- }
-
- public static class NullableDecimal28Reader extends NullableConvertedReader {
-
- NullableDecimal28SparseVector decimal28Vector;
-
- NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- decimal28Vector = (NullableDecimal28SparseVector) v;
- }
-
- @Override
- void addNext(int start, int index) {
- int width = NullableDecimal28SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
- DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
- schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits);
- }
- }
-
- public static class NullableDecimal38Reader extends NullableConvertedReader {
-
- NullableDecimal38SparseVector decimal38Vector;
-
- NullableDecimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- decimal38Vector = (NullableDecimal38SparseVector) v;
- }
-
- @Override
- void addNext(int start, int index) {
- int width = NullableDecimal38SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
- DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(),
- schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits);
- }
- }
-} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
deleted file mode 100644
index 76cc93724..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableFloat4Vector;
-import org.apache.drill.exec.vector.NullableFloat8Vector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.format.SchemaElement;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.schema.PrimitiveType;
-
-class NullableFixedByteAlignedReaders {
-
- public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
- ColumnDescriptor columnDescriptor,
- ColumnChunkMetaData columnChunkMetaData,
- boolean fixedLength,
- ValueVector valueVec,
- SchemaElement schemaElement) throws ExecutionSetupException {
- if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- return new NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
- fixedLength, valueVec, schemaElement);
- } else {
- if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) {
- return new NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
- fixedLength, (NullableBigIntVector)valueVec, schemaElement);
- }
- else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) {
- return new NullableDicationaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
- fixedLength, (NullableIntVector)valueVec, schemaElement);
- }
- else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) {
- return new NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
- fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
- }
- else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
- return new NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
- fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
- }
- else{
- throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
- }
- }
- }
-
- private static class NullableFixedByteAlignedReader extends NullableColumnReader {
- private byte[] bytes;
-
- NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- // this method is called by its superclass during a read loop
- @Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
- this.recordsReadInThisIteration = recordsToReadInThisPass;
-
- // set up metadata
- this.readStartInBytes = pageReadStatus.readPosInBytes;
- this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
- this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
- this.bytes = pageReadStatus.pageDataByteArray;
-
- // fill in data.
- vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
- }
- }
-
- private static class NullableDicationaryIntReader extends NullableColumnReader<NullableIntVector> {
-
- private byte[] bytes;
-
- NullableDicationaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- // this method is called by its superclass during a read loop
- @Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
- if (usingDictionary) {
- for (int i = 0; i < recordsToReadInThisPass; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readInteger());
- }
- }
- }
- }
-
- private static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> {
-
- private byte[] bytes;
-
- NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- // this method is called by its superclass during a read loop
- @Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
- for (int i = 0; i < recordsToReadInThisPass; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readLong());
- }
- }
- }
-
- private static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> {
-
- private byte[] bytes;
-
- NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- // this method is called by its superclass during a read loop
- @Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
- for (int i = 0; i < recordsToReadInThisPass; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readFloat());
- }
- }
- }
-
- private static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> {
-
- private byte[] bytes;
-
- NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- // this method is called by its superclass during a read loop
- @Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
- for (int i = 0; i < recordsToReadInThisPass; i++){
- valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readDouble());
- }
- }
- }
-
-} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 0189c9b3c..37d64036b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -64,7 +64,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
private final DrillbitContext context;
- static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+ public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
private CodecFactoryExposer codecFactoryExposer;
private final DrillFileSystem fs;
private final ParquetFormatMatcher formatMatcher;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
deleted file mode 100644
index 703ad1f45..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.parquet.FixedByteAlignedReader.Decimal28Reader;
-import org.apache.drill.exec.store.parquet.FixedByteAlignedReader.Decimal38Reader;
-import org.apache.drill.exec.store.parquet.NullableFixedByteAlignedReader.NullableDecimal28Reader;
-import org.apache.drill.exec.store.parquet.NullableFixedByteAlignedReader.NullableDecimal38Reader;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.Decimal28Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.Decimal38Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableDecimal28Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableDecimal38Column;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarBinaryColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarCharColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarLengthColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarBinaryColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarCharColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarLengthColumn;
-import org.apache.drill.exec.vector.Decimal28SparseVector;
-import org.apache.drill.exec.vector.Decimal38SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
-import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.format.ConvertedType;
-import parquet.format.FileMetaData;
-import parquet.format.SchemaElement;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-
-public class ParquetRecordReader implements RecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
-
- // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
- private static final int NUMBER_OF_VECTORS = 1;
- private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
- private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
- private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
-
- // TODO - should probably find a smarter way to set this, currently 1 megabyte
- private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
- public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
- private static final String SEPERATOR = System.getProperty("file.separator");
-
-
- // used for clearing the last n bits of a byte
- public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
- // used for clearing the first n bits of a byte
- public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
-
- private int bitWidthAllFixedFields;
- private boolean allFieldsFixedLength;
- private int recordsPerBatch;
- private long totalRecords;
- private long rowGroupOffset;
-
- private List<ColumnReader> columnStatuses;
- FileSystem fileSystem;
- private long batchSize;
- Path hadoopPath;
- private VarLenBinaryReader varLengthReader;
- private ParquetMetadata footer;
- private List<SchemaPath> columns;
-
- public CodecFactoryExposer getCodecFactoryExposer() {
- return codecFactoryExposer;
- }
-
- private final CodecFactoryExposer codecFactoryExposer;
-
- int rowGroupIndex;
-
- public ParquetRecordReader(FragmentContext fragmentContext, //
- String path, //
- int rowGroupIndex, //
- FileSystem fs, //
- CodecFactoryExposer codecFactoryExposer, //
- ParquetMetadata footer, //
- List<SchemaPath> columns) throws ExecutionSetupException {
- this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
- columns);
- }
-
- public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
- String path, int rowGroupIndex, FileSystem fs,
- CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
- List<SchemaPath> columns) throws ExecutionSetupException {
- hadoopPath = new Path(path);
- fileSystem = fs;
- this.codecFactoryExposer = codecFactoryExposer;
- this.rowGroupIndex = rowGroupIndex;
- this.batchSize = batchSize;
- this.footer = footer;
- this.columns = columns;
- }
-
- public int getRowGroupIndex() {
- return rowGroupIndex;
- }
-
- public int getBitWidthAllFixedFields() {
- return bitWidthAllFixedFields;
- }
-
- public long getBatchSize() {
- return batchSize;
- }
-
- /**
- * @param type a fixed length type from the parquet library enum
- * @return the length in pageDataByteArray of the type
- */
- public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
- switch (type) {
- case INT64: return 64;
- case INT32: return 32;
- case BOOLEAN: return 1;
- case FLOAT: return 32;
- case DOUBLE: return 64;
- case INT96: return 96;
- // binary and fixed length byte array
- default:
- throw new IllegalStateException("Length cannot be determined for type " + type);
- }
- }
-
- private boolean fieldSelected(MaterializedField field){
- // TODO - not sure if this is how we want to represent this
- // for now it makes the existing tests pass, simply selecting
- // all available data if no columns are provided
- if (this.columns != null){
- for (SchemaPath expr : this.columns){
- if ( field.matches(expr)){
- return true;
- }
- }
- return false;
- }
- return true;
- }
-
- @Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
-
- columnStatuses = new ArrayList<>();
- totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
- List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
- allFieldsFixedLength = true;
- ColumnDescriptor column;
- ColumnChunkMetaData columnChunkMetaData;
- int columnsToScan = 0;
-
- MaterializedField field;
- ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
- FileMetaData fileMetaData;
-
- // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
- // store a map from column name to converted types if they are non-null
- HashMap<String, SchemaElement> schemaElements = new HashMap<>();
- fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
- for (SchemaElement se : fileMetaData.getSchema()) {
- schemaElements.put(se.getName(), se);
- }
-
- // loop to add up the length of the fixed width columns and build the schema
- for (int i = 0; i < columns.size(); ++i) {
- column = columns.get(i);
- logger.debug("name: " + fileMetaData.getSchema().get(i).name);
- SchemaElement se = schemaElements.get(column.getPath()[0]);
- MajorType mt = toMajorType(column.getType(), se.getType_length(), getDataMode(column), se);
- field = MaterializedField.create(toFieldName(column.getPath()),mt);
- if ( ! fieldSelected(field)){
- continue;
- }
- columnsToScan++;
- // sum the lengths of all of the fixed length fields
- if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
- // TODO - implement this when the feature is added upstream
- if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
- bitWidthAllFixedFields += se.getType_length() * 8;
- } else {
- bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
- }
- } else {
- allFieldsFixedLength = false;
- }
- }
- rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
-
- // none of the columns in the parquet file matched the request columns from the query
- if (columnsToScan == 0){
- throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file.");
- }
- if (allFieldsFixedLength) {
- recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
- footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
- }
- else {
- recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
- }
-
- try {
- ValueVector v;
- ConvertedType convertedType;
- SchemaElement schemaElement;
- ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
- ArrayList<NullableVarLengthColumn> nullableVarLengthColumns = new ArrayList<>();
- // initialize all of the column read status objects
- boolean fieldFixedLength = false;
- for (int i = 0; i < columns.size(); ++i) {
- column = columns.get(i);
- columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
- schemaElement = schemaElements.get(column.getPath()[0]);
- convertedType = schemaElement.getConverted_type();
- MajorType type = toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement);
- field = MaterializedField.create(toFieldName(column.getPath()), type);
- // the field was not requested to be read
- if ( ! fieldSelected(field)) continue;
-
- fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
- v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
- if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
- schemaElement);
- } else {
- // create a reader and add it to the appropriate list
- getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement, varLengthColumns, nullableVarLengthColumns);
- }
- }
- varLengthReader = new VarLenBinaryReader(this, varLengthColumns, nullableVarLengthColumns);
- } catch (SchemaChangeException e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- private SchemaPath toFieldName(String[] paths) {
- return SchemaPath.getCompoundPath(paths);
- }
-
- private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
- if (column.getMaxDefinitionLevel() == 0) {
- return TypeProtos.DataMode.REQUIRED;
- } else {
- return TypeProtos.DataMode.OPTIONAL;
- }
- }
-
- private void resetBatch() {
- for (ColumnReader column : columnStatuses) {
- column.valuesReadInCurrentPass = 0;
- }
- for (VarLengthColumn r : varLengthReader.columns){
- r.valuesReadInCurrentPass = 0;
- }
- for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
- r.valuesReadInCurrentPass = 0;
- }
- }
-
- /**
- * @param fixedLength
- * @param descriptor
- * @param columnChunkMetaData
- * @param allocateSize - the size of the vector to create
- * @return
- * @throws SchemaChangeException
- */
- private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
- SchemaElement schemaElement)
- throws SchemaChangeException, ExecutionSetupException {
- ConvertedType convertedType = schemaElement.getConverted_type();
- // if the column is required
- if (descriptor.getMaxDefinitionLevel() == 0){
- if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
- columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement));
- } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
- int length = schemaElement.type_length;
- if (length <= 12) {
- columnStatuses.add(new Decimal28Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- } else if (length <= 16) {
- columnStatuses.add(new Decimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- }
- } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
- columnStatuses.add(new FixedByteAlignedReader.DateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- } else{
- if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement));
- } else {
- columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement));
- }
- }
- return true;
- }
- else { // if the column is nullable
- if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
- columnStatuses.add(new NullableBitReader(this, allocateSize, descriptor, columnChunkMetaData,
- fixedLength, v, schemaElement));
- } else if (columnChunkMetaData.getType() == PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
- columnStatuses.add(new NullableFixedByteAlignedReader.NullableDateReader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- } else if (columnChunkMetaData.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
- int length = schemaElement.type_length;
- if (length <= 12) {
- columnStatuses.add(new NullableDecimal28Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- } else if (length <= 16) {
- columnStatuses.add(new NullableDecimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement));
- }
- } else {
- columnStatuses.add(NullableFixedByteAlignedReaders.getNullableColumnReader(this, allocateSize, descriptor,
- columnChunkMetaData, fixedLength, v, schemaElement));
- }
- return true;
- }
- }
-
- public void readAllFixedFields(long recordsToRead, ColumnReader firstColumnStatus) throws IOException {
-
- for (ColumnReader crs : columnStatuses){
- crs.readAllFixedFields(recordsToRead, firstColumnStatus);
- }
- }
-
- @Override
- public int next() {
- resetBatch();
- long recordsToRead = 0;
- try {
- ColumnReader firstColumnStatus;
- if (columnStatuses.size() > 0){
- firstColumnStatus = columnStatuses.iterator().next();
- }
- else{
- if (varLengthReader.columns.size() > 0){
- firstColumnStatus = varLengthReader.columns.iterator().next();
- }
- else{
- firstColumnStatus = varLengthReader.nullableColumns.iterator().next();
- }
- }
-
- if (allFieldsFixedLength) {
- recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
- } else {
- recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
-
- // going to incorporate looking at length of values and copying the data into a single loop, hopefully it won't
- // get too complicated
-
- //loop through variable length data to find the maximum records that will fit in this batch
- // this will be a bit annoying if we want to loop though row groups, columns, pages and then individual variable
- // length values...
- // jacques believes that variable length fields will be encoded as |length|value|length|value|...
- // cannot find more information on this right now, will keep looking
- }
-
-// logger.debug("records to read in this pass: {}", recordsToRead);
- if (allFieldsFixedLength) {
- readAllFixedFields(recordsToRead, firstColumnStatus);
- } else { // variable length columns
- long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
- readAllFixedFields(fixedRecordsToRead, firstColumnStatus);
- }
-
- return firstColumnStatus.valuesReadInCurrentPass;
- } catch (IOException e) {
- throw new DrillRuntimeException(e);
- }
- }
-
- static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
- TypeProtos.DataMode mode, SchemaElement schemaElement) {
- return toMajorType(primitiveTypeName, 0, mode, schemaElement);
- }
-
- // TODO - move this into ParquetTypeHelper and use code generation to create the list
- static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
- TypeProtos.DataMode mode, SchemaElement schemaElement) {
- ConvertedType convertedType = schemaElement.getConverted_type();
- switch (mode) {
-
- case OPTIONAL:
- switch (primitiveTypeName) {
- case BINARY:
- if (convertedType == null) {
- return Types.optional(TypeProtos.MinorType.VARBINARY);
- }
- switch (convertedType) {
- case UTF8:
- return Types.optional(MinorType.VARCHAR);
- case DECIMAL:
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT64:
- if (convertedType == null) {
- return Types.optional(TypeProtos.MinorType.BIGINT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
- case FINETIME:
- throw new UnsupportedOperationException();
- case TIMESTAMP:
- return Types.optional(MinorType.TIMESTAMP);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT32:
- if (convertedType == null) {
- return Types.optional(TypeProtos.MinorType.INT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
- case DATE:
- return Types.optional(MinorType.DATE);
- case TIME:
- return Types.optional(MinorType.TIME);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case BOOLEAN:
- return Types.optional(TypeProtos.MinorType.BIT);
- case FLOAT:
- return Types.optional(TypeProtos.MinorType.FLOAT4);
- case DOUBLE:
- return Types.optional(TypeProtos.MinorType.FLOAT8);
- // TODO - Both of these are not supported by the parquet library yet (7/3/13),
- // but they are declared here for when they are implemented
- case INT96:
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
- .setMode(mode).build();
- case FIXED_LEN_BYTE_ARRAY:
- if (convertedType == null) {
- checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
- } else if (convertedType == ConvertedType.DECIMAL) {
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
- }
- default:
- throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
- }
- case REQUIRED:
- switch (primitiveTypeName) {
- case BINARY:
- if (convertedType == null) {
- return Types.required(TypeProtos.MinorType.VARBINARY);
- }
- switch (convertedType) {
- case UTF8:
- return Types.required(MinorType.VARCHAR);
- case DECIMAL:
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT64:
- if (convertedType == null) {
- return Types.required(MinorType.BIGINT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
- case FINETIME:
- throw new UnsupportedOperationException();
- case TIMESTAMP:
- return Types.required(MinorType.TIMESTAMP);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT32:
- if (convertedType == null) {
- return Types.required(MinorType.INT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
- case DATE:
- return Types.required(MinorType.DATE);
- case TIME:
- return Types.required(MinorType.TIME);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case BOOLEAN:
- return Types.required(TypeProtos.MinorType.BIT);
- case FLOAT:
- return Types.required(TypeProtos.MinorType.FLOAT4);
- case DOUBLE:
- return Types.required(TypeProtos.MinorType.FLOAT8);
- // Both of these are not supported by the parquet library yet (7/3/13),
- // but they are declared here for when they are implemented
- case INT96:
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
- .setMode(mode).build();
- case FIXED_LEN_BYTE_ARRAY:
- if (convertedType == null) {
- checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
- } else if (convertedType == ConvertedType.DECIMAL) {
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
- }
- default:
- throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
- }
- case REPEATED:
- switch (primitiveTypeName) {
- case BINARY:
- if (convertedType == null) {
- return Types.repeated(TypeProtos.MinorType.VARBINARY);
- }
- switch (schemaElement.getConverted_type()) {
- case UTF8:
- return Types.repeated(MinorType.VARCHAR);
- case DECIMAL:
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT64:
- if (convertedType == null) {
- return Types.repeated(MinorType.BIGINT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
- case FINETIME:
- throw new UnsupportedOperationException();
- case TIMESTAMP:
- return Types.repeated(MinorType.TIMESTAMP);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case INT32:
- if (convertedType == null) {
- return Types.repeated(MinorType.INT);
- }
- switch(convertedType) {
- case DECIMAL:
- return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
- case DATE:
- return Types.repeated(MinorType.DATE);
- case TIME:
- return Types.repeated(MinorType.TIME);
- default:
- throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
- }
- case BOOLEAN:
- return Types.repeated(TypeProtos.MinorType.BIT);
- case FLOAT:
- return Types.repeated(TypeProtos.MinorType.FLOAT4);
- case DOUBLE:
- return Types.repeated(TypeProtos.MinorType.FLOAT8);
- // Both of these are not supported by the parquet library yet (7/3/13),
- // but they are declared here for when they are implemented
- case INT96:
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
- .setMode(mode).build();
- case FIXED_LEN_BYTE_ARRAY:
- if (convertedType == null) {
- checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
- return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
- .setWidth(length).setMode(mode).build();
- } else if (convertedType == ConvertedType.DECIMAL) {
- return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
- }
- default:
- throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
- }
- }
- throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName + " Mode: " + mode);
- }
-
- private static void getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
- SchemaElement schemaElement, List<VarLengthColumn> varLengthColumns,
- List<NullableVarLengthColumn> nullableVarLengthColumns) throws ExecutionSetupException {
- ConvertedType convertedType = schemaElement.getConverted_type();
- switch (descriptor.getMaxDefinitionLevel()) {
- case 0:
- if (convertedType == null) {
- varLengthColumns.add(new VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement));
- return;
- }
- switch (convertedType) {
- case UTF8:
- varLengthColumns.add(new VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement));
- return;
- case DECIMAL:
- if (v instanceof Decimal28SparseVector) {
- varLengthColumns.add(new Decimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement));
- return;
- } else if (v instanceof Decimal38SparseVector) {
- varLengthColumns.add(new Decimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement));
- return;
- }
- default:
- }
- default:
- if (convertedType == null) {
- nullableVarLengthColumns.add(new NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement));
- return;
- }
- switch (convertedType) {
- case UTF8:
- nullableVarLengthColumns.add(new NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement));
- return;
- case DECIMAL:
- if (v instanceof NullableDecimal28SparseVector) {
- nullableVarLengthColumns.add(new NullableDecimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement));
- return;
- } else if (v instanceof NullableDecimal38SparseVector) {
- nullableVarLengthColumns.add(new NullableDecimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement));
- return;
- }
- default:
- }
- }
- throw new UnsupportedOperationException();
- }
-
- private static MinorType getDecimalType(SchemaElement schemaElement) {
- return schemaElement.getPrecision() <= 28 ? MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE;
- }
-
- static String join(String delimiter, String... str) {
- StringBuilder builder = new StringBuilder();
- int i = 0;
- for (String s : str) {
- builder.append(s);
- if (i < str.length) {
- builder.append(delimiter);
- }
- i++;
- }
- return builder.toString();
- }
-
- @Override
- public void cleanup() {
- for (ColumnReader column : columnStatuses) {
- column.clear();
- }
- columnStatuses.clear();
-
- for (VarLengthColumn r : varLengthReader.columns){
- r.clear();
- }
- for (NullableVarLengthColumn r : varLengthReader.nullableColumns){
- r.clear();
- }
- varLengthReader.columns.clear();
- varLengthReader.nullableColumns.clear();
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index b26f688e4..a3363168b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -52,6 +52,7 @@ import static java.lang.Math.min;
import static java.lang.String.format;
public class ParquetRecordWriter extends ParquetOutputRecordWriter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
@@ -147,7 +148,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = store.memSize();
if (memSize > blockSize) {
- System.out.println("Reached block size " + blockSize);
+ logger.debug("Reached block size " + blockSize);
flush();
newSchema();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index df6581fb8..b4f02fbe9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,6 +37,7 @@ import org.apache.drill.exec.store.RecordReader;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
deleted file mode 100644
index 813a799f3..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.parquet;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.NullableVarLengthColumn;
-import org.apache.drill.exec.store.parquet.VarLengthColumnReaders.VarLengthColumn;
-
-import parquet.bytes.BytesUtils;
-
-public class VarLenBinaryReader {
-
- ParquetRecordReader parentReader;
- final List<VarLengthColumn> columns;
- final List<NullableVarLengthColumn> nullableColumns;
-
- public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns,
- List<NullableVarLengthColumn> nullableColumns){
- this.parentReader = parentReader;
- this.nullableColumns = nullableColumns;
- this.columns = columns;
- }
-
- /**
- * Reads as many variable length values as possible.
- *
- * @param recordsToReadInThisPass - the number of records recommended for reading form the reader
- * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from
- * @return - the number of fixed length fields that will fit in the batch
- * @throws IOException
- */
- public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
-
- long recordsReadInCurrentPass = 0;
- int lengthVarFieldsInCurrentRecord;
- boolean rowGroupFinished = false;
- byte[] bytes;
- // write the first 0 offset
- for (ColumnReader columnReader : columns) {
- columnReader.bytesReadInCurrentPass = 0;
- columnReader.valuesReadInCurrentPass = 0;
- }
- // same for the nullable columns
- for (NullableVarLengthColumn columnReader : nullableColumns) {
- columnReader.bytesReadInCurrentPass = 0;
- columnReader.valuesReadInCurrentPass = 0;
- columnReader.nullsRead = 0;
- }
- outer: do {
- lengthVarFieldsInCurrentRecord = 0;
- for (VarLengthColumn columnReader : columns) {
- if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
- rowGroupFinished = true;
- break;
- }
- if (columnReader.pageReadStatus.currentPage == null
- || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
- columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
- if (!columnReader.pageReadStatus.next()) {
- rowGroupFinished = true;
- break;
- }
- }
- bytes = columnReader.pageReadStatus.pageDataByteArray;
-
- // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
- columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes,
- (int) columnReader.pageReadStatus.readPosInBytes);
- lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
-
- if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
- break outer;
- }
-
- }
- for (NullableVarLengthColumn columnReader : nullableColumns) {
- // check to make sure there is capacity for the next value (for nullables this is a check to see if there is
- // still space in the nullability recording vector)
- if (recordsReadInCurrentPass == columnReader.valueVec.getValueCapacity()){
- rowGroupFinished = true;
- break;
- }
- if (columnReader.pageReadStatus.currentPage == null
- || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
- if (!columnReader.pageReadStatus.next()) {
- rowGroupFinished = true;
- break;
- } else {
- columnReader.currDictVal = null;
- }
- }
- bytes = columnReader.pageReadStatus.pageDataByteArray;
- // we need to read all of the lengths to determine if this value will fit in the current vector,
- // as we can only read each definition level once, we have to store the last one as we will need it
- // at the start of the next read if we decide after reading all of the varlength values in this record
- // that it will not fit in this batch
- if ( columnReader.currDefLevel == -1 ) {
- columnReader.currDefLevel = columnReader.pageReadStatus.definitionLevels.readInteger();
- }
- if ( columnReader.columnDescriptor.getMaxDefinitionLevel() > columnReader.currDefLevel){
- columnReader.currentValNull = true;
- columnReader.dataTypeLengthInBits = 0;
- columnReader.nullsRead++;
- continue;// field is null, no length to add to data vector
- }
-
- if (columnReader.usingDictionary) {
- if (columnReader.currDictVal == null) {
- columnReader.currDictVal = columnReader.pageReadStatus.valueReader.readBytes();
- }
- // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
- columnReader.dataTypeLengthInBits = columnReader.currDictVal.length();
- }
- else {
- // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
- columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes,
- (int) columnReader.pageReadStatus.readPosInBytes);
- }
- lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
-
- if (columnReader.bytesReadInCurrentPass + columnReader.dataTypeLengthInBits > columnReader.capacity()) {
- break outer;
- }
- }
- // check that the next record will fit in the batch
- if (rowGroupFinished || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + lengthVarFieldsInCurrentRecord
- > parentReader.getBatchSize()){
- break outer;
- }
- for (VarLengthColumn columnReader : columns) {
- bytes = columnReader.pageReadStatus.pageDataByteArray;
- // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
- boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
- (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
- assert success;
- columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
- columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
- columnReader.pageReadStatus.valuesRead++;
- columnReader.valuesReadInCurrentPass++;
- }
- for (NullableVarLengthColumn columnReader : nullableColumns) {
- bytes = columnReader.pageReadStatus.pageDataByteArray;
- // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
- if (!columnReader.currentValNull && columnReader.dataTypeLengthInBits > 0){
- boolean success = columnReader.setSafe(columnReader.valuesReadInCurrentPass, bytes,
- (int) columnReader.pageReadStatus.readPosInBytes + 4, columnReader.dataTypeLengthInBits);
- assert success;
- }
- columnReader.currentValNull = false;
- columnReader.currDefLevel = -1;
- if (columnReader.dataTypeLengthInBits > 0){
- columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
- columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
- }
- columnReader.pageReadStatus.valuesRead++;
- columnReader.valuesReadInCurrentPass++;
- if ( columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
- columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
- columnReader.pageReadStatus.next();
- }
- columnReader.currDictVal = null;
- }
- recordsReadInCurrentPass++;
- } while (recordsReadInCurrentPass < recordsToReadInThisPass);
- for (VarLengthColumn columnReader : columns) {
- columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
- }
- for (NullableVarLengthColumn columnReader : nullableColumns) {
- columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
- }
- return recordsReadInCurrentPass;
- }
-} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
index 7ae95cdc4..2c6e4888e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
@@ -15,13 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.parquet;
+package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -37,40 +36,40 @@ final class BitReader extends ColumnReader {
}
@Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
+ protected void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
- - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
- readStartInBytes = pageReadStatus.readPosInBytes;
+ readStartInBytes = pageReader.readPosInBytes;
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / 8.0);
- bytes = pageReadStatus.pageDataByteArray;
+ bytes = pageReader.pageDataByteArray;
// standard read, using memory mapping
- if (pageReadStatus.bitShift == 0) {
+ if (pageReader.bitShift == 0) {
((BaseDataValueVector) valueVec).getData().writeBytes(bytes,
(int) readStartInBytes, (int) readLength);
} else { // read in individual values, because a bitshift is necessary with where the last page or batch ended
vectorData = ((BaseDataValueVector) valueVec).getData();
- nextByte = bytes[(int) Math.max(0, Math.ceil(pageReadStatus.valuesRead / 8.0) - 1)];
- readLengthInBits = recordsReadInThisIteration + pageReadStatus.bitShift;
+ nextByte = bytes[(int) Math.max(0, Math.ceil(pageReader.valuesRead / 8.0) - 1)];
+ readLengthInBits = recordsReadInThisIteration + pageReader.bitShift;
int i = 0;
// read individual bytes with appropriate shifting
for (; i < (int) readLength; i++) {
currentByte = nextByte;
- currentByte = (byte) (currentByte >>> pageReadStatus.bitShift);
+ currentByte = (byte) (currentByte >>> pageReader.bitShift);
// mask the bits about to be added from the next byte
- currentByte = (byte) (currentByte & ParquetRecordReader.startBitMasks[pageReadStatus.bitShift - 1]);
+ currentByte = (byte) (currentByte & ParquetRecordReader.startBitMasks[pageReader.bitShift - 1]);
// if we are not on the last byte
- if ((int) Math.ceil(pageReadStatus.valuesRead / 8.0) + i < pageReadStatus.byteLength) {
+ if ((int) Math.ceil(pageReader.valuesRead / 8.0) + i < pageReader.byteLength) {
// grab the next byte from the buffer, shift and mask it, and OR it with the leftover bits
- nextByte = bytes[(int) Math.ceil(pageReadStatus.valuesRead / 8.0) + i];
+ nextByte = bytes[(int) Math.ceil(pageReader.valuesRead / 8.0) + i];
currentByte = (byte) (currentByte | nextByte
- << (8 - pageReadStatus.bitShift)
- & ParquetRecordReader.endBitMasks[8 - pageReadStatus.bitShift - 1]);
+ << (8 - pageReader.bitShift)
+ & ParquetRecordReader.endBitMasks[8 - pageReader.bitShift - 1]);
}
vectorData.setByte(valuesReadInCurrentPass / 8 + i, currentByte);
}
@@ -82,9 +81,9 @@ final class BitReader extends ColumnReader {
// check if the values in this page did not end on a byte boundary, store a number of bits the next page must be
// shifted by to read all of the values into the vector without leaving space
if (readLengthInBits % 8 != 0) {
- pageReadStatus.bitShift = (int) readLengthInBits % 8;
+ pageReader.bitShift = (int) readLengthInBits % 8;
} else {
- pageReadStatus.bitShift = 0;
+ pageReader.bitShift = 0;
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 775fc7344..fd672d661 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -15,16 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.parquet;
+package org.apache.drill.exec.store.parquet.columnreaders;
import io.netty.buffer.ByteBuf;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.store.VectorHolder;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.FSDataInputStream;
import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.schema.PrimitiveType;
@@ -32,8 +29,9 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName;
import java.io.IOException;
-abstract class ColumnReader<V extends ValueVector> {
-
+public abstract class ColumnReader<V extends ValueVector> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class);
+
final ParquetRecordReader parentReader;
// Value Vector for this column
@@ -48,7 +46,7 @@ abstract class ColumnReader<V extends ValueVector> {
// metadata of the column, from the parquet library
final ColumnChunkMetaData columnChunkMetaData;
// status information on the current page
- final PageReadStatus pageReadStatus;
+ PageReader pageReader;
final SchemaElement schemaElement;
boolean usingDictionary;
@@ -83,15 +81,8 @@ abstract class ColumnReader<V extends ValueVector> {
this.columnChunkMetaData = columnChunkMetaData;
this.isFixedLength = fixedLength;
this.schemaElement = schemaElement;
-
- if (allocateSize > 1) {
- valueVec = v;
- } else {
- valueVec = v;
- }
-
-
- this.pageReadStatus = new PageReadStatus(this, parentReader.fileSystem, parentReader.hadoopPath, columnChunkMetaData);
+ this.valueVec = v;
+ this.pageReader = new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), columnChunkMetaData);
if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
@@ -103,38 +94,127 @@ abstract class ColumnReader<V extends ValueVector> {
}
- public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
+ public int getRecordsReadInCurrentPass() {
+ return valuesReadInCurrentPass;
+ }
+
+ public void processPages(long recordsToReadInThisPass) throws IOException {
+ reset();
+ do {
+ determineSize(recordsToReadInThisPass, 0);
+
+ } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
+ valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
+ }
+
+ public void clear() {
+ valueVec.clear();
+ this.pageReader.clear();
+ }
+
+ public void readValues(long recordsToRead) {
+ readField(recordsToRead);
+
+ valuesReadInCurrentPass += recordsReadInThisIteration;
+ totalValuesRead += recordsReadInThisIteration;
+ pageReader.valuesRead += recordsReadInThisIteration;
+ pageReader.readPosInBytes = readStartInBytes + readLength;
+ }
+
+ protected abstract void readField(long recordsToRead);
+
+ /**
+ * Determines the size of a single value in a variable column.
+ *
+ * Return value indicates if we have finished a row group and should stop reading
+ *
+ * @param recordsReadInCurrentPass
+ * @param lengthVarFieldsInCurrentRecord
+ * @return - true if we should stop reading
+ * @throws IOException
+ */
+ public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord) throws IOException {
+
+ boolean doneReading = readPage();
+ if (doneReading)
+ return true;
+
+ doneReading = processPageData((int) recordsReadInCurrentPass);
+ if (doneReading)
+ return true;
+
+ lengthVarFieldsInCurrentRecord += dataTypeLengthInBits;
+
+ doneReading = checkVectorCapacityReached();
+ if (doneReading)
+ return true;
+
+ return false;
+ }
+
+ protected void readRecords(int recordsToRead) {
+ for (int i = 0; i < recordsToRead; i++) {
+ readField(i);
+ }
+ pageReader.valuesRead += recordsToRead;
+ }
+
+ protected boolean processPageData(int recordsToReadInThisPass) throws IOException {
+ readValues(recordsToReadInThisPass);
+ return true;
+ }
+
+ public void updatePosition() {}
+
+ public void updateReadyToReadPosition() {}
+
+ public void reset() {
readStartInBytes = 0;
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
+ bytesReadInCurrentPass = 0;
vectorData = ((BaseValueVector) valueVec).getData();
- do {
- // if no page has been read, or all of the records have been read out of a page, read the next one
- if (pageReadStatus.currentPage == null || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
- if (!pageReadStatus.next()) {
- break;
- }
- }
+ }
- readField(recordsToReadInThisPass, firstColumnStatus);
+ public int capacity() {
+ return (int) (valueVec.getValueCapacity() * dataTypeLengthInBits / 8.0);
+ }
- valuesReadInCurrentPass += recordsReadInThisIteration;
- totalValuesRead += recordsReadInThisIteration;
- pageReadStatus.valuesRead += recordsReadInThisIteration;
- if (readStartInBytes + readLength >= pageReadStatus.byteLength) {
- pageReadStatus.next();
- } else {
- pageReadStatus.readPosInBytes = readStartInBytes + readLength;
+ // Read a page if we need more data, returns true if we need to exit the read loop
+ public boolean readPage() throws IOException {
+ if (pageReader.currentPage == null
+ || totalValuesReadAndReadyToReadInPage() == pageReader.currentPage.getValueCount()) {
+ readRecords(pageReader.valuesReadyToRead);
+ if (pageReader.currentPage != null)
+ totalValuesRead += pageReader.currentPage.getValueCount();
+ if (!pageReader.next()) {
+ hitRowGroupEnd();
+ return true;
}
- } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
- valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
+ postPageRead();
+ }
+ return false;
}
- public void clear() {
- valueVec.clear();
- this.pageReadStatus.clear();
+ protected int totalValuesReadAndReadyToReadInPage() {
+ return pageReader.valuesRead + pageReader.valuesReadyToRead;
}
- protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus);
+ protected void postPageRead() {
+ pageReader.valuesReadyToRead = 0;
+ }
+
+ protected void hitRowGroupEnd() {}
+
+ protected boolean checkVectorCapacityReached() {
+ if (bytesReadInCurrentPass + dataTypeLengthInBits > capacity()) {
+ logger.debug("Reached the capacity of the data vector in a variable length value vector.");
+ return true;
+ }
+ else if (valuesReadInCurrentPass > valueVec.getValueCapacity()){
+ return true;
+ }
+ return false;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
new file mode 100644
index 000000000..243744e2a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -0,0 +1,175 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+
+import org.apache.drill.exec.vector.Decimal28SparseVector;
+import org.apache.drill.exec.vector.Decimal38SparseVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.vector.VarCharVector;
+import parquet.column.ColumnDescriptor;
+import parquet.column.Encoding;
+import parquet.format.ConvertedType;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.schema.PrimitiveType;
+
+public class ColumnReaderFactory {
+
+ /**
+ * @param fixedLength
+ * @param descriptor
+ * @param columnChunkMetaData
+ * @param allocateSize - the size of the vector to create
+ * @return
+ * @throws SchemaChangeException
+ */
+ static ColumnReader createFixedColumnReader(ParquetRecordReader recordReader, boolean fixedLength, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v,
+ SchemaElement schemaElement)
+ throws Exception {
+ ConvertedType convertedType = schemaElement.getConverted_type();
+ // if the column is required, or repeated (in which case we just want to use this to generate our appropriate
+ // ColumnReader for actually transferring data into the data vector inside of our repeated vector
+ if (descriptor.getMaxDefinitionLevel() == 0 || descriptor.getMaxRepetitionLevel() > 0){
+ if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+ return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v, schemaElement);
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
+ int length = schemaElement.type_length;
+ if (length <= 12) {
+ return new FixedByteAlignedReader.Decimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else if (length <= 16) {
+ return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+ return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else{
+ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ return new ParquetFixedWidthDictionaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v, schemaElement);
+ } else {
+ return new FixedByteAlignedReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v, schemaElement);
+ }
+ }
+ }
+ else { // if the column is nullable
+ if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+ return new NullableBitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
+ fixedLength, v, schemaElement);
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
+ return new NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
+ int length = schemaElement.type_length;
+ if (length <= 12) {
+ return new NullableFixedByteAlignedReaders.NullableDecimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else if (length <= 16) {
+ return new NullableFixedByteAlignedReaders.NullableDecimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+ } else {
+ return getNullableColumnReader(recordReader, allocateSize, descriptor,
+ columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+ }
+ throw new Exception("Unexpected parquet metadata configuration.");
+ }
+
+ static VarLengthValuesColumn getReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
+ SchemaElement schemaElement
+ ) throws ExecutionSetupException {
+ ConvertedType convertedType = schemaElement.getConverted_type();
+ switch (descriptor.getMaxDefinitionLevel()) {
+ case 0:
+ if (convertedType == null) {
+ return new VarLengthColumnReaders.VarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarBinaryVector) v, schemaElement);
+ }
+ switch (convertedType) {
+ case UTF8:
+ return new VarLengthColumnReaders.VarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (VarCharVector) v, schemaElement);
+ case DECIMAL:
+ if (v instanceof Decimal28SparseVector) {
+ return new VarLengthColumnReaders.Decimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal28SparseVector) v, schemaElement);
+ } else if (v instanceof Decimal38SparseVector) {
+ return new VarLengthColumnReaders.Decimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (Decimal38SparseVector) v, schemaElement);
+ }
+ default:
+ }
+ default:
+ if (convertedType == null) {
+ return new VarLengthColumnReaders.NullableVarBinaryColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarBinaryVector) v, schemaElement);
+ }
+ switch (convertedType) {
+ case UTF8:
+ return new VarLengthColumnReaders.NullableVarCharColumn(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableVarCharVector) v, schemaElement);
+ case DECIMAL:
+ if (v instanceof NullableDecimal28SparseVector) {
+ return new VarLengthColumnReaders.NullableDecimal28Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal28SparseVector) v, schemaElement);
+ } else if (v instanceof NullableDecimal38SparseVector) {
+ return new VarLengthColumnReaders.NullableDecimal38Column(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, (NullableDecimal38SparseVector) v, schemaElement);
+ }
+ default:
+ }
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
+ ColumnDescriptor columnDescriptor,
+ ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength,
+ ValueVector valueVec,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ return new NullableFixedByteAlignedReaders.NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, valueVec, schemaElement);
+ } else {
+ if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) {
+ return new NullableFixedByteAlignedReaders.NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, (NullableBigIntVector)valueVec, schemaElement);
+ }
+ else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) {
+ return new NullableFixedByteAlignedReaders.NullableDictionaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, (NullableIntVector)valueVec, schemaElement);
+ }
+ else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+ return new NullableFixedByteAlignedReaders.NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
+ }
+ else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
+ return new NullableFixedByteAlignedReaders.NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
+ fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
+ }
+ else{
+ throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 26e1f0920..4513aaafb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.parquet;
+package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.DateHolder;
import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
import org.apache.drill.exec.store.ParquetOutputRecordWriter;
@@ -29,7 +28,6 @@ import org.apache.drill.exec.vector.Decimal38SparseVector;
import org.apache.drill.exec.vector.ValueVector;
import org.joda.time.DateTimeUtils;
import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -47,17 +45,21 @@ class FixedByteAlignedReader extends ColumnReader {
// this method is called by its superclass during a read loop
@Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
+ protected void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
- - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
- readStartInBytes = pageReadStatus.readPosInBytes;
+ readStartInBytes = pageReader.readPosInBytes;
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / 8.0);
- bytes = pageReadStatus.pageDataByteArray;
+ bytes = pageReader.pageDataByteArray;
// vectorData is assigned by the superclass read loop method
+ writeData();
+ }
+
+ protected void writeData() {
vectorData.writeBytes(bytes,
(int) readStartInBytes, (int) readLength);
}
@@ -71,17 +73,7 @@ class FixedByteAlignedReader extends ColumnReader {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}
- @Override
- protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
- recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
- - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
-
- readStartInBytes = pageReadStatus.readPosInBytes;
- readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
- readLength = (int) Math.ceil(readLengthInBits / 8.0);
-
- bytes = pageReadStatus.pageDataByteArray;
-
+ public void writeData() {
dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
for (int i = 0; i < recordsReadInThisIteration; i++) {
addNext((int)readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
@@ -109,7 +101,7 @@ class FixedByteAlignedReader extends ColumnReader {
@Override
void addNext(int start, int index) {
dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
- NullableFixedByteAlignedReader.NullableDateReader.readIntLittleEndian(bytes, start)
+ NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytes, start)
- ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
new file mode 100644
index 000000000..bbff57420
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -0,0 +1,213 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+
+public class FixedWidthRepeatedReader extends VarLengthColumn {
+
+ RepeatedFixedWidthVector castedRepeatedVector;
+ ColumnReader dataReader;
+ int dataTypeLengthInBytes;
+ // we can do a vector copy of the data once we figure out how much we need to copy
+ // this tracks the number of values to transfer (the dataReader will translate this to a number
+ // of bytes to transfer and re-use the code from the non-repeated types)
+ int valuesToRead;
+ int repeatedGroupsReadInCurrentPass;
+ int repeatedValuesInCurrentList;
+ // empty lists are notated by definition levels, to stop reading at the correct time, we must keep
+ // track of the number of empty lists as well as the length of all of the defined lists together
+ int definitionLevelsRead;
+ // parquet currently does not restrict lists reaching across pages for repeated values, this necessitates
+ // tracking when this happens to stop some of the state updates until we know the full length of the repeated
+ // value for the current record
+ boolean notFishedReadingList;
+ byte[] leftOverBytes;
+
+ FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
+ castedRepeatedVector = (RepeatedFixedWidthVector) valueVector;
+ this.dataTypeLengthInBytes = dataTypeLengthInBytes;
+ this.dataReader = dataReader;
+ this.dataReader.pageReader = this.pageReader;
+ // this is not in the reset method because it needs to be initialized only for the very first page read
+ // in all other cases if a read ends at a page boundary we will need to keep track of this flag and not
+ // clear it at the start of the next read loop
+ notFishedReadingList = false;
+ }
+
+ public void reset() {
+ bytesReadInCurrentPass = 0;
+ valuesReadInCurrentPass = 0;
+ pageReader.valuesReadyToRead = 0;
+ dataReader.vectorData = castedRepeatedVector.getMutator().getDataVector().getData();
+ dataReader.valuesReadInCurrentPass = 0;
+ repeatedGroupsReadInCurrentPass = 0;
+ }
+
+ public int getRecordsReadInCurrentPass() {
+ return repeatedGroupsReadInCurrentPass;
+ }
+
+ @Override
+ protected void readField(long recordsToRead) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean skipReadyToReadPositionUpdate() {
+ return false;
+ }
+
+ public void updateReadyToReadPosition() {
+ valuesToRead += repeatedValuesInCurrentList;
+ pageReader.valuesReadyToRead += repeatedValuesInCurrentList;
+ repeatedGroupsReadInCurrentPass++;
+ currDictVal = null;
+ if ( ! notFishedReadingList)
+ repeatedValuesInCurrentList = -1;
+ }
+
+ public void updatePosition() {
+ pageReader.readPosInBytes += dataTypeLengthInBits;
+ bytesReadInCurrentPass += dataTypeLengthInBits;
+ valuesReadInCurrentPass++;
+ }
+
+ public void hitRowGroupEnd() {
+ pageReader.valuesReadyToRead = 0;
+ definitionLevelsRead = 0;
+ }
+
+ public void postPageRead() {
+ super.postPageRead();
+ // this is no longer correct as we figured out that lists can reach across pages
+ if ( ! notFishedReadingList)
+ repeatedValuesInCurrentList = -1;
+ definitionLevelsRead = 0;
+ }
+
+ protected int totalValuesReadAndReadyToReadInPage() {
+ // we need to prevent the page reader from getting rid of the current page in the case where we have a repeated
+ // value split across a page boundary
+ if (notFishedReadingList) {
+ return definitionLevelsRead - repeatedValuesInCurrentList;
+ }
+ return definitionLevelsRead;
+ }
+
+ protected boolean checkVectorCapacityReached() {
+ boolean doneReading = super.checkVectorCapacityReached();
+ if (doneReading)
+ return true;
+ if (valuesReadInCurrentPass + pageReader.valuesReadyToRead + repeatedValuesInCurrentList >= valueVec.getValueCapacity())
+ return true;
+ else
+ return false;
+ }
+
+ protected boolean readAndStoreValueSizeInformation() {
+ boolean readingValsAcrossPageBoundary = false;
+ int numLeftoverVals = 0;
+ if (notFishedReadingList) {
+ numLeftoverVals = repeatedValuesInCurrentList;
+ readRecords(numLeftoverVals);
+ readingValsAcrossPageBoundary = true;
+ notFishedReadingList = false;
+ pageReader.valuesReadyToRead = 0;
+ try {
+ boolean stopReading = readPage();
+ if (stopReading) {
+ // hit the end of a row group
+ return false;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Unexpected error reading parquet repeated column.", e);
+ }
+ }
+ if ( currDefLevel == -1 ) {
+ currDefLevel = pageReader.definitionLevels.readInteger();
+ definitionLevelsRead++;
+ }
+ int repLevel;
+ if ( columnDescriptor.getMaxDefinitionLevel() == currDefLevel){
+ if (repeatedValuesInCurrentList == -1 || notFishedReadingList) {
+ repeatedValuesInCurrentList = 1;
+ do {
+ repLevel = pageReader.repetitionLevels.readInteger();
+ if (repLevel > 0) {
+ repeatedValuesInCurrentList++;
+ currDefLevel = pageReader.definitionLevels.readInteger();
+ definitionLevelsRead++;
+
+ // we hit the end of this page, without confirmation that we reached the end of the current record
+ if (definitionLevelsRead == pageReader.currentPage.getValueCount()) {
+ // check that we have not hit the end of the row group (in which case we will not find the repetition level indicating
+ // the end of this record as there is no next page to check, we have read all the values in this repetition so it is okay
+ // to add it to the read )
+ if (totalValuesRead + pageReader.valuesReadyToRead + repeatedValuesInCurrentList != columnChunkMetaData.getValueCount()){
+ notFishedReadingList = true;
+ // if we hit this case, we cut off the current batch at the previous value, these extra values as well
+ // as those that spill into the next page will be added to the next batch
+ return true;
+ }
+ }
+ }
+ } while (repLevel != 0);
+ }
+ }
+ else {
+ repeatedValuesInCurrentList = 0;
+ }
+ int currentValueListLength = repeatedValuesInCurrentList;
+ if (readingValsAcrossPageBoundary) {
+ currentValueListLength += numLeftoverVals;
+ }
+ // this should not fail
+ if (!castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass,
+ currentValueListLength)) {
+ return true;
+ }
+ // This field is being referenced in the superclass determineSize method, so we need to set it here
+ // again going to make this the length in BYTES to avoid repetitive multiplication/division
+ dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes;
+ return false;
+ }
+
+ protected void readRecords(int valuesToRead) {
+ if (valuesToRead == 0) return;
+ // TODO - validate that this works in all cases, it fixes a bug when reading from multiple pages into
+ // a single vector
+ dataReader.valuesReadInCurrentPass = 0;
+ dataReader.readValues(valuesToRead);
+ valuesReadInCurrentPass += valuesToRead;
+ castedRepeatedVector.getMutator().setValueCounts(repeatedGroupsReadInCurrentPass, valuesReadInCurrentPass);
+ }
+
+ @Override
+ public int capacity() {
+ return castedRepeatedVector.getMutator().getDataVector().getData().capacity();
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
index 29ca30add..fbf1dee2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
@@ -15,20 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.parquet;
+package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
-import java.io.IOException;
-
/**
* This class is used in conjunction with its superclass to read nullable bit columns in a parquet file.
* It currently is using an inefficient value-by-value approach.
@@ -45,17 +40,17 @@ final class NullableBitReader extends ColumnReader {
}
@Override
- public void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
+ public void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
- - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
int defLevel;
for (int i = 0; i < recordsReadInThisIteration; i++){
- defLevel = pageReadStatus.definitionLevels.readInteger();
+ defLevel = pageReader.definitionLevels.readInteger();
// if the value is defined
if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
if (!((NullableBitVector)valueVec).getMutator().setSafe(i + valuesReadInCurrentPass,
- pageReadStatus.valueReader.readBoolean() ? 1 : 0 )) {
+ pageReader.valueReader.readBoolean() ? 1 : 0 )) {
throw new RuntimeException();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
index 585fd667f..2babc2066 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -15,14 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.parquet;
+package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.NullableVectorDefinitionSetter;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -35,24 +34,28 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
int rightBitShift;
// used when copying less than a byte worth of data at a time, to indicate the number of used bits in the current byte
int bitsUsed;
+ BaseValueVector castedBaseVector;
+ NullableVectorDefinitionSetter castedVectorMutator;
NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ castedBaseVector = (BaseValueVector) v;
+ castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
}
- public void readAllFixedFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
+ public void processPages(long recordsToReadInThisPass) throws IOException {
readStartInBytes = 0;
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
- vectorData = ((BaseValueVector)valueVec).getData();
+ vectorData = castedBaseVector.getData();
do {
// if no page has been read, or all of the records have been read out of a page, read the next one
- if (pageReadStatus.currentPage == null
- || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
- if (!pageReadStatus.next()) {
+ if (pageReader.currentPage == null
+ || pageReader.valuesRead == pageReader.currentPage.getValueCount()) {
+ if (!pageReader.next()) {
break;
}
}
@@ -62,11 +65,11 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
// to optimize copying data out of the buffered disk stream, runs of defined values
// are located and copied together, rather than copying individual values
- long runStart = pageReadStatus.readPosInBytes;
+ long runStart = pageReader.readPosInBytes;
int runLength;
- int currentDefinitionLevel = 0;
+ int currentDefinitionLevel;
int currentValueIndexInVector = (int) recordsReadInThisIteration;
- boolean lastValueWasNull = true;
+ boolean lastValueWasNull;
int definitionLevelsRead;
// loop to find the longest run of defined values available, can be preceded by several nulls
while (true){
@@ -80,8 +83,8 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
}
while(currentValueIndexInVector < recordsToReadInThisPass
&& currentValueIndexInVector < valueVec.getValueCapacity()
- && pageReadStatus.valuesRead + definitionLevelsRead < pageReadStatus.currentPage.getValueCount()){
- currentDefinitionLevel = pageReadStatus.definitionLevels.readInteger();
+ && pageReader.valuesRead + definitionLevelsRead < pageReader.currentPage.getValueCount()){
+ currentDefinitionLevel = pageReader.definitionLevels.readInteger();
definitionLevelsRead++;
if ( currentDefinitionLevel < columnDescriptor.getMaxDefinitionLevel()){
// a run of non-null values was found, break out of this loop to do a read in the outer loop
@@ -94,22 +97,22 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
}
else{
if (lastValueWasNull){
- runStart = pageReadStatus.readPosInBytes;
+ runStart = pageReader.readPosInBytes;
runLength = 0;
lastValueWasNull = false;
}
runLength++;
- ((NullableVectorDefinitionSetter)valueVec.getMutator()).setIndexDefined(currentValueIndexInVector);
+ castedVectorMutator.setIndexDefined(currentValueIndexInVector);
}
currentValueIndexInVector++;
}
- pageReadStatus.readPosInBytes = runStart;
+ pageReader.readPosInBytes = runStart;
recordsReadInThisIteration = runLength;
- readField( runLength, firstColumnStatus);
+ readField( runLength);
int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex();
if ( dataTypeLengthInBits > 8 || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){
- ((BaseValueVector) valueVec).getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
+ castedBaseVector.getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
}
else if (dataTypeLengthInBits < 8){
rightBitShift += dataTypeLengthInBits * nullsFound;
@@ -117,20 +120,20 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
recordsReadInThisIteration += nullsFound;
valuesReadInCurrentPass += recordsReadInThisIteration;
totalValuesRead += recordsReadInThisIteration;
- pageReadStatus.valuesRead += recordsReadInThisIteration;
- if ( (readStartInBytes + readLength >= pageReadStatus.byteLength && bitsUsed == 0)
- || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
- if (!pageReadStatus.next()) {
+ pageReader.valuesRead += recordsReadInThisIteration;
+ if ( (readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0)
+ || pageReader.valuesRead == pageReader.currentPage.getValueCount()) {
+ if (!pageReader.next()) {
break;
}
} else {
- pageReadStatus.readPosInBytes = readStartInBytes + readLength;
+ pageReader.readPosInBytes = readStartInBytes + readLength;
}
}
- } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReadStatus.currentPage != null);
+ } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
valueVec.getMutator().setValueCount(
valuesReadInCurrentPass);
}
- protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus);
+ protected abstract void readField(long recordsToRead);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
new file mode 100644
index 000000000..c1575dece
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableFloat4Vector;
+import org.apache.drill.exec.vector.NullableFloat8Vector;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+
+import org.joda.time.DateTimeUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.math.BigDecimal;
+
+public class NullableFixedByteAlignedReaders {
+
+ static class NullableFixedByteAlignedReader extends NullableColumnReader {
+ protected byte[] bytes;
+
+ NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ this.recordsReadInThisIteration = recordsToReadInThisPass;
+
+ // set up metadata
+ this.readStartInBytes = pageReader.readPosInBytes;
+ this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+ this.bytes = pageReader.pageDataByteArray;
+
+ // fill in data.
+ vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
+ }
+ }
+
+ static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> {
+
+ private byte[] bytes;
+
+ NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ if (usingDictionary) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
+ }
+ }
+ }
+ }
+
+ static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> {
+
+ private byte[] bytes;
+
+ NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readLong());
+ }
+ }
+ }
+
+ static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> {
+
+ private byte[] bytes;
+
+ NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readFloat());
+ }
+ }
+ }
+
+ static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> {
+
+ private byte[] bytes;
+
+ NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ // this method is called by its superclass during a read loop
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+ for (int i = 0; i < recordsToReadInThisPass; i++){
+ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readDouble());
+ }
+ }
+ }
+
+ static abstract class NullableConvertedReader extends NullableFixedByteAlignedReader {
+
+ protected int dataTypeLengthInBytes;
+
+ NullableConvertedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ @Override
+ protected void readField(long recordsToReadInThisPass) {
+
+ this.recordsReadInThisIteration = recordsToReadInThisPass;
+
+ // set up metadata
+ this.readStartInBytes = pageReader.readPosInBytes;
+ this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+ this.bytes = pageReader.pageDataByteArray;
+
+ dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
+ for (int i = 0; i < recordsReadInThisIteration; i++) {
+ addNext((int) readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
+ }
+ }
+
+ abstract void addNext(int start, int index);
+ }
+
+ public static class NullableDateReader extends NullableConvertedReader {
+
+ NullableDateVector dateVector;
+
+ NullableDateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ dateVector = (NullableDateVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+ }
+
+ // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
+ public static int readIntLittleEndian(byte[] in, int offset) {
+ int ch4 = in[offset] & 0xff;
+ int ch3 = in[offset + 1] & 0xff;
+ int ch2 = in[offset + 2] & 0xff;
+ int ch1 = in[offset + 3] & 0xff;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ }
+
+ public static class NullableDecimal28Reader extends NullableConvertedReader {
+
+ NullableDecimal28SparseVector decimal28Vector;
+
+ NullableDecimal28Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ decimal28Vector = (NullableDecimal28SparseVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ int width = NullableDecimal28SparseHolder.WIDTH;
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+ DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(),
+ schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits);
+ }
+ }
+
+ public static class NullableDecimal38Reader extends NullableConvertedReader {
+
+ NullableDecimal38SparseVector decimal38Vector;
+
+ NullableDecimal38Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+ boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ decimal38Vector = (NullableDecimal38SparseVector) v;
+ }
+
+ @Override
+ void addNext(int start, int index) {
+ int width = NullableDecimal38SparseHolder.WIDTH;
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale());
+ DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(),
+ schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
new file mode 100644
index 000000000..2be9a373d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -0,0 +1,130 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+
+public abstract class NullableVarLengthValuesColumn<V extends ValueVector> extends VarLengthValuesColumn<V> {
+
+ int nullsRead;
+ boolean currentValNull = false;
+
+ NullableVarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+
+ public abstract boolean setSafe(int index, byte[] value, int start, int length);
+
+ public abstract int capacity();
+
+ public void reset() {
+ bytesReadInCurrentPass = 0;
+ valuesReadInCurrentPass = 0;
+ nullsRead = 0;
+ pageReader.valuesReadyToRead = 0;
+ }
+
+ protected void postPageRead() {
+ currLengthDeterminingDictVal = null;
+ pageReader.valuesReadyToRead = 0;
+ }
+
+ protected boolean readAndStoreValueSizeInformation() throws IOException {
+ // we need to read all of the lengths to determine if this value will fit in the current vector,
+ // as we can only read each definition level once, we have to store the last one as we will need it
+ // at the start of the next read if we decide after reading all of the varlength values in this record
+ // that it will not fit in this batch
+ currentValNull = false;
+ if ( currDefLevel == -1 ) {
+ currDefLevel = pageReader.definitionLevels.readInteger();
+ }
+ if ( columnDescriptor.getMaxDefinitionLevel() > currDefLevel){
+ nullsRead++;
+ // set length of zero, each index in the vector defaults to null so no need to set the nullability
+ variableWidthVector.getMutator().setValueLengthSafe(
+ valuesReadInCurrentPass + pageReader.valuesReadyToRead, 0);
+ currentValNull = true;
+ return false;// field is null, no length to add to data vector
+ }
+
+ if (usingDictionary) {
+ if (currLengthDeterminingDictVal == null) {
+ currLengthDeterminingDictVal = pageReader.dictionaryLengthDeterminingReader.readBytes();
+ }
+ currDictValToWrite = currLengthDeterminingDictVal;
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ dataTypeLengthInBits = currLengthDeterminingDictVal.length();
+ }
+ else {
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray,
+ (int) pageReader.readyToReadPosInBytes);
+ }
+ // I think this also needs to happen if it is null for the random access
+ if (! variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead, dataTypeLengthInBits)) {
+ return true;
+ }
+ boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageDataByteArray,
+ (int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);
+ assert success;
+ return false;
+ }
+
+ public void updateReadyToReadPosition() {
+ if (! currentValNull){
+ pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4;
+ }
+ pageReader.valuesReadyToRead++;
+ currLengthDeterminingDictVal = null;
+ }
+
+ public void updatePosition() {
+ if (! currentValNull){
+ pageReader.readPosInBytes += dataTypeLengthInBits + 4;
+ bytesReadInCurrentPass += dataTypeLengthInBits;
+ }
+ currentValNull = false;
+ valuesReadInCurrentPass++;
+ }
+
+ @Override
+ protected void readField(long recordsToRead) {
+ if (usingDictionary) {
+ currDictValToWrite = pageReader.dictionaryValueReader.readBytes();
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ }
+ dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass);
+ currentValNull = variableWidthVector.getAccessor().getObject(valuesReadInCurrentPass) == null;
+ // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
+ if (! currentValNull){
+ boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageDataByteArray,
+ (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits);
+ assert success;
+ }
+ updatePosition();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 3ad1d6c79..1d300bb62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -15,23 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.drill.exec.store.parquet;
+package org.apache.drill.exec.store.parquet.columnreaders;
import java.io.IOException;
-import java.util.ArrayList;
-import com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.proto.SchemaDefProtos;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.drill.exec.store.parquet.ColumnDataReader;
+import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.bytes.BytesInput;
import parquet.column.Dictionary;
-import parquet.column.Encoding;
import parquet.column.ValuesType;
import parquet.column.page.DictionaryPage;
import parquet.column.page.Page;
@@ -44,8 +40,8 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.schema.PrimitiveType;
// class to keep track of the read position of variable length columns
-final class PageReadStatus {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReadStatus.class);
+final class PageReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class);
private final ColumnReader parentColumnReader;
private final ColumnDataReader dataReader;
@@ -53,6 +49,10 @@ final class PageReadStatus {
Page currentPage;
// buffer to store bytes of current page
byte[] pageDataByteArray;
+
+ // for variable length data we need to keep track of our current position in the page data
+ // as the values and lengths are intermixed, making random access to the length data impossible
+ long readyToReadPosInBytes;
// read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
long readPosInBytes;
// bit shift needed for the next page if the last one did not line up with a byte boundary
@@ -60,16 +60,28 @@ final class PageReadStatus {
// storage space for extra bits at the end of a page if they did not line up with a byte boundary
// prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch
//byte extraBits;
+
+ // used for columns where the number of values that will fit in a vector is unknown
+ // currently used for variable length
+ // TODO - reuse this when compressed vectors are added, where fixed length values will take up a
+ // variable amount of space
+ // For example: if nulls are stored without extra space left in the data vector
+ // (this is currently simplifying random access to the data during processing, but increases the size of the vectors)
+ int valuesReadyToRead;
+
// the number of values read out of the last page
int valuesRead;
int byteLength;
//int rowGroupIndex;
ValuesReader definitionLevels;
+ ValuesReader repetitionLevels;
ValuesReader valueReader;
+ ValuesReader dictionaryLengthDeterminingReader;
+ ValuesReader dictionaryValueReader;
Dictionary dictionary;
PageHeader pageHeader = null;
- PageReadStatus(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
+ PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
this.parentColumnReader = parentStatus;
long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
@@ -111,6 +123,7 @@ final class PageReadStatus {
currentPage = null;
valuesRead = 0;
+ valuesReadyToRead = 0;
// TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
// and submit a bug report
@@ -124,7 +137,6 @@ final class PageReadStatus {
do {
pageHeader = dataReader.readPageHeader();
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
- System.out.println(pageHeader.dictionary_page_header.getEncoding());
BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
.decompress( //
dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
@@ -163,25 +175,47 @@ final class PageReadStatus {
pageDataByteArray = currentPage.getBytes().toByteArray();
readPosInBytes = 0;
+ if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
+ repetitionLevels = currentPage.getRlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+ repetitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
+ // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
+ // read the first zero here to simplify the reading processes, and start reading the first value the same as all
+ // of the rest. Effectively we are 'reading' the non-existent value in front of the first allowing direct access to
+ // the first list of repetition levels
+ readPosInBytes = repetitionLevels.getNextOffset();
+ repetitionLevels.readInteger();
+ }
if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
parentColumnReader.currDefLevel = -1;
if (!currentPage.getValueEncoding().usesDictionary()) {
+ parentColumnReader.usingDictionary = false;
definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
- definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
+ definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
readPosInBytes = definitionLevels.getNextOffset();
if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
}
} else {
+ parentColumnReader.usingDictionary = true;
definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
- definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
+ definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
readPosInBytes = definitionLevels.getNextOffset();
- valueReader = new DictionaryValuesReader(dictionary);
- valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
+ // actually copying the values out into the vectors
+ dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
+ dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
+ dictionaryValueReader = new DictionaryValuesReader(dictionary);
+ dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
this.parentColumnReader.usingDictionary = true;
}
}
+ // readPosInBytes is used for actually reading the values after we determine how many will fit in the vector
+ // readyToReadPosInBytes serves a similar purpose for the vector types where we must count up the values that will
+ // fit one record at a time, such as for variable length data. Both operations must start in the same location after the
+ // definition and repetition level data which is stored alongside the page data itself
+ readyToReadPosInBytes = readPosInBytes;
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
index c0720a9fb..ad849b4d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetFixedWidthDictionaryReader.java
@@ -15,13 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
-package org.apache.drill.exec.store.parquet;
+package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
-import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.schema.PrimitiveType;
@@ -35,18 +34,18 @@ public class ParquetFixedWidthDictionaryReader extends ColumnReader{
}
@Override
- public void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
+ public void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
- - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
+ recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ - pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
int defLevel;
for (int i = 0; i < recordsReadInThisIteration; i++){
- defLevel = pageReadStatus.definitionLevels.readInteger();
+ defLevel = pageReader.definitionLevels.readInteger();
// if the value is defined
if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64)
((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass,
- pageReadStatus.valueReader.readLong() );
+ pageReader.valueReader.readLong() );
}
// otherwise the value is skipped, because the bit vector indicating nullability is zero filled
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
new file mode 100644
index 000000000..2228787c8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import parquet.column.ColumnDescriptor;
+import parquet.format.ConvertedType;
+import parquet.format.FileMetaData;
+import parquet.format.SchemaElement;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileWriter;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.PrimitiveType;
+
+public class ParquetRecordReader implements RecordReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
+
+ // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
+ private static final int NUMBER_OF_VECTORS = 1;
+ private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
+ private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
+ private static final char DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH = 32*1024;
+
+ // TODO - should probably find a smarter way to set this, currently 1 megabyte
+ private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
+ public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
+ private static final String SEPERATOR = System.getProperty("file.separator");
+
+ // used for clearing the last n bits of a byte
+ public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
+ // used for clearing the first n bits of a byte
+ public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
+
+ private int bitWidthAllFixedFields;
+ private boolean allFieldsFixedLength;
+ private int recordsPerBatch;
+ private long totalRecords;
+ private long rowGroupOffset;
+
+ private List<ColumnReader> columnStatuses;
+ private FileSystem fileSystem;
+ private long batchSize;
+ Path hadoopPath;
+ private VarLenBinaryReader varLengthReader;
+ private ParquetMetadata footer;
+ private List<SchemaPath> columns;
+ private final CodecFactoryExposer codecFactoryExposer;
+ int rowGroupIndex;
+
+ public ParquetRecordReader(FragmentContext fragmentContext, //
+ String path, //
+ int rowGroupIndex, //
+ FileSystem fs, //
+ CodecFactoryExposer codecFactoryExposer, //
+ ParquetMetadata footer, //
+ List<SchemaPath> columns) throws ExecutionSetupException {
+ this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
+ columns);
+ }
+
+ public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
+ String path, int rowGroupIndex, FileSystem fs,
+ CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
+ List<SchemaPath> columns) throws ExecutionSetupException {
+ hadoopPath = new Path(path);
+ fileSystem = fs;
+ this.codecFactoryExposer = codecFactoryExposer;
+ this.rowGroupIndex = rowGroupIndex;
+ this.batchSize = batchSize;
+ this.footer = footer;
+ this.columns = columns;
+ }
+
+ public CodecFactoryExposer getCodecFactoryExposer() {
+ return codecFactoryExposer;
+ }
+
+ public Path getHadoopPath() {
+ return hadoopPath;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ public int getRowGroupIndex() {
+ return rowGroupIndex;
+ }
+
+ public int getBitWidthAllFixedFields() {
+ return bitWidthAllFixedFields;
+ }
+
+ public long getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * @param type a fixed length type from the parquet library enum
+ * @return the length in pageDataByteArray of the type
+ */
+ public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
+ switch (type) {
+ case INT64: return 64;
+ case INT32: return 32;
+ case BOOLEAN: return 1;
+ case FLOAT: return 32;
+ case DOUBLE: return 64;
+ case INT96: return 96;
+ // binary and fixed length byte array
+ default:
+ throw new IllegalStateException("Length cannot be determined for type " + type);
+ }
+ }
+
+ private boolean fieldSelected(MaterializedField field){
+ // TODO - not sure if this is how we want to represent this
+ // for now it makes the existing tests pass, simply selecting
+ // all available data if no columns are provided
+ if (this.columns != null){
+ for (SchemaPath expr : this.columns){
+ if ( field.matches(expr)){
+ return true;
+ }
+ }
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+
+ columnStatuses = new ArrayList<>();
+ totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
+ List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
+ allFieldsFixedLength = true;
+ ColumnDescriptor column;
+ ColumnChunkMetaData columnChunkMetaData;
+ int columnsToScan = 0;
+
+ MaterializedField field;
+ ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
+ FileMetaData fileMetaData;
+
+ // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
+ // store a map from column name to converted types if they are non-null
+ HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+ fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
+ for (SchemaElement se : fileMetaData.getSchema()) {
+ schemaElements.put(se.getName(), se);
+ }
+
+ // loop to add up the length of the fixed width columns and build the schema
+ for (int i = 0; i < columns.size(); ++i) {
+ column = columns.get(i);
+ logger.debug("name: " + fileMetaData.getSchema().get(i).name);
+ SchemaElement se = schemaElements.get(column.getPath()[0]);
+ MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(), getDataMode(column), se);
+ field = MaterializedField.create(toFieldName(column.getPath()),mt);
+ if ( ! fieldSelected(field)){
+ continue;
+ }
+ columnsToScan++;
+ // sum the lengths of all of the fixed length fields
+ if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+ if (column.getMaxRepetitionLevel() > 0) {
+ allFieldsFixedLength = false;
+ }
+ // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
+ // TODO - implement this when the feature is added upstream
+ if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
+ bitWidthAllFixedFields += se.getType_length() * 8;
+ } else {
+ bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
+ }
+ } else {
+ allFieldsFixedLength = false;
+ }
+ }
+ rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+
+ // none of the columns in the parquet file matched the request columns from the query
+ if (columnsToScan == 0){
+ throw new ExecutionSetupException("Error reading from parquet file. No columns requested were found in the file.");
+ }
+ if (allFieldsFixedLength) {
+ recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
+ footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 65535);
+ }
+ else {
+ recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
+ }
+
+ try {
+ ValueVector v;
+ ConvertedType convertedType;
+ SchemaElement schemaElement;
+ ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
+ // initialize all of the column read status objects
+ boolean fieldFixedLength = false;
+ for (int i = 0; i < columns.size(); ++i) {
+ column = columns.get(i);
+ columnChunkMetaData = footer.getBlocks().get(rowGroupIndex).getColumns().get(i);
+ schemaElement = schemaElements.get(column.getPath()[0]);
+ convertedType = schemaElement.getConverted_type();
+ MajorType type = ParquetToDrillTypeConverter.toMajorType(column.getType(), schemaElement.getType_length(), getDataMode(column), schemaElement);
+ field = MaterializedField.create(toFieldName(column.getPath()), type);
+ // the field was not requested to be read
+ if ( ! fieldSelected(field)) continue;
+
+ fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
+ v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+ if (column.getMaxRepetitionLevel() > 0) {
+ ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch,
+ ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement);
+ varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
+ getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement));
+ }
+ else {
+ columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
+ schemaElement));
+ }
+ } else {
+ // create a reader and add it to the appropriate list
+ varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement));
+ }
+ }
+ varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
+ } catch (SchemaChangeException e) {
+ throw new ExecutionSetupException(e);
+ } catch (Exception e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private SchemaPath toFieldName(String[] paths) {
+ return SchemaPath.getCompoundPath(paths);
+ }
+
+ private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+ if (column.getMaxRepetitionLevel() > 0 ) {
+ return DataMode.REPEATED;
+ } else if (column.getMaxDefinitionLevel() == 0) {
+ return TypeProtos.DataMode.REQUIRED;
+ } else {
+ return TypeProtos.DataMode.OPTIONAL;
+ }
+ }
+
+ private void resetBatch() {
+ for (ColumnReader column : columnStatuses) {
+ column.valuesReadInCurrentPass = 0;
+ }
+ for (VarLengthColumn r : varLengthReader.columns){
+ r.valuesReadInCurrentPass = 0;
+ }
+ }
+
+ public void readAllFixedFields(long recordsToRead) throws IOException {
+
+ for (ColumnReader crs : columnStatuses){
+ crs.processPages(recordsToRead);
+ }
+ }
+
+ @Override
+ public int next() {
+ resetBatch();
+ long recordsToRead = 0;
+ try {
+ ColumnReader firstColumnStatus;
+ if (columnStatuses.size() > 0){
+ firstColumnStatus = columnStatuses.iterator().next();
+ }
+ else{
+ if (varLengthReader.columns.size() > 0){
+ firstColumnStatus = varLengthReader.columns.iterator().next();
+ }
+ else{
+ firstColumnStatus = null;
+ }
+ }
+ // TODO - replace this with new functionality of returning batches even if no columns are selected
+ // the query 'select 5 from parquetfile' should return the number of records that the parquet file contains
+ // we don't need to read any of the data, we just need to fill batches with a record count and a useless vector with
+ // the right number of values
+ if (firstColumnStatus == null) throw new DrillRuntimeException("Unexpected error reading parquet file, not reading any columns");
+
+ if (allFieldsFixedLength) {
+ recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
+ } else {
+ recordsToRead = DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH;
+
+ }
+
+ if (allFieldsFixedLength) {
+ readAllFixedFields(recordsToRead);
+ } else { // variable length columns
+ long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
+ readAllFixedFields(fixedRecordsToRead);
+ }
+
+ return firstColumnStatus.getRecordsReadInCurrentPass();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ for (ColumnReader column : columnStatuses) {
+ column.clear();
+ }
+ columnStatuses.clear();
+
+ for (VarLengthColumn r : varLengthReader.columns){
+ r.clear();
+ }
+ varLengthReader.columns.clear();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
new file mode 100644
index 000000000..7eeeeaa26
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java
@@ -0,0 +1,236 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.types.TypeProtos;
+import static org.apache.drill.common.types.TypeProtos.MinorType;
+import static org.apache.drill.common.types.TypeProtos.DataMode;
+import static parquet.Preconditions.checkArgument;
+
+import org.apache.drill.common.types.Types;
+import parquet.format.ConvertedType;
+import parquet.format.SchemaElement;
+import parquet.schema.PrimitiveType;
+
+public class ParquetToDrillTypeConverter {
+
+ private static TypeProtos.MinorType getDecimalType(SchemaElement schemaElement) {
+ return schemaElement.getPrecision() <= 28 ? TypeProtos.MinorType.DECIMAL28SPARSE : MinorType.DECIMAL38SPARSE;
+ }
+
+ public static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
+ TypeProtos.DataMode mode, SchemaElement schemaElement) {
+ ConvertedType convertedType = schemaElement.getConverted_type();
+ switch (mode) {
+
+ case OPTIONAL:
+ switch (primitiveTypeName) {
+ case BINARY:
+ if (convertedType == null) {
+ return Types.optional(TypeProtos.MinorType.VARBINARY);
+ }
+ switch (convertedType) {
+ case UTF8:
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ case DECIMAL:
+ return Types.withScaleAndPrecision(getDecimalType(schemaElement), TypeProtos.DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case INT64:
+ if (convertedType == null) {
+ return Types.optional(TypeProtos.MinorType.BIGINT);
+ }
+ switch(convertedType) {
+ case DECIMAL:
+ return Types.withScaleAndPrecision(TypeProtos.MinorType.DECIMAL18, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
+ case FINETIME:
+ throw new UnsupportedOperationException();
+ case TIMESTAMP:
+ return Types.optional(MinorType.TIMESTAMP);
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case INT32:
+ if (convertedType == null) {
+ return Types.optional(TypeProtos.MinorType.INT);
+ }
+ switch(convertedType) {
+ case DECIMAL:
+ return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
+ case DATE:
+ return Types.optional(MinorType.DATE);
+ case TIME:
+ return Types.optional(MinorType.TIME);
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case BOOLEAN:
+ return Types.optional(TypeProtos.MinorType.BIT);
+ case FLOAT:
+ return Types.optional(TypeProtos.MinorType.FLOAT4);
+ case DOUBLE:
+ return Types.optional(TypeProtos.MinorType.FLOAT8);
+ // TODO - Both of these are not supported by the parquet library yet (7/3/13),
+ // but they are declared here for when they are implemented
+ case INT96:
+ return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
+ .setMode(mode).build();
+ case FIXED_LEN_BYTE_ARRAY:
+ if (convertedType == null) {
+ checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
+ return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
+ .setWidth(length).setMode(mode).build();
+ } else if (convertedType == ConvertedType.DECIMAL) {
+ return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.OPTIONAL, schemaElement.getScale(), schemaElement.getPrecision());
+ }
+ default:
+ throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
+ }
+ case REQUIRED:
+ switch (primitiveTypeName) {
+ case BINARY:
+ if (convertedType == null) {
+ return Types.required(TypeProtos.MinorType.VARBINARY);
+ }
+ switch (convertedType) {
+ case UTF8:
+ return Types.required(MinorType.VARCHAR);
+ case DECIMAL:
+ return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case INT64:
+ if (convertedType == null) {
+ return Types.required(MinorType.BIGINT);
+ }
+ switch(convertedType) {
+ case DECIMAL:
+ return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
+ case FINETIME:
+ throw new UnsupportedOperationException();
+ case TIMESTAMP:
+ return Types.required(MinorType.TIMESTAMP);
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case INT32:
+ if (convertedType == null) {
+ return Types.required(MinorType.INT);
+ }
+ switch(convertedType) {
+ case DECIMAL:
+ return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
+ case DATE:
+ return Types.required(MinorType.DATE);
+ case TIME:
+ return Types.required(MinorType.TIME);
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case BOOLEAN:
+ return Types.required(TypeProtos.MinorType.BIT);
+ case FLOAT:
+ return Types.required(TypeProtos.MinorType.FLOAT4);
+ case DOUBLE:
+ return Types.required(TypeProtos.MinorType.FLOAT8);
+ // Both of these are not supported by the parquet library yet (7/3/13),
+ // but they are declared here for when they are implemented
+ case INT96:
+ return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
+ .setMode(mode).build();
+ case FIXED_LEN_BYTE_ARRAY:
+ if (convertedType == null) {
+ checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
+ return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
+ .setWidth(length).setMode(mode).build();
+ } else if (convertedType == ConvertedType.DECIMAL) {
+ return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REQUIRED, schemaElement.getScale(), schemaElement.getPrecision());
+ }
+ default:
+ throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
+ }
+ case REPEATED:
+ switch (primitiveTypeName) {
+ case BINARY:
+ if (convertedType == null) {
+ return Types.repeated(TypeProtos.MinorType.VARBINARY);
+ }
+ switch (schemaElement.getConverted_type()) {
+ case UTF8:
+ return Types.repeated(MinorType.VARCHAR);
+ case DECIMAL:
+ return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case INT64:
+ if (convertedType == null) {
+ return Types.repeated(MinorType.BIGINT);
+ }
+ switch(convertedType) {
+ case DECIMAL:
+ return Types.withScaleAndPrecision(MinorType.DECIMAL18, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
+ case FINETIME:
+ throw new UnsupportedOperationException();
+ case TIMESTAMP:
+ return Types.repeated(MinorType.TIMESTAMP);
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case INT32:
+ if (convertedType == null) {
+ return Types.repeated(MinorType.INT);
+ }
+ switch(convertedType) {
+ case DECIMAL:
+ return Types.withScaleAndPrecision(MinorType.DECIMAL9, DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
+ case DATE:
+ return Types.repeated(MinorType.DATE);
+ case TIME:
+ return Types.repeated(MinorType.TIME);
+ default:
+ throw new UnsupportedOperationException(String.format("unsupported type: %s %s", primitiveTypeName, convertedType));
+ }
+ case BOOLEAN:
+ return Types.repeated(TypeProtos.MinorType.BIT);
+ case FLOAT:
+ return Types.repeated(TypeProtos.MinorType.FLOAT4);
+ case DOUBLE:
+ return Types.repeated(TypeProtos.MinorType.FLOAT8);
+ // Both of these are not supported by the parquet library yet (7/3/13),
+ // but they are declared here for when they are implemented
+ case INT96:
+ return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
+ .setMode(mode).build();
+ case FIXED_LEN_BYTE_ARRAY:
+ if (convertedType == null) {
+ checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
+ return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
+ .setWidth(length).setMode(mode).build();
+ } else if (convertedType == ConvertedType.DECIMAL) {
+ return Types.withScaleAndPrecision(getDecimalType(schemaElement), DataMode.REPEATED, schemaElement.getScale(), schemaElement.getPrecision());
+ }
+ default:
+ throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
+ }
+ }
+ throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName + " Mode: " + mode);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
new file mode 100644
index 000000000..409f17d68
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.List;
+
+public class VarLenBinaryReader {
+
+ ParquetRecordReader parentReader;
+ final List<VarLengthColumn> columns;
+
+ public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns){
+ this.parentReader = parentReader;
+ this.columns = columns;
+ }
+
+ /**
+ * Reads as many variable length values as possible.
+ *
+ * @param recordsToReadInThisPass - the number of records recommended for reading form the reader
+ * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from
+ * @return - the number of fixed length fields that will fit in the batch
+ * @throws IOException
+ */
+ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
+
+ long recordsReadInCurrentPass = 0;
+ int lengthVarFieldsInCurrentRecord;
+ long totalVariableLengthData = 0;
+ boolean exitLengthDeterminingLoop = false;
+ // write the first 0 offset
+ for (VarLengthColumn columnReader : columns) {
+ columnReader.reset();
+ }
+
+ do {
+ lengthVarFieldsInCurrentRecord = 0;
+ for (VarLengthColumn columnReader : columns) {
+ if ( ! exitLengthDeterminingLoop )
+ exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord);
+ else
+ break;
+ }
+ // check that the next record will fit in the batch
+ if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData
+ + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()){
+ break;
+ }
+ for (VarLengthColumn columnReader : columns ) {
+ columnReader.updateReadyToReadPosition();
+ columnReader.currDefLevel = -1;
+ }
+ recordsReadInCurrentPass++;
+ totalVariableLengthData += lengthVarFieldsInCurrentRecord;
+ } while (recordsReadInCurrentPass < recordsToReadInThisPass);
+
+ for (VarLengthColumn columnReader : columns) {
+ columnReader.readRecords(columnReader.pageReader.valuesReadyToRead);
+ }
+ for (VarLengthColumn columnReader : columns) {
+ columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
+ }
+ return recordsReadInCurrentPass;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
new file mode 100644
index 000000000..14ee6318a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumn.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import parquet.column.ColumnDescriptor;
+import parquet.format.Encoding;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.io.api.Binary;
+
+import java.io.IOException;
+
+public abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumn.class);
+
+ Binary currDictVal;
+
+ VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ usingDictionary = true;
+ }
+ else {
+ usingDictionary = false;
+ }
+ }
+
+ protected boolean processPageData(int recordsToReadInThisPass) throws IOException {
+ return readAndStoreValueSizeInformation();
+ }
+
+ public void reset() {
+ super.reset();
+ pageReader.valuesReadyToRead = 0;
+ }
+
+ protected abstract boolean readAndStoreValueSizeInformation() throws IOException;
+
+ public abstract boolean skipReadyToReadPositionUpdate();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index 56f687c59..979e8c3ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
-package org.apache.drill.exec.store.parquet;
+package org.apache.drill.exec.store.parquet.columnreaders;
import java.math.BigDecimal;
@@ -29,68 +29,16 @@ import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarCharVector;
-
import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.io.api.Binary;
public class VarLengthColumnReaders {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLengthColumnReaders.class);
- public static abstract class VarLengthColumn<V extends ValueVector> extends ColumnReader {
-
- Binary currDictVal;
-
- VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
- usingDictionary = true;
- }
- else {
- usingDictionary = false;
- }
- }
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
-
- public abstract boolean setSafe(int index, byte[] bytes, int start, int length);
-
- public abstract int capacity();
-
- }
-
- public static abstract class NullableVarLengthColumn<V extends ValueVector> extends ColumnReader {
-
- int nullsRead;
- boolean currentValNull = false;
- Binary currDictVal;
-
- NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
- ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
- SchemaElement schemaElement) throws ExecutionSetupException {
- super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- }
-
- public abstract boolean setSafe(int index, byte[] value, int start, int length);
-
- public abstract int capacity();
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
- }
-
- public static class Decimal28Column extends VarLengthColumn<Decimal28SparseVector> {
+ public static class Decimal28Column extends VarLengthValuesColumn<Decimal28SparseVector> {
protected Decimal28SparseVector decimal28Vector;
@@ -119,7 +67,7 @@ public class VarLengthColumnReaders {
}
}
- public static class NullableDecimal28Column extends NullableVarLengthColumn<NullableDecimal28SparseVector> {
+ public static class NullableDecimal28Column extends NullableVarLengthValuesColumn<NullableDecimal28SparseVector> {
protected NullableDecimal28SparseVector nullableDecimal28Vector;
@@ -149,7 +97,7 @@ public class VarLengthColumnReaders {
}
}
- public static class Decimal38Column extends VarLengthColumn<Decimal38SparseVector> {
+ public static class Decimal38Column extends VarLengthValuesColumn<Decimal38SparseVector> {
protected Decimal38SparseVector decimal28Vector;
@@ -178,7 +126,7 @@ public class VarLengthColumnReaders {
}
}
- public static class NullableDecimal38Column extends NullableVarLengthColumn<NullableDecimal38SparseVector> {
+ public static class NullableDecimal38Column extends NullableVarLengthValuesColumn<NullableDecimal38SparseVector> {
protected NullableDecimal38SparseVector nullableDecimal38Vector;
@@ -209,7 +157,7 @@ public class VarLengthColumnReaders {
}
- public static class VarCharColumn extends VarLengthColumn <VarCharVector> {
+ public static class VarCharColumn extends VarLengthValuesColumn<VarCharVector> {
// store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
protected VarCharVector varCharVector;
@@ -222,18 +170,13 @@ public class VarLengthColumnReaders {
}
@Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public boolean setSafe(int index, byte[] bytes, int start, int length) {
boolean success;
if(index >= varCharVector.getValueCapacity()) return false;
if (usingDictionary) {
- success = varCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
- 0, currDictVal.length());
+ success = varCharVector.getMutator().setSafe(index, currDictValToWrite.getBytes(),
+ 0, currDictValToWrite.length());
}
else {
success = varCharVector.getMutator().setSafe(index, bytes, start, length);
@@ -247,7 +190,7 @@ public class VarLengthColumnReaders {
}
}
- public static class NullableVarCharColumn extends NullableVarLengthColumn <NullableVarCharVector> {
+ public static class NullableVarCharColumn extends NullableVarLengthValuesColumn<NullableVarCharVector> {
int nullsRead;
boolean currentValNull = false;
@@ -266,8 +209,8 @@ public class VarLengthColumnReaders {
if(index >= nullableVarCharVector.getValueCapacity()) return false;
if (usingDictionary) {
- success = nullableVarCharVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
- 0, currDictVal.length());
+ success = nullableVarCharVector.getMutator().setSafe(index, currDictValToWrite.getBytes(),
+ 0, currDictValToWrite.length());
}
else {
success = nullableVarCharVector.getMutator().setSafe(index, value, start, length);
@@ -279,14 +222,9 @@ public class VarLengthColumnReaders {
public int capacity() {
return nullableVarCharVector.getData().capacity();
}
-
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
}
- public static class VarBinaryColumn extends VarLengthColumn <VarBinaryVector> {
+ public static class VarBinaryColumn extends VarLengthValuesColumn<VarBinaryVector> {
// store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting
protected VarBinaryVector varBinaryVector;
@@ -299,18 +237,13 @@ public class VarLengthColumnReaders {
}
@Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public boolean setSafe(int index, byte[] bytes, int start, int length) {
boolean success;
if(index >= varBinaryVector.getValueCapacity()) return false;
if (usingDictionary) {
- success = varBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
- 0, currDictVal.length());
+ success = varBinaryVector.getMutator().setSafe(index, currDictValToWrite.getBytes(),
+ 0, currDictValToWrite.length());
}
else {
success = varBinaryVector.getMutator().setSafe(index, bytes, start, length);
@@ -324,7 +257,7 @@ public class VarLengthColumnReaders {
}
}
- public static class NullableVarBinaryColumn extends NullableVarLengthColumn <NullableVarBinaryVector> {
+ public static class NullableVarBinaryColumn extends NullableVarLengthValuesColumn<NullableVarBinaryVector> {
int nullsRead;
boolean currentValNull = false;
@@ -343,8 +276,8 @@ public class VarLengthColumnReaders {
if(index >= nullableVarBinaryVector.getValueCapacity()) return false;
if (usingDictionary) {
- success = nullableVarBinaryVector.getMutator().setSafe(valuesReadInCurrentPass, currDictVal.getBytes(),
- 0, currDictVal.length());
+ success = nullableVarBinaryVector.getMutator().setSafe(index, currDictValToWrite.getBytes(),
+ 0, currDictValToWrite.length());
}
else {
success = nullableVarBinaryVector.getMutator().setSafe(index, value, start, length);
@@ -357,9 +290,5 @@ public class VarLengthColumnReaders {
return nullableVarBinaryVector.getData().capacity();
}
- @Override
- protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
- throw new UnsupportedOperationException();
- }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
new file mode 100644
index 000000000..092c186c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -0,0 +1,96 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.format.Encoding;
+import parquet.format.SchemaElement;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.io.api.Binary;
+
+import java.io.IOException;
+
+public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLengthColumn {
+
+ Binary currLengthDeterminingDictVal;
+ Binary currDictValToWrite;
+ VariableWidthVector variableWidthVector;
+
+ VarLengthValuesColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
+ ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v,
+ SchemaElement schemaElement) throws ExecutionSetupException {
+ super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ variableWidthVector = (VariableWidthVector) valueVec;
+ if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
+ usingDictionary = true;
+ }
+ else {
+ usingDictionary = false;
+ }
+ }
+
+ public abstract boolean setSafe(int index, byte[] bytes, int start, int length);
+
+ @Override
+ protected void readField(long recordToRead) {
+ dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass);
+ // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
+ boolean success = setSafe((int) valuesReadInCurrentPass, pageReader.pageDataByteArray,
+ (int) pageReader.readPosInBytes + 4, dataTypeLengthInBits);
+ assert success;
+ updatePosition();
+ }
+
+ public void updateReadyToReadPosition() {
+ pageReader.readyToReadPosInBytes += dataTypeLengthInBits + 4;
+ pageReader.valuesReadyToRead++;
+ currLengthDeterminingDictVal = null;
+ }
+
+ public void updatePosition() {
+ pageReader.readPosInBytes += dataTypeLengthInBits + 4;
+ bytesReadInCurrentPass += dataTypeLengthInBits;
+ valuesReadInCurrentPass++;
+ }
+
+ public boolean skipReadyToReadPositionUpdate() {
+ return false;
+ }
+
+ protected boolean readAndStoreValueSizeInformation() throws IOException {
+ // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
+ try {
+ dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray,
+ (int) pageReader.readyToReadPosInBytes);
+ } catch (Throwable t) {
+ throw t;
+ }
+
+ // this should not fail
+ if (!variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead,
+ dataTypeLengthInBits)) {
+ return true;
+ }
+ return false;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
index 9b0a6cd28..6d035410c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
@@ -36,4 +36,17 @@ public interface RepeatedFixedWidthVector extends ValueVector{
* @return The number of bytes of the buffer that were consumed.
*/
public int load(int parentValueCount, int childValueCount, ByteBuf buf);
+
+ public abstract RepeatedAccessor getAccessor();
+
+ public abstract RepeatedMutator getMutator();
+
+ public interface RepeatedAccessor extends Accessor {
+ public int getGroupCount();
+ }
+ public interface RepeatedMutator extends Mutator {
+ public void setValueCounts(int parentValueCount, int childValueCount);
+ public boolean setRepetitionAtIndexSafe(int index, int repetitionCount);
+ public BaseDataValueVector getDataVector();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
index bd0303850..a2c884e5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
@@ -35,6 +35,8 @@ public interface RepeatedVariableWidthVector extends ValueVector{
*/
public int getByteCapacity();
+ public abstract RepeatedFixedWidthVector.RepeatedAccessor getAccessor();
+
/**
* Load the records in the provided buffer based on the given number of values.
* @param dataBytes The number of bytes associated with the data array.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
index 2b07750c3..6660351c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
@@ -44,5 +44,15 @@ public interface VariableWidthVector extends ValueVector{
*/
public int load(int dataBytes, int valueCount, ByteBuf buf);
- public abstract Mutator getMutator();
+ public abstract VariableWidthMutator getMutator();
+
+ public abstract VariableWidthAccessor getAccessor();
+
+ public interface VariableWidthAccessor extends Accessor {
+ public int getValueLength(int index);
+ }
+
+ public interface VariableWidthMutator extends Mutator {
+ public boolean setValueLengthSafe(int index, int length);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index ef8aef8b7..d43bf59f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
@@ -54,7 +55,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
private final UInt4Vector offsets; // offsets to start of each record
private final BufferAllocator allocator;
private final Mutator mutator = new Mutator();
- private final Accessor accessor = new Accessor();
+ private final RepeatedListAccessor accessor = new RepeatedListAccessor();
private ValueVector vector;
private final MaterializedField field;
private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
@@ -112,7 +113,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
}
- public class Mutator implements ValueVector.Mutator{
+ public class Mutator implements ValueVector.Mutator, RepeatedMutator{
public void startNewGroup(int index) {
offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
@@ -151,9 +152,24 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
public void generateTestData(int values) {
}
+ @Override
+ public void setValueCounts(int parentValueCount, int childValueCount) {
+ // TODO - determine if this should be implemented for this class
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public BaseDataValueVector getDataVector() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
}
- public class Accessor implements ValueVector.Accessor {
+ public class RepeatedListAccessor implements RepeatedAccessor{
@Override
public Object getObject(int index) {
@@ -211,6 +227,10 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
return reader;
}
+ @Override
+ public int getGroupCount() {
+ return size();
+ }
}
@Override
@@ -315,7 +335,7 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
}
@Override
- public Accessor getAccessor() {
+ public RepeatedListAccessor getAccessor() {
return accessor;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index f05ab1b1a..30f5fc755 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
@@ -50,6 +51,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class RepeatedMapVector extends AbstractContainerVector implements RepeatedFixedWidthVector {
+
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build();
@@ -59,7 +61,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>();
- private final Accessor accessor = new Accessor();
+ private final RepeatedMapAccessor accessor = new RepeatedMapAccessor();
private final Mutator mutator = new Mutator();
private final BufferAllocator allocator;
private final MaterializedField field;
@@ -278,7 +280,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
}
@Override
- public Accessor getAccessor() {
+ public RepeatedMapAccessor getAccessor() {
return accessor;
}
@@ -349,7 +351,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
return mutator;
}
- public class Accessor implements ValueVector.Accessor{
+ public class RepeatedMapAccessor implements RepeatedAccessor {
@Override
public Object getObject(int index) {
@@ -414,6 +416,10 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
return reader;
}
+ @Override
+ public int getGroupCount() {
+ return size();
+ }
}
private void populateEmpties(int groupCount){
@@ -424,7 +430,7 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
lastSet = groupCount - 1;
}
- public class Mutator implements ValueVector.Mutator{
+ public class Mutator implements ValueVector.Mutator, RepeatedMutator {
public void startNewGroup(int index) {
populateEmpties(index);
@@ -458,6 +464,21 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
public void generateTestData(int values) {
}
+ @Override
+ public void setValueCounts(int parentValueCount, int childValueCount) {
+ // TODO - determine if this should be implemented for this class
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean setRepetitionAtIndexSafe(int index, int repetitionCount) {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public BaseDataValueVector getDataVector() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
}
@Override