From fab96c34cfc392b60fffc75b09c5d1927a72f33a Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Wed, 20 Aug 2014 13:37:27 -0700 Subject: Use Direct Memory in Parquet Writer --- exec/java-exec/pom.xml | 12 ++-- .../parquet/ParquetDirectByteBufferAllocator.java | 73 ++++++++++++++++++++++ .../exec/store/parquet/ParquetFormatPlugin.java | 8 ++- .../exec/store/parquet/ParquetRecordWriter.java | 47 ++++++++++---- .../hadoop/ColumnChunkPageWriteStoreExposer.java | 16 ++++- .../exec/store/parquet/TestFileGenerator.java | 11 +++- 6 files changed, 142 insertions(+), 25 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java (limited to 'exec') diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 685f2fe83..903f953bb 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -125,7 +125,7 @@ com.twitter parquet-column - 1.5.1-drill-r2 + 1.5.1-drill-r3 org.apache.hadoop @@ -140,7 +140,7 @@ com.twitter parquet-hadoop - 1.5.1-drill-r2 + 1.5.1-drill-r3 org.apache.hadoop @@ -170,7 +170,7 @@ com.twitter parquet-common - 1.5.1-drill-r2 + 1.5.1-drill-r3 org.apache.hadoop @@ -185,7 +185,7 @@ com.twitter parquet-jackson - 1.5.1-drill-r2 + 1.5.1-drill-r3 org.apache.hadoop @@ -200,7 +200,7 @@ com.twitter parquet-encoding - 1.5.1-drill-r2 + 1.5.1-drill-r3 org.apache.hadoop @@ -215,7 +215,7 @@ com.twitter parquet-generator - 1.5.1-drill-r2 + 1.5.1-drill-r3 org.apache.hadoop diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java new file mode 100644 index 000000000..1a49dcd77 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.parquet; + +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.ops.OperatorContext; +import parquet.bytes.ByteBufferAllocator; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class); + + private OperatorContext oContext; + private HashMap allocatedBuffers = new HashMap(); + + public ParquetDirectByteBufferAllocator(OperatorContext o){ + oContext=o; + } + + + @Override + public ByteBuffer allocate(int sz) { + ByteBuf bb = oContext.getAllocator().buffer(sz); + ByteBuffer b = bb.nioBuffer(0, sz); + allocatedBuffers.put(System.identityHashCode(b), bb); + logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b)); + return b; + } + + @Override + public void release(ByteBuffer b) { + Integer id = System.identityHashCode(b); + ByteBuf bb = allocatedBuffers.get(id); + // The ByteBuffer passed in may already have been freed or not allocated by this allocator. + // If it is not found in the allocated buffers, do nothing + if(bb!=null) { + logger.debug("ParquetDirectByteBufferAllocator: Freed byte buffer. Allocated ByteBuffer id: "+System.identityHashCode(b)); + bb.release(); + allocatedBuffers.remove(id); + } + } + + public void clear(){ + Iterator it = allocatedBuffers.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry pair = (Map.Entry)it.next(); + Integer id = (Integer)pair.getKey(); + ByteBuf bb = allocatedBuffers.get(id); + bb.release(); + it.remove(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 37d64036b..eb07d79a9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -27,7 +27,9 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.impl.WriterRecordBatch; @@ -71,7 +73,7 @@ public class ParquetFormatPlugin implements FormatPlugin{ private final ParquetFormatConfig config; private final StoragePluginConfig storageConfig; private final String name; - + public ParquetFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig){ this(name, context, fs, storageConfig, new ParquetFormatConfig()); } @@ -118,7 +120,7 @@ public class ParquetFormatPlugin implements FormatPlugin{ return new ParquetWriter(child, location, this); } - public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter writer) throws IOException { + public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter writer) throws IOException, OutOfMemoryException { Map options = Maps.newHashMap(); options.put("location", writer.getLocation()); @@ -131,7 +133,7 @@ public class ParquetFormatPlugin implements FormatPlugin{ options.put(ExecConstants.PARQUET_BLOCK_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString()); - RecordWriter recordWriter = new ParquetRecordWriter(); + RecordWriter recordWriter = new ParquetRecordWriter(context, writer); recordWriter.init(options); return recordWriter; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 94ccc133a..2c5f232c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -21,14 +21,14 @@ import com.google.common.collect.Lists; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.expr.holders.ComplexHolder; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.EventBasedRecordWriter; import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter; import org.apache.drill.exec.store.ParquetOutputRecordWriter; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -67,7 +67,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - private ParquetFileWriter w; + private ParquetFileWriter parquetFileWriter; private MessageType schema; private Map extraMetaData = new HashMap(); private int blockSize; @@ -91,6 +91,13 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { private String location; private String prefix; private int index = 0; + private OperatorContext oContext; + private ParquetDirectByteBufferAllocator allocator; + + public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{ + super(); + this.oContext=new OperatorContext(writer, context); + } @Override public void init(Map writerOptions) throws IOException { @@ -121,11 +128,15 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { schema = new MessageType("root", types); Path fileName = new Path(location, prefix + "_" + index + ".parquet"); - w = new ParquetFileWriter(conf, schema, fileName); - w.start(); + parquetFileWriter = new ParquetFileWriter(conf, schema, fileName); + parquetFileWriter.start(); int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5); - pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(codec, pageSize, this.schema, initialBlockBufferSize); + pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext, + codec, + pageSize, + this.schema, + initialBlockBufferSize); int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema); @@ -162,12 +173,14 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { } private void flush() throws IOException { - w.startBlock(recordCount); + parquetFileWriter.startBlock(recordCount); store.flush(); - ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, w); + ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter); recordCount = 0; - w.endBlock(); - w.end(extraMetaData); + parquetFileWriter.endBlock(); + parquetFileWriter.end(extraMetaData); + store.close(); + ColumnChunkPageWriteStoreExposer.close(pageStore); store = null; pageStore = null; index++; @@ -274,7 +287,17 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { @Override public void cleanup() throws IOException { if (recordCount > 0) { - flush(); + parquetFileWriter.startBlock(recordCount); + store.flush(); + ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter); + recordCount = 0; + parquetFileWriter.endBlock(); + parquetFileWriter.end(extraMetaData); + } + store.close(); + ColumnChunkPageWriteStoreExposer.close(pageStore); + if(oContext!=null){ + oContext.close(); } } } diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java index 54f647a72..0ffc3ba92 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java @@ -17,6 +17,8 @@ */ package parquet.hadoop; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; import org.apache.hadoop.conf.Configuration; import parquet.column.page.PageWriteStore; import parquet.hadoop.CodecFactory.BytesCompressor; @@ -27,13 +29,23 @@ import java.io.IOException; public class ColumnChunkPageWriteStoreExposer { - public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(CompressionCodecName codec, int pageSize, MessageType schema, int initialSize) { + public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(OperatorContext oContext, + CompressionCodecName codec, + int pageSize, + MessageType schema, + int initialSize) { BytesCompressor compressor = new CodecFactory(new Configuration()).getCompressor(codec, pageSize); - return new ColumnChunkPageWriteStore(compressor, schema, initialSize); + return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new ParquetDirectByteBufferAllocator(oContext)); } public static void flushPageStore(PageWriteStore pageStore, ParquetFileWriter w) throws IOException { ((ColumnChunkPageWriteStore) pageStore).flushToFileWriter(w); } + public static void close(PageWriteStore pageStore) throws IOException { + ((ColumnChunkPageWriteStore) pageStore).close(); + + } + + } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java index 3c0287dd0..fb1ea60e7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import parquet.bytes.BytesInput; +import parquet.bytes.DirectByteBufferAllocator; import parquet.column.ColumnDescriptor; import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; import parquet.hadoop.ParquetFileWriter; @@ -183,8 +184,14 @@ public class TestFileGenerator { w.startColumn(c1, props.recordsPerRowGroup, codec); int valsPerPage = (int) Math.ceil(props.recordsPerRowGroup / (float) fieldInfo.numberOfPages); byte[] bytes; - RunLengthBitPackingHybridValuesWriter defLevels = new RunLengthBitPackingHybridValuesWriter(MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, valsPerPage); - RunLengthBitPackingHybridValuesWriter repLevels = new RunLengthBitPackingHybridValuesWriter(MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, valsPerPage); + RunLengthBitPackingHybridValuesWriter defLevels = new RunLengthBitPackingHybridValuesWriter( + MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, + valsPerPage, + new DirectByteBufferAllocator()); + RunLengthBitPackingHybridValuesWriter repLevels = new RunLengthBitPackingHybridValuesWriter( + MAX_EXPECTED_BIT_WIDTH_FOR_DEFINITION_LEVELS, + valsPerPage, + new DirectByteBufferAllocator()); // for variable length binary fields int bytesNeededToEncodeLength = 4; if ((int) fieldInfo.bitLength > 0) { -- cgit v1.2.3