aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/util
diff options
context:
space:
mode:
authorVlad Rozov <vrozov@apache.org>2018-05-09 13:24:11 -0700
committerTimothy Farkas <timothyfarkas@apache.org>2018-06-13 17:11:29 -0700
commitac8e69847659582e36c89fd52bb0856ab3bfbd21 (patch)
tree917e881ff0bc693badbf152e629a75505ace8a90 /exec/java-exec/src/main/java/org/apache/drill/exec/util
parent98dbc3a222990703aebe983883779763e0cdc1e9 (diff)
DRILL-6353: Upgrade Parquet MR dependencies
closes #1259
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/util')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java11
2 files changed, 12 insertions, 7 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index f208d6e82..1d764b1b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
import java.io.Closeable;
import java.io.IOException;
@@ -179,7 +179,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
int nBytes = 0;
if (bytesToRead > 0) {
try {
- nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead);
+ nBytes = HadoopStreams.wrap(getInputStream()).read(directBuffer);
} catch (Exception e) {
logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage());
throw new IOException((e));
@@ -193,8 +193,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
logger.trace(
"PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
+ "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", this.streamId, this.startOffset,
- this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, ((double) timer.elapsed(TimeUnit.MICROSECONDS))
- / 1000);
+ this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer,
+ ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
index ae09a3708..ea2542eb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -23,7 +23,8 @@ import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
@@ -86,12 +87,16 @@ public class DirectBufInputStream extends FilterInputStream {
buf.clear();
ByteBuffer directBuffer = buf.nioBuffer(0, len);
int lengthLeftToRead = len;
+ SeekableInputStream seekableInputStream = HadoopStreams.wrap(getInputStream());
while (lengthLeftToRead > 0) {
if(logger.isTraceEnabled()) {
logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
}
Stopwatch timer = Stopwatch.createStarted();
- int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead);
+ int bytesRead = seekableInputStream.read(directBuffer);
+ if (bytesRead < 0) {
+ return bytesRead;
+ }
lengthLeftToRead -= bytesRead;
if(logger.isTraceEnabled()) {
logger.trace(
@@ -113,7 +118,7 @@ public class DirectBufInputStream extends FilterInputStream {
b.release();
throw e;
}
- if (bytesRead <= -1) {
+ if (bytesRead < 0) {
b.release();
return null;
}