aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorJason Altekruse <altekrusejason@gmail.com>2014-09-05 15:58:47 -0700
committerAditya Kishore <aditya@maprtech.com>2014-09-11 19:24:08 -0700
commit20d2aa46e2b789d5fe09b1383ec559b4aa8f5316 (patch)
treeed68e16c9227b4dc0c221751054688eaa0bf3590 /exec
parent00431d2c87040e8f5e79ab09753c20e73b56744e (diff)
DRILL-1389: Incorrect results when reading nullable data out of parquet
Fixed numerous issues in the nullable reader for parquet, removed some cruft from previous attempts to fix earlier issues that did not properly address the root causes. The reader overall is simpler and cleaner. Verified against a variety of existing a new files with complete result checks comparing against Steven's parquet reader that uses the higher level interface. Wrote some new test tooling to allow for comparisons of large result sets. Re-enabled the fixed binary support that was accidentally removed in some of the last changes before 0.5. Fixed a bug where we were initializing the column data reader to read the uncompressed size of the column chunk, rather than the on-disk size.
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java81
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java427
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java56
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java179
8 files changed, 591 insertions, 187 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index 3d36b64ad..a2a191dfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.drill.exec.vector.VarCharVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
import parquet.format.ConvertedType;
@@ -65,12 +66,16 @@ public class ColumnReaderFactory {
if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
return new BitReader(recordReader, allocateSize, descriptor, columnChunkMetaData,
fixedLength, v, schemaElement);
- } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY && convertedType == ConvertedType.DECIMAL){
- int length = schemaElement.type_length;
- if (length <= 12) {
- return new FixedByteAlignedReader.Decimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- } else if (length <= 16) {
- return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ) {
+ if (convertedType == ConvertedType.DECIMAL){
+ int length = schemaElement.type_length;
+ if (length <= 12) {
+ return new FixedByteAlignedReader.Decimal28Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ } else if (length <= 16) {
+ return new FixedByteAlignedReader.Decimal38Reader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
+ }
+ } else {
+ return new FixedByteAlignedReader.FixedBinaryReader(recordReader, allocateSize, descriptor, columnChunkMetaData, (VariableWidthVector) v, schemaElement);
}
} else if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
return new FixedByteAlignedReader.DateReader(recordReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
index 2babc2066..8551ee0d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -36,30 +36,26 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
int bitsUsed;
BaseValueVector castedBaseVector;
NullableVectorDefinitionSetter castedVectorMutator;
+ long definitionLevelsRead;
+ long totalDefinitionLevelsRead;
NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
castedBaseVector = (BaseValueVector) v;
castedVectorMutator = (NullableVectorDefinitionSetter) v.getMutator();
+ totalDefinitionLevelsRead = 0;
}
+
public void processPages(long recordsToReadInThisPass) throws IOException {
+ int indexInOutputVector = 0;
readStartInBytes = 0;
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
vectorData = castedBaseVector.getData();
- do {
- // if no page has been read, or all of the records have been read out of a page, read the next one
- if (pageReader.currentPage == null
- || pageReader.valuesRead == pageReader.currentPage.getValueCount()) {
- if (!pageReader.next()) {
- break;
- }
- }
-
// values need to be spaced out where nulls appear in the column
// leaving blank space for nulls allows for random access to values
// to optimize copying data out of the buffered disk stream, runs of defined values
@@ -68,48 +64,54 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
long runStart = pageReader.readPosInBytes;
int runLength;
int currentDefinitionLevel;
- int currentValueIndexInVector = (int) recordsReadInThisIteration;
boolean lastValueWasNull;
- int definitionLevelsRead;
- // loop to find the longest run of defined values available, can be preceded by several nulls
- while (true){
- definitionLevelsRead = 0;
+ boolean lastRunBrokenByNull = false;
+ while (indexInOutputVector < recordsToReadInThisPass && indexInOutputVector < valueVec.getValueCapacity()){
+ // read a page if needed
+ if ( pageReader.currentPage == null
+ || ((readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0) &&
+ definitionLevelsRead >= pageReader.currentPage.getValueCount())) {
+ if (!pageReader.next()) {
+ break;
+ }
+ definitionLevelsRead = 0;
+ }
lastValueWasNull = true;
- nullsFound = 0;
runLength = 0;
- if (currentValueIndexInVector == recordsToReadInThisPass
- || currentValueIndexInVector >= valueVec.getValueCapacity()) {
- break;
+ if (lastRunBrokenByNull ) {
+ nullsFound = 1;
+ lastRunBrokenByNull = false;
+ } else {
+ nullsFound = 0;
}
- while(currentValueIndexInVector < recordsToReadInThisPass
- && currentValueIndexInVector < valueVec.getValueCapacity()
- && pageReader.valuesRead + definitionLevelsRead < pageReader.currentPage.getValueCount()){
+ // loop to find the longest run of defined values available, can be preceded by several nulls
+ while(indexInOutputVector < recordsToReadInThisPass
+ && indexInOutputVector < valueVec.getValueCapacity()
+ && definitionLevelsRead < pageReader.currentPage.getValueCount()){
currentDefinitionLevel = pageReader.definitionLevels.readInteger();
definitionLevelsRead++;
+ indexInOutputVector++;
+ totalDefinitionLevelsRead++;
if ( currentDefinitionLevel < columnDescriptor.getMaxDefinitionLevel()){
// a run of non-null values was found, break out of this loop to do a read in the outer loop
- nullsFound++;
if ( ! lastValueWasNull ){
- currentValueIndexInVector++;
+ lastRunBrokenByNull = true;
break;
}
+ nullsFound++;
lastValueWasNull = true;
}
else{
if (lastValueWasNull){
- runStart = pageReader.readPosInBytes;
runLength = 0;
lastValueWasNull = false;
}
runLength++;
- castedVectorMutator.setIndexDefined(currentValueIndexInVector);
+ castedVectorMutator.setIndexDefined(indexInOutputVector - 1);
}
- currentValueIndexInVector++;
}
- pageReader.readPosInBytes = runStart;
- recordsReadInThisIteration = runLength;
+ valuesReadInCurrentPass += nullsFound;
- readField( runLength);
int writerIndex = ((BaseValueVector) valueVec).getData().writerIndex();
if ( dataTypeLengthInBits > 8 || (dataTypeLengthInBits < 8 && totalValuesRead + runLength % 8 == 0)){
castedBaseVector.getData().setIndex(0, writerIndex + (int) Math.ceil( nullsFound * dataTypeLengthInBits / 8.0));
@@ -117,20 +119,21 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
else if (dataTypeLengthInBits < 8){
rightBitShift += dataTypeLengthInBits * nullsFound;
}
+ this.recordsReadInThisIteration = runLength;
+
+ // set up metadata
+ this.readStartInBytes = pageReader.readPosInBytes;
+ this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
+ this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
+ readField( runLength);
recordsReadInThisIteration += nullsFound;
- valuesReadInCurrentPass += recordsReadInThisIteration;
+ valuesReadInCurrentPass += runLength;
totalValuesRead += recordsReadInThisIteration;
pageReader.valuesRead += recordsReadInThisIteration;
- if ( (readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0)
- || pageReader.valuesRead == pageReader.currentPage.getValueCount()) {
- if (!pageReader.next()) {
- break;
- }
- } else {
- pageReader.readPosInBytes = readStartInBytes + readLength;
- }
+
+ pageReader.readPosInBytes = readStartInBytes + readLength;
}
- } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
+ valuesReadInCurrentPass = indexInOutputVector;
valueVec.getMutator().setValueCount(
valuesReadInCurrentPass);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 557bd9fc7..fc1df7994 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -54,12 +54,6 @@ public class NullableFixedByteAlignedReaders {
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
- this.recordsReadInThisIteration = recordsToReadInThisPass;
-
- // set up metadata
- this.readStartInBytes = pageReader.readPosInBytes;
- this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
- this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
this.bytebuf = pageReader.pageDataByteArray;
// fill in data.
@@ -171,16 +165,10 @@ public class NullableFixedByteAlignedReaders {
@Override
protected void readField(long recordsToReadInThisPass) {
- this.recordsReadInThisIteration = recordsToReadInThisPass;
-
- // set up metadata
- this.readStartInBytes = pageReader.readPosInBytes;
- this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
- this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
this.bytebuf = pageReader.pageDataByteArray;
dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
- for (int i = 0; i < recordsReadInThisIteration; i++) {
+ for (int i = 0; i < recordsToReadInThisPass; i++) {
addNext((int) readStartInBytes + i * dataTypeLengthInBytes, i + valuesReadInCurrentPass);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 639577dfd..7b77e0cf6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -104,7 +104,7 @@ final class PageReader {
long start = columnChunkMetaData.getFirstDataPageOffset();
try {
FSDataInputStream f = fs.open(path);
- this.dataReader = new ColumnDataReader(f, start, totalByteLength);
+ this.dataReader = new ColumnDataReader(f, start, columnChunkMetaData.getTotalSize());
if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
f.seek(columnChunkMetaData.getDictionaryPageOffset());
PageHeader pageHeader = Util.readPageHeader(f);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index f3d9e2cd2..9f2012a4f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -425,7 +425,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
}
-
+// logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
return firstColumnStatus.getRecordsReadInCurrentPass();
} catch (IOException e) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 268d03d99..210267423 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -18,8 +18,11 @@
package org.apache.drill.exec.physical.impl.writer;
import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -28,9 +31,12 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.HyperVectorWrapper;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,11 +46,15 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+
public class TestParquetWriter extends BaseTestQuery {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetWriter.class);
static FileSystem fs;
+ private static final boolean VERBOSE_DEBUG = false;
+
@BeforeClass
public static void initFs() throws Exception {
Configuration conf = new Configuration();
@@ -161,8 +171,6 @@ public class TestParquetWriter extends BaseTestQuery {
runTestAndValidate("*", "*", inputTable, "basic_repeated");
}
- // TODO - this is failing due to the parquet behavior of allowing repeated values to reach across
- // pages. This broke our reading model a bit, but it is possible to work around.
@Test
public void testRepeatedDouble() throws Exception {
String inputTable = "cp.`parquet/repeated_double_data.json`";
@@ -220,14 +228,22 @@ public class TestParquetWriter extends BaseTestQuery {
runTestAndValidate(selection, validateSelection, inputTable, "foodmart_employee_parquet");
}
- @Test
- @Ignore
- public void testParquetRead() throws Exception {
+ public void compareParquetReaders(String selection, String table) throws Exception {
test("alter system set `store.parquet.use_new_reader` = true");
- List<QueryResultBatch> expected = testSqlWithResults("select * from dfs.`/tmp/voter`");
+ List<QueryResultBatch> expected = testSqlWithResults("select " + selection + " from " + table);
+
+ RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+ BatchSchema schema = null;
+
+ List<Map> expectedRecords = new ArrayList<>();
+ addToMaterializedResults(expectedRecords, expected, loader, schema);
+
test("alter system set `store.parquet.use_new_reader` = false");
- List<QueryResultBatch> results = testSqlWithResults("select * from dfs.`/tmp/voter`");
- compareResults(expected, results);
+ List<QueryResultBatch> results = testSqlWithResults("select " + selection + " from " + table);
+
+ List<Map> actualRecords = new ArrayList<>();
+ addToMaterializedResults(actualRecords, results, loader, schema);
+ compareResults(expectedRecords, actualRecords);
for (QueryResultBatch result : results) {
result.release();
}
@@ -236,14 +252,47 @@ public class TestParquetWriter extends BaseTestQuery {
}
}
- @Ignore
- @Test
- public void testParquetRead2() throws Exception {
+ public void compareParquetReadersColumnar(String selection, String table) throws Exception {
test("alter system set `store.parquet.use_new_reader` = true");
- List<QueryResultBatch> expected = testSqlWithResults("select s_comment,s_suppkey from dfs.`/tmp/sf100_supplier.parquet`");
+ List<QueryResultBatch> expected = testSqlWithResults("select " + selection + " from " + table);
+
+ RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+ BatchSchema schema = null;
+
+ Map<String, List> expectedSuperVectors = addToCombinedVectorResults(expected, loader, schema);
+
+ test("alter system set `store.parquet.use_new_reader` = false");
+ List<QueryResultBatch> results = testSqlWithResults("select " + selection + " from " + table);
+
+ Map<String, List> actualSuperVectors = addToCombinedVectorResults(results, loader, schema);
+ compareMergedVectors(expectedSuperVectors, actualSuperVectors);
+ for (QueryResultBatch result : results) {
+ result.release();
+ }
+ for (QueryResultBatch result : expected) {
+ result.release();
+ }
+ }
+
+ public void compareParquetReadersHyperVector(String selection, String table) throws Exception {
+ RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+ BatchSchema schema = null;
+
+ // TODO - It didn't seem to respect the max width per node setting, so I went in and modified the SimpleParalellizer directly.
+ // I backed out the changes after the test passed.
+// test("alter system set `planner.width.max_per_node` = 1");
test("alter system set `store.parquet.use_new_reader` = false");
- List<QueryResultBatch> results = testSqlWithResults("select s_comment,s_suppkey from dfs.`/tmp/sf100_supplier.parquet`");
- compareResults(expected, results);
+ String query = "select " + selection + " from " + table;
+ List<QueryResultBatch> results = testSqlWithResults(query);
+
+ Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader, schema);
+
+ test("alter system set `store.parquet.use_new_reader` = true");
+ List<QueryResultBatch> expected = testSqlWithResults(query);
+
+ Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader, schema);
+
+ compareHyperVectors(expectedSuperVectors, actualSuperVectors);
for (QueryResultBatch result : results) {
result.release();
}
@@ -252,6 +301,85 @@ public class TestParquetWriter extends BaseTestQuery {
}
}
+ @Ignore
+ @Test
+ public void testReadVoter() throws Exception {
+ compareParquetReadersHyperVector("*", "dfs.`/tmp/voter.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void testReadSf_100_supplier() throws Exception {
+ compareParquetReadersHyperVector("*", "dfs.`/tmp/sf100_supplier.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void testParquetRead_checkNulls_NullsFirst() throws Exception {
+ compareParquetReadersColumnar("*", "dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void testParquetRead_checkNulls() throws Exception {
+ compareParquetReadersColumnar("*", "dfs.`/tmp/parquet_with_nulls_should_sum_100000.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void test958_sql() throws Exception {
+ compareParquetReadersHyperVector("ss_ext_sales_price", "dfs.`/tmp/store_sales`");
+ }
+
+ @Ignore
+ @Test
+ public void testReadSf_1_supplier() throws Exception {
+ compareParquetReadersHyperVector("*", "dfs.`/tmp/orders_part-m-00001.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void test958_sql_all_columns() throws Exception {
+ compareParquetReadersHyperVector("*", "dfs.`/tmp/store_sales`");
+ compareParquetReadersHyperVector("ss_addr_sk, ss_hdemo_sk", "dfs.`/tmp/store_sales`");
+ // TODO - Drill 1388 - this currently fails, but it is an issue with project, not the reader, pulled out the physical plan
+ // removed the unneeded project in the plan and ran it against both readers, they outputs matched
+// compareParquetReadersHyperVector("pig_schema,ss_sold_date_sk,ss_item_sk,ss_cdemo_sk,ss_addr_sk, ss_hdemo_sk",
+// "dfs.`/tmp/store_sales`");
+ }
+
+ @Ignore
+ @Test
+ public void testDrill_1314() throws Exception {
+ compareParquetReadersColumnar("l_partkey ", "dfs.`/tmp/drill_1314.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void testDrill_1314_all_columns() throws Exception {
+ compareParquetReadersHyperVector("*", "dfs.`/tmp/drill_1314.parquet`");
+ compareParquetReadersColumnar("l_orderkey,l_partkey,l_suppkey,l_linenumber, l_quantity, l_extendedprice,l_discount,l_tax",
+ "dfs.`/tmp/drill_1314.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void testParquetRead_checkShortNullLists() throws Exception {
+ compareParquetReadersColumnar("*", "dfs.`/tmp/short_null_lists.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void testParquetRead_checkStartWithNull() throws Exception {
+ compareParquetReadersColumnar("*", "dfs.`/tmp/start_with_null.parquet`");
+ }
+
+ @Ignore
+ @Test
+ public void testParquetReadWebReturns() throws Exception {
+ compareParquetReadersColumnar("wr_returning_customer_sk", "dfs.`/tmp/web_returns`");
+ }
+
public void runTestAndValidate(String selection, String validationSelection, String inputTable, String outputFile) throws Exception {
Path path = new Path("/tmp/" + outputFile);
@@ -265,9 +393,19 @@ public class TestParquetWriter extends BaseTestQuery {
String create = "CREATE TABLE " + outputFile + " AS " + query;
String validateQuery = String.format("SELECT %s FROM " + outputFile, validationSelection);
test(create);
+
+ RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
+ BatchSchema schema = null;
+
List<QueryResultBatch> expected = testSqlWithResults(query);
+ List<Map> expectedRecords = new ArrayList<>();
+ addToMaterializedResults(expectedRecords, expected, loader, schema);
+
List<QueryResultBatch> results = testSqlWithResults(validateQuery);
- compareResults(expected, results);
+ List<Map> actualRecords = new ArrayList<>();
+ addToMaterializedResults(actualRecords, results, loader, schema);
+
+ compareResults(expectedRecords, actualRecords);
for (QueryResultBatch result : results) {
result.release();
}
@@ -276,17 +414,205 @@ public class TestParquetWriter extends BaseTestQuery {
}
}
+ public void compareHyperVectors(Map<String, HyperVectorValueIterator> expectedRecords,
+ Map<String, HyperVectorValueIterator> actualRecords) throws Exception {
+ for (String s : expectedRecords.keySet()) {
+ assertEquals(expectedRecords.get(s).getTotalRecords(), actualRecords.get(s).getTotalRecords());
+ HyperVectorValueIterator expectedValues = expectedRecords.get(s);
+ HyperVectorValueIterator actualValues = actualRecords.get(s);
+ int i = 0;
+ while (expectedValues.hasNext()) {
+ compareValues(expectedValues.next(), actualValues.next(), i, s);
+ i++;
+ }
+ }
+ for (HyperVectorValueIterator hvi : expectedRecords.values()) {
+ for (ValueVector vv : hvi.hyperVector.getValueVectors()) {
+ vv.clear();
+ }
+ }
+ for (HyperVectorValueIterator hvi : actualRecords.values()) {
+ for (ValueVector vv : hvi.hyperVector.getValueVectors()) {
+ vv.clear();
+ }
+ }
+ }
+
+ public void compareMergedVectors(Map<String, List> expectedRecords, Map<String, List> actualRecords) throws Exception {
+ for (String s : expectedRecords.keySet()) {
+ assertEquals(expectedRecords.get(s).size(), actualRecords.get(s).size());
+ List expectedValues = expectedRecords.get(s);
+ List actualValues = actualRecords.get(s);
+ for (int i = 0; i < expectedValues.size(); i++) {
+ compareValues(expectedValues.get(i), actualValues.get(i), i, s);
+ }
+ }
+ }
+
+ public Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryResultBatch> records, RecordBatchLoader loader,
+ BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
+ // TODO - this does not handle schema changes
+ Map<String, HyperVectorValueIterator> combinedVectors = new HashMap();
+
+ long totalRecords = 0;
+ QueryResultBatch batch;
+ int size = records.size();
+ for (int i = 0; i < size; i++) {
+ batch = records.get(i);
+ loader = new RecordBatchLoader(getAllocator());
+ loader.load(batch.getHeader().getDef(), batch.getData());
+ logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
+ totalRecords += loader.getRecordCount();
+ for (VectorWrapper w : loader) {
+ String field = w.getField().toExpr();
+ if ( ! combinedVectors.containsKey(field)) {
+ MaterializedField mf = w.getField();
+ ValueVector[] vvList = (ValueVector[]) Array.newInstance(mf.getValueClass(), 1);
+ vvList[0] = w.getValueVector();
+ combinedVectors.put(mf.getPath().toExpr(), new HyperVectorValueIterator(mf, new HyperVectorWrapper(mf,
+ vvList)));
+ } else {
+ combinedVectors.get(field).hyperVector.addVector(w.getValueVector());
+ }
+
+ }
+ }
+ for (HyperVectorValueIterator hvi : combinedVectors.values()) {
+ hvi.determineTotalSize();
+ }
+ return combinedVectors;
+ }
+
+ public Map<String, List> addToCombinedVectorResults(List<QueryResultBatch> records, RecordBatchLoader loader,
+ BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
+ // TODO - this does not handle schema changes
+ Map<String, List> combinedVectors = new HashMap();
+
+ long totalRecords = 0;
+ QueryResultBatch batch;
+ int size = records.size();
+ for (int i = 0; i < size; i++) {
+ batch = records.get(0);
+ loader.load(batch.getHeader().getDef(), batch.getData());
+ if (schema == null) {
+ schema = loader.getSchema();
+ for (MaterializedField mf : schema) {
+ combinedVectors.put(mf.getPath().toExpr(), new ArrayList());
+ }
+ }
+ logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
+ totalRecords += loader.getRecordCount();
+ for (VectorWrapper w : loader) {
+ String field = w.getField().toExpr();
+ for (int j = 0; j < loader.getRecordCount(); j++) {
+ if (totalRecords - loader.getRecordCount() + j > 5000000) continue;
+ Object obj = w.getValueVector().getAccessor().getObject(j);
+ if (obj != null) {
+ if (obj instanceof Text) {
+ obj = obj.toString();
+ if (obj.equals("")) {
+ System.out.println(w.getField());
+ }
+ }
+ else if (obj instanceof byte[]) {
+ obj = new String((byte[]) obj, "UTF-8");
+ }
+ }
+ combinedVectors.get(field).add(obj);
+ }
+ }
+ records.remove(0);
+ batch.release();
+ loader.clear();
+ }
+ return combinedVectors;
+ }
+
+ public static class HyperVectorValueIterator implements Iterator<Object>{
+ private MaterializedField mf;
+ HyperVectorWrapper hyperVector;
+ private int indexInVectorList;
+ private int indexInCurrentVector;
+ private ValueVector currVec;
+ private long totalValues;
+ private long totalValuesRead;
+ // limit how many values will be read out of this iterator
+ private long recordLimit;
+
+ public HyperVectorValueIterator(MaterializedField mf, HyperVectorWrapper hyperVector) {
+ this.mf = mf;
+ this.hyperVector = hyperVector;
+ this.totalValues = 0;
+ this.indexInCurrentVector = 0;
+ this.indexInVectorList = 0;
+ this.recordLimit = -1;
+ }
+
+ public void setRecordLimit(long limit) {
+ this.recordLimit = limit;
+ }
+
+ public long getTotalRecords() {
+ if (recordLimit > 0) {
+ return recordLimit;
+ } else {
+ return totalValues;
+ }
+ }
+
+ public void determineTotalSize() {
+ for (ValueVector vv : hyperVector.getValueVectors()) {
+ this.totalValues += vv.getAccessor().getValueCount();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (totalValuesRead == recordLimit) return false;
+ if (indexInVectorList < hyperVector.getValueVectors().length) {
+ return true;
+ } else if ( indexInCurrentVector < currVec.getAccessor().getValueCount()) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Object next() {
+ if (currVec == null || indexInCurrentVector == currVec.getValueCapacity()) {
+ currVec = hyperVector.getValueVectors()[indexInVectorList];
+ indexInVectorList++;
+ indexInCurrentVector = 0;
+ }
+ Object obj = currVec.getAccessor().getObject(indexInCurrentVector);
+ indexInCurrentVector++;
+ totalValuesRead++;
+ return obj;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
public void addToMaterializedResults(List<Map> materializedRecords, List<QueryResultBatch> records, RecordBatchLoader loader,
BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
- for (QueryResultBatch batch : records) {
+ long totalRecords = 0;
+ QueryResultBatch batch;
+ int size = records.size();
+ for (int i = 0; i < size; i++) {
+ batch = records.get(0);
loader.load(batch.getHeader().getDef(), batch.getData());
if (schema == null) {
schema = loader.getSchema();
}
- for (int i = 0; i < loader.getRecordCount(); i++) {
+ logger.debug("reading batch with " + loader.getRecordCount() + " rows, total read so far " + totalRecords);
+ totalRecords += loader.getRecordCount();
+ for (int j = 0; j < loader.getRecordCount(); j++) {
HashMap<String, Object> record = new HashMap<>();
for (VectorWrapper w : loader) {
- Object obj = w.getValueVector().getAccessor().getObject(i);
+ Object obj = w.getValueVector().getAccessor().getObject(j);
if (obj != null) {
if (obj instanceof Text) {
obj = obj.toString();
@@ -303,47 +629,66 @@ public class TestParquetWriter extends BaseTestQuery {
}
materializedRecords.add(record);
}
+ records.remove(0);
+ batch.release();
loader.clear();
}
}
- public void compareResults(List<QueryResultBatch> expected, List<QueryResultBatch> result) throws Exception {
- List<Map> expectedRecords = new ArrayList<>();
- List<Map> actualRecords = new ArrayList<>();
+ public void compareValues(Object expected, Object actual, int counter, String column) throws Exception {
- BatchSchema schema = null;
- RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
- addToMaterializedResults(expectedRecords, expected, loader, schema);
- addToMaterializedResults(actualRecords, result, loader, schema);
+ if ( expected == null ) {
+ if (actual == null ) {
+ if (VERBOSE_DEBUG) logger.debug("(1) at position " + counter + " column '" + column + "' matched value: " + expected );
+ return;
+ } else {
+ throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: " + expected + " but received " + actual);
+ }
+ }
+ if ( actual == null) {
+ throw new Exception("unexpected null at position " + counter + " column '" + column + "' should have been: " + expected);
+ }
+ if (actual instanceof byte[]) {
+ if ( ! Arrays.equals((byte[]) expected, (byte[]) actual)) {
+ throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: "
+ + new String((byte[])expected, "UTF-8") + " but received " + new String((byte[])actual, "UTF-8"));
+ } else {
+ if (VERBOSE_DEBUG) logger.debug("at position " + counter + " column '" + column + "' matched value " + new String((byte[])expected, "UTF-8"));
+ return;
+ }
+ }
+ if ( ! expected.equals(actual)) {
+ throw new Exception("at position " + counter + " column '" + column + "' mismatched values, expected: " + expected + " but received " + actual);
+ } else {
+ if (VERBOSE_DEBUG) logger.debug("at position " + counter + " column '" + column + "' matched value: " + expected );
+ }
+ }
+
+ public void compareResults(List<Map> expectedRecords, List<Map> actualRecords) throws Exception {
Assert.assertEquals("Different number of records returned", expectedRecords.size(), actualRecords.size());
- String missing = "";
+ StringBuilder missing = new StringBuilder();
int i = 0;
int counter = 0;
int missmatch;
for (Map<String, Object> record : expectedRecords) {
missmatch = 0;
- counter++;
for (String column : record.keySet()) {
- if ( actualRecords.get(i).get(column) == null && expectedRecords.get(i).get(column) == null ) {
- continue;
- }
- if (actualRecords.get(i).get(column) == null)
- continue;
- if ( (actualRecords.get(i).get(column) == null && record.get(column) == null) || ! actualRecords.get(i).get(column).equals(record.get(column))) {
- missmatch++;
- }
+ compareValues(record.get(column), actualRecords.get(i).get(column), counter, column );
}
- if ( ! actualRecords.remove(record)) {
- missing += missmatch + ",";
+ if ( ! actualRecords.get(i).equals(record)) {
+ System.out.println("mismatch at position " + counter );
+ missing.append(missmatch);
+ missing.append(",");
}
- else {
- i--;
+
+ counter++;
+ if (counter % 100000 == 0 ) {
+ System.out.println("checked so far:" + counter);
}
i++;
}
- logger.debug(missing);
+ logger.debug(missing.toString());
System.out.println(missing);
- Assert.assertEquals(0, actualRecords.size());
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index ecdb990c2..c49c328bd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -48,11 +48,17 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.CachedSingleFileSystem;
import org.apache.drill.exec.store.TestOutputMutator;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -137,6 +143,45 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
testFull(QueryType.SQL, "select L_RECEIPTDATE from dfs.`/tmp/lineitem_null_dict.parquet`", "", 1, 1, 100000, false);
}
+ @Test
+ public void testNullableAgg() throws Exception {
+
+ List<QueryResultBatch> result = testSqlWithResults("select sum(a) as total_sum from dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
+ assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
+ RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
+ .getAllocator());
+
+ QueryResultBatch b = result.get(0);
+ loader.load(b.getHeader().getDef(), b.getData());
+
+ VectorWrapper vw = loader.getValueAccessorById(
+ NullableBigIntVector.class, //
+ loader.getValueVectorId(SchemaPath.getCompoundPath("total_sum")).getFieldIds() //
+ );
+ assertEquals(4999950000l, vw.getValueVector().getAccessor().getObject(0));
+ b.release();
+ loader.clear();
+ }
+
+ @Test
+ public void testNullableFilter() throws Exception {
+ List<QueryResultBatch> result = testSqlWithResults("select count(wr_return_quantity) as row_count from dfs.`/tmp/web_returns` where wr_return_quantity = 1");
+ assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
+ RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
+ .getAllocator());
+
+ QueryResultBatch b = result.get(0);
+ loader.load(b.getHeader().getDef(), b.getData());
+
+ VectorWrapper vw = loader.getValueAccessorById(
+ BigIntVector.class, //
+ loader.getValueVectorId(SchemaPath.getCompoundPath("row_count")).getFieldIds() //
+ );
+ assertEquals(3573l, vw.getValueVector().getAccessor().getObject(0));
+ b.release();
+ loader.clear();
+ }
+
@Test
public void testFixedBinary() throws Exception {
@@ -168,6 +213,12 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
}
@Test
+ public void testDrill_1314_all_columns() throws Exception {
+ testFull(QueryType.SQL, "select * " +
+ "from dfs.`/tmp/drill_1314.parquet`", "", 1,1, 10000, false);
+ }
+
+ @Test
public void testDictionaryError_419() throws Exception {
testFull(QueryType.SQL, "select c_address from dfs.`/tmp/customer_snappyimpala_drill_419.parquet`", "", 1, 1, 150000, false);
}
@@ -190,6 +241,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
testFull(QueryType.SQL, "select cust_key, address, non_existent_column, non_existent_col_2 from dfs.`/tmp/customer.dict.parquet`", "", 1, 1, 150000, false);
}
+ @Ignore // ignored for now for performance
@Test
public void testTPCHPerformace_SF1() throws Exception {
testFull(QueryType.SQL, "select * from dfs.`/tmp/orders_part-m-00001.parquet`", "", 1, 1, 150000, false);
@@ -489,10 +541,10 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
@Test
public void test958_sql() throws Exception {
- testFull(QueryType.SQL, "select ss_ext_sales_price from dfs.`/tmp/store_sales`", "", 1, 1, 30000000, false);
+// testFull(QueryType.SQL, "select ss_ext_sales_price from dfs.`/tmp/store_sales`", "", 1, 1, 30000000, false);
+ testFull(QueryType.SQL, "select * from dfs.`/tmp/store_sales`", "", 1, 1, 30000000, false);
}
-
@Test
public void drill_958bugTest() throws Exception {
HashMap<String, FieldInfo> fields = new HashMap<>();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index a624234d5..dd198f51e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -112,81 +112,119 @@ public class ParquetResultListener implements UserResultsListener {
}
// used to make sure each vector in the batch has the same number of records
- int valueCount = -1;
+ int valueCount = batchLoader.getRecordCount();
- int recordCount = 0;
// print headers.
if (schemaChanged) {
} // do not believe any change is needed for when the schema changes, with the current mock scan use case
for (VectorWrapper vw : batchLoader) {
ValueVector vv = vw.getValueVector();
- currentField = props.fields.get(vv.getField().getAsSchemaPath().getRootSegment().getPath());
- if (ParquetRecordReaderTest.VERBOSE_DEBUG){
- System.out.println("\n" + vv.getField().getAsSchemaPath().getRootSegment().getPath());
- }
- if ( ! valuesChecked.containsKey(vv.getField().getAsSchemaPath().getRootSegment().getPath())){
- valuesChecked.put(vv.getField().getAsSchemaPath().getRootSegment().getPath(), 0);
+ currentField = props.fields.get(vv.getField().getPath().getRootSegment().getPath());
+ if ( ! valuesChecked.containsKey(vv.getField().getPath().getRootSegment().getPath())){
+ valuesChecked.put(vv.getField().getPath().getRootSegment().getPath(), 0);
columnValCounter = 0;
} else {
- columnValCounter = valuesChecked.get(vv.getField().getAsSchemaPath().getRootSegment().getPath());
+ columnValCounter = valuesChecked.get(vv.getField().getPath().getRootSegment().getPath());
}
- for (int j = 0; j < vv.getAccessor().getValueCount(); j++) {
- if (ParquetRecordReaderTest.VERBOSE_DEBUG){
- Object o = vv.getAccessor().getObject(j);
- if (o instanceof byte[]) {
- try {
- o = new String((byte[])o, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
- System.out.print(Strings.padStart(o + "", 20, ' ') + " ");
- System.out.print(", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
- }
- if (testValues){
+ printColumnMajor(vv);
+
+ if (testValues){
+ for (int j = 0; j < vv.getAccessor().getValueCount(); j++) {
assertField(vv, j, currentField.type,
currentField.values[columnValCounter % 3], currentField.name + "/");
+ columnValCounter++;
}
- columnValCounter++;
- }
- if (ParquetRecordReaderTest.VERBOSE_DEBUG){
- System.out.println("\n" + vv.getAccessor().getValueCount());
- }
- valuesChecked.remove(vv.getField().getAsSchemaPath().getRootSegment().getPath());
- if (valueCount == -1) {
- valueCount = columnValCounter;
- }
- else {
- assertEquals("Mismatched value count for vectors in the same batch.", valueCount, columnValCounter);
+ } else {
+ columnValCounter += vv.getAccessor().getValueCount();
}
- valuesChecked.put(vv.getField().getAsSchemaPath().getRootSegment().getPath(), columnValCounter);
+
+ valuesChecked.remove(vv.getField().getPath().getRootSegment().getPath());
+ assertEquals("Mismatched value count for vectors in the same batch.", valueCount, vv.getAccessor().getValueCount());
+ valuesChecked.put(vv.getField().getPath().getRootSegment().getPath(), columnValCounter);
}
if (ParquetRecordReaderTest.VERBOSE_DEBUG){
- for (int i = 0; i < batchLoader.getRecordCount(); i++) {
- recordCount++;
- if (i % 50 == 0){
- System.out.println();
- for (VectorWrapper vw : batchLoader) {
- ValueVector v = vw.getValueVector();
- System.out.print(Strings.padStart(v.getField().getAsSchemaPath().getRootSegment().getPath(), 20, ' ') + " ");
+ printRowMajor(batchLoader);
+ }
+ batchCounter++;
+ if(result.getHeader().getIsLastChunk()){
+ checkLastChunk(batchLoader, result);
+ }
+
+ batchLoader.clear();
+ result.release();
+ }
+
+ public void checkLastChunk(RecordBatchLoader batchLoader, QueryResultBatch result) {
+ int recordsInBatch = -1;
+ // ensure the right number of columns was returned, especially important to ensure selective column read is working
+ if (testValues) {
+ assertEquals( "Unexpected number of output columns from parquet scan.", props.fields.keySet().size(), valuesChecked.keySet().size() );
+ }
+ for (String s : valuesChecked.keySet()) {
+ try {
+ if (recordsInBatch == -1 ){
+ recordsInBatch = valuesChecked.get(s);
+ } else {
+ assertEquals("Mismatched record counts in vectors.", recordsInBatch, valuesChecked.get(s).intValue());
+ }
+ assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s));
+ } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
+ }
+
+ assert valuesChecked.keySet().size() > 0;
+ batchLoader.clear();
+ result.release();
+ future.set(null);
+ }
+ public void printColumnMajor(ValueVector vv) {
+ if (ParquetRecordReaderTest.VERBOSE_DEBUG){
+ System.out.println("\n" + vv.getField().getAsSchemaPath().getRootSegment().getPath());
+ }
+ for (int j = 0; j < vv.getAccessor().getValueCount(); j++) {
+ if (ParquetRecordReaderTest.VERBOSE_DEBUG){
+ Object o = vv.getAccessor().getObject(j);
+ if (o instanceof byte[]) {
+ try {
+ o = new String((byte[])o, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
}
- System.out.println();
- System.out.println();
}
+ System.out.print(Strings.padStart(o + "", 20, ' ') + " ");
+ System.out.print(", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
+ }
+ }
+ if (ParquetRecordReaderTest.VERBOSE_DEBUG){
+ System.out.println("\n" + vv.getAccessor().getValueCount());
+ }
+ }
+ public void printRowMajor(RecordBatchLoader batchLoader) {
+ for (int i = 0; i < batchLoader.getRecordCount(); i++) {
+ if (i % 50 == 0){
+ System.out.println();
for (VectorWrapper vw : batchLoader) {
ValueVector v = vw.getValueVector();
- Object o = v.getAccessor().getObject(i);
- if (o instanceof byte[]) {
- try {
- // TODO - in the dictionary read error test there is some data that does not look correct
- // the output of our reader matches the values of the parquet-mr cat/head tools (no full comparison was made,
- // but from a quick check of a few values it looked consistent
- // this might have gotten corrupted by pig somehow, or maybe this is just how the data is supposed ot look
- // TODO - check this!!
+ System.out.print(Strings.padStart(v.getField().getAsSchemaPath().getRootSegment().getPath(), 20, ' ') + " ");
+
+ }
+ System.out.println();
+ System.out.println();
+ }
+
+ for (VectorWrapper vw : batchLoader) {
+ ValueVector v = vw.getValueVector();
+ Object o = v.getAccessor().getObject(i);
+ if (o instanceof byte[]) {
+ try {
+ // TODO - in the dictionary read error test there is some data that does not look correct
+ // the output of our reader matches the values of the parquet-mr cat/head tools (no full comparison was made,
+ // but from a quick check of a few values it looked consistent
+ // this might have gotten corrupted by pig somehow, or maybe this is just how the data is supposed ot look
+ // TODO - check this!!
// for (int k = 0; k < ((byte[])o).length; k++ ) {
// // check that the value at each position is a valid single character ascii value.
//
@@ -194,42 +232,15 @@ public class ParquetResultListener implements UserResultsListener {
// System.out.println("batch: " + batchCounter + " record: " + recordCount);
// }
// }
- o = new String((byte[])o, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
+ o = new String((byte[])o, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
}
- System.out.print(Strings.padStart(o + "", 20, ' ') + " ");
}
- System.out.println();
- }
- }
- batchCounter++;
- int recordsInBatch = -1;
- if(result.getHeader().getIsLastChunk()){
- // ensure the right number of columns was returned, especially important to ensure selective column read is working
- if (testValues) {
- assertEquals( "Unexpected number of output columns from parquet scan.", props.fields.keySet().size(), valuesChecked.keySet().size() );
+ System.out.print(Strings.padStart(o + "", 20, ' ') + " ");
}
- for (String s : valuesChecked.keySet()) {
- try {
- if (recordsInBatch == -1 ){
- recordsInBatch = valuesChecked.get(s);
- } else {
- assertEquals("Mismatched record counts in vectors.", recordsInBatch, valuesChecked.get(s).intValue());
- }
- assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s));
- } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
- }
-
- assert valuesChecked.keySet().size() > 0;
- batchLoader.clear();
- result.release();
- future.set(null);
+ System.out.println();
}
-
- batchLoader.clear();
- result.release();
}
public void getResults() throws RpcException{