aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-05-18 00:46:02 -0700
committerJacques Nadeau <jacques@apache.org>2014-07-29 21:42:30 -0700
commit0d6befca0bd98ad14cd2bb6700cdee5172cd8f90 (patch)
treefdf1a66496d3b48134f54d975189d7725dea7835 /exec/java-exec/src/main
parent48229ec90359226638e3f7c274c7adadfc95952d (diff)
DRILL-1058: Read complex types in parquet
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/codegen/templates/ComplexWriters.java4
-rw-r--r--exec/java-exec/src/main/codegen/templates/NullableValueVectors.java15
-rw-r--r--exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java8
-rw-r--r--exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java46
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java439
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java204
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java4
-rw-r--r--exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java5
-rw-r--r--exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java190
22 files changed, 1055 insertions, 10 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
index 72ff135fe..ce839ef85 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
@@ -74,6 +74,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
if(ok()){
// update to inform(addSafe) once available for all repeated vector types for holders.
inform(mutator.addSafe(idx(), h));
+ vector.setCurrentValueCount(idx());
}
}
@@ -81,6 +82,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
if(ok()){
// update to inform(addSafe) once available for all repeated vector types for holders.
inform(mutator.addSafe(idx(), h));
+ vector.setCurrentValueCount(idx());
}
}
@@ -96,6 +98,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
if(ok()){
// update to inform(setSafe) once available for all vector types for holders.
inform(mutator.setSafe(idx(), h));
+ vector.setCurrentValueCount(idx());
}
}
@@ -103,6 +106,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
if(ok()){
// update to inform(setSafe) once available for all vector types for holders.
inform(mutator.setSafe(idx(), h));
+ vector.setCurrentValueCount(idx());
}
}
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 4873f2a07..6876cabe6 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -61,7 +61,15 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
public int getValueCapacity(){
return Math.min(bits.getValueCapacity(), values.getValueCapacity());
}
-
+
+ public int getCurrentValueCount() {
+ return values.getCurrentValueCount();
+ }
+
+ public void setCurrentValueCount(int count) {
+ values.setCurrentValueCount(count);
+ }
+
@Override
public ByteBuf[] getBuffers() {
ByteBuf[] buffers = ObjectArrays.concat(bits.getBuffers(), values.getBuffers(), ByteBuf.class);
@@ -143,6 +151,11 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
return values.getByteCapacity();
}
+ @Override
+ public int getCurrentSizeInBytes(){
+ return values.getCurrentSizeInBytes();
+ }
+
<#else>
@Override
public SerializedField getMetadata() {
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index b283d19be..7cbc50cb7 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -67,6 +67,14 @@ package org.apache.drill.exec.vector;
return Math.min(values.getValueCapacity(), offsets.getValueCapacity() - 1);
}
+ public int getCurrentValueCount() {
+ return values.getCurrentValueCount();
+ }
+
+ public void setCurrentValueCount(int count) {
+ values.setCurrentValueCount(count);
+ }
+
public int getBufferSize(){
return offsets.getBufferSize() + values.getBufferSize();
}
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 1d30acb94..b0af1feaf 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -78,6 +78,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public int getByteCapacity(){
return data.capacity();
}
+
+ public int getCurrentSizeInBytes() {
+ return offsetVector.getAccessor().get(currentValueCount);
+ }
/**
* Return the number of bytes contained in the current var len byte vector.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 595fdd745..f2a84ee74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -81,6 +81,8 @@ public interface ExecConstants {
public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
public static final String PARQUET_BLOCK_SIZE = "store.parquet.block-size";
public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
+ public static String PARQUET_NEW_RECORD_READER = "store.parquet.use_new_reader";
+ public static OptionValidator PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR = new BooleanValidator(PARQUET_NEW_RECORD_READER, false);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 21a580b99..eaf26d1e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -47,12 +47,17 @@ import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.util.BatchPrinter;
+import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
/**
* Record batch used for a particular scan. Operators against one or more
@@ -136,6 +141,7 @@ public class ScanBatch implements RecordBatch {
if (done) {
return IterOutcome.NONE;
}
+ long t1 = System.nanoTime();
oContext.getStats().startProcessing();
try {
mutator.allocate(MAX_RECORD_CNT);
@@ -177,8 +183,14 @@ public class ScanBatch implements RecordBatch {
if (mutator.isNewSchema()) {
container.buildSchema(SelectionVectorMode.NONE);
schema = container.getSchema();
+ long t2 = System.nanoTime();
+// System.out.println((t2 - t1) / recordCount);
+// BatchPrinter.printBatch(this, "\t");
return IterOutcome.OK_NEW_SCHEMA;
} else {
+ long t2 = System.nanoTime();
+// System.out.println((t2 - t1) / recordCount);
+// BatchPrinter.printBatch(this, "\t");
return IterOutcome.OK;
}
} catch (Exception ex) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 6a8cc5e08..e49b10745 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -60,6 +60,7 @@ public class SystemOptionManager implements OptionManager{
PlannerSettings.HASH_SINGLE_KEY,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
+ ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
ExecConstants.SLICE_TARGET_OPTION,
ExecConstants.AFFINITY_FACTOR,
ExecConstants.MAX_WIDTH_GLOBAL,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index b4f02fbe9..ae73af365 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -38,15 +38,24 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
+ private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
+ private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
+ private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
+
@Override
public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
@@ -74,7 +83,11 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
-
+ Configuration conf = fs.getConf();
+ conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
+ conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
+ conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
+
// keep footers in a map to avoid re-reading them
Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>();
int numParts = 0;
@@ -91,14 +104,19 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
footers.put(e.getPath(),
ParquetFileReader.readFooter( fs.getConf(), new Path(e.getPath())));
}
- readers.add(
- new ParquetRecordReader(
- context, e.getPath(), e.getRowGroupIndex(), fs,
- rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
- footers.get(e.getPath()),
- rowGroupScan.getColumns()
- )
- );
+ if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) {
+ readers.add(
+ new ParquetRecordReader(
+ context, e.getPath(), e.getRowGroupIndex(), fs,
+ rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
+ footers.get(e.getPath()),
+ rowGroupScan.getColumns()
+ )
+ );
+ } else {
+ ParquetMetadata footer = footers.get(e.getPath());
+ readers.add(new DrillParquetReader(footer, e, columns, conf));
+ }
if (rowGroupScan.getSelectionRoot() != null) {
String[] r = rowGroupScan.getSelectionRoot().split("/");
String[] p = e.getPath().split("/");
@@ -125,4 +143,14 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
return new ScanBatch(rowGroupScan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
}
+
+ private static boolean isComplex(ParquetMetadata footer) {
+ MessageType schema = footer.getFileMetaData().getSchema();
+ for (Type type : schema.getFields()) {
+ if (!type.isPrimitive()) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
new file mode 100644
index 000000000..f9bac6534
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet2;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Decimal18Holder;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal9Holder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.store.parquet.columnreaders.NullableFixedByteAlignedReaders;
+import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.drill.exec.vector.complex.writer.BitWriter;
+import org.apache.drill.exec.vector.complex.writer.DateWriter;
+import org.apache.drill.exec.vector.complex.writer.Decimal18Writer;
+import org.apache.drill.exec.vector.complex.writer.Decimal28SparseWriter;
+import org.apache.drill.exec.vector.complex.writer.Decimal38SparseWriter;
+import org.apache.drill.exec.vector.complex.writer.Decimal9Writer;
+import org.apache.drill.exec.vector.complex.writer.Float4Writer;
+import org.apache.drill.exec.vector.complex.writer.Float8Writer;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
+import org.apache.drill.exec.vector.complex.writer.TimeWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+import org.joda.time.DateTimeUtils;
+
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.DecimalMetadata;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Type;
+import parquet.schema.Type.Repetition;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+public class DrillParquetGroupConverter extends GroupConverter {
+
+ private List<Converter> converters;
+ private MapWriter mapWriter;
+
+ public DrillParquetGroupConverter(ComplexWriterImpl complexWriter, MessageType schema) {
+ this(complexWriter.rootAsMap(), schema);
+ }
+
+ public DrillParquetGroupConverter(MapWriter mapWriter, GroupType schema) {
+ this.mapWriter = mapWriter;
+ converters = Lists.newArrayList();
+ for (Type type : schema.getFields()) {
+ Repetition rep = type.getRepetition();
+ boolean isPrimitive = type.isPrimitive();
+ if (!isPrimitive) {
+ if (rep != Repetition.REPEATED) {
+ DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mapWriter.map(type.getName()), type.asGroupType());
+ converters.add(converter);
+ } else {
+ DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mapWriter.list(type.getName()).map(), type.asGroupType());
+ converters.add(converter);
+ }
+ } else {
+ PrimitiveConverter converter = getConverterForType(type.asPrimitiveType());
+ converters.add(converter);
+ }
+ }
+ }
+
+ private PrimitiveConverter getConverterForType(PrimitiveType type) {
+
+ String name = type.getName();
+ switch(type.getPrimitiveTypeName()) {
+ case INT32: {
+ if (type.getOriginalType() == null) {
+ IntWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).integer() : mapWriter.integer(name);
+ return new DrillIntConverter(writer);
+ }
+ switch(type.getOriginalType()) {
+ case DECIMAL: {
+ Decimal9Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal9() : mapWriter.decimal9(name);
+ return new DrillDecimal9Converter(writer, type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale());
+ }
+ case DATE: {
+ DateWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).date() : mapWriter.date(name);
+ return new DrillDateConverter(writer);
+ }
+ case TIME_MILLIS: {
+ TimeWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).time() : mapWriter.time(name);
+ return new DrillTimeConverter(writer);
+ }
+ default: {
+ throw new UnsupportedOperationException("Unsupported type: " + type.getOriginalType());
+ }
+ }
+ }
+ case INT64: {
+ if (type.getOriginalType() == null) {
+ BigIntWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).bigInt() : mapWriter.bigInt(name);
+ return new DrillBigIntConverter(writer);
+ }
+ switch(type.getOriginalType()) {
+ case DECIMAL: {
+ Decimal18Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal18() : mapWriter.decimal18(name);
+ return new DrillDecimal18Converter(writer, type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale());
+ }
+ case TIMESTAMP_MILLIS: {
+ TimeStampWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).timeStamp() : mapWriter.timeStamp(name);
+ return new DrillTimeStampConverter(writer);
+ }
+ default: {
+ throw new UnsupportedOperationException("Unsupported type " + type.getOriginalType());
+ }
+ }
+ }
+ case FLOAT: {
+ Float4Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).float4() : mapWriter.float4(name);
+ return new DrillFloat4Converter(writer);
+ }
+ case DOUBLE: {
+ Float8Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).float8() : mapWriter.float8(name);
+ return new DrillFloat8Converter(writer);
+ }
+ case BOOLEAN: {
+ BitWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).bit() : mapWriter.bit(name);
+ return new DrillBoolConverter(writer);
+ }
+ case BINARY: {
+ if (type.getOriginalType() == null) {
+ VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name);
+ return new DrillVarBinaryConverter(writer);
+ }
+ switch(type.getOriginalType()) {
+ case UTF8: {
+ VarCharWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varChar() : mapWriter.varChar(name);
+ return new DrillVarCharConverter(writer);
+ }
+ case DECIMAL: {
+ DecimalMetadata metadata = type.getDecimalMetadata();
+ if (metadata.getPrecision() <= 28) {
+ Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal28Sparse() : mapWriter.decimal28Sparse(name);
+ return new DrillBinaryToDecimal28Converter(writer, metadata.getPrecision(), metadata.getScale());
+ } else {
+ Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal38Sparse() : mapWriter.decimal38Sparse(name);
+ return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale());
+ }
+ }
+ default: {
+ throw new UnsupportedOperationException("Unsupported type " + type.getOriginalType());
+ }
+ }
+ }
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " + type.getPrimitiveTypeName());
+ }
+ }
+
+ @Override
+ public Converter getConverter(int i) {
+ return converters.get(i);
+ }
+
+ @Override
+ public void start() {
+ mapWriter.start();
+ }
+
+ @Override
+ public void end() {
+ mapWriter.end();
+ }
+
+ public static class DrillIntConverter extends PrimitiveConverter {
+ private IntWriter writer;
+ private IntHolder holder = new IntHolder();
+
+ public DrillIntConverter(IntWriter writer) {
+ super();
+ this.writer = writer;
+ }
+
+ @Override
+ public void addInt(int value) {
+ holder.value = value;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillDecimal9Converter extends PrimitiveConverter {
+ private Decimal9Writer writer;
+ private Decimal9Holder holder = new Decimal9Holder();
+ int precision;
+ int scale;
+
+ public DrillDecimal9Converter(Decimal9Writer writer, int precision, int scale) {
+ this.writer = writer;
+ this.scale = scale;
+ this.precision = precision;
+ }
+
+ @Override
+ public void addInt(int value) {
+ holder.value = value;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillDateConverter extends PrimitiveConverter {
+ private DateWriter writer;
+ private DateHolder holder = new DateHolder();
+
+ public DrillDateConverter(DateWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addInt(int value) {
+ holder.value = DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5);
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillTimeConverter extends PrimitiveConverter {
+ private TimeWriter writer;
+ private TimeHolder holder = new TimeHolder();
+
+ public DrillTimeConverter(TimeWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addInt(int value) {
+ holder.value = value;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillBigIntConverter extends PrimitiveConverter {
+ private BigIntWriter writer;
+ private BigIntHolder holder = new BigIntHolder();
+
+ public DrillBigIntConverter(BigIntWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addLong(long value) {
+ holder.value = value;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillTimeStampConverter extends PrimitiveConverter {
+ private TimeStampWriter writer;
+ private TimeStampHolder holder = new TimeStampHolder();
+
+ public DrillTimeStampConverter(TimeStampWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addInt(int value) {
+ holder.value = value;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillDecimal18Converter extends PrimitiveConverter {
+ private Decimal18Writer writer;
+ private Decimal18Holder holder = new Decimal18Holder();
+
+ public DrillDecimal18Converter(Decimal18Writer writer, int precision, int scale) {
+ this.writer = writer;
+ holder.precision = precision;
+ holder.scale = scale;
+ }
+
+ @Override
+ public void addLong(long value) {
+ holder.value = value;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillFloat4Converter extends PrimitiveConverter {
+ private Float4Writer writer;
+ private Float4Holder holder = new Float4Holder();
+
+ public DrillFloat4Converter(Float4Writer writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addFloat(float value) {
+ holder.value = value;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillFloat8Converter extends PrimitiveConverter {
+ private Float8Writer writer;
+ private Float8Holder holder = new Float8Holder();
+
+ public DrillFloat8Converter(Float8Writer writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addDouble(double value) {
+ holder.value = value;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillBoolConverter extends PrimitiveConverter {
+ private BitWriter writer;
+ private BitHolder holder = new BitHolder();
+
+ public DrillBoolConverter(BitWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ holder.value = value ? 1 : 0;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillVarBinaryConverter extends PrimitiveConverter {
+ private VarBinaryWriter writer;
+ private VarBinaryHolder holder = new VarBinaryHolder();
+
+ public DrillVarBinaryConverter(VarBinaryWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer());
+ holder.buffer = buf;
+ holder.start = 0;
+ holder.end = value.length();
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillVarCharConverter extends PrimitiveConverter {
+ private VarCharWriter writer;
+ private VarCharHolder holder = new VarCharHolder();
+
+ public DrillVarCharConverter(VarCharWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer());
+ holder.buffer = buf;
+ holder.start = 0;
+ holder.end = value.length();
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillBinaryToDecimal28Converter extends PrimitiveConverter {
+ private Decimal28SparseWriter writer;
+ private Decimal28SparseHolder holder = new Decimal28SparseHolder();
+
+ public DrillBinaryToDecimal28Converter(Decimal28SparseWriter writer, int precision, int scale) {
+ this.writer = writer;
+ holder.precision = precision;
+ holder.scale = scale;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ BigDecimal bigDecimal = DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), holder.scale);
+ ByteBuf buf = Unpooled.wrappedBuffer(new byte[28]);
+ DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, holder.precision, Decimal28SparseHolder.nDecimalDigits);
+ holder.buffer = buf;
+ writer.write(holder);
+ }
+ }
+
+ public static class DrillBinaryToDecimal38Converter extends PrimitiveConverter {
+ private Decimal38SparseWriter writer;
+ private Decimal38SparseHolder holder = new Decimal38SparseHolder();
+
+ public DrillBinaryToDecimal38Converter(Decimal38SparseWriter writer, int precision, int scale) {
+ this.writer = writer;
+ holder.precision = precision;
+ holder.scale = scale;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ BigDecimal bigDecimal = DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), holder.scale);
+ ByteBuf buf = Unpooled.wrappedBuffer(new byte[38]);
+ DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, holder.precision, Decimal38SparseHolder.nDecimalDigits);
+ holder.buffer = buf;
+ writer.write(holder);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
new file mode 100644
index 000000000..aaeb536a8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ColumnChunkIncReadStore;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ColumnPath;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.io.ColumnIOFactory;
+import parquet.io.InvalidRecordException;
+import parquet.io.MessageColumnIO;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DrillParquetReader implements RecordReader {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
+
+ private ParquetMetadata footer;
+ private MessageType schema;
+ private Configuration conf;
+ private RowGroupReadEntry entry;
+ private List<SchemaPath> columns;
+ private VectorContainerWriter writer;
+ private parquet.io.RecordReader<Void> recordReader;
+ private DrillParquetRecordMaterializer recordMaterializer;
+ private int recordCount;
+ private List<ValueVector> primitiveVectors;
+
+ public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) {
+ this.footer = footer;
+ this.conf = conf;
+ this.columns = columns;
+ this.entry = entry;
+ }
+
+ public static MessageType getProjection(MessageType schema, List<SchemaPath> columns) {
+ MessageType projection = null;
+ for (SchemaPath path : columns) {
+ List<String> segments = Lists.newArrayList();
+ PathSegment rootSegment = path.getRootSegment();
+ PathSegment seg = rootSegment;
+ String messageName = schema.getName();
+ while(seg != null){
+ if(seg.isNamed()) {
+ segments.add(seg.getNameSegment().getPath());
+ }
+ seg = seg.getChild();
+ }
+ String[] pathSegments = new String[segments.size()];
+ segments.toArray(pathSegments);
+ Type type = null;
+ try {
+ type = schema.getType(pathSegments);
+ } catch (InvalidRecordException e) {
+ logger.warn("Invalid record" , e);
+ }
+ if (type != null) {
+ Type t = getType(pathSegments, 0, schema);
+ if (projection == null) {
+ projection = new MessageType(messageName, t);
+ } else {
+ projection = projection.union(new MessageType(messageName, t));
+ }
+ }
+ }
+ return projection;
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+
+ try {
+ schema = footer.getFileMetaData().getSchema();
+ MessageType projection = null;
+
+ if (columns == null || columns.size() == 0) {
+ projection = schema;
+ } else {
+ projection = getProjection(schema, columns);
+ if (projection == null) {
+ projection = schema;
+ }
+ }
+
+ logger.debug("Requesting schema {}", projection);
+
+ ColumnIOFactory factory = new ColumnIOFactory(false);
+ MessageColumnIO columnIO = factory.getColumnIO(projection, schema);
+ Map<ColumnPath, ColumnChunkMetaData> paths = new HashMap();
+
+ for (ColumnChunkMetaData md : footer.getBlocks().get(entry.getRowGroupIndex()).getColumns()) {
+ paths.put(md.getPath(), md);
+ }
+
+ CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(conf);
+ FileSystem fs = FileSystem.get(conf);
+ Path filePath = new Path(entry.getPath());
+
+ BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex());
+
+ recordCount = (int) blockMetaData.getRowCount();
+
+ ColumnChunkIncReadStore pageReadStore = new ColumnChunkIncReadStore(recordCount,
+ codecFactoryExposer.getCodecFactory(), fs, filePath);
+
+ for (String[] path : schema.getPaths()) {
+ Type type = schema.getType(path);
+ if (type.isPrimitive()) {
+ ColumnChunkMetaData md = paths.get(ColumnPath.get(path));
+ pageReadStore.addColumn(schema.getColumnDescription(path), md);
+ }
+ }
+
+ writer = new VectorContainerWriter(output);
+ recordMaterializer = new DrillParquetRecordMaterializer(writer, projection);
+ primitiveVectors = writer.getMapVector().getPrimitiveVectors();
+ recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer);
+ } catch (Exception e) {
+ throw new ExecutionSetupException(e);
+ }
+ }
+
+ private static Type getType(String[] pathSegments, int depth, MessageType schema) {
+ Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
+ if (depth + 1 == pathSegments.length) {
+ return type;
+ } else {
+ Preconditions.checkState(!type.isPrimitive());
+ return new GroupType(type.getRepetition(), type.getName(), getType(pathSegments, depth + 1, schema));
+ }
+ }
+
+ private long totalRead = 0;
+
+ @Override
+ public int next() {
+ int count = 0;
+ for (; count < 4000 && totalRead < recordCount; count++, totalRead++) {
+ recordMaterializer.setPosition(count);
+ recordReader.read();
+ if (count % 100 == 0) {
+ if (getPercentFilled() > 85) {
+ break;
+ }
+ }
+ }
+ writer.setValueCount(count);
+ return count;
+ }
+
+ private int getPercentFilled() {
+ int filled = 0;
+ for (ValueVector v : primitiveVectors) {
+ filled = Math.max(filled, ((BaseValueVector) v).getCurrentValueCount() * 100 / v.getValueCapacity());
+ if (v instanceof VariableWidthVector) {
+ filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity());
+ }
+ }
+ return filled;
+ }
+
+ @Override
+ public void cleanup() {
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
new file mode 100644
index 000000000..69893dce8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet2;
+
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import java.util.List;
+
+public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
+
+ public DrillParquetGroupConverter root;
+ private ComplexWriter complexWriter;
+
+ public DrillParquetRecordMaterializer(ComplexWriter complexWriter, MessageType schema) {
+ this.complexWriter = complexWriter;
+ root = new DrillParquetGroupConverter(complexWriter.rootAsMap(), schema);
+ }
+
+ public void setPosition(int position) {
+ complexWriter.setPosition(position);
+ }
+
+ public boolean ok() {
+ return complexWriter.ok();
+ }
+
+ @Override
+ public Void getCurrentRecord() {
+ return null;
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index e70f406fd..7d0fbc7ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -32,6 +32,7 @@ public abstract class BaseDataValueVector extends BaseValueVector{
protected ByteBuf data = DeadBuf.DEAD_BUFFER;
protected int valueCount;
+ protected int currentValueCount;
public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
@@ -50,6 +51,14 @@ public abstract class BaseDataValueVector extends BaseValueVector{
}
}
+ public void setCurrentValueCount(int count) {
+ currentValueCount = count;
+ }
+
+ public int getCurrentValueCount() {
+ return currentValueCount;
+ }
+
@Override
public ByteBuf[] getBuffers(){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index f96843502..e310b81be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -58,6 +58,9 @@ public abstract class BaseValueVector implements ValueVector{
return getField().getAsBuilder();
}
+ public abstract int getCurrentValueCount();
+ public abstract void setCurrentValueCount(int count);
+
abstract public ByteBuf getData();
abstract class BaseAccessor implements ValueVector.Accessor{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index 6998a7410..25aff57f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -125,6 +125,15 @@ public class ObjectVector extends BaseValueVector{
}
@Override
+ public int getCurrentValueCount() {
+ return 0;
+ }
+
+ @Override
+ public void setCurrentValueCount(int count) {
+ }
+
+ @Override
public ByteBuf getData() {
throw new UnsupportedOperationException("ObjectVector does not support this");
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
index 6660351c1..5c1d3ab01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
@@ -34,6 +34,8 @@ public interface VariableWidthVector extends ValueVector{
* @return
*/
public int getByteCapacity();
+
+ public int getCurrentSizeInBytes();
/**
* Load the records in the provided buffer based on the given number of values.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
index 9667fd2db..5eb135898 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
@@ -24,6 +24,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.vector.ValueVector;
+import java.util.List;
+
public abstract class AbstractContainerVector implements ValueVector{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
@@ -39,6 +41,8 @@ public abstract class AbstractContainerVector implements ValueVector{
}
}
+ public abstract List<ValueVector> getPrimitiveVectors();
+
public abstract VectorWithOrdinal getVectorWithOrdinal(String name);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index ed2ad8a3b..480b86352 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -76,6 +76,22 @@ public class MapVector extends AbstractContainerVector {
return vectors.size();
}
+ @Override
+ public List<ValueVector> getPrimitiveVectors() {
+ List<ValueVector> primitiveVectors = Lists.newArrayList();
+ for (ValueVector v : this.vectors.values()) {
+ if (v instanceof AbstractContainerVector) {
+ AbstractContainerVector av = (AbstractContainerVector) v;
+ for (ValueVector vv : av.getPrimitiveVectors()) {
+ primitiveVectors.add(vv);
+ }
+ } else {
+ primitiveVectors.add(v);
+ }
+ }
+ return primitiveVectors;
+ }
+
transient private MapTransferPair ephPair;
transient private MapSingleCopier ephPair2;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index d43bf59f9..57c47d4e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -78,6 +78,20 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
return vector != null ? 1 : 0;
}
+ @Override
+ public List<ValueVector> getPrimitiveVectors() {
+ List<ValueVector> primitiveVectors = Lists.newArrayList();
+ if (vector instanceof AbstractContainerVector) {
+ for (ValueVector v : ((AbstractContainerVector) vector).getPrimitiveVectors()) {
+ primitiveVectors.add(v);
+ }
+ } else {
+ primitiveVectors.add(vector);
+ }
+ primitiveVectors.add(offsets);
+ return primitiveVectors;
+ }
+
public RepeatedListVector(SchemaPath path, BufferAllocator allocator){
this(MaterializedField.create(path, TYPE), allocator);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 952fb4b90..cb770321d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -92,6 +92,23 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
}
@Override
+ public List<ValueVector> getPrimitiveVectors() {
+ List<ValueVector> primitiveVectors = Lists.newArrayList();
+ for (ValueVector v : this.vectors.values()) {
+ if (v instanceof AbstractContainerVector) {
+ AbstractContainerVector av = (AbstractContainerVector) v;
+ for (ValueVector vv : av.getPrimitiveVectors()) {
+ primitiveVectors.add(vv);
+ }
+ } else {
+ primitiveVectors.add(v);
+ }
+ }
+ primitiveVectors.add(offsets);
+ return primitiveVectors;
+ }
+
+ @Override
public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
ValueVector v = vectors.get(name);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index bc1d3675b..4f669c00e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -40,6 +40,10 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple
this.mapRoot = new SingleMapWriter(mapVector, this);
}
+ public MapVector getMapVector() {
+ return mapVector;
+ }
+
public void reset() {
setPosition(0);
resetState();
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
index dcd20b1bf..cb3610252 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import parquet.bytes.BytesInput;
+import parquet.hadoop.CodecFactory.BytesDecompressor;
import parquet.hadoop.metadata.CompressionCodecName;
public class CodecFactoryExposer{
@@ -39,4 +40,8 @@ public class CodecFactoryExposer{
public BytesInput decompress(BytesInput bytes, int uncompressedSize, CompressionCodecName codecName) throws IOException {
return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize);
}
+
+ public BytesDecompressor getDecompressor(CompressionCodecName codec) {
+ return codecFactory.getDecompressor(codec);
+ }
}
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
new file mode 100644
index 000000000..e5a477b0f
--- /dev/null
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.Decompressor;
+import parquet.bytes.BytesInput;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.Page;
+import parquet.column.page.PageReadStore;
+import parquet.column.page.PageReader;
+import parquet.format.PageHeader;
+import parquet.format.PageType;
+import parquet.format.Util;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactory.BytesDecompressor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.FileMetaData;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class ColumnChunkIncReadStore implements PageReadStore {
+
+ private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+
+ private CodecFactory codecFactory = new CodecFactory(new Configuration());
+ private FileSystem fs;
+ private Path path;
+ private long rowCount;
+ private List<FSDataInputStream> streams = new ArrayList();
+
+ public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, FileSystem fs, Path path) {
+ this.codecFactory = codecFactory;
+ this.fs = fs;
+ this.path = path;
+ this.rowCount = rowCount;
+ }
+
+
+ public class ColumnChunkIncPageReader implements PageReader {
+
+ ColumnChunkMetaData metaData;
+ long fileOffset;
+ long size;
+ private long valueReadSoFar = 0;
+
+ private DictionaryPage dictionaryPage;
+ private FSDataInputStream in;
+ private BytesDecompressor decompressor;
+
+ public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in) {
+ this.metaData = metaData;
+ this.size = metaData.getTotalSize();
+ this.fileOffset = metaData.getStartingPos();
+ this.in = in;
+ this.decompressor = codecFactory.getDecompressor(metaData.getCodec());
+ }
+
+ public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in, CodecFactory codecFactory) {
+ this.metaData = metaData;
+ this.size = metaData.getTotalSize();
+ this.fileOffset = metaData.getStartingPos();
+ this.in = in;
+ this.decompressor = codecFactory.getDecompressor(metaData.getCodec());
+ }
+
+ @Override
+ public DictionaryPage readDictionaryPage() {
+ if (dictionaryPage == null) {
+ try {
+ long pos = in.getPos();
+ PageHeader pageHeader = Util.readPageHeader(in);
+ if (pageHeader.getDictionary_page_header() == null) {
+ in.seek(pos);
+ return null;
+ }
+ dictionaryPage =
+ new DictionaryPage(
+ decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+ pageHeader.getDictionary_page_header().getNum_values(),
+ parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+ );
+ System.out.println(dictionaryPage);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+// if (dictionaryPage == null) {
+// throw new RuntimeException("Dictionary page null");
+// }
+ }
+ return dictionaryPage;
+ }
+
+ @Override
+ public long getTotalValueCount() {
+ return metaData.getValueCount();
+ }
+
+ @Override
+ public Page readPage() {
+ try {
+ while(valueReadSoFar < metaData.getValueCount()) {
+ PageHeader pageHeader = Util.readPageHeader(in);
+ switch (pageHeader.type) {
+ case DICTIONARY_PAGE:
+ if (dictionaryPage == null) {
+ dictionaryPage =
+ new DictionaryPage(
+ decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+ pageHeader.uncompressed_page_size,
+ parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+ );
+ } else {
+ in.skip(pageHeader.compressed_page_size);
+ }
+ break;
+ case DATA_PAGE:
+ valueReadSoFar += pageHeader.data_page_header.getNum_values();
+ return new Page(
+ decompressor.decompress(BytesInput.from(in,pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+ pageHeader.data_page_header.num_values,
+ pageHeader.uncompressed_page_size,
+ parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+ parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+ parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+ );
+ default:
+ in.skip(pageHeader.compressed_page_size);
+ break;
+ }
+ }
+ in.close();
+ return null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap();
+
+ public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) throws IOException {
+ FSDataInputStream in = fs.open(path);
+ streams.add(in);
+ in.seek(metaData.getStartingPos());
+ ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, in);
+
+ columns.put(descriptor, reader);
+ }
+
+ public void close() throws IOException {
+ for (FSDataInputStream stream : streams) {
+ stream.close();
+ }
+ }
+
+ @Override
+ public PageReader getPageReader(ColumnDescriptor descriptor) {
+ return columns.get(descriptor);
+ }
+
+ @Override
+ public long getRowCount() {
+ return rowCount;
+ }
+}