diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-08-11 11:58:43 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-08-18 14:46:43 -0700 |
commit | c3b15af0292270e75a3f98a9225e7dceb24c96c3 (patch) | |
tree | e4bffcb689e37214c80dc68a3a052f677e4865e2 /exec | |
parent | 02e4824ed9da6e15b5f974f16b15b408695c8ada (diff) |
DRILL-1281: Use ByteBuffer read codepath in complex parquet reader
Diffstat (limited to 'exec')
3 files changed, 90 insertions, 17 deletions
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 21dfc6707..685f2fe83 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -357,6 +357,10 @@ <groupId>com.sun.jersey</groupId> <artifactId>jersey-json</artifactId> </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> </exclusions> </dependency> </dependencies> @@ -367,6 +371,16 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.mapr.hadoop</groupId> @@ -393,6 +407,42 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> </exclusions> </dependency> </dependencies> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 0d2a225fa..f47acab07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -46,6 +46,7 @@ import parquet.schema.GroupType; import parquet.schema.MessageType; import parquet.schema.Type; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -61,6 +62,7 @@ public class DrillParquetReader implements RecordReader { private RowGroupReadEntry entry; private List<SchemaPath> columns; private VectorContainerWriter writer; + private ColumnChunkIncReadStore pageReadStore; private parquet.io.RecordReader<Void> recordReader; private DrillParquetRecordMaterializer recordMaterializer; private int recordCount; @@ -142,8 +144,8 @@ public class DrillParquetReader implements RecordReader { recordCount = (int) blockMetaData.getRowCount(); - ColumnChunkIncReadStore pageReadStore = new ColumnChunkIncReadStore(recordCount, - codecFactoryExposer.getCodecFactory(), fs, filePath); + pageReadStore = new ColumnChunkIncReadStore(recordCount, + codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fs, filePath); for (String[] path : schema.getPaths()) { Type type = schema.getType(path); @@ -203,6 +205,11 @@ public class DrillParquetReader implements RecordReader { @Override public void cleanup() { + try { + pageReadStore.close(); + } catch (IOException e) { + logger.warn("Failure while closing PageReadStore", e); + } } public void setOperatorContext(OperatorContext operatorContext) { diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java index e5a477b0f..379d3e6a5 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java @@ -17,6 +17,8 @@ */ package parquet.hadoop; +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -35,8 +37,10 @@ import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactory.BytesDecompressor; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.FileMetaData; +import parquet.hadoop.util.CompatibilityUtil; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -48,13 +52,15 @@ public class ColumnChunkIncReadStore implements PageReadStore { private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); private CodecFactory codecFactory = new CodecFactory(new Configuration()); + private BufferAllocator allocator; private FileSystem fs; private Path path; private long rowCount; private List<FSDataInputStream> streams = new ArrayList(); - public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, FileSystem fs, Path path) { + public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) { this.codecFactory = codecFactory; + this.allocator = allocator; this.fs = fs; this.path = path; this.rowCount = rowCount; @@ -64,6 +70,7 @@ public class ColumnChunkIncReadStore implements PageReadStore { public class ColumnChunkIncPageReader implements PageReader { ColumnChunkMetaData metaData; + ColumnDescriptor columnDescriptor; long fileOffset; long size; private long valueReadSoFar = 0; @@ -72,16 +79,11 @@ public class ColumnChunkIncReadStore implements PageReadStore { private FSDataInputStream in; private BytesDecompressor decompressor; - public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in) { - this.metaData = metaData; - this.size = metaData.getTotalSize(); - this.fileOffset = metaData.getStartingPos(); - this.in = in; - this.decompressor = codecFactory.getDecompressor(metaData.getCodec()); - } + private ByteBuf lastPage; - public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in, CodecFactory codecFactory) { + public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) { this.metaData = metaData; + this.columnDescriptor = columnDescriptor; this.size = metaData.getTotalSize(); this.fileOffset = metaData.getStartingPos(); this.in = in; @@ -104,13 +106,9 @@ public class ColumnChunkIncReadStore implements PageReadStore { pageHeader.getDictionary_page_header().getNum_values(), parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding) ); - System.out.println(dictionaryPage); } catch (IOException e) { throw new RuntimeException(e); } -// if (dictionaryPage == null) { -// throw new RuntimeException("Dictionary page null"); -// } } return dictionaryPage; } @@ -123,6 +121,10 @@ public class ColumnChunkIncReadStore implements PageReadStore { @Override public Page readPage() { try { + if (lastPage != null) { + lastPage.release(); + lastPage = null; + } while(valueReadSoFar < metaData.getValueCount()) { PageHeader pageHeader = Util.readPageHeader(in); switch (pageHeader.type) { @@ -140,10 +142,15 @@ public class ColumnChunkIncReadStore implements PageReadStore { break; case DATA_PAGE: valueReadSoFar += pageHeader.data_page_header.getNum_values(); + ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size); + lastPage = buf; + ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); + CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size); return new Page( - decompressor.decompress(BytesInput.from(in,pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.compressed_page_size), pageHeader.data_page_header.num_values, pageHeader.uncompressed_page_size, + parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()), parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding), parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding), parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding) @@ -159,6 +166,12 @@ public class ColumnChunkIncReadStore implements PageReadStore { throw new RuntimeException(e); } } + + void close() { + if (lastPage != null) { + lastPage.release(); + } + } } private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap(); @@ -167,7 +180,7 @@ public class ColumnChunkIncReadStore implements PageReadStore { FSDataInputStream in = fs.open(path); streams.add(in); in.seek(metaData.getStartingPos()); - ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, in); + ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, descriptor, in); columns.put(descriptor, reader); } @@ -176,6 +189,9 @@ public class ColumnChunkIncReadStore implements PageReadStore { for (FSDataInputStream stream : streams) { stream.close(); } + for (ColumnChunkIncPageReader reader : columns.values()) { + reader.close(); + } } @Override |