aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
blob: 733915b6124310f6283b5a87822448d52ec256d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.parquet;

import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.parquet.metadata.MetadataVersion;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.ExecErrorConstants;
import org.apache.hadoop.util.VersionUtil;
import org.apache.parquet.SemanticVersion;
import org.apache.parquet.VersionParser;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.format.ConvertedType;
import org.apache.parquet.format.FileMetaData;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.joda.time.Chronology;
import org.joda.time.DateTimeConstants;
import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.io.api.Binary;
import org.joda.time.DateTimeZone;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.drill.exec.store.parquet.metadata.Metadata_V2.ColumnTypeMetadata_v2;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V2.ParquetTableMetadata_v2;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata;
import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase;
import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata;
import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata;

/**
 * Utility class where we can capture common logic between the two parquet readers
 */
public class ParquetReaderUtility {
  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetReaderUtility.class);

  /**
   * Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
   * The value of this constant is {@value}.
   */
  public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
  /**
   * All old parquet files (which haven't "is.date.correct=true" or "parquet-writer.version" properties
   * in metadata) have a corrupt date shift: {@value} days or 2 * {@value #JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH}
   */
  public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH;
  private static final Chronology UTC = org.joda.time.chrono.ISOChronology.getInstanceUTC();
  /**
   * The year 5000 (or 1106685 day from Unix epoch) is chosen as the threshold for auto-detecting date corruption.
   * This balances two possible cases of bad auto-correction. External tools writing dates in the future will not
   * be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata).
   * On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were
   * something like 10,000 years in the past.
   */
  public static final int DATE_CORRUPTION_THRESHOLD =
      (int) (UTC.getDateTimeMillis(5000, 1, 1, 0) / DateTimeConstants.MILLIS_PER_DAY);
  /**
   * Version 2 (and later) of the Drill Parquet writer uses the date format described in the
   * <a href="https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md#date">Parquet spec</a>.
   * Prior versions had dates formatted with {@link org.apache.drill.exec.store.parquet.ParquetReaderUtility#CORRECT_CORRUPT_DATE_SHIFT}
   */
  public static final int DRILL_WRITER_VERSION_STD_DATE_FORMAT = 2;

  public static final String ALLOWED_DRILL_VERSION_FOR_BINARY = "1.15.0";

  /**
   * For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203)
   * based on the file metadata. For older files that lack statistics we must actually test the values
   * in the data pages themselves to see if they are likely corrupt.
   */
  public enum DateCorruptionStatus {
    META_SHOWS_CORRUPTION {
      @Override
      public String toString() {
        return "It is determined from metadata that the date values are definitely CORRUPT";
      }
    },
    META_SHOWS_NO_CORRUPTION {
      @Override
      public String toString() {
        return "It is determined from metadata that the date values are definitely CORRECT";
      }
    },
    META_UNCLEAR_TEST_VALUES {
      @Override
      public String toString() {
        return "Not enough info in metadata, parquet reader will test individual date values";
      }
    }
  }

  public static void checkDecimalTypeEnabled(OptionManager options) {
    if (! options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE)) {
      throw UserException.unsupportedError()
        .message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG)
        .build(logger);
    }
  }

  public static int getIntFromLEBytes(byte[] input, int start) {
    int out = 0;
    int shiftOrder = 0;
    for (int i = start; i < start + 4; i++) {
      out |= (((input[i]) & 0xFF) << shiftOrder);
      shiftOrder += 8;
    }
    return out;
  }

  /**
   * Map full schema paths in format `a`.`b`.`c` to respective SchemaElement objects.
   *
   * @param footer Parquet file metadata
   * @return       schema full path to SchemaElement map
   */
  public static Map<String, SchemaElement> getColNameToSchemaElementMapping(ParquetMetadata footer) {
    Map<String, SchemaElement> schemaElements = new HashMap<>();
    FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);

    Iterator<SchemaElement> iter = fileMetaData.getSchema().iterator();

    // First element in collection is default `root` element. We skip it to maintain key in `a` format instead of `root`.`a`,
    // and thus to avoid the need to cut it out again when comparing with SchemaPath string representation
    if (iter.hasNext()) {
      iter.next();
    }
    while (iter.hasNext()) {
      addSchemaElementMapping(iter, new StringBuilder(), schemaElements);
    }
    return schemaElements;
  }

  /**
   * Populate full path to SchemaElement map by recursively traversing schema elements referenced by the given iterator
   *
   * @param iter file schema values iterator
   * @param path parent schema element path
   * @param schemaElements schema elements map to insert next iterator element into
   */
  private static void addSchemaElementMapping(Iterator<SchemaElement> iter, StringBuilder path,
      Map<String, SchemaElement> schemaElements) {
    SchemaElement schemaElement = iter.next();
    path.append('`').append(schemaElement.getName().toLowerCase()).append('`');
    schemaElements.put(path.toString(), schemaElement);

    // for each element that has children we need to maintain remaining children count
    // to exit current recursion level when no more children is left
    int remainingChildren = schemaElement.getNum_children();

    while (remainingChildren > 0 && iter.hasNext()) {
      addSchemaElementMapping(iter, new StringBuilder(path).append('.'), schemaElements);
      remainingChildren--;
    }
    return;
  }

  /**
   * generate full path of the column in format `a`.`b`.`c`
   *
   * @param column ColumnDescriptor object
   * @return       full path in format `a`.`b`.`c`
   */
  public static String getFullColumnPath(ColumnDescriptor column) {
    StringBuilder sb = new StringBuilder();
    String[] path = column.getPath();
    for (int i = 0; i < path.length; i++) {
      sb.append("`").append(path[i].toLowerCase()).append("`").append(".");
    }

    // remove trailing dot
    if (sb.length() > 0) {
      sb.deleteCharAt(sb.length() - 1);
    }

    return sb.toString();
  }

  /**
   * Map full column paths to all ColumnDescriptors in file schema
   *
   * @param footer Parquet file metadata
   * @return       column full path to ColumnDescriptor object map
   */
  public static Map<String, ColumnDescriptor> getColNameToColumnDescriptorMapping(ParquetMetadata footer) {
    Map<String, ColumnDescriptor> colDescMap = new HashMap<>();
    List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();

    for (ColumnDescriptor column : columns) {
      colDescMap.put(getFullColumnPath(column), column);
    }
    return colDescMap;
  }

  public static int autoCorrectCorruptedDate(int corruptedDate) {
    return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT);
  }

  public static void correctDatesInMetadataCache(ParquetTableMetadataBase parquetTableMetadata) {
    DateCorruptionStatus cacheFileCanContainsCorruptDates =
        new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0 ?
        DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
    if (cacheFileCanContainsCorruptDates == DateCorruptionStatus.META_UNCLEAR_TEST_VALUES) {
      // Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
      String[] names = new String[0];
      if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
        for (ColumnTypeMetadata_v2 columnTypeMetadata :
            ((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
          if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) {
            names = columnTypeMetadata.name;
          }
        }
      }
      for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
        // Drill has only ever written a single row group per file, only need to correct the statistics
        // on the first row group
        RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0);
        Long rowCount = rowGroupMetadata.getRowCount();
        for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
          // Setting Min/Max values for ParquetTableMetadata_v1
          if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
            OriginalType originalType = columnMetadata.getOriginalType();
            if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue(rowCount) &&
                (Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
              int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
              columnMetadata.setMax(newMinMax);
              columnMetadata.setMin(newMinMax);
            }
          }
          // Setting Max values for ParquetTableMetadata_v2
          else if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion())) &&
              columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) &&
              columnMetadata.hasSingleValue(rowCount) && (Integer) columnMetadata.getMaxValue() >
              ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
            int newMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
            columnMetadata.setMax(newMax);
          }
        }
      }
    }
  }

  /**
   *
   * Transforms values for min / max binary statistics to byte array.
   * Transformation logic depends on metadata file version.
   *
   * @param parquetTableMetadata table metadata that should be corrected
   * @param readerConfig parquet reader config
   */
  public static void transformBinaryInMetadataCache(ParquetTableMetadataBase parquetTableMetadata, ParquetReaderConfig readerConfig) {
    // Looking for the names of the columns with BINARY data type
    // in the metadata cache file for V2 and all v3 versions
    Set<List<String>> columnsNames = getBinaryColumnsNames(parquetTableMetadata);
    boolean allowBinaryMetadata = allowBinaryMetadata(parquetTableMetadata.getDrillVersion(), readerConfig);

    for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
      for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) {
        Long rowCount = rowGroupMetadata.getRowCount();
        for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
          // Setting Min / Max values for ParquetTableMetadata_v1
          if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) {
            if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY
                || columnMetadata.getPrimitiveType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
              setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
            }
          }
          // Setting Min / Max values for V2 and all V3 versions prior to V3_3
          else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) < 0
                    && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
            setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, false);
          }
          // Setting Min / Max values for V3_3 and all next versions
          else if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 3)) >= 0
                      && columnsNames.contains(Arrays.asList(columnMetadata.getName()))) {
            setMinMaxValues(columnMetadata, rowCount, allowBinaryMetadata, true);
          }
        }
      }
    }
  }

  /**
   * If binary metadata was stored prior to Drill version {@link #ALLOWED_DRILL_VERSION_FOR_BINARY},
   * it might have incorrectly defined min / max values.
   * In case if given version is null, we assume this version is prior to {@link #ALLOWED_DRILL_VERSION_FOR_BINARY}.
   * In this case we allow reading such metadata only if {@link ParquetReaderConfig#enableStringsSignedMinMax()} is true.
   *
   * @param drillVersion drill version used to create metadata file
   * @param readerConfig parquet reader configuration
   * @return true if reading binary min / max values are allowed, false otherwise
   */
  private static boolean allowBinaryMetadata(String drillVersion, ParquetReaderConfig readerConfig) {
    return readerConfig.enableStringsSignedMinMax() ||
      (drillVersion != null && VersionUtil.compareVersions(ALLOWED_DRILL_VERSION_FOR_BINARY, drillVersion) <= 0);
  }

  /**
   * Returns the set of the lists with names of the columns with BINARY or
   * FIXED_LEN_BYTE_ARRAY data type from {@code ParquetTableMetadataBase columnTypeMetadataCollection}
   * if parquetTableMetadata has version v2 or v3 (including minor versions).
   *
   * @param parquetTableMetadata table metadata the source of the columns to check
   * @return set of the lists with column names
   */
  private static Set<List<String>> getBinaryColumnsNames(ParquetTableMetadataBase parquetTableMetadata) {
    Set<List<String>> names = new HashSet<>();
    if (parquetTableMetadata instanceof ParquetTableMetadata_v2) {
      for (ColumnTypeMetadata_v2 columnTypeMetadata :
        ((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
        if (columnTypeMetadata.primitiveType == PrimitiveTypeName.BINARY
            || columnTypeMetadata.primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
          names.add(Arrays.asList(columnTypeMetadata.name));
        }
      }
    } else if (parquetTableMetadata instanceof ParquetTableMetadata_v3) {
      for (ColumnTypeMetadata_v3 columnTypeMetadata :
        ((ParquetTableMetadata_v3) parquetTableMetadata).columnTypeInfo.values()) {
        if (columnTypeMetadata.primitiveType == PrimitiveTypeName.BINARY
            || columnTypeMetadata.primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
          names.add(Arrays.asList(columnTypeMetadata.name));
        }
      }
    }
    return names;
  }

  /**
   * If binary metadata is not allowed (logic is defined in
   * {@link ParquetReaderUtility#allowBinaryMetadata(String, ParquetReaderConfig)}),
   * set min max values for binary column, only if min and max values are the same,
   * otherwise set min and max as nulls.
   *
   * @param columnMetadata column metadata that should be changed
   * @param rowCount rows count in column chunk
   * @param allowBinaryMetadata if reading binary metadata is allowed
   * @param needsDecoding if min and max values is Base64 data and should be decoded
   */
  private static void setMinMaxValues(ColumnMetadata columnMetadata, long rowCount, boolean allowBinaryMetadata, boolean needsDecoding) {
    byte[] minBytes = null;
    byte[] maxBytes = null;

    boolean hasSingleValue = false;
    if (allowBinaryMetadata || (hasSingleValue = columnMetadata.hasSingleValue(rowCount))) {
      Object minValue = columnMetadata.getMinValue();
      Object maxValue = columnMetadata.getMaxValue();
      if (minValue instanceof String && maxValue instanceof String) {
        minBytes = ((String) minValue).getBytes();
        maxBytes = ((String) maxValue).getBytes();
        if (needsDecoding) {
          minBytes = Base64.decodeBase64(minBytes);
          maxBytes = hasSingleValue ? minBytes : Base64.decodeBase64(maxBytes);
        }
      }
    }

    columnMetadata.setMin(minBytes);
    columnMetadata.setMax(maxBytes);
  }

  /**
   * Check for corrupted dates in a parquet file. See Drill-4203
   */
  public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,
                                           List<SchemaPath> columns,
                                           boolean autoCorrectCorruptDates) {
    // old drill files have "parquet-mr" as created by string, and no drill version, need to check min/max values to see
    // if they look corrupt
    //  - option to disable this auto-correction based on the date values, in case users are storing these
    //    dates intentionally

    // migrated parquet files have 1.8.1 parquet-mr version with drill-r0 in the part of the name usually containing "SNAPSHOT"

    // new parquet files are generated with "is.date.correct" property have no corruption dates

    String createdBy = footer.getFileMetaData().getCreatedBy();
    String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
    String writerVersionValue = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.WRITER_VERSION_PROPERTY);
    // This flag can be present in parquet files which were generated with 1.9.0-SNAPSHOT and 1.9.0 drill versions.
    // If this flag is present it means that the version of the drill parquet writer is 2
    final String isDateCorrectFlag = "is.date.correct";
    String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(isDateCorrectFlag);
    if (drillVersion != null) {
      int writerVersion = 1;
      if (writerVersionValue != null) {
        writerVersion = Integer.parseInt(writerVersionValue);
      }
      else if (Boolean.valueOf(isDateCorrect)) {
        writerVersion = DRILL_WRITER_VERSION_STD_DATE_FORMAT;
      }
      return writerVersion >= DRILL_WRITER_VERSION_STD_DATE_FORMAT ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
          // loop through parquet column metadata to find date columns, check for corrupt values
          : checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
    } else {
      // Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
      // only applies if there is a date column selected
      if (createdBy == null || createdBy.equals("parquet-mr")) {
        return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
      } else {
        // check the created by to see if it is a migrated Drill file
        try {
          VersionParser.ParsedVersion parsedCreatedByVersion = VersionParser.parse(createdBy);
          // check if this is a migrated Drill file, lacking a Drill version number, but with
          // "drill" in the parquet created-by string
          if (parsedCreatedByVersion.hasSemanticVersion()) {
            SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
            String pre = semVer.pre + "";
            if (semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) {
              return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
            }
          }
          // written by a tool that wasn't Drill, the dates are not corrupted
          return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
        } catch (VersionParser.VersionParseException e) {
          // If we couldn't parse "created by" field, check column metadata of date columns
          return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
        }
      }
    }
  }

  /**
   * Detect corrupt date values by looking at the min/max values in the metadata.
   *
   * This should only be used when a file does not have enough metadata to determine if
   * the data was written with an external tool or an older version of Drill
   * ({@link org.apache.drill.exec.store.parquet.ParquetRecordWriter#WRITER_VERSION_PROPERTY} <
   * {@link org.apache.drill.exec.store.parquet.ParquetReaderUtility#DRILL_WRITER_VERSION_STD_DATE_FORMAT})
   *
   * This method only checks the first Row Group, because Drill has only ever written
   * a single Row Group per file.
   *
   * @param footer parquet footer
   * @param columns list of columns schema path
   * @param autoCorrectCorruptDates user setting to allow enabling/disabling of auto-correction
   *                                of corrupt dates. There are some rare cases (storing dates thousands
   *                                of years into the future, with tools other than Drill writing files)
   *                                that would result in the date values being "corrected" into bad values.
   */
  public static DateCorruptionStatus checkForCorruptDateValuesInStatistics(ParquetMetadata footer,
                                                              List<SchemaPath> columns,
                                                              boolean autoCorrectCorruptDates) {
    // Users can turn-off date correction in cases where we are detecting corruption based on the date values
    // that are unlikely to appear in common datasets. In this case report that no correction needs to happen
    // during the file read
    if (! autoCorrectCorruptDates) {
      return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
    }
    // Drill produced files have only ever have a single row group, if this changes in the future it won't matter
    // as we will know from the Drill version written in the files that the dates are correct
    int rowGroupIndex = 0;
    Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
    findDateColWithStatsLoop : for (SchemaPath schemaPath : columns) {
      List<ColumnDescriptor> parquetColumns = footer.getFileMetaData().getSchema().getColumns();
      for (int i = 0; i < parquetColumns.size(); ++i) {
        ColumnDescriptor column = parquetColumns.get(i);
        // this reader only supports flat data, this is restricted in the ParquetScanBatchCreator
        // creating a NameSegment makes sure we are using the standard code for comparing names,
        // currently it is all case-insensitive
        if (Utilities.isStarQuery(columns)
            || getFullColumnPath(column).equalsIgnoreCase(schemaPath.getUnIndexed().toString())) {
          int colIndex = -1;
          ConvertedType convertedType = schemaElements.get(getFullColumnPath(column)).getConverted_type();
          if (convertedType != null && convertedType.equals(ConvertedType.DATE)) {
            List<ColumnChunkMetaData> colChunkList = footer.getBlocks().get(rowGroupIndex).getColumns();
            for (int j = 0; j < colChunkList.size(); j++) {
              if (colChunkList.get(j).getPath().equals(ColumnPath.get(column.getPath()))) {
                colIndex = j;
                break;
              }
            }
          }
          if (colIndex == -1) {
            // column does not appear in this file, skip it
            continue;
          }
          IntStatistics statistics = (IntStatistics) footer.getBlocks().get(rowGroupIndex).getColumns().get(colIndex).getStatistics();
          return (statistics.hasNonNullValue() && statistics.compareMaxToValue(ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) > 0) ?
              DateCorruptionStatus.META_SHOWS_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
        }
      }
    }
    return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
  }

  /**
   * Utilities for converting from parquet INT96 binary (impala, hive timestamp)
   * to date time value. This utilizes the Joda library.
   */
  public static class NanoTimeUtils {

    public static final long NANOS_PER_MILLISECOND = 1000000;

  /**
   * @param binaryTimeStampValue
   *          hive, impala timestamp values with nanoseconds precision
   *          are stored in parquet Binary as INT96 (12 constant bytes)
   * @param retainLocalTimezone
   *          parquet files don't keep local timeZone according to the
   *          <a href="https://github.com/Parquet/parquet-format/blob/master/LogicalTypes.md#timestamp">Parquet spec</a>,
   *          but some tools (hive, for example) retain local timezone for parquet files by default
   *          Note: Impala doesn't retain local timezone by default
   * @return  Timestamp in milliseconds - the number of milliseconds since January 1, 1970, 00:00:00 GMT
   *          represented by @param binaryTimeStampValue.
   *          The nanos precision is cut to millis. Therefore the length of single timestamp value is
   *          {@value org.apache.drill.exec.expr.holders.NullableTimeStampHolder#WIDTH} bytes instead of 12 bytes.
   */
    public static long getDateTimeValueFromBinary(Binary binaryTimeStampValue, boolean retainLocalTimezone) {
      // This method represents binaryTimeStampValue as ByteBuffer, where timestamp is stored as sum of
      // julian day number (4 bytes) and nanos of day (8 bytes)
      NanoTime nt = NanoTime.fromBinary(binaryTimeStampValue);
      int julianDay = nt.getJulianDay();
      long nanosOfDay = nt.getTimeOfDayNanos();
      long dateTime = (julianDay - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
          + nanosOfDay / NANOS_PER_MILLISECOND;
      if (retainLocalTimezone) {
        return DateTimeZone.getDefault().convertUTCToLocal(dateTime);
      } else {
        return dateTime;
      }
    }
  }

  /**
   * Builds major type using given {@code OriginalType originalType} or {@code PrimitiveTypeName type}.
   * For DECIMAL will be returned major type with scale and precision.
   *
   * @param type         parquet primitive type
   * @param originalType parquet original type
   * @param scale        type scale (used for DECIMAL type)
   * @param precision    type precision (used for DECIMAL type)
   * @return major type
   */
  public static TypeProtos.MajorType getType(PrimitiveTypeName type, OriginalType originalType, int scale, int precision) {
    if (originalType != null) {
      switch (originalType) {
        case DECIMAL:
          return Types.withScaleAndPrecision(TypeProtos.MinorType.VARDECIMAL, TypeProtos.DataMode.OPTIONAL, scale, precision);
        case DATE:
          return Types.optional(TypeProtos.MinorType.DATE);
        case TIME_MILLIS:
          return Types.optional(TypeProtos.MinorType.TIME);
        case TIMESTAMP_MILLIS:
          return Types.optional(TypeProtos.MinorType.TIMESTAMP);
        case UTF8:
          return Types.optional(TypeProtos.MinorType.VARCHAR);
        case UINT_8:
          return Types.optional(TypeProtos.MinorType.UINT1);
        case UINT_16:
          return Types.optional(TypeProtos.MinorType.UINT2);
        case UINT_32:
          return Types.optional(TypeProtos.MinorType.UINT4);
        case UINT_64:
          return Types.optional(TypeProtos.MinorType.UINT8);
        case INT_8:
          return Types.optional(TypeProtos.MinorType.TINYINT);
        case INT_16:
          return Types.optional(TypeProtos.MinorType.SMALLINT);
        case INTERVAL:
          return Types.optional(TypeProtos.MinorType.INTERVAL);
      }
    }

    switch (type) {
      case BOOLEAN:
        return Types.optional(TypeProtos.MinorType.BIT);
      case INT32:
        return Types.optional(TypeProtos.MinorType.INT);
      case INT64:
        return Types.optional(TypeProtos.MinorType.BIGINT);
      case FLOAT:
        return Types.optional(TypeProtos.MinorType.FLOAT4);
      case DOUBLE:
        return Types.optional(TypeProtos.MinorType.FLOAT8);
      case BINARY:
      case FIXED_LEN_BYTE_ARRAY:
      case INT96:
        return Types.optional(TypeProtos.MinorType.VARBINARY);
      default:
        // Should never hit this
        throw new UnsupportedOperationException("Unsupported type:" + type);
    }
  }

  /**
   * Check whether any of columns in the given list is either nested or repetitive.
   *
   * @param footer  Parquet file schema
   * @param columns list of query SchemaPath objects
   */
  public static boolean containsComplexColumn(ParquetMetadata footer, List<SchemaPath> columns) {

    MessageType schema = footer.getFileMetaData().getSchema();

    if (Utilities.isStarQuery(columns)) {
      for (Type type : schema.getFields()) {
        if (!type.isPrimitive()) {
          return true;
        }
      }
      for (ColumnDescriptor col : schema.getColumns()) {
        if (col.getMaxRepetitionLevel() > 0) {
          return true;
        }
      }
      return false;
    } else {
      Map<String, ColumnDescriptor> colDescMap = ParquetReaderUtility.getColNameToColumnDescriptorMapping(footer);
      Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);

      for (SchemaPath schemaPath : columns) {
        // Schema path which is non-leaf is complex column
        if (!schemaPath.isLeaf()) {
          logger.trace("rowGroupScan contains complex column: {}", schemaPath.getUnIndexed().toString());
          return true;
        }

        // following column descriptor lookup failure may mean two cases, depending on subsequent SchemaElement lookup:
        // 1. success: queried column is complex, i.e. GroupType
        // 2. failure: queried column is not in schema and thus is non-complex
        ColumnDescriptor column = colDescMap.get(schemaPath.getUnIndexed().toString().toLowerCase());

        if (column == null) {
          SchemaElement schemaElement = schemaElements.get(schemaPath.getUnIndexed().toString().toLowerCase());
          if (schemaElement != null) {
            return true;
          }
        } else {
          if (column.getMaxRepetitionLevel() > 0) {
            logger.trace("rowGroupScan contains repetitive column: {}", schemaPath.getUnIndexed().toString());
            return true;
          }
        }
      }
    }
    return false;
  }
}