diff options
3 files changed, 218 insertions, 166 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java index 744f98297..89e220c26 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java @@ -218,7 +218,7 @@ public class AvroRecordReader extends AbstractRecordReader { if (schema.getTypes().get(0).getType() != Schema.Type.NULL) { throw new UnsupportedOperationException("Avro union type must be of the format : [\"null\", \"some-type\"]"); } - process(value, schema.getTypes().get(1), fieldName, writer, fieldSelection.getChild(fieldName)); + process(value, schema.getTypes().get(1), fieldName, writer, fieldSelection); break; case MAP: @SuppressWarnings("unchecked") diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java index b68f19ddc..af4d0e6a1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroFormatTest.java @@ -17,13 +17,20 @@ */ package org.apache.drill.exec.store.avro; +import com.google.common.collect.Lists; import org.apache.drill.BaseTestQuery; import org.apache.drill.TestBuilder; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.exec.util.JsonStringHashMap; import org.junit.Assert; import org.junit.Test; +import java.util.List; +import java.util.Map; + +import static org.apache.drill.TestBuilder.listOf; + /** * Unit tests for Avro record reader. */ @@ -133,24 +140,37 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testSimplePrimitiveSchema_StarQuery() throws Exception { + simpleAvroTestHelper(AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(), "select * from dfs_test.`%s`"); + } - final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); - final String file = testSetup.getFilePath(); - final String sql = "select * from dfs_test.`" + file + "`"; - test(sql); - testBuilder() - .sqlQuery(sql) - .unOrdered() - .baselineRecords(testSetup.getExpectedRecords()) - .go(); + private List<Map<String, Object>> project( + List<Map<String,Object>> incomingRecords, + List<String> projectCols) { + List<Map<String,Object>> output = Lists.newArrayList(); + for (Map<String, Object> incomingRecord : incomingRecords) { + final JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>(); + for (String s : incomingRecord.keySet()) { + if (projectCols.contains(s)) { + newRecord.put(s, incomingRecord.get(s)); + } + } + output.add(newRecord); + } + return output; } @Test public void testSimplePrimitiveSchema_SelectColumnSubset() throws Exception { - final String file = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues().getFilePath(); + final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimplePrimitiveSchema_NoNullValues(); + final String file = testSetup.getFilePath(); final String sql = "select h_boolean, e_double from dfs_test.`" + file + "`"; - test(sql); + List<String> projectList = Lists.newArrayList("`h_boolean`", "`e_double`"); + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineRecords(project(testSetup.getExpectedRecords(), projectList)) + .go(); } @Test @@ -182,8 +202,7 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testSimpleArraySchema_NoNullValues() throws Exception { - - final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues(); + final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues().getFilePath(); final String sql = "select a_string, c_string_array[0], e_float_array[2] " + "from dfs_test.`" + file + "`"; test(sql); @@ -191,16 +210,12 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testSimpleArraySchema_StarQuery() throws Exception { - - final String file = AvroTestUtil.generateSimpleArraySchema_NoNullValues(); - final String sql = "select * from dfs_test.`" + file + "`"; - test(sql); + simpleAvroTestHelper(AvroTestUtil.generateSimpleArraySchema_NoNullValues(), "select * from dfs_test.`%s`"); } @Test public void testDoubleNestedSchema_NoNullValues_NotAllColumnsProjected() throws Exception { - - final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues(); + final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues().getFilePath(); final String sql = "select t.c_record.nested_1_int, " + "t.c_record.nested_1_record.double_nested_1_int " + "from dfs_test.`" + file + "` t"; @@ -210,67 +225,89 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testSimpleNestedSchema_NoNullValues() throws Exception { - final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues(); + final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimpleNestedSchema_NoNullValues(); + final String file = testSetup.getFilePath(); final String sql = "select a_string, b_int, t.c_record.nested_1_string, t.c_record.nested_1_int " + - "from dfs_test.`" + file + "` t"; + "from dfs_test.`" + file + "` t"; test(sql); } @Test public void testSimpleNestedSchema_StarQuery() throws Exception { - final String file = AvroTestUtil.generateSimpleNestedSchema_NoNullValues(); + final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimpleNestedSchema_NoNullValues(); + final String file = testSetup.getFilePath(); final String sql = "select * from dfs_test.`" + file + "`"; - test(sql); + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineRecords(testSetup.getExpectedRecords()) + .go(); } - @Test public void testDoubleNestedSchema_NoNullValues() throws Exception { - - final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues(); + final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues().getFilePath(); final String sql = "select a_string, b_int, t.c_record.nested_1_string, t.c_record.nested_1_int, " + "t.c_record.nested_1_record.double_nested_1_string, " + "t.c_record.nested_1_record.double_nested_1_int " + "from dfs_test.`" + file + "` t"; test(sql); + + final String sql2 = "select t.c_record.nested_1_string " + + "from dfs_test.`" + file + "` t limit 1"; + TestBuilder testBuilder = testBuilder() + .sqlQuery(sql2) + .unOrdered() + .baselineColumns("EXPR$0"); + for (int i = 0; i < 1; i++) { + testBuilder + .baselineValues("nested_1_string_" + i); + } + testBuilder.go(); } @Test public void testDoubleNestedSchema_StarQuery() throws Exception { + simpleAvroTestHelper(AvroTestUtil.generateDoubleNestedSchema_NoNullValues(), "select * from dfs_test.`%s`"); + } - final String file = AvroTestUtil.generateDoubleNestedSchema_NoNullValues(); - final String sql = "select * from dfs_test.`" + file + "`"; - test(sql); + private static void simpleAvroTestHelper(AvroTestUtil.AvroTestRecordWriter testSetup, final String sql) throws Exception { + final String file = testSetup.getFilePath(); + final String sqlWithTable = String.format(sql, file); + testBuilder() + .sqlQuery(sqlWithTable) + .unOrdered() + .baselineRecords(testSetup.getExpectedRecords()) + .go(); } @Test public void testSimpleEnumSchema_NoNullValues() throws Exception { - - final String file = AvroTestUtil.generateSimpleEnumSchema_NoNullValues(); + final AvroTestUtil.AvroTestRecordWriter testSetup = AvroTestUtil.generateSimpleEnumSchema_NoNullValues(); + final String file = testSetup.getFilePath(); final String sql = "select a_string, b_enum from dfs_test.`" + file + "`"; - test(sql); + List<String> projectList = Lists.newArrayList("`a_string`", "`b_enum`"); + testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineRecords(project(testSetup.getExpectedRecords(), projectList)) + .go(); } @Test public void testSimpleEnumSchema_StarQuery() throws Exception { - - final String file = AvroTestUtil.generateSimpleEnumSchema_NoNullValues(); - final String sql = "select * from dfs_test.`" + file + "`"; - test(sql); + simpleAvroTestHelper(AvroTestUtil.generateSimpleEnumSchema_NoNullValues(), "select * from dfs_test.`%s`"); } @Test public void testSimpleUnionSchema_StarQuery() throws Exception { - - final String file = AvroTestUtil.generateUnionSchema_WithNullValues(); - final String sql = "select * from dfs_test.`" + file + "`"; - test(sql); + simpleAvroTestHelper(AvroTestUtil.generateUnionSchema_WithNullValues(), "select * from dfs_test.`%s`"); } @Test public void testShouldFailSimpleUnionNonNullSchema_StarQuery() throws Exception { - final String file = AvroTestUtil.generateUnionSchema_WithNonNullValues(); + final String file = AvroTestUtil.generateUnionSchema_WithNonNullValues().getFilePath(); final String sql = "select * from dfs_test.`" + file + "`"; try { test(sql); @@ -284,7 +321,7 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testNestedUnionSchema_withNullValues() throws Exception { - final String file = AvroTestUtil.generateUnionNestedSchema_withNullValues(); + final String file = AvroTestUtil.generateUnionNestedSchema_withNullValues().getFilePath(); final String sql = "select t.c_record.nested_1_string,t.c_record.nested_1_int from dfs_test.`" + file + "` t"; test(sql); } @@ -292,7 +329,7 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testNestedUnionArraySchema_withNullValues() throws Exception { - final String file = AvroTestUtil.generateUnionNestedArraySchema_withNullValues(); + final String file = AvroTestUtil.generateUnionNestedArraySchema_withNullValues().getFilePath(); final String sql = "select t.c_array[0].nested_1_string,t.c_array[0].nested_1_int from dfs_test.`" + file + "` t"; test(sql); } @@ -300,7 +337,7 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testMapSchema_withNullValues() throws Exception { - final String file = AvroTestUtil.generateMapSchema_withNullValues(); + final String file = AvroTestUtil.generateMapSchema_withNullValues().getFilePath(); final String sql = "select c_map['key1'],c_map['key2'] from dfs_test.`" + file + "`"; test(sql); } @@ -308,22 +345,33 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testMapSchemaComplex_withNullValues() throws Exception { - final String file = AvroTestUtil.generateMapSchemaComplex_withNullValues(); - final String sql = "select d_map['key1'],d_map['key2'] from dfs_test.`" + file + "`"; - test(sql); + final String file = AvroTestUtil.generateMapSchemaComplex_withNullValues().getFilePath(); + final String sql = "select d_map['key1'] nested_key1, d_map['key2'] nested_key2 from dfs_test.`" + file + "`"; + + TestBuilder testBuilder = testBuilder() + .sqlQuery(sql) + .unOrdered() + .baselineColumns("nested_key1", "nested_key2"); + + final List<Object> expectedList = Lists.newArrayList(); + for (int i = 0; i < AvroTestUtil.ARRAY_SIZE; i++) { + expectedList.add((double)i); + } + final List<Object> emptyList = listOf(); + for (int i = 0; i < AvroTestUtil.RECORD_COUNT; i += 2) { + testBuilder.baselineValues(expectedList, expectedList); + testBuilder.baselineValues(emptyList, emptyList); + } + testBuilder.go(); } @Test public void testStringAndUtf8Data() throws Exception { - - final String file = AvroTestUtil.generateStringAndUtf8Data(); - final String sql = "select * from dfs_test.`" + file + "`"; - test(sql); + simpleAvroTestHelper(AvroTestUtil.generateStringAndUtf8Data(), "select * from dfs_test.`%s`"); } @Test public void testLinkedList() throws Exception { - final String file = AvroTestUtil.generateLinkedList(); final String sql = "select * from dfs_test.`" + file + "`"; test(sql); @@ -331,10 +379,14 @@ public class AvroFormatTest extends BaseTestQuery { @Test public void testCountStar() throws Exception { - - final String file = AvroTestUtil.generateStringAndUtf8Data(); - final String sql = "select count(*) from dfs_test.`" + file + "`"; - test(sql); + final String file = AvroTestUtil.generateStringAndUtf8Data().getFilePath(); + final String sql = "select count(*) as row_count from dfs_test.`" + file + "`"; + testBuilder() + .sqlQuery(sql) + .ordered() + .baselineColumns("row_count") + .baselineValues((long)AvroTestUtil.RECORD_COUNT) + .go(); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java index ce3f90ade..96508d826 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/avro/AvroTestUtil.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; @@ -36,23 +37,17 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import com.google.common.base.Charsets; +import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.util.JsonStringHashMap; +import org.apache.drill.exec.util.Text; /** * Utilities for generating Avro test data. */ public class AvroTestUtil { - public static final int RECORD_COUNT = 10; - - public static class AvroTestSetup { - private String filePath; - private List<Map<String, ?>> expectedRecords; - - public AvroTestSetup(String filePath, List<Map<String, ?>> expectedRecords) { - this.filePath = filePath; - this.expectedRecords = expectedRecords; - } - } + public static final int RECORD_COUNT = 10_000; + public static int ARRAY_SIZE = 20; /** * Class to write records to an Avro file while simultaneously @@ -62,7 +57,7 @@ public class AvroTestUtil { public static class AvroTestRecordWriter implements Closeable { private final List<Map<String, Object>> expectedRecords; GenericData.Record currentAvroRecord; - Map<String, Object> currentExpectedRecord; + TreeMap<String, Object> currentExpectedRecord; private Schema schema; private final DataFileWriter<GenericData.Record> writer; private final String filePath; @@ -75,28 +70,53 @@ public class AvroTestUtil { throw new RuntimeException("Error creating file in Avro test setup.", e); } this.schema = schema; - currentExpectedRecord = new HashMap<>(); + currentExpectedRecord = new TreeMap<>(); expectedRecords = new ArrayList<>(); filePath = file.getAbsolutePath(); } public void startRecord() { currentAvroRecord = new GenericData.Record(schema); - currentExpectedRecord = new HashMap<>(); + currentExpectedRecord = new TreeMap<>(); } public void put(String key, Object value) { currentAvroRecord.put(key, value); // convert binary values into byte[], the format they will be given // in the Drill result set in the test framework + currentExpectedRecord.put("`" + key + "`", convertAvroValToDrill(value, true)); + } + + // TODO - fix this the test wrapper to prevent the need for this hack + // to make the root behave differently than nested fields for String vs. Text + private Object convertAvroValToDrill(Object value, boolean root) { if (value instanceof ByteBuffer) { ByteBuffer bb = ((ByteBuffer)value); byte[] drillVal = new byte[((ByteBuffer)value).remaining()]; bb.get(drillVal); bb.position(0); value = drillVal; + } else if (!root && value instanceof CharSequence) { + value = new Text(value.toString()); + } else if (value instanceof GenericData.Array) { + GenericData.Array array = ((GenericData.Array) value); + final JsonStringArrayList<Object> drillList = new JsonStringArrayList<>(); + for (Object o : array) { + drillList.add(convertAvroValToDrill(o, false)); + } + value = drillList; + } else if (value instanceof GenericData.EnumSymbol) { + value = value.toString(); + } else if (value instanceof GenericData.Record) { + GenericData.Record rec = ((GenericData.Record) value); + final JsonStringHashMap<String, Object> newRecord = new JsonStringHashMap<>(); + for (Schema.Field field : rec.getSchema().getFields()) { + Object val = rec.get(field.name()); + newRecord.put(field.name(), convertAvroValToDrill(val, false)); + } + value = newRecord; } - currentExpectedRecord.put("`" + key + "`", value); + return value; } public void endRecord() throws IOException { @@ -167,7 +187,7 @@ public class AvroTestUtil { return record; } - public static String generateUnionSchema_WithNullValues() throws Exception { + public static AvroTestRecordWriter generateUnionSchema_WithNullValues() throws Exception { final Schema schema = SchemaBuilder.record("AvroRecordReaderTest") .namespace("org.apache.drill.exec.store.avro") @@ -186,15 +206,14 @@ public class AvroTestUtil { final File file = File.createTempFile("avro-primitive-test", ".avro"); file.deleteOnExit(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { - writer.create(schema, file); ByteBuffer bb = ByteBuffer.allocate(1); bb.put(0, (byte) 1); for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); record.put("c_long", (long) i); @@ -204,16 +223,16 @@ public class AvroTestUtil { record.put("g_null", null); record.put("h_boolean", (i % 2 == 0)); record.put("i_union", (i % 2 == 0 ? (double) i : null)); - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateUnionSchema_WithNonNullValues() throws Exception { + public static AvroTestRecordWriter generateUnionSchema_WithNonNullValues() throws Exception { final Schema schema = SchemaBuilder.record("AvroRecordReaderTest") .namespace("org.apache.drill.exec.store.avro") @@ -232,15 +251,14 @@ public class AvroTestUtil { final File file = File.createTempFile("avro-primitive-test", ".avro"); file.deleteOnExit(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { - writer.create(schema, file); ByteBuffer bb = ByteBuffer.allocate(1); bb.put(0, (byte) 1); for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); record.put("c_long", (long) i); @@ -250,16 +268,16 @@ public class AvroTestUtil { record.put("g_null", null); record.put("h_boolean", (i % 2 == 0)); record.put("i_union", (i % 2 == 0 ? (double) i : (long) i)); - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateSimpleEnumSchema_NoNullValues() throws Exception { + public static AvroTestRecordWriter generateSimpleEnumSchema_NoNullValues() throws Exception { final String[] symbols = { "E_SYM_A", "E_SYM_B", "E_SYM_C", "E_SYM_D" }; @@ -275,27 +293,25 @@ public class AvroTestUtil { final Schema enumSchema = schema.getField("b_enum").schema(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); - + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { - writer.create(schema, file); for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); final GenericData.EnumSymbol symbol = new GenericData.EnumSymbol(enumSchema, symbols[(i + symbols.length) % symbols.length]); record.put("b_enum", symbol); - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateSimpleArraySchema_NoNullValues() throws Exception { + public static AvroTestRecordWriter generateSimpleArraySchema_NoNullValues() throws Exception { final File file = File.createTempFile("avro-array-test", ".avro"); file.deleteOnExit(); @@ -310,46 +326,43 @@ public class AvroTestUtil { .name("e_float_array").type().array().items().floatType().noDefault() .endRecord(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { - writer.create(schema, file); - for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); { - GenericArray<String> array = new GenericData.Array<>(RECORD_COUNT, schema.getField("c_string_array").schema()); - for (int j = 0; j < RECORD_COUNT; j++) { + GenericArray<String> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("c_string_array").schema()); + for (int j = 0; j < ARRAY_SIZE; j++) { array.add(j, "c_string_array_" + i + "_" + j); } record.put("c_string_array", array); } { - GenericArray<Integer> array = new GenericData.Array<>(RECORD_COUNT, schema.getField("d_int_array").schema()); - for (int j = 0; j < RECORD_COUNT; j++) { + GenericArray<Integer> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("d_int_array").schema()); + for (int j = 0; j < ARRAY_SIZE; j++) { array.add(j, i * j); } record.put("d_int_array", array); } { - GenericArray<Float> array = new GenericData.Array<>(RECORD_COUNT, schema.getField("e_float_array").schema()); - for (int j = 0; j < RECORD_COUNT; j++) { + GenericArray<Float> array = new GenericData.Array<>(ARRAY_SIZE, schema.getField("e_float_array").schema()); + for (int j = 0; j < ARRAY_SIZE; j++) { array.add(j, (float) (i * j)); } record.put("e_float_array", array); } - - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateSimpleNestedSchema_NoNullValues() throws Exception { + public static AvroTestRecordWriter generateSimpleNestedSchema_NoNullValues() throws Exception { final File file = File.createTempFile("avro-nested-test", ".avro"); file.deleteOnExit(); @@ -370,12 +383,10 @@ public class AvroTestUtil { final Schema nestedSchema = schema.getField("c_record").schema(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); - writer.create(schema, file); - + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); @@ -384,16 +395,16 @@ public class AvroTestUtil { nestedRecord.put("nested_1_int", i * i); record.put("c_record", nestedRecord); - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateUnionNestedArraySchema_withNullValues() throws Exception { + public static AvroTestRecordWriter generateUnionNestedArraySchema_withNullValues() throws Exception { final File file = File.createTempFile("avro-nested-test", ".avro"); file.deleteOnExit(); @@ -414,12 +425,10 @@ public class AvroTestUtil { final Schema arraySchema = nestedSchema.getTypes().get(1); final Schema itemSchema = arraySchema.getElementType(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); - writer.create(schema, file); - + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); @@ -431,16 +440,16 @@ public class AvroTestUtil { array.add(nestedRecord); record.put("c_array", array); } - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateMapSchema_withNullValues() throws Exception { + public static AvroTestRecordWriter generateMapSchema_withNullValues() throws Exception { final File file = File.createTempFile("avro-nested-test", ".avro"); file.deleteOnExit(); @@ -453,12 +462,10 @@ public class AvroTestUtil { .name("c_map").type().optional().map().values(Schema.create(Type.STRING)) .endRecord(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); - writer.create(schema, file); - + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); @@ -468,16 +475,16 @@ public class AvroTestUtil { strMap.put("key2", "nested_1_string_" + (i + 1 )); record.put("c_map", strMap); } - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateMapSchemaComplex_withNullValues() throws Exception { + public static AvroTestRecordWriter generateMapSchemaComplex_withNullValues() throws Exception { final File file = File.createTempFile("avro-nested-test", ".avro"); file.deleteOnExit(); @@ -494,12 +501,10 @@ public class AvroTestUtil { final Schema arrayMapSchema = schema.getField("d_map").schema(); final Schema arrayItemSchema = arrayMapSchema.getTypes().get(1).getValueType(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); - writer.create(schema, file); - + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); @@ -510,8 +515,8 @@ public class AvroTestUtil { record.put("c_map", c_map); } else { Map<String, GenericArray<Double>> d_map = new HashMap<>(); - GenericArray<Double> array = new GenericData.Array<>(RECORD_COUNT, arrayItemSchema); - for (int j = 0; j < RECORD_COUNT; j++) { + GenericArray<Double> array = new GenericData.Array<>(ARRAY_SIZE, arrayItemSchema); + for (int j = 0; j < ARRAY_SIZE; j++) { array.add((double)j); } d_map.put("key1", array); @@ -519,16 +524,16 @@ public class AvroTestUtil { record.put("d_map", d_map); } - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateUnionNestedSchema_withNullValues() throws Exception { + public static AvroTestRecordWriter generateUnionNestedSchema_withNullValues() throws Exception { final File file = File.createTempFile("avro-nested-test", ".avro"); file.deleteOnExit(); @@ -548,12 +553,10 @@ public class AvroTestUtil { final Schema nestedSchema = schema.getField("c_record").schema(); final Schema optionalSchema = nestedSchema.getTypes().get(1); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); - writer.create(schema, file); - + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); @@ -563,16 +566,16 @@ public class AvroTestUtil { nestedRecord.put("nested_1_int", i * i); record.put("c_record", nestedRecord); } - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } - public static String generateDoubleNestedSchema_NoNullValues() throws Exception { + public static AvroTestRecordWriter generateDoubleNestedSchema_NoNullValues() throws Exception { final File file = File.createTempFile("avro-double-nested-test", ".avro"); file.deleteOnExit(); @@ -601,12 +604,10 @@ public class AvroTestUtil { final Schema nestedSchema = schema.getField("c_record").schema(); final Schema doubleNestedSchema = nestedSchema.getField("nested_1_record").schema(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); - writer.create(schema, file); - + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_int", i); @@ -621,13 +622,13 @@ public class AvroTestUtil { nestedRecord.put("nested_1_record", doubleNestedRecord); record.put("c_record", nestedRecord); - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } public static String generateLinkedList() throws Exception { @@ -665,7 +666,7 @@ public class AvroTestUtil { return file.getAbsolutePath(); } - public static String generateStringAndUtf8Data() throws Exception { + public static AvroTestRecordWriter generateStringAndUtf8Data() throws Exception { final Schema schema = SchemaBuilder.record("AvroRecordReaderTest") .namespace("org.apache.drill.exec.store.avro") @@ -677,23 +678,22 @@ public class AvroTestUtil { final File file = File.createTempFile("avro-primitive-test", ".avro"); file.deleteOnExit(); - final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)); + final AvroTestRecordWriter record = new AvroTestRecordWriter(schema, file); try { - writer.create(schema, file); ByteBuffer bb = ByteBuffer.allocate(1); bb.put(0, (byte) 1); for (int i = 0; i < RECORD_COUNT; i++) { - final GenericRecord record = new GenericData.Record(schema); + record.startRecord(); record.put("a_string", "a_" + i); record.put("b_utf8", "b_" + i); - writer.append(record); + record.endRecord(); } } finally { - writer.close(); + record.close(); } - return file.getAbsolutePath(); + return record; } } |