aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-08-11 11:58:43 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-08-18 14:46:43 -0700
commitc3b15af0292270e75a3f98a9225e7dceb24c96c3 (patch)
treee4bffcb689e37214c80dc68a3a052f677e4865e2 /exec
parent02e4824ed9da6e15b5f974f16b15b408695c8ada (diff)
DRILL-1281: Use ByteBuffer read codepath in complex parquet reader
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/pom.xml50
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java11
-rw-r--r--exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java46
3 files changed, 90 insertions, 17 deletions
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 21dfc6707..685f2fe83 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -357,6 +357,10 @@
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
@@ -367,6 +371,16 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.mapr.hadoop</groupId>
@@ -393,6 +407,42 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-api-2.1</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api-2.5</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 0d2a225fa..f47acab07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -46,6 +46,7 @@ import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.Type;
+import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -61,6 +62,7 @@ public class DrillParquetReader implements RecordReader {
private RowGroupReadEntry entry;
private List<SchemaPath> columns;
private VectorContainerWriter writer;
+ private ColumnChunkIncReadStore pageReadStore;
private parquet.io.RecordReader<Void> recordReader;
private DrillParquetRecordMaterializer recordMaterializer;
private int recordCount;
@@ -142,8 +144,8 @@ public class DrillParquetReader implements RecordReader {
recordCount = (int) blockMetaData.getRowCount();
- ColumnChunkIncReadStore pageReadStore = new ColumnChunkIncReadStore(recordCount,
- codecFactoryExposer.getCodecFactory(), fs, filePath);
+ pageReadStore = new ColumnChunkIncReadStore(recordCount,
+ codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fs, filePath);
for (String[] path : schema.getPaths()) {
Type type = schema.getType(path);
@@ -203,6 +205,11 @@ public class DrillParquetReader implements RecordReader {
@Override
public void cleanup() {
+ try {
+ pageReadStore.close();
+ } catch (IOException e) {
+ logger.warn("Failure while closing PageReadStore", e);
+ }
}
public void setOperatorContext(OperatorContext operatorContext) {
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index e5a477b0f..379d3e6a5 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -17,6 +17,8 @@
*/
package parquet.hadoop;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -35,8 +37,10 @@ import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.CodecFactory.BytesDecompressor;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.FileMetaData;
+import parquet.hadoop.util.CompatibilityUtil;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -48,13 +52,15 @@ public class ColumnChunkIncReadStore implements PageReadStore {
private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
private CodecFactory codecFactory = new CodecFactory(new Configuration());
+ private BufferAllocator allocator;
private FileSystem fs;
private Path path;
private long rowCount;
private List<FSDataInputStream> streams = new ArrayList();
- public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, FileSystem fs, Path path) {
+ public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) {
this.codecFactory = codecFactory;
+ this.allocator = allocator;
this.fs = fs;
this.path = path;
this.rowCount = rowCount;
@@ -64,6 +70,7 @@ public class ColumnChunkIncReadStore implements PageReadStore {
public class ColumnChunkIncPageReader implements PageReader {
ColumnChunkMetaData metaData;
+ ColumnDescriptor columnDescriptor;
long fileOffset;
long size;
private long valueReadSoFar = 0;
@@ -72,16 +79,11 @@ public class ColumnChunkIncReadStore implements PageReadStore {
private FSDataInputStream in;
private BytesDecompressor decompressor;
- public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in) {
- this.metaData = metaData;
- this.size = metaData.getTotalSize();
- this.fileOffset = metaData.getStartingPos();
- this.in = in;
- this.decompressor = codecFactory.getDecompressor(metaData.getCodec());
- }
+ private ByteBuf lastPage;
- public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in, CodecFactory codecFactory) {
+ public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) {
this.metaData = metaData;
+ this.columnDescriptor = columnDescriptor;
this.size = metaData.getTotalSize();
this.fileOffset = metaData.getStartingPos();
this.in = in;
@@ -104,13 +106,9 @@ public class ColumnChunkIncReadStore implements PageReadStore {
pageHeader.getDictionary_page_header().getNum_values(),
parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
);
- System.out.println(dictionaryPage);
} catch (IOException e) {
throw new RuntimeException(e);
}
-// if (dictionaryPage == null) {
-// throw new RuntimeException("Dictionary page null");
-// }
}
return dictionaryPage;
}
@@ -123,6 +121,10 @@ public class ColumnChunkIncReadStore implements PageReadStore {
@Override
public Page readPage() {
try {
+ if (lastPage != null) {
+ lastPage.release();
+ lastPage = null;
+ }
while(valueReadSoFar < metaData.getValueCount()) {
PageHeader pageHeader = Util.readPageHeader(in);
switch (pageHeader.type) {
@@ -140,10 +142,15 @@ public class ColumnChunkIncReadStore implements PageReadStore {
break;
case DATA_PAGE:
valueReadSoFar += pageHeader.data_page_header.getNum_values();
+ ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
+ lastPage = buf;
+ ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size);
+ CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size);
return new Page(
- decompressor.decompress(BytesInput.from(in,pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+ decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.compressed_page_size),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
+ parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
@@ -159,6 +166,12 @@ public class ColumnChunkIncReadStore implements PageReadStore {
throw new RuntimeException(e);
}
}
+
+ void close() {
+ if (lastPage != null) {
+ lastPage.release();
+ }
+ }
}
private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap();
@@ -167,7 +180,7 @@ public class ColumnChunkIncReadStore implements PageReadStore {
FSDataInputStream in = fs.open(path);
streams.add(in);
in.seek(metaData.getStartingPos());
- ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, in);
+ ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, descriptor, in);
columns.put(descriptor, reader);
}
@@ -176,6 +189,9 @@ public class ColumnChunkIncReadStore implements PageReadStore {
for (FSDataInputStream stream : streams) {
stream.close();
}
+ for (ColumnChunkIncPageReader reader : columns.values()) {
+ reader.close();
+ }
}
@Override