diff options
Diffstat (limited to 'exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java')
-rw-r--r-- | exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java | 326 |
1 files changed, 314 insertions, 12 deletions
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index c359e69b6..c1dc643e8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -19,11 +19,17 @@ package org.apache.drill.exec.physical.impl.writer; import static org.apache.drill.exec.store.parquet.ParquetRecordWriter.DRILL_VERSION_PROPERTY; import static org.apache.drill.test.TestBuilder.convertToLocalTimestamp; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.junit.Assert.assertEquals; import java.io.File; import java.io.FileWriter; +import java.io.IOException; import java.math.BigDecimal; import java.nio.file.Paths; import java.sql.Date; @@ -35,6 +41,9 @@ import java.util.List; import java.util.Map; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import org.apache.calcite.util.Pair; +import org.apache.drill.exec.util.JsonStringArrayList; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.ParquetTest; import org.apache.drill.categories.SlowTest; @@ -49,6 +58,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; import org.joda.time.DateTime; import org.joda.time.Period; import org.junit.AfterClass; @@ -72,7 +84,7 @@ public class TestParquetWriter extends BaseTestQuery { dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "int96_dict_change")); } - static FileSystem fs; + private static FileSystem fs; // Map storing a convenient name as well as the cast type necessary // to produce it casting from a varchar @@ -85,11 +97,11 @@ public class TestParquetWriter extends BaseTestQuery { static { allTypes.put("int", "int"); allTypes.put("bigint", "bigint"); - // TODO(DRILL-3367) -// allTypes.put("decimal(9, 4)", "decimal9"); -// allTypes.put("decimal(18,9)", "decimal18"); -// allTypes.put("decimal(28, 14)", "decimal28sparse"); -// allTypes.put("decimal(38, 19)", "decimal38sparse"); + allTypes.put("decimal(9, 4)", "decimal9"); + allTypes.put("decimal(18,9)", "decimal18"); + allTypes.put("decimal(28, 14)", "decimal28sparse"); + allTypes.put("decimal(38, 19)", "decimal38sparse"); + allTypes.put("decimal(38, 15)", "vardecimal"); allTypes.put("date", "date"); allTypes.put("timestamp", "timestamp"); allTypes.put("float", "float4"); @@ -126,8 +138,8 @@ public class TestParquetWriter extends BaseTestQuery { } @AfterClass - public static void disableDecimalDataType() throws Exception { - alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, false); + public static void disableDecimalDataType() { + resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY); } @Test @@ -146,7 +158,7 @@ public class TestParquetWriter extends BaseTestQuery { @Test public void testLargeFooter() throws Exception { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); // create a JSON document with a lot of columns sb.append("{"); final int numCols = 1000; @@ -155,13 +167,13 @@ public class TestParquetWriter extends BaseTestQuery { for (int i = 0 ; i < numCols - 1; i++) { sb.append(String.format("\"col_%d\" : 100,", i)); colNames[i] = "col_" + i; - values[i] = 100l; + values[i] = 100L; } // add one column without a comma after it sb.append(String.format("\"col_%d\" : 100", numCols - 1)); sb.append("}"); colNames[numCols - 1] = "col_" + (numCols - 1); - values[numCols - 1] = 100l; + values[numCols - 1] = 100L; String path = "test"; File pathDir = dirTestWatcher.makeRootSubDir(Paths.get(path)); @@ -1002,5 +1014,295 @@ public class TestParquetWriter extends BaseTestQuery { resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); } } -} + @Test + public void testWriteDecimalIntBigIntFixedLen() throws Exception { + String tableName = "decimalIntBigIntFixedLen"; + try { + alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, FIXED_LEN_BYTE_ARRAY.name()); + test( + "create table dfs.tmp.%s as\n" + + "select cast('123456.789' as decimal(9, 3)) as decInt,\n" + + "cast('123456.789123456789' as decimal(18, 12)) as decBigInt,\n" + + "cast('123456.789123456789' as decimal(19, 12)) as fixedLen", tableName); + checkTableTypes(tableName, + ImmutableList.of( + Pair.of("decInt", INT32), + Pair.of("decBigInt", INT64), + Pair.of("fixedLen", FIXED_LEN_BYTE_ARRAY)), + true); + testBuilder() + .sqlQuery("select * from dfs.tmp.%s", tableName) + .unOrdered() + .baselineColumns("decInt", "decBigInt", "fixedLen") + .baselineValues(new BigDecimal("123456.789"), + new BigDecimal("123456.789123456789"), + new BigDecimal("123456.789123456789")) + .go(); + } finally { + resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS); + resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS); + test("drop table if exists dfs.tmp.%s", tableName); + } + } + + @Test + public void testWriteDecimalIntBigIntBinary() throws Exception { + String tableName = "decimalIntBigIntBinary"; + try { + alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true); + alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, BINARY.name()); + test( + "create table dfs.tmp.%s as\n" + + "select cast('123456.789' as decimal(9, 3)) as decInt,\n" + + "cast('123456.789123456789' as decimal(18, 12)) as decBigInt,\n" + + "cast('123456.789123456789' as decimal(19, 12)) as binCol", tableName); + checkTableTypes(tableName, + ImmutableList.of( + Pair.of("decInt", INT32), + Pair.of("decBigInt", INT64), + Pair.of("binCol", BINARY)), + true); + testBuilder() + .sqlQuery("select * from dfs.tmp.%s", tableName) + .unOrdered() + .baselineColumns("decInt", "decBigInt", "binCol") + .baselineValues(new BigDecimal("123456.789"), + new BigDecimal("123456.789123456789"), + new BigDecimal("123456.789123456789")) + .go(); + } finally { + resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS); + resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS); + test("drop table if exists dfs.tmp.%s", tableName); + } + } + + @Test + public void testWriteDecimalFixedLenOnly() throws Exception { + String tableName = "decimalFixedLenOnly"; + try { + alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false); + alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, FIXED_LEN_BYTE_ARRAY.name()); + test( + "create table dfs.tmp.%s as\n" + + "select cast('123456.789' as decimal(9, 3)) as decInt,\n" + + "cast('123456.789123456789' as decimal(18, 12)) as decBigInt,\n" + + "cast('123456.789123456789' as decimal(19, 12)) as fixedLen", tableName); + checkTableTypes(tableName, + ImmutableList.of( + Pair.of("decInt", FIXED_LEN_BYTE_ARRAY), + Pair.of("decBigInt", FIXED_LEN_BYTE_ARRAY), + Pair.of("fixedLen", FIXED_LEN_BYTE_ARRAY)), + true); + testBuilder() + .sqlQuery("select * from dfs.tmp.%s", tableName) + .unOrdered() + .baselineColumns("decInt", "decBigInt", "fixedLen") + .baselineValues(new BigDecimal("123456.789"), + new BigDecimal("123456.789123456789"), + new BigDecimal("123456.789123456789")) + .go(); + } finally { + resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS); + resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS); + test("drop table if exists dfs.tmp.%s", tableName); + } + } + + @Test + public void testWriteDecimalBinaryOnly() throws Exception { + String tableName = "decimalBinaryOnly"; + try { + alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false); + alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, BINARY.name()); + test( + "create table dfs.tmp.%s as\n" + + "select cast('123456.789' as decimal(9, 3)) as decInt,\n" + + "cast('123456.789123456789' as decimal(18, 12)) as decBigInt,\n" + + "cast('123456.789123456789' as decimal(19, 12)) as binCol", tableName); + checkTableTypes(tableName, + ImmutableList.of( + Pair.of("decInt", BINARY), + Pair.of("decBigInt", BINARY), + Pair.of("binCol", BINARY)), + true); + testBuilder() + .sqlQuery("select * from dfs.tmp.%s", tableName) + .unOrdered() + .baselineColumns("decInt", "decBigInt", "binCol") + .baselineValues(new BigDecimal("123456.789"), + new BigDecimal("123456.789123456789"), + new BigDecimal("123456.789123456789")) + .go(); + } finally { + resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS); + resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS); + test("drop table if exists dfs.tmp.%s", tableName); + } + } + + @Test + public void testWriteDecimalIntBigIntRepeated() throws Exception { + String tableName = "decimalIntBigIntRepeated"; + + JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>(); + ints.add(new BigDecimal("999999.999")); + ints.add(new BigDecimal("-999999.999")); + ints.add(new BigDecimal("0.000")); + + JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>(); + longs.add(new BigDecimal("999999999.999999999")); + longs.add(new BigDecimal("-999999999.999999999")); + longs.add(new BigDecimal("0.000000000")); + + JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>(); + fixedLen.add(new BigDecimal("999999999999.999999")); + fixedLen.add(new BigDecimal("-999999999999.999999")); + fixedLen.add(new BigDecimal("0.000000")); + + try { + alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true); + alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, FIXED_LEN_BYTE_ARRAY.name()); + test( + "create table dfs.tmp.%s as\n" + + "select * from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName); + checkTableTypes(tableName, + ImmutableList.of( + Pair.of("decimal_int32", INT32), + Pair.of("decimal_int64", INT64), + Pair.of("decimal_fixedLen", INT64), + Pair.of("decimal_binary", INT64)), + true); + testBuilder() + .sqlQuery("select * from dfs.tmp.%s", tableName) + .unOrdered() + .baselineColumns("decimal_int32", "decimal_int64", "decimal_fixedLen", "decimal_binary") + .baselineValues(ints, longs, fixedLen, fixedLen) + .go(); + } finally { + resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS); + resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS); + test("drop table if exists dfs.tmp.%s", tableName); + } + } + + @Test + public void testWriteDecimalFixedLenRepeated() throws Exception { + String tableName = "decimalFixedLenRepeated"; + + JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>(); + ints.add(new BigDecimal("999999.999")); + ints.add(new BigDecimal("-999999.999")); + ints.add(new BigDecimal("0.000")); + + JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>(); + longs.add(new BigDecimal("999999999.999999999")); + longs.add(new BigDecimal("-999999999.999999999")); + longs.add(new BigDecimal("0.000000000")); + + JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>(); + fixedLen.add(new BigDecimal("999999999999.999999")); + fixedLen.add(new BigDecimal("-999999999999.999999")); + fixedLen.add(new BigDecimal("0.000000")); + + try { + alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false); + alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, FIXED_LEN_BYTE_ARRAY.name()); + test( + "create table dfs.tmp.%s as\n" + + "select * from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName); + checkTableTypes(tableName, + ImmutableList.of( + Pair.of("decimal_int32", FIXED_LEN_BYTE_ARRAY), + Pair.of("decimal_int64", FIXED_LEN_BYTE_ARRAY), + Pair.of("decimal_fixedLen", FIXED_LEN_BYTE_ARRAY), + Pair.of("decimal_binary", FIXED_LEN_BYTE_ARRAY)), + true); + testBuilder() + .sqlQuery("select * from dfs.tmp.%s", tableName) + .unOrdered() + .baselineColumns("decimal_int32", "decimal_int64", "decimal_fixedLen", "decimal_binary") + .baselineValues(ints, longs, fixedLen, fixedLen) + .go(); + } finally { + resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS); + resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS); + test("drop table if exists dfs.tmp.%s", tableName); + } + } + + @Test + public void testWriteDecimalBinaryRepeated() throws Exception { + String tableName = "decimalBinaryRepeated"; + + JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>(); + ints.add(new BigDecimal("999999.999")); + ints.add(new BigDecimal("-999999.999")); + ints.add(new BigDecimal("0.000")); + + JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>(); + longs.add(new BigDecimal("999999999.999999999")); + longs.add(new BigDecimal("-999999999.999999999")); + longs.add(new BigDecimal("0.000000000")); + + JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>(); + fixedLen.add(new BigDecimal("999999999999.999999")); + fixedLen.add(new BigDecimal("-999999999999.999999")); + fixedLen.add(new BigDecimal("0.000000")); + try { + alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false); + alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, BINARY.name()); + test( + "create table dfs.tmp.%s as\n" + + "select * from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName); + checkTableTypes(tableName, + ImmutableList.of( + Pair.of("decimal_int32", BINARY), + Pair.of("decimal_int64", BINARY), + Pair.of("decimal_fixedLen", BINARY), + Pair.of("decimal_binary", BINARY)), + true); + testBuilder() + .sqlQuery("select * from dfs.tmp.%s", tableName) + .unOrdered() + .baselineColumns("decimal_int32", "decimal_int64", "decimal_fixedLen", "decimal_binary") + .baselineValues(ints, longs, fixedLen, fixedLen) + .go(); + } finally { + resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS); + resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS); + test("drop table if exists dfs.tmp.%s", tableName); + } + } + + /** + * Checks that specified parquet table contains specified columns with specified types. + * + * @param tableName name of the table that should be checked. + * @param columnsToCheck pair of column name and column type that should be checked in the table. + * @param isDecimalType is should be specified columns annotated ad DECIMAL. + * @throws IOException If table file was not found. + */ + private void checkTableTypes(String tableName, + List<Pair<String, PrimitiveType.PrimitiveTypeName>> columnsToCheck, + boolean isDecimalType) throws IOException { + MessageType schema = ParquetFileReader.readFooter( + new Configuration(), + new Path(Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName, "0_0_0.parquet").toUri().getPath()), + NO_FILTER).getFileMetaData().getSchema(); + + for (Pair<String, PrimitiveType.PrimitiveTypeName> nameType : columnsToCheck) { + assertEquals( + String.format("Table %s does not contain column %s with type %s", + tableName, nameType.getKey(), nameType.getValue()), + nameType.getValue(), + schema.getType(nameType.getKey()).asPrimitiveType().getPrimitiveTypeName()); + + assertEquals( + String.format("Table %s %s column %s with DECIMAL type", tableName, + isDecimalType ? "does not contain" : "contains unexpected", nameType.getKey()), + isDecimalType, schema.getType(nameType.getKey()).getOriginalType() == OriginalType.DECIMAL); + } + } +} |