aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2018-05-09 12:27:32 +0800
committerWenchen Fan <wenchen@databricks.com>2018-05-09 12:27:32 +0800
commitcac9b1dea1bb44fa42abf77829c05bf93f70cf20 (patch)
tree8c3a6c54da25be568380399cb17d6f04176ee09d
parent7e7350285dc22764f599671d874617c0eea093e5 (diff)
[SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
## What changes were proposed in this pull request? This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection. ## How was this patch tested? Existing Parquet tests. Running in production at Netflix for about 3 months. Author: Ryan Blue <blue@apache.org> Closes #21070 from rdblue/SPARK-23972-update-parquet-to-1.10.0.
-rw-r--r--dev/deps/spark-deps-hadoop-2.612
-rw-r--r--dev/deps/spark-deps-hadoop-2.712
-rw-r--r--dev/deps/spark-deps-hadoop-3.112
-rw-r--r--docs/sql-programming-guide.md2
-rw-r--r--pom.xml8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java39
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java166
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java163
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala5
-rw-r--r--sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala4
13 files changed, 241 insertions, 198 deletions
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index c3d1dd444b..f479c13f00 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -162,13 +162,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 290867035f..e7c4599cb5 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
index 97ad65a409..3447cd7395 100644
--- a/dev/deps/spark-deps-hadoop-3.1
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -181,13 +181,13 @@ orc-mapreduce-1.4.3-nohive.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.6.jar
pyrolite-4.13.jar
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index c521f3cb51..3e8946e424 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -964,7 +964,7 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession
Sets the compression codec used when writing Parquet files. If either `compression` or
`parquet.compression` is specified in the table-specific options/properties, the precedence would be
`compression`, `parquet.compression`, `spark.sql.parquet.compression.codec`. Acceptable values include:
- none, uncompressed, snappy, gzip, lzo.
+ none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
</td>
</tr>
<tr>
diff --git a/pom.xml b/pom.xml
index 88e77ff874..6e37e518d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,7 +129,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<derby.version>10.12.1.1</derby.version>
- <parquet.version>1.8.2</parquet.version>
+ <parquet.version>1.10.0</parquet.version>
<orc.version>1.4.3</orc.version>
<orc.classifier>nohive</orc.classifier>
<hive.parquet.version>1.6.0</hive.parquet.version>
@@ -1778,6 +1778,12 @@
<artifactId>parquet-hadoop</artifactId>
<version>${parquet.version}</version>
<scope>${parquet.deps.scope}</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 895e150756..b00edca97c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -345,7 +345,7 @@ object SQLConf {
"snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
- .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
+ .checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", "zstd"))
.createWithDefault("snappy")
val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index e65cd252c3..10d6ed85a4 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -293,7 +293,7 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
return new RLEIntIterator(
new RunLengthBitPackingHybridDecoder(
BytesUtils.getWidthFromMaxInt(maxLevel),
- new ByteArrayInputStream(bytes.toByteArray())));
+ bytes.toInputStream()));
} catch (IOException e) {
throw new IOException("could not read levels in page for col " + descriptor, e);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 72f1d024b0..d5969b55ee 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.TimeZone;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
@@ -388,7 +390,8 @@ public class VectorizedColumnReader {
* is guaranteed that num is smaller than the number of values left in the current page.
*/
- private void readBooleanBatch(int rowId, int num, WritableColumnVector column) {
+ private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
+ throws IOException {
if (column.dataType() != DataTypes.BooleanType) {
throw constructConvertNotSupportedException(descriptor, column);
}
@@ -396,7 +399,7 @@ public class VectorizedColumnReader {
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}
- private void readIntBatch(int rowId, int num, WritableColumnVector column) {
+ private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
@@ -414,7 +417,7 @@ public class VectorizedColumnReader {
}
}
- private void readLongBatch(int rowId, int num, WritableColumnVector column) {
+ private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) ||
@@ -434,7 +437,7 @@ public class VectorizedColumnReader {
}
}
- private void readFloatBatch(int rowId, int num, WritableColumnVector column) {
+ private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: support implicit cast to double?
if (column.dataType() == DataTypes.FloatType) {
@@ -445,7 +448,7 @@ public class VectorizedColumnReader {
}
}
- private void readDoubleBatch(int rowId, int num, WritableColumnVector column) {
+ private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.DoubleType) {
@@ -456,7 +459,7 @@ public class VectorizedColumnReader {
}
}
- private void readBinaryBatch(int rowId, int num, WritableColumnVector column) {
+ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: implement remaining type conversions
VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
@@ -556,7 +559,7 @@ public class VectorizedColumnReader {
});
}
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
+ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) throws IOException {
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
@@ -581,7 +584,7 @@ public class VectorizedColumnReader {
}
try {
- dataColumn.initFromPage(pageValueCount, bytes, offset);
+ dataColumn.initFromPage(pageValueCount, in);
} catch (IOException e) {
throw new IOException("could not read page in col " + descriptor, e);
}
@@ -602,12 +605,11 @@ public class VectorizedColumnReader {
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
- byte[] bytes = page.getBytes().toByteArray();
- rlReader.initFromPage(pageValueCount, bytes, 0);
- int next = rlReader.getNextOffset();
- dlReader.initFromPage(pageValueCount, bytes, next);
- next = dlReader.getNextOffset();
- initDataReader(page.getValueEncoding(), bytes, next);
+ BytesInput bytes = page.getBytes();
+ ByteBufferInputStream in = bytes.toInputStream();
+ rlReader.initFromPage(pageValueCount, in);
+ dlReader.initFromPage(pageValueCount, in);
+ initDataReader(page.getValueEncoding(), in);
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
@@ -619,12 +621,13 @@ public class VectorizedColumnReader {
page.getRepetitionLevels(), descriptor);
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
- this.defColumn = new VectorizedRleValuesReader(bitWidth);
+ // do not read the length from the stream. v2 pages handle dividing the page bytes.
+ this.defColumn = new VectorizedRleValuesReader(bitWidth, false);
this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
- this.defColumn.initFromBuffer(
- this.pageValueCount, page.getDefinitionLevels().toByteArray());
+ this.defColumn.initFromPage(
+ this.pageValueCount, page.getDefinitionLevels().toInputStream());
try {
- initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
+ initDataReader(page.getDataEncoding(), page.getData().toInputStream());
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 5b75f71933..aacefacfc1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -20,34 +20,30 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.io.ParquetDecodingException;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
-import org.apache.spark.unsafe.Platform;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.spark.unsafe.Platform;
/**
* An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
*/
public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
- private byte[] buffer;
- private int offset;
- private int bitOffset; // Only used for booleans.
- private ByteBuffer byteBuffer; // used to wrap the byte array buffer
+ private ByteBufferInputStream in = null;
- private static final boolean bigEndianPlatform =
- ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
+ // Only used for booleans.
+ private int bitOffset;
+ private byte currentByte = 0;
public VectorizedPlainValuesReader() {
}
@Override
- public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
- this.buffer = bytes;
- this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
- if (bigEndianPlatform) {
- byteBuffer = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
- }
+ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+ this.in = in;
}
@Override
@@ -63,115 +59,157 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
}
}
+ private ByteBuffer getBuffer(int length) {
+ try {
+ return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
+ }
+ }
+
@Override
public final void readIntegers(int total, WritableColumnVector c, int rowId) {
- c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 4 * total;
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putInt(rowId + i, buffer.getInt());
+ }
+ }
}
@Override
public final void readLongs(int total, WritableColumnVector c, int rowId) {
- c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 8 * total;
+ int requiredBytes = total * 8;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putLong(rowId + i, buffer.getLong());
+ }
+ }
}
@Override
public final void readFloats(int total, WritableColumnVector c, int rowId) {
- c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 4 * total;
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putFloats(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putFloat(rowId + i, buffer.getFloat());
+ }
+ }
}
@Override
public final void readDoubles(int total, WritableColumnVector c, int rowId) {
- c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 8 * total;
+ int requiredBytes = total * 8;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putDoubles(rowId, total, buffer.array(), offset);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putDouble(rowId + i, buffer.getDouble());
+ }
+ }
}
@Override
public final void readBytes(int total, WritableColumnVector c, int rowId) {
- for (int i = 0; i < total; i++) {
- // Bytes are stored as a 4-byte little endian int. Just read the first byte.
- // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
- c.putByte(rowId + i, Platform.getByte(buffer, offset));
- offset += 4;
+ // Bytes are stored as a 4-byte little endian int. Just read the first byte.
+ // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ for (int i = 0; i < total; i += 1) {
+ c.putByte(rowId + i, buffer.get());
+ // skip the next 3 bytes
+ buffer.position(buffer.position() + 3);
}
}
@Override
public final boolean readBoolean() {
- byte b = Platform.getByte(buffer, offset);
- boolean v = (b & (1 << bitOffset)) != 0;
+ // TODO: vectorize decoding and keep boolean[] instead of currentByte
+ if (bitOffset == 0) {
+ try {
+ currentByte = (byte) in.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read a byte", e);
+ }
+ }
+
+ boolean v = (currentByte & (1 << bitOffset)) != 0;
bitOffset += 1;
if (bitOffset == 8) {
bitOffset = 0;
- offset++;
}
return v;
}
@Override
public final int readInteger() {
- int v = Platform.getInt(buffer, offset);
- if (bigEndianPlatform) {
- v = java.lang.Integer.reverseBytes(v);
- }
- offset += 4;
- return v;
+ return getBuffer(4).getInt();
}
@Override
public final long readLong() {
- long v = Platform.getLong(buffer, offset);
- if (bigEndianPlatform) {
- v = java.lang.Long.reverseBytes(v);
- }
- offset += 8;
- return v;
+ return getBuffer(8).getLong();
}
@Override
public final byte readByte() {
- return (byte)readInteger();
+ return (byte) readInteger();
}
@Override
public final float readFloat() {
- float v;
- if (!bigEndianPlatform) {
- v = Platform.getFloat(buffer, offset);
- } else {
- v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
- }
- offset += 4;
- return v;
+ return getBuffer(4).getFloat();
}
@Override
public final double readDouble() {
- double v;
- if (!bigEndianPlatform) {
- v = Platform.getDouble(buffer, offset);
- } else {
- v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
- }
- offset += 8;
- return v;
+ return getBuffer(8).getDouble();
}
@Override
public final void readBinary(int total, WritableColumnVector v, int rowId) {
for (int i = 0; i < total; i++) {
int len = readInteger();
- int start = offset;
- offset += len;
- v.putByteArray(rowId + i, buffer, start - Platform.BYTE_ARRAY_OFFSET, len);
+ ByteBuffer buffer = getBuffer(len);
+ if (buffer.hasArray()) {
+ v.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), len);
+ } else {
+ byte[] bytes = new byte[len];
+ buffer.get(bytes);
+ v.putByteArray(rowId + i, bytes);
+ }
}
}
@Override
public final Binary readBinary(int len) {
- Binary result = Binary.fromConstantByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len);
- offset += len;
- return result;
+ ByteBuffer buffer = getBuffer(len);
+ if (buffer.hasArray()) {
+ return Binary.fromConstantByteArray(
+ buffer.array(), buffer.arrayOffset() + buffer.position(), len);
+ } else {
+ byte[] bytes = new byte[len];
+ buffer.get(bytes);
+ return Binary.fromConstantByteArray(bytes);
+ }
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index fc7fa70c39..fe3d31ae8e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.parquet;
import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.bitpacking.BytePacker;
@@ -27,6 +28,9 @@ import org.apache.parquet.io.api.Binary;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
/**
* A values reader for Parquet's run-length encoded data. This is based off of the version in
* parquet-mr with these changes:
@@ -49,9 +53,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
// Encoded data.
- private byte[] in;
- private int end;
- private int offset;
+ private ByteBufferInputStream in;
// bit/byte width of decoded data and utility to batch unpack them.
private int bitWidth;
@@ -70,45 +72,40 @@ public final class VectorizedRleValuesReader extends ValuesReader
// If true, the bit width is fixed. This decoder is used in different places and this also
// controls if we need to read the bitwidth from the beginning of the data stream.
private final boolean fixedWidth;
+ private final boolean readLength;
public VectorizedRleValuesReader() {
- fixedWidth = false;
+ this.fixedWidth = false;
+ this.readLength = false;
}
public VectorizedRleValuesReader(int bitWidth) {
- fixedWidth = true;
+ this.fixedWidth = true;
+ this.readLength = bitWidth != 0;
+ init(bitWidth);
+ }
+
+ public VectorizedRleValuesReader(int bitWidth, boolean readLength) {
+ this.fixedWidth = true;
+ this.readLength = readLength;
init(bitWidth);
}
@Override
- public void initFromPage(int valueCount, byte[] page, int start) {
- this.offset = start;
- this.in = page;
+ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
+ this.in = in;
if (fixedWidth) {
- if (bitWidth != 0) {
+ // initialize for repetition and definition levels
+ if (readLength) {
int length = readIntLittleEndian();
- this.end = this.offset + length;
+ this.in = in.sliceStream(length);
}
} else {
- this.end = page.length;
- if (this.end != this.offset) init(page[this.offset++] & 255);
- }
- if (bitWidth == 0) {
- // 0 bit width, treat this as an RLE run of valueCount number of 0's.
- this.mode = MODE.RLE;
- this.currentCount = valueCount;
- this.currentValue = 0;
- } else {
- this.currentCount = 0;
+ // initialize for values
+ if (in.available() > 0) {
+ init(in.read());
+ }
}
- }
-
- // Initialize the reader from a buffer. This is used for the V2 page encoding where the
- // definition are in its own buffer.
- public void initFromBuffer(int valueCount, byte[] data) {
- this.offset = 0;
- this.in = data;
- this.end = data.length;
if (bitWidth == 0) {
// 0 bit width, treat this as an RLE run of valueCount number of 0's.
this.mode = MODE.RLE;
@@ -130,11 +127,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
@Override
- public int getNextOffset() {
- return this.end;
- }
-
- @Override
public boolean readBoolean() {
return this.readInteger() != 0;
}
@@ -182,7 +174,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -217,7 +209,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -251,7 +243,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -285,7 +277,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -321,7 +313,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -355,7 +347,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -389,7 +381,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -423,7 +415,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector c,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -462,7 +454,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
WritableColumnVector nulls,
int rowId,
int level,
- VectorizedValuesReader data) {
+ VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -559,12 +551,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
/**
* Reads the next varint encoded int.
*/
- private int readUnsignedVarInt() {
+ private int readUnsignedVarInt() throws IOException {
int value = 0;
int shift = 0;
int b;
do {
- b = in[offset++] & 255;
+ b = in.read();
value |= (b & 0x7F) << shift;
shift += 7;
} while ((b & 0x80) != 0);
@@ -574,35 +566,32 @@ public final class VectorizedRleValuesReader extends ValuesReader
/**
* Reads the next 4 byte little endian int.
*/
- private int readIntLittleEndian() {
- int ch4 = in[offset] & 255;
- int ch3 = in[offset + 1] & 255;
- int ch2 = in[offset + 2] & 255;
- int ch1 = in[offset + 3] & 255;
- offset += 4;
+ private int readIntLittleEndian() throws IOException {
+ int ch4 = in.read();
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}
/**
* Reads the next byteWidth little endian int.
*/
- private int readIntLittleEndianPaddedOnBitWidth() {
+ private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
switch (bytesWidth) {
case 0:
return 0;
case 1:
- return in[offset++] & 255;
+ return in.read();
case 2: {
- int ch2 = in[offset] & 255;
- int ch1 = in[offset + 1] & 255;
- offset += 2;
+ int ch2 = in.read();
+ int ch1 = in.read();
return (ch1 << 8) + ch2;
}
case 3: {
- int ch3 = in[offset] & 255;
- int ch2 = in[offset + 1] & 255;
- int ch1 = in[offset + 2] & 255;
- offset += 3;
+ int ch3 = in.read();
+ int ch2 = in.read();
+ int ch1 = in.read();
return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
}
case 4: {
@@ -619,32 +608,36 @@ public final class VectorizedRleValuesReader extends ValuesReader
/**
* Reads the next group.
*/
- private void readNextGroup() {
- int header = readUnsignedVarInt();
- this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
- switch (mode) {
- case RLE:
- this.currentCount = header >>> 1;
- this.currentValue = readIntLittleEndianPaddedOnBitWidth();
- return;
- case PACKED:
- int numGroups = header >>> 1;
- this.currentCount = numGroups * 8;
- int bytesToRead = ceil8(this.currentCount * this.bitWidth);
-
- if (this.currentBuffer.length < this.currentCount) {
- this.currentBuffer = new int[this.currentCount];
- }
- currentBufferIdx = 0;
- int valueIndex = 0;
- for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
- this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
- valueIndex += 8;
- }
- offset += bytesToRead;
- return;
- default:
- throw new ParquetDecodingException("not a valid mode " + this.mode);
+ private void readNextGroup() {
+ try {
+ int header = readUnsignedVarInt();
+ this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+ switch (mode) {
+ case RLE:
+ this.currentCount = header >>> 1;
+ this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+ return;
+ case PACKED:
+ int numGroups = header >>> 1;
+ this.currentCount = numGroups * 8;
+
+ if (this.currentBuffer.length < this.currentCount) {
+ this.currentBuffer = new int[this.currentCount];
+ }
+ currentBufferIdx = 0;
+ int valueIndex = 0;
+ while (valueIndex < this.currentCount) {
+ // values are bit packed 8 at a time, so reading bitWidth will always work
+ ByteBuffer buffer = in.slice(bitWidth);
+ this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
+ valueIndex += 8;
+ }
+ return;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + this.mode);
+ }
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read from input stream", e);
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index f36a89a4c3..9cfc30725f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -81,7 +81,10 @@ object ParquetOptions {
"uncompressed" -> CompressionCodecName.UNCOMPRESSED,
"snappy" -> CompressionCodecName.SNAPPY,
"gzip" -> CompressionCodecName.GZIP,
- "lzo" -> CompressionCodecName.LZO)
+ "lzo" -> CompressionCodecName.LZO,
+ "lz4" -> CompressionCodecName.LZ4,
+ "brotli" -> CompressionCodecName.BROTLI,
+ "zstd" -> CompressionCodecName.ZSTD)
def getParquetCompressionCodecName(name: String): String = {
shortParquetCompressionCodecNames(name).name()
diff --git a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
index 51dac11102..58ed201e2a 100644
--- a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
@@ -89,7 +89,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
-Partition Statistics 1067 bytes, 3 rows
+Partition Statistics 1121 bytes, 3 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -122,7 +122,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
-Partition Statistics 1067 bytes, 3 rows
+Partition Statistics 1121 bytes, 3 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -147,7 +147,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=11]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
-Partition Statistics 1080 bytes, 4 rows
+Partition Statistics 1098 bytes, 4 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -180,7 +180,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=10]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10
-Partition Statistics 1067 bytes, 3 rows
+Partition Statistics 1121 bytes, 3 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -205,7 +205,7 @@ Database default
Table t
Partition Values [ds=2017-08-01, hr=11]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11
-Partition Statistics 1080 bytes, 4 rows
+Partition Statistics 1098 bytes, 4 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
@@ -230,7 +230,7 @@ Database default
Table t
Partition Values [ds=2017-09-01, hr=5]
Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5
-Partition Statistics 1054 bytes, 2 rows
+Partition Statistics 1144 bytes, 2 rows
# Storage Information
Location [not included in comparison]sql/core/spark-warehouse/t
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 863703b15f..efc2f20a90 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
case plan: InMemoryRelation => plan
}.head
// InMemoryRelation's stats is file size before the underlying RDD is materialized
- assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+ assert(inMemoryRelation.computeStats().sizeInBytes === 800)
// InMemoryRelation's stats is updated after materializing RDD
dfFromFile.collect()
@@ -516,7 +516,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
// Even CBO enabled, InMemoryRelation's stats keeps as the file size before table's stats
// is calculated
- assert(inMemoryRelation2.computeStats().sizeInBytes === 740)
+ assert(inMemoryRelation2.computeStats().sizeInBytes === 800)
// InMemoryRelation's stats should be updated after calculating stats of the table
// clear cache to simulate a fresh environment