From ac8e69847659582e36c89fd52bb0856ab3bfbd21 Mon Sep 17 00:00:00 2001 From: Vlad Rozov Date: Wed, 9 May 2018 13:24:11 -0700 Subject: DRILL-6353: Upgrade Parquet MR dependencies closes #1259 --- .../exec/util/filereader/BufferedDirectBufInputStream.java | 8 ++++---- .../drill/exec/util/filereader/DirectBufInputStream.java | 11 ++++++++--- 2 files changed, 12 insertions(+), 7 deletions(-) (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/util') diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java index f208d6e82..1d764b1b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import io.netty.buffer.DrillBuf; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.parquet.hadoop.util.CompatibilityUtil; +import org.apache.parquet.hadoop.util.HadoopStreams; import java.io.Closeable; import java.io.IOException; @@ -179,7 +179,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement int nBytes = 0; if (bytesToRead > 0) { try { - nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead); + nBytes = HadoopStreams.wrap(getInputStream()).read(directBuffer); } catch (Exception e) { logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage()); throw new IOException((e)); @@ -193,8 +193,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement logger.trace( "PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, " + "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", this.streamId, this.startOffset, - this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, ((double) timer.elapsed(TimeUnit.MICROSECONDS)) - / 1000); + this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, + ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java index ae09a3708..ea2542eb8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java @@ -23,7 +23,8 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.parquet.hadoop.util.CompatibilityUtil; +import org.apache.parquet.hadoop.util.HadoopStreams; +import org.apache.parquet.io.SeekableInputStream; import java.io.FilterInputStream; import java.io.IOException; @@ -86,12 +87,16 @@ public class DirectBufInputStream extends FilterInputStream { buf.clear(); ByteBuffer directBuffer = buf.nioBuffer(0, len); int lengthLeftToRead = len; + SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream()); while (lengthLeftToRead > 0) { if(logger.isTraceEnabled()) { logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize); } Stopwatch timer = Stopwatch.createStarted(); - int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead); + int bytesRead = seekableInputStream.read(directBuffer); + if (bytesRead < 0) { + return bytesRead; + } lengthLeftToRead -= bytesRead; if(logger.isTraceEnabled()) { logger.trace( @@ -113,7 +118,7 @@ public class DirectBufInputStream extends FilterInputStream { b.release(); throw e; } - if (bytesRead <= -1) { + if (bytesRead < 0) { b.release(); return null; } -- cgit v1.2.3