aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java')
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java326
1 files changed, 314 insertions, 12 deletions
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index c359e69b6..c1dc643e8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -19,11 +19,17 @@ package org.apache.drill.exec.physical.impl.writer;
import static org.apache.drill.exec.store.parquet.ParquetRecordWriter.DRILL_VERSION_PROPERTY;
import static org.apache.drill.test.TestBuilder.convertToLocalTimestamp;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.FileWriter;
+import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Paths;
import java.sql.Date;
@@ -35,6 +41,9 @@ import java.util.List;
import java.util.Map;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.util.JsonStringArrayList;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.ParquetTest;
import org.apache.drill.categories.SlowTest;
@@ -49,6 +58,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.AfterClass;
@@ -72,7 +84,7 @@ public class TestParquetWriter extends BaseTestQuery {
dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "int96_dict_change"));
}
- static FileSystem fs;
+ private static FileSystem fs;
// Map storing a convenient name as well as the cast type necessary
// to produce it casting from a varchar
@@ -85,11 +97,11 @@ public class TestParquetWriter extends BaseTestQuery {
static {
allTypes.put("int", "int");
allTypes.put("bigint", "bigint");
- // TODO(DRILL-3367)
-// allTypes.put("decimal(9, 4)", "decimal9");
-// allTypes.put("decimal(18,9)", "decimal18");
-// allTypes.put("decimal(28, 14)", "decimal28sparse");
-// allTypes.put("decimal(38, 19)", "decimal38sparse");
+ allTypes.put("decimal(9, 4)", "decimal9");
+ allTypes.put("decimal(18,9)", "decimal18");
+ allTypes.put("decimal(28, 14)", "decimal28sparse");
+ allTypes.put("decimal(38, 19)", "decimal38sparse");
+ allTypes.put("decimal(38, 15)", "vardecimal");
allTypes.put("date", "date");
allTypes.put("timestamp", "timestamp");
allTypes.put("float", "float4");
@@ -126,8 +138,8 @@ public class TestParquetWriter extends BaseTestQuery {
}
@AfterClass
- public static void disableDecimalDataType() throws Exception {
- alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, false);
+ public static void disableDecimalDataType() {
+ resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
}
@Test
@@ -146,7 +158,7 @@ public class TestParquetWriter extends BaseTestQuery {
@Test
public void testLargeFooter() throws Exception {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
// create a JSON document with a lot of columns
sb.append("{");
final int numCols = 1000;
@@ -155,13 +167,13 @@ public class TestParquetWriter extends BaseTestQuery {
for (int i = 0 ; i < numCols - 1; i++) {
sb.append(String.format("\"col_%d\" : 100,", i));
colNames[i] = "col_" + i;
- values[i] = 100l;
+ values[i] = 100L;
}
// add one column without a comma after it
sb.append(String.format("\"col_%d\" : 100", numCols - 1));
sb.append("}");
colNames[numCols - 1] = "col_" + (numCols - 1);
- values[numCols - 1] = 100l;
+ values[numCols - 1] = 100L;
String path = "test";
File pathDir = dirTestWatcher.makeRootSubDir(Paths.get(path));
@@ -1002,5 +1014,295 @@ public class TestParquetWriter extends BaseTestQuery {
resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
}
}
-}
+ @Test
+ public void testWriteDecimalIntBigIntFixedLen() throws Exception {
+ String tableName = "decimalIntBigIntFixedLen";
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, FIXED_LEN_BYTE_ARRAY.name());
+ test(
+ "create table dfs.tmp.%s as\n" +
+ "select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
+ "cast('123456.789123456789' as decimal(18, 12)) as decBigInt,\n" +
+ "cast('123456.789123456789' as decimal(19, 12)) as fixedLen", tableName);
+ checkTableTypes(tableName,
+ ImmutableList.of(
+ Pair.of("decInt", INT32),
+ Pair.of("decBigInt", INT64),
+ Pair.of("fixedLen", FIXED_LEN_BYTE_ARRAY)),
+ true);
+ testBuilder()
+ .sqlQuery("select * from dfs.tmp.%s", tableName)
+ .unOrdered()
+ .baselineColumns("decInt", "decBigInt", "fixedLen")
+ .baselineValues(new BigDecimal("123456.789"),
+ new BigDecimal("123456.789123456789"),
+ new BigDecimal("123456.789123456789"))
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+ resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ test("drop table if exists dfs.tmp.%s", tableName);
+ }
+ }
+
+ @Test
+ public void testWriteDecimalIntBigIntBinary() throws Exception {
+ String tableName = "decimalIntBigIntBinary";
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+ alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, BINARY.name());
+ test(
+ "create table dfs.tmp.%s as\n" +
+ "select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
+ "cast('123456.789123456789' as decimal(18, 12)) as decBigInt,\n" +
+ "cast('123456.789123456789' as decimal(19, 12)) as binCol", tableName);
+ checkTableTypes(tableName,
+ ImmutableList.of(
+ Pair.of("decInt", INT32),
+ Pair.of("decBigInt", INT64),
+ Pair.of("binCol", BINARY)),
+ true);
+ testBuilder()
+ .sqlQuery("select * from dfs.tmp.%s", tableName)
+ .unOrdered()
+ .baselineColumns("decInt", "decBigInt", "binCol")
+ .baselineValues(new BigDecimal("123456.789"),
+ new BigDecimal("123456.789123456789"),
+ new BigDecimal("123456.789123456789"))
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+ resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ test("drop table if exists dfs.tmp.%s", tableName);
+ }
+ }
+
+ @Test
+ public void testWriteDecimalFixedLenOnly() throws Exception {
+ String tableName = "decimalFixedLenOnly";
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+ alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, FIXED_LEN_BYTE_ARRAY.name());
+ test(
+ "create table dfs.tmp.%s as\n" +
+ "select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
+ "cast('123456.789123456789' as decimal(18, 12)) as decBigInt,\n" +
+ "cast('123456.789123456789' as decimal(19, 12)) as fixedLen", tableName);
+ checkTableTypes(tableName,
+ ImmutableList.of(
+ Pair.of("decInt", FIXED_LEN_BYTE_ARRAY),
+ Pair.of("decBigInt", FIXED_LEN_BYTE_ARRAY),
+ Pair.of("fixedLen", FIXED_LEN_BYTE_ARRAY)),
+ true);
+ testBuilder()
+ .sqlQuery("select * from dfs.tmp.%s", tableName)
+ .unOrdered()
+ .baselineColumns("decInt", "decBigInt", "fixedLen")
+ .baselineValues(new BigDecimal("123456.789"),
+ new BigDecimal("123456.789123456789"),
+ new BigDecimal("123456.789123456789"))
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+ resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ test("drop table if exists dfs.tmp.%s", tableName);
+ }
+ }
+
+ @Test
+ public void testWriteDecimalBinaryOnly() throws Exception {
+ String tableName = "decimalBinaryOnly";
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+ alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, BINARY.name());
+ test(
+ "create table dfs.tmp.%s as\n" +
+ "select cast('123456.789' as decimal(9, 3)) as decInt,\n" +
+ "cast('123456.789123456789' as decimal(18, 12)) as decBigInt,\n" +
+ "cast('123456.789123456789' as decimal(19, 12)) as binCol", tableName);
+ checkTableTypes(tableName,
+ ImmutableList.of(
+ Pair.of("decInt", BINARY),
+ Pair.of("decBigInt", BINARY),
+ Pair.of("binCol", BINARY)),
+ true);
+ testBuilder()
+ .sqlQuery("select * from dfs.tmp.%s", tableName)
+ .unOrdered()
+ .baselineColumns("decInt", "decBigInt", "binCol")
+ .baselineValues(new BigDecimal("123456.789"),
+ new BigDecimal("123456.789123456789"),
+ new BigDecimal("123456.789123456789"))
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+ resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ test("drop table if exists dfs.tmp.%s", tableName);
+ }
+ }
+
+ @Test
+ public void testWriteDecimalIntBigIntRepeated() throws Exception {
+ String tableName = "decimalIntBigIntRepeated";
+
+ JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
+ ints.add(new BigDecimal("999999.999"));
+ ints.add(new BigDecimal("-999999.999"));
+ ints.add(new BigDecimal("0.000"));
+
+ JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
+ longs.add(new BigDecimal("999999999.999999999"));
+ longs.add(new BigDecimal("-999999999.999999999"));
+ longs.add(new BigDecimal("0.000000000"));
+
+ JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
+ fixedLen.add(new BigDecimal("999999999999.999999"));
+ fixedLen.add(new BigDecimal("-999999999999.999999"));
+ fixedLen.add(new BigDecimal("0.000000"));
+
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true);
+ alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, FIXED_LEN_BYTE_ARRAY.name());
+ test(
+ "create table dfs.tmp.%s as\n" +
+ "select * from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
+ checkTableTypes(tableName,
+ ImmutableList.of(
+ Pair.of("decimal_int32", INT32),
+ Pair.of("decimal_int64", INT64),
+ Pair.of("decimal_fixedLen", INT64),
+ Pair.of("decimal_binary", INT64)),
+ true);
+ testBuilder()
+ .sqlQuery("select * from dfs.tmp.%s", tableName)
+ .unOrdered()
+ .baselineColumns("decimal_int32", "decimal_int64", "decimal_fixedLen", "decimal_binary")
+ .baselineValues(ints, longs, fixedLen, fixedLen)
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+ resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ test("drop table if exists dfs.tmp.%s", tableName);
+ }
+ }
+
+ @Test
+ public void testWriteDecimalFixedLenRepeated() throws Exception {
+ String tableName = "decimalFixedLenRepeated";
+
+ JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
+ ints.add(new BigDecimal("999999.999"));
+ ints.add(new BigDecimal("-999999.999"));
+ ints.add(new BigDecimal("0.000"));
+
+ JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
+ longs.add(new BigDecimal("999999999.999999999"));
+ longs.add(new BigDecimal("-999999999.999999999"));
+ longs.add(new BigDecimal("0.000000000"));
+
+ JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
+ fixedLen.add(new BigDecimal("999999999999.999999"));
+ fixedLen.add(new BigDecimal("-999999999999.999999"));
+ fixedLen.add(new BigDecimal("0.000000"));
+
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+ alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, FIXED_LEN_BYTE_ARRAY.name());
+ test(
+ "create table dfs.tmp.%s as\n" +
+ "select * from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
+ checkTableTypes(tableName,
+ ImmutableList.of(
+ Pair.of("decimal_int32", FIXED_LEN_BYTE_ARRAY),
+ Pair.of("decimal_int64", FIXED_LEN_BYTE_ARRAY),
+ Pair.of("decimal_fixedLen", FIXED_LEN_BYTE_ARRAY),
+ Pair.of("decimal_binary", FIXED_LEN_BYTE_ARRAY)),
+ true);
+ testBuilder()
+ .sqlQuery("select * from dfs.tmp.%s", tableName)
+ .unOrdered()
+ .baselineColumns("decimal_int32", "decimal_int64", "decimal_fixedLen", "decimal_binary")
+ .baselineValues(ints, longs, fixedLen, fixedLen)
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+ resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ test("drop table if exists dfs.tmp.%s", tableName);
+ }
+ }
+
+ @Test
+ public void testWriteDecimalBinaryRepeated() throws Exception {
+ String tableName = "decimalBinaryRepeated";
+
+ JsonStringArrayList<BigDecimal> ints = new JsonStringArrayList<>();
+ ints.add(new BigDecimal("999999.999"));
+ ints.add(new BigDecimal("-999999.999"));
+ ints.add(new BigDecimal("0.000"));
+
+ JsonStringArrayList<BigDecimal> longs = new JsonStringArrayList<>();
+ longs.add(new BigDecimal("999999999.999999999"));
+ longs.add(new BigDecimal("-999999999.999999999"));
+ longs.add(new BigDecimal("0.000000000"));
+
+ JsonStringArrayList<BigDecimal> fixedLen = new JsonStringArrayList<>();
+ fixedLen.add(new BigDecimal("999999999999.999999"));
+ fixedLen.add(new BigDecimal("-999999999999.999999"));
+ fixedLen.add(new BigDecimal("0.000000"));
+ try {
+ alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false);
+ alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, BINARY.name());
+ test(
+ "create table dfs.tmp.%s as\n" +
+ "select * from cp.`parquet/repeatedIntLondFixedLenBinaryDecimal.parquet`", tableName);
+ checkTableTypes(tableName,
+ ImmutableList.of(
+ Pair.of("decimal_int32", BINARY),
+ Pair.of("decimal_int64", BINARY),
+ Pair.of("decimal_fixedLen", BINARY),
+ Pair.of("decimal_binary", BINARY)),
+ true);
+ testBuilder()
+ .sqlQuery("select * from dfs.tmp.%s", tableName)
+ .unOrdered()
+ .baselineColumns("decimal_int32", "decimal_int64", "decimal_fixedLen", "decimal_binary")
+ .baselineValues(ints, longs, fixedLen, fixedLen)
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS);
+ resetSessionOption(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS);
+ test("drop table if exists dfs.tmp.%s", tableName);
+ }
+ }
+
+ /**
+ * Checks that specified parquet table contains specified columns with specified types.
+ *
+ * @param tableName name of the table that should be checked.
+ * @param columnsToCheck pair of column name and column type that should be checked in the table.
+ * @param isDecimalType is should be specified columns annotated ad DECIMAL.
+ * @throws IOException If table file was not found.
+ */
+ private void checkTableTypes(String tableName,
+ List<Pair<String, PrimitiveType.PrimitiveTypeName>> columnsToCheck,
+ boolean isDecimalType) throws IOException {
+ MessageType schema = ParquetFileReader.readFooter(
+ new Configuration(),
+ new Path(Paths.get(dirTestWatcher.getDfsTestTmpDir().getPath(), tableName, "0_0_0.parquet").toUri().getPath()),
+ NO_FILTER).getFileMetaData().getSchema();
+
+ for (Pair<String, PrimitiveType.PrimitiveTypeName> nameType : columnsToCheck) {
+ assertEquals(
+ String.format("Table %s does not contain column %s with type %s",
+ tableName, nameType.getKey(), nameType.getValue()),
+ nameType.getValue(),
+ schema.getType(nameType.getKey()).asPrimitiveType().getPrimitiveTypeName());
+
+ assertEquals(
+ String.format("Table %s %s column %s with DECIMAL type", tableName,
+ isDecimalType ? "does not contain" : "contains unexpected", nameType.getKey()),
+ isDecimalType, schema.getType(nameType.getKey()).getOriginalType() == OriginalType.DECIMAL);
+ }
+ }
+}