diff options
author | Salim Achouche <sachouche2@gmail.com> | 2019-03-12 18:06:43 -0700 |
---|---|---|
committer | Timothy Farkas <timothytiborfarkas@gmail.com> | 2019-03-13 12:17:01 -0700 |
commit | b20a2e6b5ee82814011a031d8a8282b0fec3ffe1 (patch) | |
tree | 46c4565626f25ffb2b14dd8279e3d183810e8379 | |
parent | e5e8419ab6fc1761cc7a6055b02f4300525e936e (diff) |
DRILL-7100: Fixed IllegalArgumentException when reading Parquet data
5 files changed, 73 insertions, 54 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java index ca8fc055f..3a7177f5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchOverflowOptimizer.java @@ -91,7 +91,7 @@ final class BatchOverflowOptimizer { // do not account for null values as we are interested in the // actual data that is being stored within a batch. BatchSizingMemoryUtil.getMemoryUsage(stat.vector, stat.numValuesRead, vectorMemoryUsage); - final int batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead); + final long batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead); double currAvgPrecision = columnPrecisionStats.avgPrecision; double newAvgPrecision = ((numBatches - 1) * currAvgPrecision + batchColumnPrecision) / numBatches; @@ -138,7 +138,7 @@ final class BatchOverflowOptimizer { /** Materialized field */ private final MaterializedField field; /** Average column precision */ - private int avgPrecision; + private long avgPrecision; private ColumnPrecisionStats(MaterializedField field) { this.field = field; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java index 302d0eb5e..878256815 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/BatchSizingMemoryUtil.java @@ -59,9 +59,9 @@ public final class BatchSizingMemoryUtil { * limit; false otherwise */ public static boolean canAddNewData(ColumnMemoryUsageInfo columnMemoryUsage, - int newBitsMemory, - int newOffsetsMemory, - int newDataMemory) { + long newBitsMemory, + long newOffsetsMemory, + long newDataMemory) { // First we need to update the vector memory usage final VectorMemoryUsageInfo vectorMemoryUsage = columnMemoryUsage.vectorMemoryUsage; @@ -69,20 +69,20 @@ public final class BatchSizingMemoryUtil { // We need to compute the new ValueVector memory usage if we attempt to add the new payload // usedCapacity, int newPayload, int currentCapacity - int totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed, + long totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed, newBitsMemory, vectorMemoryUsage.bitsBytesCapacity); - int totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed, + long totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed, newOffsetsMemory, vectorMemoryUsage.offsetsByteCapacity); - int totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed, + long totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed, newDataMemory, vectorMemoryUsage.dataByteCapacity); // Alright now we can figure out whether the new payload will take us over the maximum memory threshold - int totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory; + long totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory; assert totalMemory >= 0; return totalMemory <= columnMemoryUsage.memoryQuota.getMaxMemoryUsage(); @@ -227,16 +227,16 @@ public final class BatchSizingMemoryUtil { * @param valueCount number of column values * @return memory size required to store "valueCount" within a value vector */ - public static int computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) { + public static long computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) { assert column.isFixedLength(); // Formula: memory-usage = next-power-of-two(byte-size * valueCount) // nullable storage (if any) // + next-power-of-two(DT_LEN * valueCount) // data storage - int memoryUsage = BaseAllocator.nextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount); + long memoryUsage = BaseAllocator.longNextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount); if (column.getField().isNullable()) { - memoryUsage += BaseAllocator.nextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount); + memoryUsage += BaseAllocator.longNextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount); } return memoryUsage; @@ -248,19 +248,19 @@ public final class BatchSizingMemoryUtil { * @param valueCount number of column values * @return memory size required to store "valueCount" within a value vector */ - public static int computeVariableLengthVectorMemory(ParquetColumnMetadata column, - int averagePrecision, int valueCount) { + public static long computeVariableLengthVectorMemory(ParquetColumnMetadata column, + long averagePrecision, int valueCount) { assert !column.isFixedLength(); // Formula: memory-usage = next-power-of-two(byte-size * valueCount) // nullable storage (if any) // + next-power-of-two(int-size * valueCount) // offsets storage // + next-power-of-two(DT_LEN * valueCount) // data storage - int memoryUsage = BaseAllocator.nextPowerOfTwo(averagePrecision * valueCount); - memoryUsage += BaseAllocator.nextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1)); + long memoryUsage = BaseAllocator.longNextPowerOfTwo(averagePrecision * valueCount); + memoryUsage += BaseAllocator.longNextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1)); if (column.getField().isNullable()) { - memoryUsage += BaseAllocator.nextPowerOfTwo(valueCount); + memoryUsage += BaseAllocator.longNextPowerOfTwo(valueCount); } return memoryUsage; } @@ -269,8 +269,8 @@ public final class BatchSizingMemoryUtil { // Internal implementation // ---------------------------------------------------------------------------- - private static int computeNewVectorCapacity(int usedCapacity, int newPayload, int currentCapacity) { - int newUsedCapacity = BaseAllocator.nextPowerOfTwo(usedCapacity + newPayload); + private static long computeNewVectorCapacity(long usedCapacity, long newPayload, long currentCapacity) { + long newUsedCapacity = BaseAllocator.longNextPowerOfTwo(usedCapacity + newPayload); assert newUsedCapacity >= 0; return Math.max(currentCapacity, newUsedCapacity); @@ -299,17 +299,17 @@ public final class BatchSizingMemoryUtil { */ public static final class VectorMemoryUsageInfo { /** Bits vector capacity */ - public int bitsBytesCapacity; + public long bitsBytesCapacity; /** Offsets vector capacity */ - public int offsetsByteCapacity; + public long offsetsByteCapacity; /** Data vector capacity */ - public int dataByteCapacity; + public long dataByteCapacity; /** Bits vector used up capacity */ - public int bitsBytesUsed; + public long bitsBytesUsed; /** Offsets vector used up capacity */ - public int offsetsBytesUsed; + public long offsetsBytesUsed; /** Data vector used up capacity */ - public int dataBytesUsed; + public long dataBytesUsed; public void reset() { bitsBytesCapacity = 0; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java index 5ddcf7e92..2e22ce339 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/RecordBatchSizerManager.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.columnreaders.batchsizing; import java.util.ArrayList; import java.util.List; import java.util.Map; + import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.ExecConstants; @@ -34,6 +35,7 @@ import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; /** * This class is tasked with managing all aspects of flat Parquet reader record batch sizing logic. @@ -56,11 +58,11 @@ public final class RecordBatchSizerManager { /** Configured Parquet records per batch */ private final int configRecordsPerBatch; /** Configured Parquet memory size per batch */ - private final int configMemorySizePerBatch; + private final long configMemorySizePerBatch; /** An upper bound on the Parquet records per batch based on the configured value and schema */ private int maxRecordsPerBatch; /** An upper bound on the Parquet memory size per batch based on the configured value and schema */ - private int maxMemorySizePerBatch; + private long maxMemorySizePerBatch; /** The current number of records per batch as it can be dynamically optimized */ private int recordsPerBatch; @@ -162,7 +164,8 @@ public final class RecordBatchSizerManager { ColumnMemoryInfo columnMemoryInfo = columnMemoryInfoMap.get(v.getField().getName()); if (columnMemoryInfo != null) { - AllocationHelper.allocate(v, recordsPerBatch, columnMemoryInfo.columnPrecision, 0); + Preconditions.checkState(columnMemoryInfo.columnPrecision <= Integer.MAX_VALUE, "Column precision cannot exceed 2GB"); + AllocationHelper.allocate(v, recordsPerBatch, (int) columnMemoryInfo.columnPrecision, 0); } else { // This column was found in another Parquet file but not the current one; so we inject // a null value. At this time, we do not account for such columns. Why? the right design is @@ -219,7 +222,7 @@ public final class RecordBatchSizerManager { /** * @return current total memory per batch (may change across batches) */ - public int getCurrentMemorySizePerBatch() { + public long getCurrentMemorySizePerBatch() { return maxMemorySizePerBatch; // Current logic doesn't mutate the max-memory after it has been set } @@ -233,7 +236,7 @@ public final class RecordBatchSizerManager { /** * @return configured memory size per batch (may be different from the enforced one) */ - public int getConfigMemorySizePerBatch() { + public long getConfigMemorySizePerBatch() { return configMemorySizePerBatch; } @@ -265,13 +268,13 @@ public final class RecordBatchSizerManager { // Internal implementation logic // ---------------------------------------------------------------------------- - private int getConfiguredMaxBatchMemory(OptionManager options) { + private long getConfiguredMaxBatchMemory(OptionManager options) { // Use the parquet specific configuration if set - int maxMemory = (int) options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE); + long maxMemory = options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE); // Otherwise, use the common property if (maxMemory <= 0) { - maxMemory = (int) options.getLong(ExecConstants.OUTPUT_BATCH_SIZE); + maxMemory = options.getLong(ExecConstants.OUTPUT_BATCH_SIZE); } return maxMemory; } @@ -304,8 +307,8 @@ public final class RecordBatchSizerManager { return normalizedNumRecords; } - private int normalizeMemorySizePerBatch() { - int normalizedMemorySize = configMemorySizePerBatch; + private long normalizeMemorySizePerBatch() { + long normalizedMemorySize = configMemorySizePerBatch; if (normalizedMemorySize <= 0) { final String message = String.format("Invalid Parquet memory per batch [%d] byte(s)", @@ -321,10 +324,10 @@ public final class RecordBatchSizerManager { return normalizedMemorySize; // NOOP } - final int memorySizePerColumn = normalizedMemorySize / numColumns; + final long memorySizePerColumn = normalizedMemorySize / numColumns; if (memorySizePerColumn < MIN_COLUMN_MEMORY_SZ) { - final int prevValue = normalizedMemorySize; + final long prevValue = normalizedMemorySize; normalizedMemorySize = MIN_COLUMN_MEMORY_SZ * numColumns; final String message = String.format("The Parquet memory per batch [%d] byte(s) is too low for this query ; using [%d] bytes", @@ -444,9 +447,9 @@ public final class RecordBatchSizerManager { return; // we're done } - final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory; - final int extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded; - final int perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns; + final long totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory; + final long extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded; + final long perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns; if (perColumnExtraSpace == 0) { return; @@ -481,7 +484,7 @@ public final class RecordBatchSizerManager { return remove; } - private int computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) { + private long computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) { if (columnInfo.columnMeta.isFixedLength()) { return BatchSizingMemoryUtil.computeFixedLengthVectorMemory(columnInfo.columnMeta, numValues); } @@ -506,7 +509,7 @@ public final class RecordBatchSizerManager { requiredMemory.variableLenRequiredMemory += columnInfo.columnMemoryQuota.maxMemoryUsage; } - final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory; + final long totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory; assert totalMemoryNeeded > 0; double neededMemoryRatio = ((double) maxMemorySizePerBatch) / totalMemoryNeeded; @@ -612,7 +615,7 @@ public final class RecordBatchSizerManager { /** Field memory quota */ public static final class ColumnMemoryQuota { /** Maximum cumulative memory that could be used */ - private int maxMemoryUsage; + private long maxMemoryUsage; /** Maximum number of values that could be inserted */ private int maxNumValues; @@ -622,14 +625,14 @@ public final class RecordBatchSizerManager { /** * @param maxMemoryUsage maximum cumulative memory that could be used */ - public ColumnMemoryQuota(int maxMemoryUsage) { + public ColumnMemoryQuota(long maxMemoryUsage) { this.maxMemoryUsage = maxMemoryUsage; } /** * @return the maxMemoryUsage */ - public int getMaxMemoryUsage() { + public long getMaxMemoryUsage() { return maxMemoryUsage; } @@ -651,7 +654,7 @@ public final class RecordBatchSizerManager { /** Column metadata */ ParquetColumnMetadata columnMeta; /** Column value precision (maximum length for VL columns) */ - int columnPrecision; + long columnPrecision; /** Column current memory quota within a batch */ final ColumnMemoryQuota columnMemoryQuota = new ColumnMemoryQuota(); } @@ -659,9 +662,9 @@ public final class RecordBatchSizerManager { /** Memory requirements container */ static final class MemoryRequirementContainer { /** Memory needed for the fixed length columns given a specific record size */ - private int fixedLenRequiredMemory; + private long fixedLenRequiredMemory; /** Memory needed for the fixed length columns given a specific record size */ - private int variableLenRequiredMemory; + private long variableLenRequiredMemory; private void reset() { this.fixedLenRequiredMemory = 0; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java index 896675a3c..4cf150372 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/columnreaders/TestBatchSizingMemoryUtil.java @@ -93,9 +93,9 @@ public class TestBatchSizingMemoryUtil extends PhysicalOpUnitTestBase { for (int columnIdx = 0; columnIdx < 3; columnIdx++) { final ColumnMemoryUsageInfo columnInfo = columnMemoryInfo[columnIdx]; - final int remainingBitsCapacity = getRemainingBitsCapacity(columnInfo); - final int remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo); - final int remainingDataCapacity = getRemainingDataCapacity(columnInfo); + final long remainingBitsCapacity = getRemainingBitsCapacity(columnInfo); + final long remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo); + final long remainingDataCapacity = getRemainingDataCapacity(columnInfo); // Test current VV is within quota (since we are not adding new entries) Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, 0, 0)); @@ -152,15 +152,15 @@ public class TestBatchSizingMemoryUtil extends PhysicalOpUnitTestBase { return result; } - private static int getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) { + private static long getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) { return columnInfo.vectorMemoryUsage.bitsBytesCapacity - columnInfo.vectorMemoryUsage.bitsBytesUsed; } - private static int getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) { + private static long getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) { return columnInfo.vectorMemoryUsage.offsetsByteCapacity - columnInfo.vectorMemoryUsage.offsetsBytesUsed; } - private static int getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) { + private static long getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) { return columnInfo.vectorMemoryUsage.dataByteCapacity - columnInfo.vectorMemoryUsage.dataBytesUsed; } diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java index ab3ff1e5d..408b865ec 100644 --- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java +++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java @@ -577,6 +577,22 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato } /** + * Rounds up the provided value to the nearest power of two. + * + * @param val + * An integer long value. + * @return The closest power of two of that value. + */ + public static long longNextPowerOfTwo(long val) { + long highestBit = Long.highestOneBit(val); + if (highestBit == val) { + return val; + } else { + return highestBit << 1; + } + } + + /** * Verifies the accounting state of the allocator. Only works for DEBUG. * * @throws IllegalStateException |