diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-05-18 00:46:02 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-07-29 21:42:30 -0700 |
commit | 0d6befca0bd98ad14cd2bb6700cdee5172cd8f90 (patch) | |
tree | fdf1a66496d3b48134f54d975189d7725dea7835 /exec/java-exec/src/main | |
parent | 48229ec90359226638e3f7c274c7adadfc95952d (diff) |
DRILL-1058: Read complex types in parquet
Diffstat (limited to 'exec/java-exec/src/main')
22 files changed, 1055 insertions, 10 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java index 72ff135fe..ce839ef85 100644 --- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java @@ -74,6 +74,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { if(ok()){ // update to inform(addSafe) once available for all repeated vector types for holders. inform(mutator.addSafe(idx(), h)); + vector.setCurrentValueCount(idx()); } } @@ -81,6 +82,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { if(ok()){ // update to inform(addSafe) once available for all repeated vector types for holders. inform(mutator.addSafe(idx(), h)); + vector.setCurrentValueCount(idx()); } } @@ -96,6 +98,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { if(ok()){ // update to inform(setSafe) once available for all vector types for holders. inform(mutator.setSafe(idx(), h)); + vector.setCurrentValueCount(idx()); } } @@ -103,6 +106,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { if(ok()){ // update to inform(setSafe) once available for all vector types for holders. inform(mutator.setSafe(idx(), h)); + vector.setCurrentValueCount(idx()); } } diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java index 4873f2a07..6876cabe6 100644 --- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java @@ -61,7 +61,15 @@ public final class ${className} extends BaseValueVector implements <#if type.maj public int getValueCapacity(){ return Math.min(bits.getValueCapacity(), values.getValueCapacity()); } - + + public int getCurrentValueCount() { + return values.getCurrentValueCount(); + } + + public void setCurrentValueCount(int count) { + values.setCurrentValueCount(count); + } + @Override public ByteBuf[] getBuffers() { ByteBuf[] buffers = ObjectArrays.concat(bits.getBuffers(), values.getBuffers(), ByteBuf.class); @@ -143,6 +151,11 @@ public final class ${className} extends BaseValueVector implements <#if type.maj return values.getByteCapacity(); } + @Override + public int getCurrentSizeInBytes(){ + return values.getCurrentSizeInBytes(); + } + <#else> @Override public SerializedField getMetadata() { diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java index b283d19be..7cbc50cb7 100644 --- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java @@ -67,6 +67,14 @@ package org.apache.drill.exec.vector; return Math.min(values.getValueCapacity(), offsets.getValueCapacity() - 1); } + public int getCurrentValueCount() { + return values.getCurrentValueCount(); + } + + public void setCurrentValueCount(int count) { + values.setCurrentValueCount(count); + } + public int getBufferSize(){ return offsets.getBufferSize() + values.getBufferSize(); } diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index 1d30acb94..b0af1feaf 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -78,6 +78,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V public int getByteCapacity(){ return data.capacity(); } + + public int getCurrentSizeInBytes() { + return offsetVector.getAccessor().get(currentValueCount); + } /** * Return the number of bytes contained in the current var len byte vector. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 595fdd745..f2a84ee74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -81,6 +81,8 @@ public interface ExecConstants { public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet"); public static final String PARQUET_BLOCK_SIZE = "store.parquet.block-size"; public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024); + public static String PARQUET_NEW_RECORD_READER = "store.parquet.use_new_reader"; + public static OptionValidator PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR = new BooleanValidator(PARQUET_NEW_RECORD_READER, false); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 21a580b99..eaf26d1e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -47,12 +47,17 @@ import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.util.BatchPrinter; +import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; /** * Record batch used for a particular scan. Operators against one or more @@ -136,6 +141,7 @@ public class ScanBatch implements RecordBatch { if (done) { return IterOutcome.NONE; } + long t1 = System.nanoTime(); oContext.getStats().startProcessing(); try { mutator.allocate(MAX_RECORD_CNT); @@ -177,8 +183,14 @@ public class ScanBatch implements RecordBatch { if (mutator.isNewSchema()) { container.buildSchema(SelectionVectorMode.NONE); schema = container.getSchema(); + long t2 = System.nanoTime(); +// System.out.println((t2 - t1) / recordCount); +// BatchPrinter.printBatch(this, "\t"); return IterOutcome.OK_NEW_SCHEMA; } else { + long t2 = System.nanoTime(); +// System.out.println((t2 - t1) / recordCount); +// BatchPrinter.printBatch(this, "\t"); return IterOutcome.OK; } } catch (Exception ex) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 6a8cc5e08..e49b10745 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -60,6 +60,7 @@ public class SystemOptionManager implements OptionManager{ PlannerSettings.HASH_SINGLE_KEY, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, + ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR, ExecConstants.SLICE_TARGET_OPTION, ExecConstants.AFFINITY_FACTOR, ExecConstants.MAX_WIDTH_GLOBAL, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index b4f02fbe9..ae73af365 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -38,15 +38,24 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; +import org.apache.drill.exec.store.parquet2.DrillParquetReader; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.ParquetMetadata; +import parquet.schema.MessageType; +import parquet.schema.Type; + public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class); + private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read"; + private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total"; + private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read"; + @Override public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); @@ -74,7 +83,11 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying(); - + Configuration conf = fs.getConf(); + conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false); + conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false); + conf.setBoolean(ENABLE_TIME_READ_COUNTER, false); + // keep footers in a map to avoid re-reading them Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>(); int numParts = 0; @@ -91,14 +104,19 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan footers.put(e.getPath(), ParquetFileReader.readFooter( fs.getConf(), new Path(e.getPath()))); } - readers.add( - new ParquetRecordReader( - context, e.getPath(), e.getRowGroupIndex(), fs, - rowGroupScan.getStorageEngine().getCodecFactoryExposer(), - footers.get(e.getPath()), - rowGroupScan.getColumns() - ) - ); + if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) { + readers.add( + new ParquetRecordReader( + context, e.getPath(), e.getRowGroupIndex(), fs, + rowGroupScan.getStorageEngine().getCodecFactoryExposer(), + footers.get(e.getPath()), + rowGroupScan.getColumns() + ) + ); + } else { + ParquetMetadata footer = footers.get(e.getPath()); + readers.add(new DrillParquetReader(footer, e, columns, conf)); + } if (rowGroupScan.getSelectionRoot() != null) { String[] r = rowGroupScan.getSelectionRoot().split("/"); String[] p = e.getPath().split("/"); @@ -125,4 +143,14 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan return new ScanBatch(rowGroupScan, context, readers.iterator(), partitionColumns, selectedPartitionColumns); } + + private static boolean isComplex(ParquetMetadata footer) { + MessageType schema = footer.getFileMetaData().getSchema(); + for (Type type : schema.getFields()) { + if (!type.isPrimitive()) { + return true; + } + } + return false; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java new file mode 100644 index 000000000..f9bac6534 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -0,0 +1,439 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet2; + +import com.google.common.collect.Lists; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.expr.holders.BitHolder; +import org.apache.drill.exec.expr.holders.DateHolder; +import org.apache.drill.exec.expr.holders.Decimal18Holder; +import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; +import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; +import org.apache.drill.exec.expr.holders.Decimal9Holder; +import org.apache.drill.exec.expr.holders.Float4Holder; +import org.apache.drill.exec.expr.holders.Float8Holder; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.expr.holders.TimeHolder; +import org.apache.drill.exec.expr.holders.TimeStampHolder; +import org.apache.drill.exec.expr.holders.VarBinaryHolder; +import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.store.ParquetOutputRecordWriter; +import org.apache.drill.exec.store.parquet.columnreaders.NullableFixedByteAlignedReaders; +import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.drill.exec.vector.complex.writer.BigIntWriter; +import org.apache.drill.exec.vector.complex.writer.BitWriter; +import org.apache.drill.exec.vector.complex.writer.DateWriter; +import org.apache.drill.exec.vector.complex.writer.Decimal18Writer; +import org.apache.drill.exec.vector.complex.writer.Decimal28SparseWriter; +import org.apache.drill.exec.vector.complex.writer.Decimal38SparseWriter; +import org.apache.drill.exec.vector.complex.writer.Decimal9Writer; +import org.apache.drill.exec.vector.complex.writer.Float4Writer; +import org.apache.drill.exec.vector.complex.writer.Float8Writer; +import org.apache.drill.exec.vector.complex.writer.IntWriter; +import org.apache.drill.exec.vector.complex.writer.TimeStampWriter; +import org.apache.drill.exec.vector.complex.writer.TimeWriter; +import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter; +import org.apache.drill.exec.vector.complex.writer.VarCharWriter; +import org.joda.time.DateTimeUtils; + +import parquet.io.api.Binary; +import parquet.io.api.Converter; +import parquet.io.api.GroupConverter; +import parquet.io.api.PrimitiveConverter; +import parquet.schema.DecimalMetadata; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.PrimitiveType; +import parquet.schema.Type; +import parquet.schema.Type.Repetition; + +import java.math.BigDecimal; +import java.util.List; + +public class DrillParquetGroupConverter extends GroupConverter { + + private List<Converter> converters; + private MapWriter mapWriter; + + public DrillParquetGroupConverter(ComplexWriterImpl complexWriter, MessageType schema) { + this(complexWriter.rootAsMap(), schema); + } + + public DrillParquetGroupConverter(MapWriter mapWriter, GroupType schema) { + this.mapWriter = mapWriter; + converters = Lists.newArrayList(); + for (Type type : schema.getFields()) { + Repetition rep = type.getRepetition(); + boolean isPrimitive = type.isPrimitive(); + if (!isPrimitive) { + if (rep != Repetition.REPEATED) { + DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mapWriter.map(type.getName()), type.asGroupType()); + converters.add(converter); + } else { + DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mapWriter.list(type.getName()).map(), type.asGroupType()); + converters.add(converter); + } + } else { + PrimitiveConverter converter = getConverterForType(type.asPrimitiveType()); + converters.add(converter); + } + } + } + + private PrimitiveConverter getConverterForType(PrimitiveType type) { + + String name = type.getName(); + switch(type.getPrimitiveTypeName()) { + case INT32: { + if (type.getOriginalType() == null) { + IntWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).integer() : mapWriter.integer(name); + return new DrillIntConverter(writer); + } + switch(type.getOriginalType()) { + case DECIMAL: { + Decimal9Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal9() : mapWriter.decimal9(name); + return new DrillDecimal9Converter(writer, type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale()); + } + case DATE: { + DateWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).date() : mapWriter.date(name); + return new DrillDateConverter(writer); + } + case TIME_MILLIS: { + TimeWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).time() : mapWriter.time(name); + return new DrillTimeConverter(writer); + } + default: { + throw new UnsupportedOperationException("Unsupported type: " + type.getOriginalType()); + } + } + } + case INT64: { + if (type.getOriginalType() == null) { + BigIntWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).bigInt() : mapWriter.bigInt(name); + return new DrillBigIntConverter(writer); + } + switch(type.getOriginalType()) { + case DECIMAL: { + Decimal18Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal18() : mapWriter.decimal18(name); + return new DrillDecimal18Converter(writer, type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale()); + } + case TIMESTAMP_MILLIS: { + TimeStampWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).timeStamp() : mapWriter.timeStamp(name); + return new DrillTimeStampConverter(writer); + } + default: { + throw new UnsupportedOperationException("Unsupported type " + type.getOriginalType()); + } + } + } + case FLOAT: { + Float4Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).float4() : mapWriter.float4(name); + return new DrillFloat4Converter(writer); + } + case DOUBLE: { + Float8Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).float8() : mapWriter.float8(name); + return new DrillFloat8Converter(writer); + } + case BOOLEAN: { + BitWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).bit() : mapWriter.bit(name); + return new DrillBoolConverter(writer); + } + case BINARY: { + if (type.getOriginalType() == null) { + VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name); + return new DrillVarBinaryConverter(writer); + } + switch(type.getOriginalType()) { + case UTF8: { + VarCharWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varChar() : mapWriter.varChar(name); + return new DrillVarCharConverter(writer); + } + case DECIMAL: { + DecimalMetadata metadata = type.getDecimalMetadata(); + if (metadata.getPrecision() <= 28) { + Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal28Sparse() : mapWriter.decimal28Sparse(name); + return new DrillBinaryToDecimal28Converter(writer, metadata.getPrecision(), metadata.getScale()); + } else { + Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal38Sparse() : mapWriter.decimal38Sparse(name); + return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale()); + } + } + default: { + throw new UnsupportedOperationException("Unsupported type " + type.getOriginalType()); + } + } + } + default: + throw new UnsupportedOperationException("Unsupported type: " + type.getPrimitiveTypeName()); + } + } + + @Override + public Converter getConverter(int i) { + return converters.get(i); + } + + @Override + public void start() { + mapWriter.start(); + } + + @Override + public void end() { + mapWriter.end(); + } + + public static class DrillIntConverter extends PrimitiveConverter { + private IntWriter writer; + private IntHolder holder = new IntHolder(); + + public DrillIntConverter(IntWriter writer) { + super(); + this.writer = writer; + } + + @Override + public void addInt(int value) { + holder.value = value; + writer.write(holder); + } + } + + public static class DrillDecimal9Converter extends PrimitiveConverter { + private Decimal9Writer writer; + private Decimal9Holder holder = new Decimal9Holder(); + int precision; + int scale; + + public DrillDecimal9Converter(Decimal9Writer writer, int precision, int scale) { + this.writer = writer; + this.scale = scale; + this.precision = precision; + } + + @Override + public void addInt(int value) { + holder.value = value; + writer.write(holder); + } + } + + public static class DrillDateConverter extends PrimitiveConverter { + private DateWriter writer; + private DateHolder holder = new DateHolder(); + + public DrillDateConverter(DateWriter writer) { + this.writer = writer; + } + + @Override + public void addInt(int value) { + holder.value = DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5); + writer.write(holder); + } + } + + public static class DrillTimeConverter extends PrimitiveConverter { + private TimeWriter writer; + private TimeHolder holder = new TimeHolder(); + + public DrillTimeConverter(TimeWriter writer) { + this.writer = writer; + } + + @Override + public void addInt(int value) { + holder.value = value; + writer.write(holder); + } + } + + public static class DrillBigIntConverter extends PrimitiveConverter { + private BigIntWriter writer; + private BigIntHolder holder = new BigIntHolder(); + + public DrillBigIntConverter(BigIntWriter writer) { + this.writer = writer; + } + + @Override + public void addLong(long value) { + holder.value = value; + writer.write(holder); + } + } + + public static class DrillTimeStampConverter extends PrimitiveConverter { + private TimeStampWriter writer; + private TimeStampHolder holder = new TimeStampHolder(); + + public DrillTimeStampConverter(TimeStampWriter writer) { + this.writer = writer; + } + + @Override + public void addInt(int value) { + holder.value = value; + writer.write(holder); + } + } + + public static class DrillDecimal18Converter extends PrimitiveConverter { + private Decimal18Writer writer; + private Decimal18Holder holder = new Decimal18Holder(); + + public DrillDecimal18Converter(Decimal18Writer writer, int precision, int scale) { + this.writer = writer; + holder.precision = precision; + holder.scale = scale; + } + + @Override + public void addLong(long value) { + holder.value = value; + writer.write(holder); + } + } + + public static class DrillFloat4Converter extends PrimitiveConverter { + private Float4Writer writer; + private Float4Holder holder = new Float4Holder(); + + public DrillFloat4Converter(Float4Writer writer) { + this.writer = writer; + } + + @Override + public void addFloat(float value) { + holder.value = value; + writer.write(holder); + } + } + + public static class DrillFloat8Converter extends PrimitiveConverter { + private Float8Writer writer; + private Float8Holder holder = new Float8Holder(); + + public DrillFloat8Converter(Float8Writer writer) { + this.writer = writer; + } + + @Override + public void addDouble(double value) { + holder.value = value; + writer.write(holder); + } + } + + public static class DrillBoolConverter extends PrimitiveConverter { + private BitWriter writer; + private BitHolder holder = new BitHolder(); + + public DrillBoolConverter(BitWriter writer) { + this.writer = writer; + } + + @Override + public void addBoolean(boolean value) { + holder.value = value ? 1 : 0; + writer.write(holder); + } + } + + public static class DrillVarBinaryConverter extends PrimitiveConverter { + private VarBinaryWriter writer; + private VarBinaryHolder holder = new VarBinaryHolder(); + + public DrillVarBinaryConverter(VarBinaryWriter writer) { + this.writer = writer; + } + + @Override + public void addBinary(Binary value) { + ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer()); + holder.buffer = buf; + holder.start = 0; + holder.end = value.length(); + writer.write(holder); + } + } + + public static class DrillVarCharConverter extends PrimitiveConverter { + private VarCharWriter writer; + private VarCharHolder holder = new VarCharHolder(); + + public DrillVarCharConverter(VarCharWriter writer) { + this.writer = writer; + } + + @Override + public void addBinary(Binary value) { + ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer()); + holder.buffer = buf; + holder.start = 0; + holder.end = value.length(); + writer.write(holder); + } + } + + public static class DrillBinaryToDecimal28Converter extends PrimitiveConverter { + private Decimal28SparseWriter writer; + private Decimal28SparseHolder holder = new Decimal28SparseHolder(); + + public DrillBinaryToDecimal28Converter(Decimal28SparseWriter writer, int precision, int scale) { + this.writer = writer; + holder.precision = precision; + holder.scale = scale; + } + + @Override + public void addBinary(Binary value) { + BigDecimal bigDecimal = DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), holder.scale); + ByteBuf buf = Unpooled.wrappedBuffer(new byte[28]); + DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, holder.precision, Decimal28SparseHolder.nDecimalDigits); + holder.buffer = buf; + writer.write(holder); + } + } + + public static class DrillBinaryToDecimal38Converter extends PrimitiveConverter { + private Decimal38SparseWriter writer; + private Decimal38SparseHolder holder = new Decimal38SparseHolder(); + + public DrillBinaryToDecimal38Converter(Decimal38SparseWriter writer, int precision, int scale) { + this.writer = writer; + holder.precision = precision; + holder.scale = scale; + } + + @Override + public void addBinary(Binary value) { + BigDecimal bigDecimal = DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), holder.scale); + ByteBuf buf = Unpooled.wrappedBuffer(new byte[38]); + DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, holder.precision, Decimal38SparseHolder.nDecimalDigits); + holder.buffer = buf; + writer.write(holder); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java new file mode 100644 index 000000000..aaeb536a8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet2; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.parquet.RowGroupReadEntry; +import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VariableWidthVector; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import parquet.hadoop.CodecFactoryExposer; +import parquet.hadoop.ColumnChunkIncReadStore; +import parquet.hadoop.metadata.BlockMetaData; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.hadoop.metadata.ColumnPath; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.io.ColumnIOFactory; +import parquet.io.InvalidRecordException; +import parquet.io.MessageColumnIO; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.Type; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DrillParquetReader implements RecordReader { + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class); + + private ParquetMetadata footer; + private MessageType schema; + private Configuration conf; + private RowGroupReadEntry entry; + private List<SchemaPath> columns; + private VectorContainerWriter writer; + private parquet.io.RecordReader<Void> recordReader; + private DrillParquetRecordMaterializer recordMaterializer; + private int recordCount; + private List<ValueVector> primitiveVectors; + + public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) { + this.footer = footer; + this.conf = conf; + this.columns = columns; + this.entry = entry; + } + + public static MessageType getProjection(MessageType schema, List<SchemaPath> columns) { + MessageType projection = null; + for (SchemaPath path : columns) { + List<String> segments = Lists.newArrayList(); + PathSegment rootSegment = path.getRootSegment(); + PathSegment seg = rootSegment; + String messageName = schema.getName(); + while(seg != null){ + if(seg.isNamed()) { + segments.add(seg.getNameSegment().getPath()); + } + seg = seg.getChild(); + } + String[] pathSegments = new String[segments.size()]; + segments.toArray(pathSegments); + Type type = null; + try { + type = schema.getType(pathSegments); + } catch (InvalidRecordException e) { + logger.warn("Invalid record" , e); + } + if (type != null) { + Type t = getType(pathSegments, 0, schema); + if (projection == null) { + projection = new MessageType(messageName, t); + } else { + projection = projection.union(new MessageType(messageName, t)); + } + } + } + return projection; + } + + @Override + public void setup(OutputMutator output) throws ExecutionSetupException { + + try { + schema = footer.getFileMetaData().getSchema(); + MessageType projection = null; + + if (columns == null || columns.size() == 0) { + projection = schema; + } else { + projection = getProjection(schema, columns); + if (projection == null) { + projection = schema; + } + } + + logger.debug("Requesting schema {}", projection); + + ColumnIOFactory factory = new ColumnIOFactory(false); + MessageColumnIO columnIO = factory.getColumnIO(projection, schema); + Map<ColumnPath, ColumnChunkMetaData> paths = new HashMap(); + + for (ColumnChunkMetaData md : footer.getBlocks().get(entry.getRowGroupIndex()).getColumns()) { + paths.put(md.getPath(), md); + } + + CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(conf); + FileSystem fs = FileSystem.get(conf); + Path filePath = new Path(entry.getPath()); + + BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex()); + + recordCount = (int) blockMetaData.getRowCount(); + + ColumnChunkIncReadStore pageReadStore = new ColumnChunkIncReadStore(recordCount, + codecFactoryExposer.getCodecFactory(), fs, filePath); + + for (String[] path : schema.getPaths()) { + Type type = schema.getType(path); + if (type.isPrimitive()) { + ColumnChunkMetaData md = paths.get(ColumnPath.get(path)); + pageReadStore.addColumn(schema.getColumnDescription(path), md); + } + } + + writer = new VectorContainerWriter(output); + recordMaterializer = new DrillParquetRecordMaterializer(writer, projection); + primitiveVectors = writer.getMapVector().getPrimitiveVectors(); + recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer); + } catch (Exception e) { + throw new ExecutionSetupException(e); + } + } + + private static Type getType(String[] pathSegments, int depth, MessageType schema) { + Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1)); + if (depth + 1 == pathSegments.length) { + return type; + } else { + Preconditions.checkState(!type.isPrimitive()); + return new GroupType(type.getRepetition(), type.getName(), getType(pathSegments, depth + 1, schema)); + } + } + + private long totalRead = 0; + + @Override + public int next() { + int count = 0; + for (; count < 4000 && totalRead < recordCount; count++, totalRead++) { + recordMaterializer.setPosition(count); + recordReader.read(); + if (count % 100 == 0) { + if (getPercentFilled() > 85) { + break; + } + } + } + writer.setValueCount(count); + return count; + } + + private int getPercentFilled() { + int filled = 0; + for (ValueVector v : primitiveVectors) { + filled = Math.max(filled, ((BaseValueVector) v).getCurrentValueCount() * 100 / v.getValueCapacity()); + if (v instanceof VariableWidthVector) { + filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity()); + } + } + return filled; + } + + @Override + public void cleanup() { + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java new file mode 100644 index 000000000..69893dce8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet2; + +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; +import parquet.io.api.GroupConverter; +import parquet.io.api.RecordMaterializer; +import parquet.schema.MessageType; + +import java.util.List; + +public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> { + + public DrillParquetGroupConverter root; + private ComplexWriter complexWriter; + + public DrillParquetRecordMaterializer(ComplexWriter complexWriter, MessageType schema) { + this.complexWriter = complexWriter; + root = new DrillParquetGroupConverter(complexWriter.rootAsMap(), schema); + } + + public void setPosition(int position) { + complexWriter.setPosition(position); + } + + public boolean ok() { + return complexWriter.ok(); + } + + @Override + public Void getCurrentRecord() { + return null; + } + + @Override + public GroupConverter getRootConverter() { + return root; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java index e70f406fd..7d0fbc7ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java @@ -32,6 +32,7 @@ public abstract class BaseDataValueVector extends BaseValueVector{ protected ByteBuf data = DeadBuf.DEAD_BUFFER; protected int valueCount; + protected int currentValueCount; public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) { super(field, allocator); @@ -50,6 +51,14 @@ public abstract class BaseDataValueVector extends BaseValueVector{ } } + public void setCurrentValueCount(int count) { + currentValueCount = count; + } + + public int getCurrentValueCount() { + return currentValueCount; + } + @Override public ByteBuf[] getBuffers(){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java index f96843502..e310b81be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java @@ -58,6 +58,9 @@ public abstract class BaseValueVector implements ValueVector{ return getField().getAsBuilder(); } + public abstract int getCurrentValueCount(); + public abstract void setCurrentValueCount(int count); + abstract public ByteBuf getData(); abstract class BaseAccessor implements ValueVector.Accessor{ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java index 6998a7410..25aff57f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java @@ -125,6 +125,15 @@ public class ObjectVector extends BaseValueVector{ } @Override + public int getCurrentValueCount() { + return 0; + } + + @Override + public void setCurrentValueCount(int count) { + } + + @Override public ByteBuf getData() { throw new UnsupportedOperationException("ObjectVector does not support this"); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java index 6660351c1..5c1d3ab01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java @@ -34,6 +34,8 @@ public interface VariableWidthVector extends ValueVector{ * @return */ public int getByteCapacity(); + + public int getCurrentSizeInBytes(); /** * Load the records in the provided buffer based on the given number of values. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java index 9667fd2db..5eb135898 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java @@ -24,6 +24,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.vector.ValueVector; +import java.util.List; + public abstract class AbstractContainerVector implements ValueVector{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class); @@ -39,6 +41,8 @@ public abstract class AbstractContainerVector implements ValueVector{ } } + public abstract List<ValueVector> getPrimitiveVectors(); + public abstract VectorWithOrdinal getVectorWithOrdinal(String name); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java index ed2ad8a3b..480b86352 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -76,6 +76,22 @@ public class MapVector extends AbstractContainerVector { return vectors.size(); } + @Override + public List<ValueVector> getPrimitiveVectors() { + List<ValueVector> primitiveVectors = Lists.newArrayList(); + for (ValueVector v : this.vectors.values()) { + if (v instanceof AbstractContainerVector) { + AbstractContainerVector av = (AbstractContainerVector) v; + for (ValueVector vv : av.getPrimitiveVectors()) { + primitiveVectors.add(vv); + } + } else { + primitiveVectors.add(v); + } + } + return primitiveVectors; + } + transient private MapTransferPair ephPair; transient private MapSingleCopier ephPair2; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index d43bf59f9..57c47d4e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -78,6 +78,20 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea return vector != null ? 1 : 0; } + @Override + public List<ValueVector> getPrimitiveVectors() { + List<ValueVector> primitiveVectors = Lists.newArrayList(); + if (vector instanceof AbstractContainerVector) { + for (ValueVector v : ((AbstractContainerVector) vector).getPrimitiveVectors()) { + primitiveVectors.add(v); + } + } else { + primitiveVectors.add(vector); + } + primitiveVectors.add(offsets); + return primitiveVectors; + } + public RepeatedListVector(SchemaPath path, BufferAllocator allocator){ this(MaterializedField.create(path, TYPE), allocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index 952fb4b90..cb770321d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -92,6 +92,23 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } @Override + public List<ValueVector> getPrimitiveVectors() { + List<ValueVector> primitiveVectors = Lists.newArrayList(); + for (ValueVector v : this.vectors.values()) { + if (v instanceof AbstractContainerVector) { + AbstractContainerVector av = (AbstractContainerVector) v; + for (ValueVector vv : av.getPrimitiveVectors()) { + primitiveVectors.add(vv); + } + } else { + primitiveVectors.add(v); + } + } + primitiveVectors.add(offsets); + return primitiveVectors; + } + + @Override public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { ValueVector v = vectors.get(name); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java index bc1d3675b..4f669c00e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java @@ -40,6 +40,10 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple this.mapRoot = new SingleMapWriter(mapVector, this); } + public MapVector getMapVector() { + return mapVector; + } + public void reset() { setPosition(0); resetState(); diff --git a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java index dcd20b1bf..cb3610252 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import parquet.bytes.BytesInput; +import parquet.hadoop.CodecFactory.BytesDecompressor; import parquet.hadoop.metadata.CompressionCodecName; public class CodecFactoryExposer{ @@ -39,4 +40,8 @@ public class CodecFactoryExposer{ public BytesInput decompress(BytesInput bytes, int uncompressedSize, CompressionCodecName codecName) throws IOException { return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize); } + + public BytesDecompressor getDecompressor(CompressionCodecName codec) { + return codecFactory.getDecompressor(codec); + } } diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java new file mode 100644 index 000000000..e5a477b0f --- /dev/null +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.Decompressor; +import parquet.bytes.BytesInput; +import parquet.column.ColumnDescriptor; +import parquet.column.page.DictionaryPage; +import parquet.column.page.Page; +import parquet.column.page.PageReadStore; +import parquet.column.page.PageReader; +import parquet.format.PageHeader; +import parquet.format.PageType; +import parquet.format.Util; +import parquet.format.converter.ParquetMetadataConverter; +import parquet.hadoop.CodecFactory.BytesDecompressor; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.hadoop.metadata.FileMetaData; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class ColumnChunkIncReadStore implements PageReadStore { + + private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + + private CodecFactory codecFactory = new CodecFactory(new Configuration()); + private FileSystem fs; + private Path path; + private long rowCount; + private List<FSDataInputStream> streams = new ArrayList(); + + public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, FileSystem fs, Path path) { + this.codecFactory = codecFactory; + this.fs = fs; + this.path = path; + this.rowCount = rowCount; + } + + + public class ColumnChunkIncPageReader implements PageReader { + + ColumnChunkMetaData metaData; + long fileOffset; + long size; + private long valueReadSoFar = 0; + + private DictionaryPage dictionaryPage; + private FSDataInputStream in; + private BytesDecompressor decompressor; + + public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in) { + this.metaData = metaData; + this.size = metaData.getTotalSize(); + this.fileOffset = metaData.getStartingPos(); + this.in = in; + this.decompressor = codecFactory.getDecompressor(metaData.getCodec()); + } + + public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in, CodecFactory codecFactory) { + this.metaData = metaData; + this.size = metaData.getTotalSize(); + this.fileOffset = metaData.getStartingPos(); + this.in = in; + this.decompressor = codecFactory.getDecompressor(metaData.getCodec()); + } + + @Override + public DictionaryPage readDictionaryPage() { + if (dictionaryPage == null) { + try { + long pos = in.getPos(); + PageHeader pageHeader = Util.readPageHeader(in); + if (pageHeader.getDictionary_page_header() == null) { + in.seek(pos); + return null; + } + dictionaryPage = + new DictionaryPage( + decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + pageHeader.getDictionary_page_header().getNum_values(), + parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding) + ); + System.out.println(dictionaryPage); + } catch (IOException e) { + throw new RuntimeException(e); + } +// if (dictionaryPage == null) { +// throw new RuntimeException("Dictionary page null"); +// } + } + return dictionaryPage; + } + + @Override + public long getTotalValueCount() { + return metaData.getValueCount(); + } + + @Override + public Page readPage() { + try { + while(valueReadSoFar < metaData.getValueCount()) { + PageHeader pageHeader = Util.readPageHeader(in); + switch (pageHeader.type) { + case DICTIONARY_PAGE: + if (dictionaryPage == null) { + dictionaryPage = + new DictionaryPage( + decompressor.decompress(BytesInput.from(in, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + pageHeader.uncompressed_page_size, + parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding) + ); + } else { + in.skip(pageHeader.compressed_page_size); + } + break; + case DATA_PAGE: + valueReadSoFar += pageHeader.data_page_header.getNum_values(); + return new Page( + decompressor.decompress(BytesInput.from(in,pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + pageHeader.data_page_header.num_values, + pageHeader.uncompressed_page_size, + parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding), + parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding), + parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding) + ); + default: + in.skip(pageHeader.compressed_page_size); + break; + } + } + in.close(); + return null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap(); + + public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData metaData) throws IOException { + FSDataInputStream in = fs.open(path); + streams.add(in); + in.seek(metaData.getStartingPos()); + ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, in); + + columns.put(descriptor, reader); + } + + public void close() throws IOException { + for (FSDataInputStream stream : streams) { + stream.close(); + } + } + + @Override + public PageReader getPageReader(ColumnDescriptor descriptor) { + return columns.get(descriptor); + } + + @Override + public long getRowCount() { + return rowCount; + } +} |