aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen-Zvi <bben-zvi@mapr.com>2019-02-28 18:06:02 -0800
committerkarthik <kmanivannan@maprtech.com>2019-03-08 12:21:44 -0800
commit5abcd88642e224beb8252185f938a5e42387b18e (patch)
treeb29efa4041d10725e57e1baa91dd78a0ee172cd8
parent6bd31f33c137e56912bcea04c5b993eafc64a20a (diff)
DRILL-7069: Moved version checks outside loops in transformBinaryInMetadataCache
closes #1667
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java72
1 files changed, 50 insertions, 22 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 14a222c37..ad2ecec0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.ExecErrorConstants;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.hadoop.util.VersionUtil;
import org.apache.parquet.SemanticVersion;
import org.apache.parquet.VersionParser;
@@ -57,6 +58,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V2.ColumnTypeMetadata_v2;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V2.ParquetTableMetadata_v2;
@@ -239,9 +241,11 @@ public class ParquetReaderUtility {
new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0 ?
DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
if (cacheFileCanContainsCorruptDates == DateCorruptionStatus.META_UNCLEAR_TEST_VALUES) {
+ boolean mdVersion_1_0 = new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()));
+ boolean mdVersion_2_0 = new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()));
// Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
String[] names = new String[0];
- if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
+ if (mdVersion_2_0) {
for (ColumnTypeMetadata_v2 columnTypeMetadata :
((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) {
@@ -256,7 +260,7 @@ public class ParquetReaderUtility {
Long rowCount = rowGroupMetadata.getRowCount();
for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
// Setting Min/Max values for ParquetTableMetadata_v1
- if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
+ if (mdVersion_1_0) {
OriginalType originalType = columnMetadata.getOriginalType();
if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue(rowCount) &&
(Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
@@ -266,10 +270,11 @@ public class ParquetReaderUtility {
}
}
// Setting Max values for ParquetTableMetadata_v2
- else if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion())) &&
- columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) &&
- columnMetadata.hasSingleValue(rowCount) && (Integer) columnMetadata.getMaxValue() >
- ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+ else if (mdVersion_2_0 &&
+ columnMetadata.getName() != null &&
+ Arrays.equals(columnMetadata.getName(), names) &&
+ columnMetadata.hasSingleValue(rowCount) &&
+ (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
int newMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
columnMetadata.setMax(newMax);
}
@@ -292,30 +297,53 @@ public class ParquetReaderUtility {
Set<List<String>> columnsNames = getBinaryColumnsNames(parquetTableMetadata);
boolean allowBinaryMetadata = allowBinaryMetadata(parquetTableMetadata.getDrillVersion(), readerConfig);
+ // Setting Min / Max values for ParquetTableMetadata_v1
+ if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
+ for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+ for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
+ Long rowCount = rowGroupMetadata.getRowCount();
+ for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
+ if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY || columnMetadata.getPrimitiveType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+ setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
+ }
+ }
+ }
+ }
+ return;
+ }
+
+ // Variables needed for debugging only
+ Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
+ int maxRowGroups = 0;
+ int minRowGroups = Integer.MAX_VALUE;
+ int maxNumColumns = 0;
+
+ // Setting Min / Max values for V2 and V3 versions; for versions V3_3 and above need to do decoding
+ boolean needDecoding = new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0;
for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
+ if ( timer != null ) { // for debugging only
+ maxRowGroups = Math.max(maxRowGroups, file.getRowGroups().size());
+ minRowGroups = Math.min(minRowGroups, file.getRowGroups().size());
+ }
for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
Long rowCount = rowGroupMetadata.getRowCount();
+ if ( timer != null ) { // for debugging only
+ maxNumColumns = Math.max(maxNumColumns, rowGroupMetadata.getColumns().size());
+ }
for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
- // Setting Min / Max values for ParquetTableMetadata_v1
- if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
- if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY
- || columnMetadata.getPrimitiveType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
- setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
- }
- }
- // Setting Min / Max values for V2 and all V3 versions prior to V3_3
- else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) < 0
- && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
- setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
- }
- // Setting Min / Max values for V3_3 and all next versions
- else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0
- && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
- setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, true);
+ if (columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
+ setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, needDecoding);
}
}
}
}
+
+ if (timer != null) { // log a debug message and stop the timer
+ String reportRG = 1 == maxRowGroups ? "1 rowgroup" : "between " + minRowGroups + "-" + maxRowGroups + "rowgroups";
+ logger.debug("Transforming binary in metadata cache took {} ms ({} files, {} per file, max {} columns)", timer.elapsed(TimeUnit.MILLISECONDS),
+ parquetTableMetadata.getFiles().size(), reportRG, maxNumColumns);
+ timer.stop();
+ }
}
/**