aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSalim Achouche <sachouche2@gmail.com>2019-03-12 18:06:43 -0700
committerTimothy Farkas <timothytiborfarkas@gmail.com>2019-03-13 12:17:01 -0700
commitb20a2e6b5ee82814011a031d8a8282b0fec3ffe1 (patch)
tree46c4565626f25ffb2b14dd8279e3d183810e8379
parente5e8419ab6fc1761cc7a6055b02f4300525e936e (diff)
DRILL-7100: Fixed IllegalArgumentException when reading Parquet data
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java46
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java49
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java12
-rw-r--r--exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java16
5 files changed, 73 insertions, 54 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
index ca8fc055f..3a7177f5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java
@@ -91,7 +91,7 @@ final class BatchOverflowOptimizer {
// do not account for null values as we are interested in the
// actual data that is being stored within a batch.
BatchSizingMemoryUtil.getMemoryUsage(stat.vector, stat.numValuesRead, vectorMemoryUsage);
- final int batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead);
+ final long batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead);
double currAvgPrecision = columnPrecisionStats.avgPrecision;
double newAvgPrecision = ((numBatches - 1) * currAvgPrecision + batchColumnPrecision) / numBatches;
@@ -138,7 +138,7 @@ final class BatchOverflowOptimizer {
/** Materialized field */
private final MaterializedField field;
/** Average column precision */
- private int avgPrecision;
+ private long avgPrecision;
private ColumnPrecisionStats(MaterializedField field) {
this.field = field;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
index 302d0eb5e..878256815 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java
@@ -59,9 +59,9 @@ public final class BatchSizingMemoryUtil {
* limit; false otherwise
*/
public static boolean canAddNewData(ColumnMemoryUsageInfo columnMemoryUsage,
- int newBitsMemory,
- int newOffsetsMemory,
- int newDataMemory) {
+ long newBitsMemory,
+ long newOffsetsMemory,
+ long newDataMemory) {
// First we need to update the vector memory usage
final VectorMemoryUsageInfo vectorMemoryUsage = columnMemoryUsage.vectorMemoryUsage;
@@ -69,20 +69,20 @@ public final class BatchSizingMemoryUtil {
// We need to compute the new ValueVector memory usage if we attempt to add the new payload
// usedCapacity, int newPayload, int currentCapacity
- int totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed,
+ long totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed,
newBitsMemory,
vectorMemoryUsage.bitsBytesCapacity);
- int totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed,
+ long totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed,
newOffsetsMemory,
vectorMemoryUsage.offsetsByteCapacity);
- int totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed,
+ long totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed,
newDataMemory,
vectorMemoryUsage.dataByteCapacity);
// Alright now we can figure out whether the new payload will take us over the maximum memory threshold
- int totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory;
+ long totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory;
assert totalMemory >= 0;
return totalMemory <= columnMemoryUsage.memoryQuota.getMaxMemoryUsage();
@@ -227,16 +227,16 @@ public final class BatchSizingMemoryUtil {
* @param valueCount number of column values
* @return memory size required to store "valueCount" within a value vector
*/
- public static int computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) {
+ public static long computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) {
assert column.isFixedLength();
// Formula: memory-usage = next-power-of-two(byte-size * valueCount) // nullable storage (if any)
// + next-power-of-two(DT_LEN * valueCount) // data storage
- int memoryUsage = BaseAllocator.nextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount);
+ long memoryUsage = BaseAllocator.longNextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount);
if (column.getField().isNullable()) {
- memoryUsage += BaseAllocator.nextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount);
+ memoryUsage += BaseAllocator.longNextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount);
}
return memoryUsage;
@@ -248,19 +248,19 @@ public final class BatchSizingMemoryUtil {
* @param valueCount number of column values
* @return memory size required to store "valueCount" within a value vector
*/
- public static int computeVariableLengthVectorMemory(ParquetColumnMetadata column,
- int averagePrecision, int valueCount) {
+ public static long computeVariableLengthVectorMemory(ParquetColumnMetadata column,
+ long averagePrecision, int valueCount) {
assert !column.isFixedLength();
// Formula: memory-usage = next-power-of-two(byte-size * valueCount) // nullable storage (if any)
// + next-power-of-two(int-size * valueCount) // offsets storage
// + next-power-of-two(DT_LEN * valueCount) // data storage
- int memoryUsage = BaseAllocator.nextPowerOfTwo(averagePrecision * valueCount);
- memoryUsage += BaseAllocator.nextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1));
+ long memoryUsage = BaseAllocator.longNextPowerOfTwo(averagePrecision * valueCount);
+ memoryUsage += BaseAllocator.longNextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1));
if (column.getField().isNullable()) {
- memoryUsage += BaseAllocator.nextPowerOfTwo(valueCount);
+ memoryUsage += BaseAllocator.longNextPowerOfTwo(valueCount);
}
return memoryUsage;
}
@@ -269,8 +269,8 @@ public final class BatchSizingMemoryUtil {
// Internal implementation
// ----------------------------------------------------------------------------
- private static int computeNewVectorCapacity(int usedCapacity, int newPayload, int currentCapacity) {
- int newUsedCapacity = BaseAllocator.nextPowerOfTwo(usedCapacity + newPayload);
+ private static long computeNewVectorCapacity(long usedCapacity, long newPayload, long currentCapacity) {
+ long newUsedCapacity = BaseAllocator.longNextPowerOfTwo(usedCapacity + newPayload);
assert newUsedCapacity >= 0;
return Math.max(currentCapacity, newUsedCapacity);
@@ -299,17 +299,17 @@ public final class BatchSizingMemoryUtil {
*/
public static final class VectorMemoryUsageInfo {
/** Bits vector capacity */
- public int bitsBytesCapacity;
+ public long bitsBytesCapacity;
/** Offsets vector capacity */
- public int offsetsByteCapacity;
+ public long offsetsByteCapacity;
/** Data vector capacity */
- public int dataByteCapacity;
+ public long dataByteCapacity;
/** Bits vector used up capacity */
- public int bitsBytesUsed;
+ public long bitsBytesUsed;
/** Offsets vector used up capacity */
- public int offsetsBytesUsed;
+ public long offsetsBytesUsed;
/** Data vector used up capacity */
- public int dataBytesUsed;
+ public long dataBytesUsed;
public void reset() {
bitsBytesCapacity = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
index 5ddcf7e92..2e22ce339 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.columnreaders.batchsizing;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.ExecConstants;
@@ -34,6 +35,7 @@ import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
* This class is tasked with managing all aspects of flat Parquet reader record batch sizing logic.
@@ -56,11 +58,11 @@ public final class RecordBatchSizerManager {
/** Configured Parquet records per batch */
private final int configRecordsPerBatch;
/** Configured Parquet memory size per batch */
- private final int configMemorySizePerBatch;
+ private final long configMemorySizePerBatch;
/** An upper bound on the Parquet records per batch based on the configured value and schema */
private int maxRecordsPerBatch;
/** An upper bound on the Parquet memory size per batch based on the configured value and schema */
- private int maxMemorySizePerBatch;
+ private long maxMemorySizePerBatch;
/** The current number of records per batch as it can be dynamically optimized */
private int recordsPerBatch;
@@ -162,7 +164,8 @@ public final class RecordBatchSizerManager {
ColumnMemoryInfo columnMemoryInfo = columnMemoryInfoMap.get(v.getField().getName());
if (columnMemoryInfo != null) {
- AllocationHelper.allocate(v, recordsPerBatch, columnMemoryInfo.columnPrecision, 0);
+ Preconditions.checkState(columnMemoryInfo.columnPrecision <= Integer.MAX_VALUE, "Column precision cannot exceed 2GB");
+ AllocationHelper.allocate(v, recordsPerBatch, (int) columnMemoryInfo.columnPrecision, 0);
} else {
// This column was found in another Parquet file but not the current one; so we inject
// a null value. At this time, we do not account for such columns. Why? the right design is
@@ -219,7 +222,7 @@ public final class RecordBatchSizerManager {
/**
* @return current total memory per batch (may change across batches)
*/
- public int getCurrentMemorySizePerBatch() {
+ public long getCurrentMemorySizePerBatch() {
return maxMemorySizePerBatch; // Current logic doesn't mutate the max-memory after it has been set
}
@@ -233,7 +236,7 @@ public final class RecordBatchSizerManager {
/**
* @return configured memory size per batch (may be different from the enforced one)
*/
- public int getConfigMemorySizePerBatch() {
+ public long getConfigMemorySizePerBatch() {
return configMemorySizePerBatch;
}
@@ -265,13 +268,13 @@ public final class RecordBatchSizerManager {
// Internal implementation logic
// ----------------------------------------------------------------------------
- private int getConfiguredMaxBatchMemory(OptionManager options) {
+ private long getConfiguredMaxBatchMemory(OptionManager options) {
// Use the parquet specific configuration if set
- int maxMemory = (int) options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE);
+ long maxMemory = options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE);
// Otherwise, use the common property
if (maxMemory <= 0) {
- maxMemory = (int) options.getLong(ExecConstants.OUTPUT_BATCH_SIZE);
+ maxMemory = options.getLong(ExecConstants.OUTPUT_BATCH_SIZE);
}
return maxMemory;
}
@@ -304,8 +307,8 @@ public final class RecordBatchSizerManager {
return normalizedNumRecords;
}
- private int normalizeMemorySizePerBatch() {
- int normalizedMemorySize = configMemorySizePerBatch;
+ private long normalizeMemorySizePerBatch() {
+ long normalizedMemorySize = configMemorySizePerBatch;
if (normalizedMemorySize <= 0) {
final String message = String.format("Invalid Parquet memory per batch [%d] byte(s)",
@@ -321,10 +324,10 @@ public final class RecordBatchSizerManager {
return normalizedMemorySize; // NOOP
}
- final int memorySizePerColumn = normalizedMemorySize / numColumns;
+ final long memorySizePerColumn = normalizedMemorySize / numColumns;
if (memorySizePerColumn < MIN_COLUMN_MEMORY_SZ) {
- final int prevValue = normalizedMemorySize;
+ final long prevValue = normalizedMemorySize;
normalizedMemorySize = MIN_COLUMN_MEMORY_SZ * numColumns;
final String message = String.format("The Parquet memory per batch [%d] byte(s) is too low for this query ; using [%d] bytes",
@@ -444,9 +447,9 @@ public final class RecordBatchSizerManager {
return; // we're done
}
- final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
- final int extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded;
- final int perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns;
+ final long totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
+ final long extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded;
+ final long perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns;
if (perColumnExtraSpace == 0) {
return;
@@ -481,7 +484,7 @@ public final class RecordBatchSizerManager {
return remove;
}
- private int computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) {
+ private long computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) {
if (columnInfo.columnMeta.isFixedLength()) {
return BatchSizingMemoryUtil.computeFixedLengthVectorMemory(columnInfo.columnMeta, numValues);
}
@@ -506,7 +509,7 @@ public final class RecordBatchSizerManager {
requiredMemory.variableLenRequiredMemory += columnInfo.columnMemoryQuota.maxMemoryUsage;
}
- final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
+ final long totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
assert totalMemoryNeeded > 0;
double neededMemoryRatio = ((double) maxMemorySizePerBatch) / totalMemoryNeeded;
@@ -612,7 +615,7 @@ public final class RecordBatchSizerManager {
/** Field memory quota */
public static final class ColumnMemoryQuota {
/** Maximum cumulative memory that could be used */
- private int maxMemoryUsage;
+ private long maxMemoryUsage;
/** Maximum number of values that could be inserted */
private int maxNumValues;
@@ -622,14 +625,14 @@ public final class RecordBatchSizerManager {
/**
* @param maxMemoryUsage maximum cumulative memory that could be used
*/
- public ColumnMemoryQuota(int maxMemoryUsage) {
+ public ColumnMemoryQuota(long maxMemoryUsage) {
this.maxMemoryUsage = maxMemoryUsage;
}
/**
* @return the maxMemoryUsage
*/
- public int getMaxMemoryUsage() {
+ public long getMaxMemoryUsage() {
return maxMemoryUsage;
}
@@ -651,7 +654,7 @@ public final class RecordBatchSizerManager {
/** Column metadata */
ParquetColumnMetadata columnMeta;
/** Column value precision (maximum length for VL columns) */
- int columnPrecision;
+ long columnPrecision;
/** Column current memory quota within a batch */
final ColumnMemoryQuota columnMemoryQuota = new ColumnMemoryQuota();
}
@@ -659,9 +662,9 @@ public final class RecordBatchSizerManager {
/** Memory requirements container */
static final class MemoryRequirementContainer {
/** Memory needed for the fixed length columns given a specific record size */
- private int fixedLenRequiredMemory;
+ private long fixedLenRequiredMemory;
/** Memory needed for the fixed length columns given a specific record size */
- private int variableLenRequiredMemory;
+ private long variableLenRequiredMemory;
private void reset() {
this.fixedLenRequiredMemory = 0;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
index 896675a3c..4cf150372 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java
@@ -93,9 +93,9 @@ public class TestBatchSizingMemoryUtil extends PhysicalOpUnitTestBase {
for (int columnIdx = 0; columnIdx < 3; columnIdx++) {
final ColumnMemoryUsageInfo columnInfo = columnMemoryInfo[columnIdx];
- final int remainingBitsCapacity = getRemainingBitsCapacity(columnInfo);
- final int remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo);
- final int remainingDataCapacity = getRemainingDataCapacity(columnInfo);
+ final long remainingBitsCapacity = getRemainingBitsCapacity(columnInfo);
+ final long remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo);
+ final long remainingDataCapacity = getRemainingDataCapacity(columnInfo);
// Test current VV is within quota (since we are not adding new entries)
Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, 0, 0));
@@ -152,15 +152,15 @@ public class TestBatchSizingMemoryUtil extends PhysicalOpUnitTestBase {
return result;
}
- private static int getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) {
+ private static long getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) {
return columnInfo.vectorMemoryUsage.bitsBytesCapacity - columnInfo.vectorMemoryUsage.bitsBytesUsed;
}
- private static int getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) {
+ private static long getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) {
return columnInfo.vectorMemoryUsage.offsetsByteCapacity - columnInfo.vectorMemoryUsage.offsetsBytesUsed;
}
- private static int getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) {
+ private static long getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) {
return columnInfo.vectorMemoryUsage.dataByteCapacity - columnInfo.vectorMemoryUsage.dataBytesUsed;
}
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index ab3ff1e5d..408b865ec 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -577,6 +577,22 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
}
/**
+ * Rounds up the provided value to the nearest power of two.
+ *
+ * @param val
+ * An integer long value.
+ * @return The closest power of two of that value.
+ */
+ public static long longNextPowerOfTwo(long val) {
+ long highestBit = Long.highestOneBit(val);
+ if (highestBit == val) {
+ return val;
+ } else {
+ return highestBit << 1;
+ }
+ }
+
+ /**
* Verifies the accounting state of the allocator. Only works for DEBUG.
*
* @throws IllegalStateException