aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec
diff options
context:
space:
mode:
authorJinfeng Ni <jni@apache.org>2017-05-17 16:08:00 -0700
committerJinfeng Ni <jni@apache.org>2017-09-05 12:07:23 -0700
commitfde0a1df1734e0742b49aabdd28b02202ee2b044 (patch)
treef5d408914895d1b9bea8cdc86bab26365ed8c81d /exec/java-exec/src/main/java/org/apache/drill/exec
parente1649dd7d9fb2c30632f4df6ea17c483379c9775 (diff)
DRILL-5546: Handle schema change exception failure caused by empty input or empty batch.
1. Modify ScanBatch's logic when it iterates list of RecordReader. 1) Skip RecordReader if it returns 0 row && present same schema. A new schema (by calling Mutator.isNewSchema() ) means either a new top level field is added, or a field in a nested field is added, or an existing field type is changed. 2) Implicit columns are presumed to have constant schema, and are added to outgoing container before any regular column is added in. 3) ScanBatch will return NONE directly (called as "fast NONE"), if all its RecordReaders haver empty input and thus are skipped, in stead of returing OK_NEW_SCHEMA first. 2. Modify IteratorValidatorBatchIterator to allow 1) fast NONE ( before seeing a OK_NEW_SCHEMA) 2) batch with empty list of columns. 2. Modify JsonRecordReader when it get 0 row. Do not insert a nullable-int column for 0 row input. Together with ScanBatch, Drill will skip empty json files. 3. Modify binary operators such as join, union to handle fast none for either one side or both sides. Abstract the logic in AbstractBinaryRecordBatch, except for MergeJoin as its implementation is quite different from others. 4. Fix and refactor union all operator. 1) Correct union operator hanndling 0 input rows. Previously, it will ignore inputs with 0 row and put nullable-int into output schema, which causes various of schema change issue in down-stream operator. The new behavior is to take schema with 0 into account in determining the output schema, in the same way with > 0 input rows. By doing that, we ensure Union operator will not behave like a schema-lossy operator. 2) Add a UnionInputIterator to simplify the logic to iterate the left/right inputs, removing significant chunk of duplicate codes in previous implementation. The new union all operator reduces the code size into half, comparing the old one. 5. Introduce UntypedNullVector to handle convertFromJson() function, when the input batch contains 0 row. Problem: The function convertFromJSon() is different from other regular functions in that it only knows the output schema after evaluation is performed. When input has 0 row, Drill essentially does not have a way to know the output type, and previously will assume Map type. That works under the assumption other operators like Union would ignore batch with 0 row, which is no longer the case in the current implementation. Solution: Use MinorType.NULL at the output type for convertFromJSON() when input contains 0 row. The new UntypedNullVector is used to represent a column with MinorType.NULL. 6. HBaseGroupScan convert star column into list of row_key and column family. HBaseRecordReader should reject column star since it expectes star has been converted somewhere else. In HBase a column family always has map type, and a non-rowkey column always has nullable varbinary type, this ensures that HBaseRecordReader across different HBase regions will have the same top level schema, even if the region is empty or prune all the rows due to filter pushdown optimization. In other words, we will not see different top level schema from different HBaseRecordReader for the same table. However, such change will not be able to handle hard schema change : c1 exists in cf1 in one region, but not in another region. Further work is required to handle hard schema change. 7. Modify scan cost estimation when the query involves * column. This is to remove the planning randomness since previously two different operators could have same cost. 8. Add a new flag 'outputProj' to Project operator, to indicate if Project is for the query's final output. Such Project is added by TopProjectVisitor, to handle fast NONE when all the inputs to the query are empty and are skipped. 1) column star is replaced with empty list 2) regular column reference is replaced with nullable-int column 3) An expression will go through ExpressionTreeMaterializer, and use the type of materialized expression as the output type 4) Return an OK_NEW_SCHEMA with the schema using the above logic, then return a NONE to down-stream operator. 9. Add unit test to test operators handling empty input. 10. Add unit test to test query when inputs are all empty. DRILL-5546: Revise code based on review comments. Handle implicit column in scan batch. Change interface in ScanBatch's constructor. 1) Ensure either the implicit column list is empty, or all the reader has the same set of implicit columns. 2) We could skip the implicit columns when check if there is a schema change coming from record reader. 3) ScanBatch accept a list in stead of iterator, since we may need go through the implicit column list multiple times, and verify the size of two lists are same. ScanBatch code review comments. Add more unit tests. Share code path in ProjectBatch to handle normal setupNewSchema() and handleNullInput(). - Move SimpleRecordBatch out of TopNBatch to make it sharable across different places. - Add Unit test verify schema for star column query against multilevel tables. Unit test framework change - Fix memory leak in unit test framework. - Allow SchemaTestBuilder to pass in BatchSchema. close #906
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java407
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java73
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java117
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java737
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java75
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java98
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java15
30 files changed, 889 insertions, 908 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
index 7b58ecdf1..b0188ea34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
@@ -36,8 +36,19 @@ public class Project extends AbstractSingle{
private final List<NamedExpression> exprs;
+ /**
+ * {@link org.apache.drill.exec.planner.physical.ProjectPrel for the meaning of flag 'outputProj'}
+ */
+ private boolean outputProj = false;
+
@JsonCreator
- public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child) {
+ public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child, @JsonProperty("outputProj") boolean outputProj) {
+ super(child);
+ this.exprs = exprs;
+ this.outputProj = outputProj;
+ }
+
+ public Project(List<NamedExpression> exprs, PhysicalOperator child) {
super(child);
this.exprs = exprs;
}
@@ -46,6 +57,14 @@ public class Project extends AbstractSingle{
return exprs;
}
+ /**
+ * @Return true if Project is for the query's final output. Such Project is added by TopProjectVisitor,
+ * to handle fast NONE when all the inputs to the query are empty and are skipped.
+ */
+ public boolean isOutputProj() {
+ return outputProj;
+ }
+
@Override
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
return physicalVisitor.visitProject(this, value);
@@ -53,7 +72,7 @@ public class Project extends AbstractSingle{
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new Project(exprs, child);
+ return new Project(exprs, child, outputProj);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 803bd4893..64be129db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -17,16 +17,13 @@
*/
package org.apache.drill.exec.physical.impl;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import io.netty.buffer.DrillBuf;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -55,10 +52,11 @@ import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.common.map.CaseInsensitiveMap;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
/**
* Record batch used for a particular scan. Operators against one or more
@@ -78,50 +76,56 @@ public class ScanBatch implements CloseableRecordBatch {
private BatchSchema schema;
private final Mutator mutator;
private boolean done = false;
- private boolean hasReadNonEmptyFile = false;
- private Map<String, ValueVector> implicitVectors;
private Iterator<Map<String, String>> implicitColumns;
private Map<String, String> implicitValues;
private final BufferAllocator allocator;
-
+ private final List<Map<String, String>> implicitColumnList;
+ private String currentReaderClassName;
+ /**
+ *
+ * @param subScanConfig
+ * @param context
+ * @param oContext
+ * @param readerList
+ * @param implicitColumnList : either an emptylist when all the readers do not have implicit
+ * columns, or there is a one-to-one mapping between reader and implicitColumns.
+ */
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
- OperatorContext oContext, Iterator<RecordReader> readers,
- List<Map<String, String>> implicitColumns) {
+ OperatorContext oContext, List<RecordReader> readerList,
+ List<Map<String, String>> implicitColumnList) {
this.context = context;
- this.readers = readers;
+ this.readers = readerList.iterator();
+ this.implicitColumns = implicitColumnList.iterator();
if (!readers.hasNext()) {
throw UserException.systemError(
new ExecutionSetupException("A scan batch must contain at least one reader."))
.build(logger);
}
- currentReader = readers.next();
+
this.oContext = oContext;
allocator = oContext.getAllocator();
mutator = new Mutator(oContext, allocator, container);
+ oContext.getStats().startProcessing();
try {
- oContext.getStats().startProcessing();
- currentReader.setup(oContext, mutator);
- } catch (ExecutionSetupException e) {
- try {
- currentReader.close();
- } catch(final Exception e2) {
- logger.error("Close failed for reader " + currentReader.getClass().getSimpleName(), e2);
- }
- throw UserException.systemError(e)
- .addContext("Setup failed for", currentReader.getClass().getSimpleName())
+ if (!verifyImplcitColumns(readerList.size(), implicitColumnList)) {
+ Exception ex = new ExecutionSetupException("Either implicit column list does not have same cardinality as reader list, "
+ + "or implicit columns are not same across all the record readers!");
+ throw UserException.systemError(ex)
+ .addContext("Setup failed for", readerList.get(0).getClass().getSimpleName())
.build(logger);
+ }
+
+ this.implicitColumnList = implicitColumnList;
+ addImplicitVectors();
+ currentReader = null;
} finally {
oContext.getStats().stopProcessing();
}
- this.implicitColumns = implicitColumns.iterator();
- this.implicitValues = this.implicitColumns.hasNext() ? this.implicitColumns.next() : null;
-
- addImplicitVectors();
}
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
- Iterator<RecordReader> readers)
+ List<RecordReader> readers)
throws ExecutionSetupException {
this(subScanConfig, context,
context.newOperatorContext(subScanConfig),
@@ -152,16 +156,6 @@ public class ScanBatch implements CloseableRecordBatch {
}
}
- private void releaseAssets() {
- container.zeroVectors();
- }
-
- private void clearFieldVectorMap() {
- for (final ValueVector v : mutator.fieldVectorMap().values()) {
- v.clear();
- }
- }
-
@Override
public IterOutcome next() {
if (done) {
@@ -169,82 +163,57 @@ public class ScanBatch implements CloseableRecordBatch {
}
oContext.getStats().startProcessing();
try {
- try {
+ while (true) {
+ if (currentReader == null && !getNextReaderIfHas()) {
+ releaseAssets(); // All data has been read. Release resource.
+ done = true;
+ return IterOutcome.NONE;
+ }
injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class);
-
currentReader.allocate(mutator.fieldVectorMap());
- } catch (OutOfMemoryException e) {
- clearFieldVectorMap();
- throw UserException.memoryError(e).build(logger);
- }
- while ((recordCount = currentReader.next()) == 0) {
- try {
- if (!readers.hasNext()) {
- // We're on the last reader, and it has no (more) rows.
- currentReader.close();
- releaseAssets();
- done = true; // have any future call to next() return NONE
- if (mutator.isNewSchema()) {
- // This last reader has a new schema (e.g., we have a zero-row
- // file or other source). (Note that some sources have a non-
- // null/non-trivial schema even when there are no rows.)
-
- container.buildSchema(SelectionVectorMode.NONE);
- schema = container.getSchema();
-
- return IterOutcome.OK_NEW_SCHEMA;
- }
- return IterOutcome.NONE;
- }
- // At this point, the reader that hit its end is not the last reader.
-
- // If all the files we have read so far are just empty, the schema is not useful
- if (! hasReadNonEmptyFile) {
- container.clear();
- clearFieldVectorMap();
- mutator.clear();
- }
+ recordCount = currentReader.next();
+ Preconditions.checkArgument(recordCount >= 0, "recordCount from RecordReader.next() should not be negative");
+ boolean isNewSchema = mutator.isNewSchema();
+ populateImplicitVectorsAndSetCount();
+ oContext.getStats().batchReceived(0, recordCount, isNewSchema);
+ if (recordCount == 0) {
currentReader.close();
- currentReader = readers.next();
- implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
- currentReader.setup(oContext, mutator);
- try {
- currentReader.allocate(mutator.fieldVectorMap());
- } catch (OutOfMemoryException e) {
- clearFieldVectorMap();
- throw UserException.memoryError(e).build(logger);
- }
- addImplicitVectors();
- } catch (ExecutionSetupException e) {
- releaseAssets();
- throw UserException.systemError(e).build(logger);
+ currentReader = null; // indicate currentReader is complete,
+ // and fetch next reader in next loop iterator if required.
}
- }
-
- // At this point, the current reader has read 1 or more rows.
-
- hasReadNonEmptyFile = true;
- populateImplicitVectors();
-
- for (VectorWrapper<?> w : container) {
- w.getValueVector().getMutator().setValueCount(recordCount);
- }
- // this is a slight misuse of this metric but it will allow Readers to report how many records they generated.
- final boolean isNewSchema = mutator.isNewSchema();
- oContext.getStats().batchReceived(0, getRecordCount(), isNewSchema);
+ if (isNewSchema) {
+ // Even when recordCount = 0, we should return return OK_NEW_SCHEMA if current reader presents a new schema.
+ // This could happen when data sources have a non-trivial schema with 0 row.
+ container.buildSchema(SelectionVectorMode.NONE);
+ schema = container.getSchema();
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
- if (isNewSchema) {
- container.buildSchema(SelectionVectorMode.NONE);
- schema = container.getSchema();
- return IterOutcome.OK_NEW_SCHEMA;
- } else {
- return IterOutcome.OK;
+ // Handle case of same schema.
+ if (recordCount == 0) {
+ continue; // Skip to next loop iteration if reader returns 0 row and has same schema.
+ } else {
+ // return OK if recordCount > 0 && ! isNewSchema
+ return IterOutcome.OK;
+ }
}
} catch (OutOfMemoryException ex) {
+ clearFieldVectorMap();
throw UserException.memoryError(ex).build(logger);
+ } catch (ExecutionSetupException e) {
+ if (currentReader != null) {
+ try {
+ currentReader.close();
+ } catch (final Exception e2) {
+ logger.error("Close failed for reader " + currentReaderClassName, e2);
+ }
+ }
+ throw UserException.systemError(e)
+ .addContext("Setup failed for", currentReaderClassName)
+ .build(logger);
} catch (Exception ex) {
throw UserException.systemError(ex).build(logger);
} finally {
@@ -252,21 +221,38 @@ public class ScanBatch implements CloseableRecordBatch {
}
}
+ private void releaseAssets() {
+ container.zeroVectors();
+ }
+
+ private void clearFieldVectorMap() {
+ for (final ValueVector v : mutator.fieldVectorMap().values()) {
+ v.clear();
+ }
+ for (final ValueVector v : mutator.implicitFieldVectorMap.values()) {
+ v.clear();
+ }
+ }
+
+ private boolean getNextReaderIfHas() throws ExecutionSetupException {
+ if (readers.hasNext()) {
+ currentReader = readers.next();
+ implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null;
+ currentReader.setup(oContext, mutator);
+ currentReaderClassName = currentReader.getClass().getSimpleName();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
private void addImplicitVectors() {
try {
- if (implicitVectors != null) {
- for (ValueVector v : implicitVectors.values()) {
- v.clear();
- }
- }
- implicitVectors = Maps.newHashMap();
-
- if (implicitValues != null) {
- for (String column : implicitValues.keySet()) {
+ if (!implicitColumnList.isEmpty()) {
+ for (String column : implicitColumnList.get(0).keySet()) {
final MaterializedField field = MaterializedField.create(column, Types.optional(MinorType.VARCHAR));
@SuppressWarnings("resource")
- final ValueVector v = mutator.addField(field, NullableVarCharVector.class);
- implicitVectors.put(column, v);
+ final ValueVector v = mutator.addField(field, NullableVarCharVector.class, true /*implicit field*/);
}
}
} catch(SchemaChangeException e) {
@@ -277,24 +263,11 @@ public class ScanBatch implements CloseableRecordBatch {
}
}
- private void populateImplicitVectors() {
- if (implicitValues != null) {
- for (Map.Entry<String, String> entry : implicitValues.entrySet()) {
- @SuppressWarnings("resource")
- final NullableVarCharVector v = (NullableVarCharVector) implicitVectors.get(entry.getKey());
- String val;
- if ((val = entry.getValue()) != null) {
- AllocationHelper.allocate(v, recordCount, val.length());
- final byte[] bytes = val.getBytes();
- for (int j = 0; j < recordCount; j++) {
- v.getMutator().setSafe(j, bytes, 0, bytes.length);
- }
- v.getMutator().setValueCount(recordCount);
- } else {
- AllocationHelper.allocate(v, recordCount, 0);
- v.getMutator().setValueCount(recordCount);
- }
- }
+ private void populateImplicitVectorsAndSetCount() {
+ mutator.populateImplicitVectors(implicitValues, recordCount);
+ for (Map.Entry<String, ValueVector> entry: mutator.fieldVectorMap().entrySet()) {
+ logger.debug("set record count {} for vv {}", recordCount, entry.getKey());
+ entry.getValue().getMutator().setValueCount(recordCount);
}
}
@@ -329,14 +302,20 @@ public class ScanBatch implements CloseableRecordBatch {
@VisibleForTesting
public static class Mutator implements OutputMutator {
- /** Whether schema has changed since last inquiry (via #isNewSchema}). Is
- * true before first inquiry. */
- private boolean schemaChanged = true;
-
- /** Fields' value vectors indexed by fields' keys. */
- private final CaseInsensitiveMap<ValueVector> fieldVectorMap =
+ /** Flag keeping track whether top-level schema has changed since last inquiry (via #isNewSchema}).
+ * It's initialized to false, or reset to false after #isNewSchema or after #clear, until a new value vector
+ * or a value vector with different type is added to fieldVectorMap.
+ **/
+ private boolean schemaChanged;
+
+ /** Regular fields' value vectors indexed by fields' keys. */
+ private final CaseInsensitiveMap<ValueVector> regularFieldVectorMap =
CaseInsensitiveMap.newHashMap();
+ /** Implicit fields' value vectors index by fields' keys. */
+ private final CaseInsensitiveMap<ValueVector> implicitFieldVectorMap =
+ CaseInsensitiveMap.newHashMap();
+
private final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private final BufferAllocator allocator;
@@ -348,46 +327,27 @@ public class ScanBatch implements CloseableRecordBatch {
this.oContext = oContext;
this.allocator = allocator;
this.container = container;
+ this.schemaChanged = false;
}
public Map<String, ValueVector> fieldVectorMap() {
- return fieldVectorMap;
+ return regularFieldVectorMap;
+ }
+
+ public Map<String, ValueVector> implicitFieldVectorMap() {
+ return implicitFieldVectorMap;
}
@SuppressWarnings("resource")
@Override
public <T extends ValueVector> T addField(MaterializedField field,
Class<T> clazz) throws SchemaChangeException {
- // Check if the field exists.
- ValueVector v = fieldVectorMap.get(field.getName());
- if (v == null || v.getClass() != clazz) {
- // Field does not exist--add it to the map and the output container.
- v = TypeHelper.getNewVector(field, allocator, callBack);
- if (!clazz.isAssignableFrom(v.getClass())) {
- throw new SchemaChangeException(
- String.format(
- "The class that was provided, %s, does not correspond to the "
- + "expected vector type of %s.",
- clazz.getSimpleName(), v.getClass().getSimpleName()));
- }
-
- final ValueVector old = fieldVectorMap.put(field.getName(), v);
- if (old != null) {
- old.clear();
- container.remove(old);
- }
-
- container.add(v);
- // Added new vectors to the container--mark that the schema has changed.
- schemaChanged = true;
- }
-
- return clazz.cast(v);
+ return addField(field, clazz, false);
}
@Override
public void allocate(int recordCount) {
- for (final ValueVector v : fieldVectorMap.values()) {
+ for (final ValueVector v : regularFieldVectorMap.values()) {
AllocationHelper.allocate(v, recordCount, 50, 10);
}
}
@@ -423,10 +383,82 @@ public class ScanBatch implements CloseableRecordBatch {
}
public void clear() {
- fieldVectorMap.clear();
+ regularFieldVectorMap.clear();
+ implicitFieldVectorMap.clear();
+ schemaChanged = false;
+ }
+
+ private <T extends ValueVector> T addField(MaterializedField field,
+ Class<T> clazz, boolean isImplicitField) throws SchemaChangeException {
+ Map<String, ValueVector> fieldVectorMap;
+
+ if (isImplicitField) {
+ fieldVectorMap = implicitFieldVectorMap;
+ } else {
+ fieldVectorMap = regularFieldVectorMap;
+ }
+
+ if (!isImplicitField && implicitFieldVectorMap.containsKey(field.getName()) ||
+ isImplicitField && regularFieldVectorMap.containsKey(field.getName())) {
+ throw new SchemaChangeException(
+ String.format(
+ "It's not allowed to have regular field and implicit field share common name %s. "
+ + "Either change regular field name in datasource, or change the default implicit field names.",
+ field.getName()));
+ }
+
+ // Check if the field exists.
+ ValueVector v = fieldVectorMap.get(field.getName());
+ if (v == null || v.getClass() != clazz) {
+ // Field does not exist--add it to the map and the output container.
+ v = TypeHelper.getNewVector(field, allocator, callBack);
+ if (!clazz.isAssignableFrom(v.getClass())) {
+ throw new SchemaChangeException(
+ String.format(
+ "The class that was provided, %s, does not correspond to the "
+ + "expected vector type of %s.",
+ clazz.getSimpleName(), v.getClass().getSimpleName()));
+ }
+
+ final ValueVector old = fieldVectorMap.put(field.getName(), v);
+ if (old != null) {
+ old.clear();
+ container.remove(old);
+ }
+
+ container.add(v);
+ // Only mark schema change for regular vectors added to the container; implicit schema is constant.
+ if (!isImplicitField) {
+ schemaChanged = true;
+ }
+ }
+
+ return clazz.cast(v);
+ }
+
+ private void populateImplicitVectors(Map<String, String> implicitValues, int recordCount) {
+ if (implicitValues != null) {
+ for (Map.Entry<String, String> entry : implicitValues.entrySet()) {
+ @SuppressWarnings("resource")
+ final NullableVarCharVector v = (NullableVarCharVector) implicitFieldVectorMap.get(entry.getKey());
+ String val;
+ if ((val = entry.getValue()) != null) {
+ AllocationHelper.allocate(v, recordCount, val.length());
+ final byte[] bytes = val.getBytes();
+ for (int j = 0; j < recordCount; j++) {
+ v.getMutator().setSafe(j, bytes, 0, bytes.length);
+ }
+ v.getMutator().setValueCount(recordCount);
+ } else {
+ AllocationHelper.allocate(v, recordCount, 0);
+ v.getMutator().setValueCount(recordCount);
+ }
+ }
+ }
}
}
+
@Override
public Iterator<VectorWrapper<?>> iterator() {
return container.iterator();
@@ -440,11 +472,10 @@ public class ScanBatch implements CloseableRecordBatch {
@Override
public void close() throws Exception {
container.clear();
- for (final ValueVector v : implicitVectors.values()) {
- v.clear();
- }
mutator.clear();
- currentReader.close();
+ if (currentReader != null) {
+ currentReader.close();
+ }
}
@Override
@@ -453,4 +484,34 @@ public class ScanBatch implements CloseableRecordBatch {
String.format("You should not call getOutgoingContainer() for class %s",
this.getClass().getCanonicalName()));
}
+
+ /**
+ * Verify list of implicit column values is valid input:
+ * - Either implicit column list is empty;
+ * - Or implicit column list has same sie as reader list, and the key set is same across all the readers.
+ * @param numReaders
+ * @param implicitColumnList
+ * @return return true if
+ */
+ private boolean verifyImplcitColumns(int numReaders, List<Map<String, String>> implicitColumnList) {
+ if (implicitColumnList.isEmpty()) {
+ return true;
+ }
+
+ if (numReaders != implicitColumnList.size()) {
+ return false;
+ }
+
+ Map<String, String> firstMap = implicitColumnList.get(0);
+
+ for (int i = 1; i< implicitColumnList.size(); i++) {
+ Map<String, String> nonFirstMap = implicitColumnList.get(i);
+
+ if (!firstMap.keySet().equals(nonFirstMap.keySet())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index d2497f1e5..e77c186c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SchemaUtil;
+import org.apache.drill.exec.record.SimpleRecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
@@ -66,6 +67,8 @@ import com.google.common.base.Stopwatch;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
+import static org.bouncycastle.asn1.x500.style.RFC4519Style.l;
+
public class TopNBatch extends AbstractRecordBatch<TopN> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNBatch.class);
@@ -290,8 +293,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
VectorContainer newContainer = new VectorContainer(oContext);
@SuppressWarnings("resource")
SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
- SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
- SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+ SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
+ SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
if (copier == null) {
copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
} else {
@@ -391,8 +394,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
final VectorContainer newContainer = new VectorContainer(oContext);
@SuppressWarnings("resource")
final SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4();
- final SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
- final SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
+ final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
+ final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
@SuppressWarnings("resource")
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
@@ -440,26 +443,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
- public static class SimpleRecordBatch implements RecordBatch {
-
- private VectorContainer container;
+ public static class SimpleSV4RecordBatch extends SimpleRecordBatch {
private SelectionVector4 sv4;
- private FragmentContext context;
- public SimpleRecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
- this.container = container;
+ public SimpleSV4RecordBatch(VectorContainer container, SelectionVector4 sv4, FragmentContext context) {
+ super(container, context);
this.sv4 = sv4;
- this.context = context;
- }
-
- @Override
- public FragmentContext getContext() {
- return context;
- }
-
- @Override
- public BatchSchema getSchema() {
- return container.getSchema();
}
@Override
@@ -467,54 +456,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
if (sv4 != null) {
return sv4.getCount();
} else {
- return container.getRecordCount();
+ return super.getRecordCount();
}
}
@Override
- public void kill(boolean sendUpstream) {
- }
-
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException();
- }
-
- @Override
public SelectionVector4 getSelectionVector4() {
return sv4;
}
-
- @Override
- public TypedFieldId getValueVectorId(SchemaPath path) {
- return container.getValueVectorId(path);
- }
-
- @Override
- public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
- return container.getValueAccessorById(clazz, ids);
- }
-
- @Override
- public IterOutcome next() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public WritableBatch getWritableBatch() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<VectorWrapper<?>> iterator() {
- return container.iterator();
- }
-
- @Override
- public VectorContainer getOutgoingContainer() {
- throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
- }
-
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 1f74ba1c4..8c899aacd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -47,6 +47,7 @@ import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -64,16 +65,10 @@ import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JVar;
-public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
public static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
public static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
- // Probe side record batch
- private final RecordBatch left;
-
- // Build side record batch
- private final RecordBatch right;
-
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
@@ -145,9 +140,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// indicates if we have previously returned an output batch
boolean firstOutputBatch = true;
- IterOutcome leftUpstream = IterOutcome.NONE;
- IterOutcome rightUpstream = IterOutcome.NONE;
-
private final HashTableStats htStats = new HashTableStats();
public enum Metric implements MetricDef {
@@ -172,16 +164,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
@Override
protected void buildSchema() throws SchemaChangeException {
- leftUpstream = next(left);
- rightUpstream = next(right);
-
- if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
- state = BatchState.STOP;
- return;
- }
-
- if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
- state = BatchState.OUT_OF_MEMORY;
+ if (! prefetchFirstBatchFromBothSides()) {
return;
}
@@ -503,11 +486,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
}
- public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left,
- RecordBatch right) throws OutOfMemoryException {
- super(popConfig, context, true);
- this.left = left;
- this.right = right;
+ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
+ RecordBatch left, /*Probe side record batch*/
+ RecordBatch right /*Build side record batch*/
+ ) throws OutOfMemoryException {
+ super(popConfig, context, true, left, right);
joinType = popConfig.getJoinType();
conditions = popConfig.getConditions();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index e5997025e..a1b8dc212 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -151,6 +151,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
state = BatchState.OUT_OF_MEMORY;
return;
}
+
+ if (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE) {
+ state = BatchState.DONE;
+ return;
+ }
+
allocateBatch(true);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 35cc71076..b390e418e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -42,6 +42,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
@@ -62,7 +63,7 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
/*
* RecordBatch implementation for the nested loop join operator
*/
-public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> {
+public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);
// Maximum number records in the outgoing batch
@@ -72,24 +73,12 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
protected static final int LEFT_INPUT = 0;
protected static final int RIGHT_INPUT = 1;
- // Left input to the nested loop join operator
- private final RecordBatch left;
-
// Schema on the left side
private BatchSchema leftSchema = null;
- // state (IterOutcome) of the left input
- private IterOutcome leftUpstream = IterOutcome.NONE;
-
- // Right input to the nested loop join operator.
- private final RecordBatch right;
-
// Schema on the right side
private BatchSchema rightSchema = null;
- // state (IterOutcome) of the right input
- private IterOutcome rightUpstream = IterOutcome.NONE;
-
// Runtime generated class implementing the NestedLoopJoin interface
private NestedLoopJoin nljWorker = null;
@@ -134,11 +123,9 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
EMIT_LEFT_CONSTANT, EMIT_LEFT);
protected NestedLoopJoinBatch(NestedLoopJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
- super(popConfig, context);
+ super(popConfig, context, left, right);
Preconditions.checkNotNull(left);
Preconditions.checkNotNull(right);
- this.left = left;
- this.right = right;
}
/**
@@ -352,18 +339,8 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
*/
@Override
protected void buildSchema() throws SchemaChangeException {
-
try {
- leftUpstream = next(LEFT_INPUT, left);
- rightUpstream = next(RIGHT_INPUT, right);
-
- if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
- state = BatchState.STOP;
- return;
- }
-
- if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
- state = BatchState.OUT_OF_MEMORY;
+ if (! prefetchFirstBatchFromBothSides()) {
return;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 9a72fcb3f..30efeece3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -17,10 +17,10 @@
*/
package org.apache.drill.exec.physical.impl.project;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-
+import com.carrotsearch.hppc.IntHashSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.drill.common.expression.ConvertExpression;
import org.apache.drill.common.expression.ErrorCollector;
@@ -35,6 +35,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.fn.CastFunctions;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -51,24 +52,31 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.planner.StarColumnHelper;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SimpleRecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.UntypedNullHolder;
+import org.apache.drill.exec.vector.UntypedNullVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
-import com.carrotsearch.hppc.IntHashSet;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -165,8 +173,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// Only need to add the schema for the complex exprs because others should already have
// been setup during setupNewSchema
for (FieldReference fieldReference : complexFieldReferencesList) {
- container.addOrGet(fieldReference.getRootSegment().getPath(),
- Types.required(MinorType.MAP), MapVector.class);
+ MaterializedField field = MaterializedField.create(fieldReference.getAsNamePart().getName(), UntypedNullHolder.TYPE);
+ container.add(new UntypedNullVector(field, container.getAllocator()));
}
container.buildSchema(SelectionVectorMode.NONE);
wasNone = true;
@@ -302,8 +310,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
return expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
}
- @Override
- protected boolean setupNewSchema() throws SchemaChangeException {
+ private void setupNewSchemaFromInput(RecordBatch incomingBatch) throws SchemaChangeException {
if (allocationVectors != null) {
for (final ValueVector v : allocationVectors) {
v.clear();
@@ -322,7 +329,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
-// cg.getCodeGenerator().saveCodeForDebugging(true);
+ // cg.getCodeGenerator().saveCodeForDebugging(true);
final IntHashSet transferFieldIds = new IntHashSet();
@@ -335,14 +342,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
result.clear();
if (classify && namedExpression.getExpr() instanceof SchemaPath) {
- classifyExpr(namedExpression, incoming, result);
+ classifyExpr(namedExpression, incomingBatch, result);
if (result.isStar) {
// The value indicates which wildcard we are processing now
final Integer value = result.prefixMap.get(result.prefix);
if (value != null && value == 1) {
int k = 0;
- for (final VectorWrapper<?> wrapper : incoming) {
+ for (final VectorWrapper<?> wrapper : incomingBatch) {
final ValueVector vvIn = wrapper.getValueVector();
if (k > result.outputNames.size() - 1) {
assert false;
@@ -363,7 +370,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
} else if (value != null && value > 1) { // subsequent wildcards should do a copy of incoming valuevectors
int k = 0;
- for (final VectorWrapper<?> wrapper : incoming) {
+ for (final VectorWrapper<?> wrapper : incomingBatch) {
final ValueVector vvIn = wrapper.getValueVector();
final SchemaPath originalPath = SchemaPath.getSimplePath(vvIn.getField().getName());
if (k > result.outputNames.size() - 1) {
@@ -378,9 +385,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
continue;
}
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry());
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incomingBatch, collector, context.getFunctionRegistry() );
if (collector.hasErrors()) {
- throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incomingBatch schema. Errors:\n %s.", collector.toErrorString()));
}
final MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
@@ -417,23 +424,23 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
}
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming,
- collector, context.getFunctionRegistry(), true, unionTypeEnabled);
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incomingBatch,
+ collector, context.getFunctionRegistry(), true, unionTypeEnabled);
final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
// add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
- if (expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+ if (expr instanceof ValueVectorReadExpression && incomingBatch.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
&& !((ValueVectorReadExpression) expr).hasReadPath()
&& !isAnyWildcard
&& !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) {
final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
final TypedFieldId id = vectorRead.getFieldId();
- final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
- Preconditions.checkNotNull(incoming);
+ final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+ Preconditions.checkNotNull(incomingBatch);
final FieldReference ref = getRef(namedExpression);
final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref.getLastSegment().getNameSegment().getPath(),
@@ -473,7 +480,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
if (!vectorRead.hasReadPath()) {
final TypedFieldId id = vectorRead.getFieldId();
- final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+ final ValueVector vvIn = incomingBatch.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
vvIn.makeTransferPair(vector);
}
}
@@ -485,12 +492,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
CodeGenerator<Projector> codeGen = cg.getCodeGenerator();
codeGen.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
-// codeGen.saveCodeForDebugging(true);
+ // codeGen.saveCodeForDebugging(true);
this.projector = context.getImplementationClass(codeGen);
- projector.setup(context, incoming, this, transfers);
+ projector.setup(context, incomingBatch, this, transfers);
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
+ }
+
+ @Override
+ protected boolean setupNewSchema() throws SchemaChangeException {
+ setupNewSchemaFromInput(this.incoming);
if (container.isSchemaChanged()) {
container.buildSchema(SelectionVectorMode.NONE);
return true;
@@ -624,11 +636,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final int incomingSchemaSize = incoming.getSchema().getFieldCount();
- // for debugging..
- // if (incomingSchemaSize > 9) {
- // assert false;
- // }
-
// input is '*' and output is 'prefix_*'
if (exprIsStar && refHasPrefix && refEndsWithStar) {
final String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
@@ -768,4 +775,50 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
}
}
+
+ /**
+ * Handle Null input specially when Project operator is for query output. This happens when input return 0 batch
+ * (returns a FAST NONE directly).
+ *
+ * <p>
+ * Project operator has to return a batch with schema derived using the following 3 rules:
+ * </p>
+ * <ul>
+ * <li>Case 1: * ==> expand into an empty list of columns. </li>
+ * <li>Case 2: regular column reference ==> treat as nullable-int column </li>
+ * <li>Case 3: expressions => Call ExpressionTreeMaterialization over an empty vector contain.
+ * Once the expression is materialized without error, use the output type of materialized
+ * expression. </li>
+ * </ul>
+ *
+ * <p>
+ * The batch is constructed with the above rules, and recordCount = 0.
+ * Returned with OK_NEW_SCHEMA to down-stream operator.
+ * </p>
+ */
+ @Override
+ protected IterOutcome handleNullInput() {
+ if (! popConfig.isOutputProj()) {
+ return super.handleNullInput();
+ }
+
+ VectorContainer emptyVC = new VectorContainer();
+ emptyVC.buildSchema(SelectionVectorMode.NONE);
+ RecordBatch emptyIncomingBatch = new SimpleRecordBatch(emptyVC, context);
+
+ try {
+ setupNewSchemaFromInput(emptyIncomingBatch);
+ } catch (SchemaChangeException e) {
+ kill(false);
+ logger.error("Failure during query", e);
+ context.fail(e);
+ return IterOutcome.STOP;
+ }
+
+ doAlloc(0);
+ container.buildSchema(SelectionVectorMode.NONE);
+ wasNone = true;
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 5afe66bd4..4d623cf15 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -17,18 +17,15 @@
*/
package org.apache.drill.exec.physical.impl.union;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.calcite.util.Pair;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -39,88 +36,96 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.UnionAll;
-import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.resolver.TypeCastRules;
-import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Stack;
-public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
+public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
- private List<MaterializedField> outputFields;
+ private SchemaChangeCallBack callBack = new SchemaChangeCallBack();
private UnionAller unionall;
- private UnionAllInput unionAllInput;
- private RecordBatch current;
-
private final List<TransferPair> transfers = Lists.newArrayList();
- private List<ValueVector> allocationVectors;
- protected SchemaChangeCallBack callBack = new SchemaChangeCallBack();
+ private List<ValueVector> allocationVectors = Lists.newArrayList();
private int recordCount = 0;
- private boolean schemaAvailable = false;
+ private UnionInputIterator unionInputIterator;
public UnionAllRecordBatch(UnionAll config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
- super(config, context, false);
- assert (children.size() == 2) : "The number of the operands of Union must be 2";
- unionAllInput = new UnionAllInput(this, children.get(0), children.get(1));
- }
-
- @Override
- public int getRecordCount() {
- return recordCount;
+ super(config, context, true, children.get(0), children.get(1));
}
@Override
protected void killIncoming(boolean sendUpstream) {
- unionAllInput.getLeftRecordBatch().kill(sendUpstream);
- unionAllInput.getRightRecordBatch().kill(sendUpstream);
+ left.kill(sendUpstream);
+ right.kill(sendUpstream);
}
- @Override
- public SelectionVector2 getSelectionVector2() {
- throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector");
- }
+ protected void buildSchema() throws SchemaChangeException {
+ if (! prefetchFirstBatchFromBothSides()) {
+ return;
+ }
- @Override
- public SelectionVector4 getSelectionVector4() {
- throw new UnsupportedOperationException("UnionAllRecordBatch does not support selection vector");
+ unionInputIterator = new UnionInputIterator(leftUpstream, left, rightUpstream, right);
+
+ if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsOneSide(right.getSchema());
+ } else if (rightUpstream == IterOutcome.NONE && leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsOneSide((left.getSchema()));
+ } else if (leftUpstream == IterOutcome.OK_NEW_SCHEMA && rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ inferOutputFieldsBothSide(left.getSchema(), right.getSchema());
+ }
+
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+ VectorAccessibleUtilities.allocateVectors(container, 0);
+ VectorAccessibleUtilities.setValueCount(container,0);
}
@Override
public IterOutcome innerNext() {
try {
- IterOutcome upstream = unionAllInput.nextBatch();
- logger.debug("Upstream of Union-All: {}", upstream);
- switch (upstream) {
+ while (true) {
+ if (!unionInputIterator.hasNext()) {
+ return IterOutcome.NONE;
+ }
+
+ Pair<IterOutcome, RecordBatch> nextBatch = unionInputIterator.next();
+ IterOutcome upstream = nextBatch.left;
+ RecordBatch incoming = nextBatch.right;
+
+ switch (upstream) {
case NONE:
case OUT_OF_MEMORY:
case STOP:
return upstream;
-
case OK_NEW_SCHEMA:
- outputFields = unionAllInput.getOutputFields();
+ return doWork(nextBatch.right, true);
case OK:
- IterOutcome workOutcome = doWork();
-
- if (workOutcome != IterOutcome.OK) {
- return workOutcome;
- } else {
- return upstream;
+ // skip batches with same schema as the previous one yet having 0 row.
+ if (incoming.getRecordCount() == 0) {
+ VectorAccessibleUtilities.clear(incoming);
+ continue;
}
+ return doWork(nextBatch.right, false);
default:
throw new IllegalStateException(String.format("Unknown state %s.", upstream));
+ }
}
} catch (ClassTransformationException | IOException | SchemaChangeException ex) {
context.fail(ex);
@@ -130,120 +135,75 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
}
@Override
- public WritableBatch getWritableBatch() {
- return WritableBatch.get(this);
+ public int getRecordCount() {
+ return recordCount;
}
- private void setValueCount(int count) {
- for (ValueVector v : allocationVectors) {
- ValueVector.Mutator m = v.getMutator();
- m.setValueCount(count);
- }
- }
-
- private boolean doAlloc() {
- for (ValueVector v : allocationVectors) {
- try {
- AllocationHelper.allocateNew(v, current.getRecordCount());
- } catch (OutOfMemoryException ex) {
- return false;
- }
- }
- return true;
- }
@SuppressWarnings("resource")
- private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException {
- if (allocationVectors != null) {
- for (ValueVector v : allocationVectors) {
- v.clear();
- }
+ private IterOutcome doWork(RecordBatch inputBatch, boolean newSchema) throws ClassTransformationException, IOException, SchemaChangeException {
+ Preconditions.checkArgument(inputBatch.getSchema().getFieldCount() == container.getSchema().getFieldCount(),
+ "Input batch and output batch have different field counthas!");
+
+ if (newSchema) {
+ createUnionAller(inputBatch);
}
- allocationVectors = Lists.newArrayList();
- transfers.clear();
+ container.zeroVectors();
+ VectorUtil.allocateVectors(allocationVectors, inputBatch.getRecordCount());
+ recordCount = unionall.unionRecords(0, inputBatch.getRecordCount(), 0);
+ VectorUtil.setValueCount(allocationVectors, recordCount);
- // If both sides of Union-All are empty
- if (unionAllInput.isBothSideEmpty()) {
- for (MaterializedField materializedField : outputFields) {
- final String colName = materializedField.getName();
- final MajorType majorType = MajorType.newBuilder()
- .setMinorType(MinorType.INT)
- .setMode(DataMode.OPTIONAL)
- .build();
-
- MaterializedField outputField = MaterializedField.create(colName, majorType);
- ValueVector vv = container.addOrGet(outputField, callBack);
- allocationVectors.add(vv);
- }
-
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ return IterOutcome.OK;
}
+ }
+
+ private void createUnionAller(RecordBatch inputBatch) throws ClassTransformationException, IOException, SchemaChangeException {
+ transfers.clear();
+ allocationVectors.clear();
final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
-// cg.getCodeGenerator().saveCodeForDebugging(true);
+ // cg.getCodeGenerator().saveCodeForDebugging(true);
+
int index = 0;
- for (VectorWrapper<?> vw : current) {
- ValueVector vvIn = vw.getValueVector();
- // get the original input column names
- SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
- // get the renamed column names
- SchemaPath outputPath = SchemaPath.getSimplePath(outputFields.get(index).getName());
+ for(VectorWrapper<?> vw : inputBatch) {
+ ValueVector vvIn = vw.getValueVector();
+ ValueVector vvOut = container.getValueVector(index).getValueVector();
final ErrorCollector collector = new ErrorCollectorImpl();
// According to input data names, Minortypes, Datamodes, choose to
// transfer directly,
// rename columns or
// cast data types (Minortype or DataMode)
- if (hasSameTypeAndMode(outputFields.get(index), vw.getValueVector().getField())) {
+ if (container.getSchema().getColumn(index).hasSameTypeAndMode(vvIn.getField())
+ && vvIn.getField().getType().getMinorType() != TypeProtos.MinorType.MAP // Per DRILL-5521, existing bug for map transfer
+ ) {
// Transfer column
+ TransferPair tp = vvIn.makeTransferPair(vvOut);
+ transfers.add(tp);
+ } else if (vvIn.getField().getType().getMinorType() == TypeProtos.MinorType.NULL) {
+ continue;
+ } else { // Copy data in order to rename the column
+ SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getName());
+ MaterializedField inField = vvIn.getField();
+ MaterializedField outputField = vvOut.getField();
- MajorType outputFieldType = outputFields.get(index).getType();
- MaterializedField outputField = MaterializedField.create(outputPath.getLastSegment().getNameSegment().getPath(),
- outputFieldType);
-
- /*
- todo: Fix if condition when DRILL-4824 is merged
- If condition should be changed to:
- `if (outputFields.get(index).getName().equals(inputPath.getRootSegmentPath())) {`
- DRILL-5419 has changed condition to correct one but this caused regression (DRILL-5521).
- Root cause is missing indication of child column in map types when it is null.
- DRILL-4824 is re-working json reader implementation, including map types and will fix this problem.
- Reverting condition to previous one to avoid regression till DRILL-4824 is merged.
- Unit test - TestJsonReader.testKvgenWithUnionAll().
- */
- if (outputFields.get(index).getName().equals(inputPath)) {
- ValueVector vvOut = container.addOrGet(outputField);
- TransferPair tp = vvIn.makeTransferPair(vvOut);
- transfers.add(tp);
- // Copy data in order to rename the column
- } else {
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry() );
- if (collector.hasErrors()) {
- throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
- }
+ LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, inputBatch, collector, context.getFunctionRegistry());
- ValueVector vv = container.addOrGet(outputField, callBack);
- allocationVectors.add(vv);
- TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
- cg.addExpr(write);
- }
- // Cast is necessary
- } else {
- LogicalExpression expr = ExpressionTreeMaterializer.materialize(inputPath, current, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
// If the inputs' DataMode is required and the outputs' DataMode is not required
// cast to the one with the least restriction
- if (vvIn.getField().getType().getMode() == DataMode.REQUIRED
- && outputFields.get(index).getType().getMode() != DataMode.REQUIRED) {
- expr = ExpressionTreeMaterializer.convertToNullableType(expr, vvIn.getField().getType().getMinorType(), context.getFunctionRegistry(), collector);
+ if(inField.getType().getMode() == TypeProtos.DataMode.REQUIRED
+ && outputField.getType().getMode() != TypeProtos.DataMode.REQUIRED) {
+ expr = ExpressionTreeMaterializer.convertToNullableType(expr, inField.getType().getMinorType(), context.getFunctionRegistry(), collector);
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
@@ -251,442 +211,163 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
// If two inputs' MinorTypes are different,
// Insert a cast before the Union operation
- if (vvIn.getField().getType().getMinorType() != outputFields.get(index).getType().getMinorType()) {
- expr = ExpressionTreeMaterializer.addCastExpression(expr, outputFields.get(index).getType(), context.getFunctionRegistry(), collector);
+ if(inField.getType().getMinorType() != outputField.getType().getMinorType()) {
+ expr = ExpressionTreeMaterializer.addCastExpression(expr, outputField.getType(), context.getFunctionRegistry(), collector);
if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
}
- final MaterializedField outputField = MaterializedField.create(outputPath.getLastSegment().getNameSegment().getPath(),
- expr.getMajorType());
- ValueVector vector = container.addOrGet(outputField, callBack);
- allocationVectors.add(vector);
- TypedFieldId fid = container.getValueVectorId(outputPath);
+ TypedFieldId fid = container.getValueVectorId(SchemaPath.getSimplePath(outputField.getName()));
- boolean useSetSafe = !(vector instanceof FixedWidthVector);
+ boolean useSetSafe = !(vvOut instanceof FixedWidthVector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
cg.addExpr(write);
+
+ allocationVectors.add(vvOut);
}
++index;
}
unionall = context.getImplementationClass(cg.getCodeGenerator());
- unionall.setup(context, current, this, transfers);
-
- if (!schemaAvailable) {
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- schemaAvailable = true;
- }
-
- if (!doAlloc()) {
- return IterOutcome.OUT_OF_MEMORY;
- }
-
- recordCount = unionall.unionRecords(0, current.getRecordCount(), 0);
- setValueCount(recordCount);
- return IterOutcome.OK;
- }
-
- public static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
- return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
- && (leftField.getType().getMode() == rightField.getType().getMode());
+ unionall.setup(context, inputBatch, this, transfers);
}
- // This method is used by inner class to point the reference `current` to the correct record batch
- private void setCurrentRecordBatch(RecordBatch target) {
- this.current = target;
- }
- // This method is used by inner class to clear the current record batch
- private void clearCurrentRecordBatch() {
- for (VectorWrapper<?> v: current) {
- v.clear();
- }
- }
-
- public static class UnionAllInput {
- private UnionAllRecordBatch unionAllRecordBatch;
- private List<MaterializedField> outputFields;
- private OneSideInput leftSide;
- private OneSideInput rightSide;
- private IterOutcome upstream = IterOutcome.NOT_YET;
- private boolean leftIsFinish = false;
- private boolean rightIsFinish = false;
-
- // These two schemas are obtained from the first record batches of the left and right inputs
- // They are used to check if the schema is changed between recordbatches
- private BatchSchema leftSchema;
- private BatchSchema rightSchema;
- private boolean bothEmpty = false;
-
- public UnionAllInput(UnionAllRecordBatch unionAllRecordBatch, RecordBatch left, RecordBatch right) {
- this.unionAllRecordBatch = unionAllRecordBatch;
- leftSide = new OneSideInput(left);
- rightSide = new OneSideInput(right);
- }
-
- private void setBothSideEmpty(boolean bothEmpty) {
- this.bothEmpty = bothEmpty;
- }
-
- private boolean isBothSideEmpty() {
- return bothEmpty;
- }
-
- public IterOutcome nextBatch() throws SchemaChangeException {
- if (upstream == RecordBatch.IterOutcome.NOT_YET) {
- IterOutcome iterLeft = leftSide.nextBatch();
- switch (iterLeft) {
- case OK_NEW_SCHEMA:
- /*
- * If the first few record batches are all empty,
- * there is no way to tell whether these empty batches are coming from empty files.
- * It is incorrect to infer output types when either side could be coming from empty.
- *
- * Thus, while-loop is necessary to skip those empty batches.
- */
- whileLoop:
- while (leftSide.getRecordBatch().getRecordCount() == 0) {
- iterLeft = leftSide.nextBatch();
-
- switch(iterLeft) {
- case STOP:
- case OUT_OF_MEMORY:
- return iterLeft;
-
- case NONE:
- // Special Case: The left side was an empty input.
- leftIsFinish = true;
- break whileLoop;
-
- case NOT_YET:
- case OK_NEW_SCHEMA:
- case OK:
- continue whileLoop;
-
- default:
- throw new IllegalStateException(
- String.format("Unexpected state %s.", iterLeft));
- }
- }
-
- break;
- case STOP:
- case OUT_OF_MEMORY:
- return iterLeft;
-
- default:
- throw new IllegalStateException(
- String.format("Unexpected state %s.", iterLeft));
- }
-
- IterOutcome iterRight = rightSide.nextBatch();
- switch (iterRight) {
- case OK_NEW_SCHEMA:
- // Unless there is no record batch on the left side of the inputs,
- // always start processing from the left side.
- if (leftIsFinish) {
- unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
- } else {
- unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
- }
- // If the record count of the first batch from right input is zero,
- // there are two possibilities:
- // 1. The right side is an empty input (e.g., file).
- // 2. There will be more records carried by later batches.
-
- /*
- * If the first few record batches are all empty,
- * there is no way to tell whether these empty batches are coming from empty files.
- * It is incorrect to infer output types when either side could be coming from empty.
- *
- * Thus, while-loop is necessary to skip those empty batches.
- */
- whileLoop:
- while (rightSide.getRecordBatch().getRecordCount() == 0) {
- iterRight = rightSide.nextBatch();
- switch (iterRight) {
- case STOP:
- case OUT_OF_MEMORY:
- return iterRight;
-
- case NONE:
- // Special Case: The right side was an empty input.
- rightIsFinish = true;
- break whileLoop;
-
- case NOT_YET:
- case OK_NEW_SCHEMA:
- case OK:
- continue whileLoop;
-
- default:
- throw new IllegalStateException(
- String.format("Unexpected state %s.", iterRight));
- }
- }
-
- if (leftIsFinish && rightIsFinish) {
- setBothSideEmpty(true);
- }
-
- inferOutputFields();
- break;
-
- case STOP:
- case OUT_OF_MEMORY:
- return iterRight;
-
- default:
- throw new IllegalStateException(
- String.format("Unexpected state %s.", iterRight));
- }
-
-
-
- upstream = IterOutcome.OK_NEW_SCHEMA;
- return upstream;
+ // The output table's column names always follow the left table,
+ // where the output type is chosen based on DRILL's implicit casting rules
+ private void inferOutputFieldsBothSide(final BatchSchema leftSchema, final BatchSchema rightSchema) {
+// outputFields = Lists.newArrayList();
+ final Iterator<MaterializedField> leftIter = leftSchema.iterator();
+ final Iterator<MaterializedField> rightIter = rightSchema.iterator();
+
+ int index = 1;
+ while (leftIter.hasNext() && rightIter.hasNext()) {
+ MaterializedField leftField = leftIter.next();
+ MaterializedField rightField = rightIter.next();
+
+ if (leftField.hasSameTypeAndMode(rightField)) {
+ TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+ builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
+ container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
+ } else if (Types.isUntypedNull(rightField.getType())) {
+ container.addOrGet(leftField, callBack);
+ } else if (Types.isUntypedNull(leftField.getType())) {
+ container.addOrGet(MaterializedField.create(leftField.getName(), rightField.getType()), callBack);
} else {
- if (isBothSideEmpty()) {
- return IterOutcome.NONE;
- }
-
- unionAllRecordBatch.clearCurrentRecordBatch();
-
- if (leftIsFinish && rightIsFinish) {
- upstream = IterOutcome.NONE;
- return upstream;
- } else if (leftIsFinish) {
- IterOutcome iterOutcome = rightSide.nextBatch();
-
- switch (iterOutcome) {
- case NONE:
- rightIsFinish = true;
- // fall through
- case STOP:
- case OUT_OF_MEMORY:
- upstream = iterOutcome;
- return upstream;
-
- case OK_NEW_SCHEMA:
- if (!rightSide.getRecordBatch().getSchema().equals(rightSchema)) {
- throw new SchemaChangeException("Schema change detected in the right input of Union-All. This is not currently supported");
- }
- iterOutcome = IterOutcome.OK;
- // fall through
- case OK:
- unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
- upstream = iterOutcome;
- return upstream;
-
- default:
- throw new IllegalStateException(String.format("Unknown state %s.", upstream));
- }
- } else if (rightIsFinish) {
- IterOutcome iterOutcome = leftSide.nextBatch();
- switch (iterOutcome) {
- case STOP:
- case OUT_OF_MEMORY:
- case NONE:
- upstream = iterOutcome;
- return upstream;
-
- case OK:
- unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
- upstream = iterOutcome;
- return upstream;
-
- default:
- throw new IllegalStateException(String.format("Unknown state %s.", iterOutcome));
- }
- } else {
- IterOutcome iterOutcome = leftSide.nextBatch();
-
- switch (iterOutcome) {
- case STOP:
- case OUT_OF_MEMORY:
- upstream = iterOutcome;
- return upstream;
-
- case OK_NEW_SCHEMA:
- if (!leftSide.getRecordBatch().getSchema().equals(leftSchema)) {
- throw new SchemaChangeException("Schema change detected in the left input of Union-All. This is not currently supported");
- }
-
- iterOutcome = IterOutcome.OK;
- // fall through
- case OK:
- unionAllRecordBatch.setCurrentRecordBatch(leftSide.getRecordBatch());
- upstream = iterOutcome;
- return upstream;
-
- case NONE:
- unionAllRecordBatch.setCurrentRecordBatch(rightSide.getRecordBatch());
- upstream = IterOutcome.OK;
- leftIsFinish = true;
- return upstream;
-
- default:
- throw new IllegalStateException(String.format("Unknown state %s.", upstream));
- }
- }
- }
- }
-
- /**
- *
- * Summarize the inference in the four different situations:
- * First of all, the field names are always determined by the left side
- * (Even when the left side is from an empty file, we have the column names.)
- *
- * Cases:
- * 1. Left: non-empty; Right: non-empty
- * types determined by both sides with implicit casting involved
- * 2. Left: empty; Right: non-empty
- * type from the right
- * 3. Left: non-empty; Right: empty
- * types from the left
- * 4. Left: empty; Right: empty
- * types are nullable integer
- */
- private void inferOutputFields() {
- if (!leftIsFinish && !rightIsFinish) {
- // Both sides are non-empty
- inferOutputFieldsBothSide();
- } else if (!rightIsFinish) {
- // Left side is non-empty
- // While use left side's column names as output column names,
- // use right side's column types as output column types.
- inferOutputFieldsFromSingleSide(
- leftSide.getRecordBatch().getSchema(),
- rightSide.getRecordBatch().getSchema());
- } else {
- // Either right side is empty or both are empty
- // Using left side's schema is sufficient
- inferOutputFieldsFromSingleSide(
- leftSide.getRecordBatch().getSchema(),
- leftSide.getRecordBatch().getSchema());
- }
- }
-
- // The output table's column names always follow the left table,
- // where the output type is chosen based on DRILL's implicit casting rules
- private void inferOutputFieldsBothSide() {
- outputFields = Lists.newArrayList();
- leftSchema = leftSide.getRecordBatch().getSchema();
- rightSchema = rightSide.getRecordBatch().getSchema();
- Iterator<MaterializedField> leftIter = leftSchema.iterator();
- Iterator<MaterializedField> rightIter = rightSchema.iterator();
-
- int index = 1;
- while (leftIter.hasNext() && rightIter.hasNext()) {
- MaterializedField leftField = leftIter.next();
- MaterializedField rightField = rightIter.next();
-
- if (hasSameTypeAndMode(leftField, rightField)) {
- MajorType.Builder builder = MajorType.newBuilder().setMinorType(leftField.getType().getMinorType()).setMode(leftField.getDataMode());
+ // If the output type is not the same,
+ // cast the column of one of the table to a data type which is the Least Restrictive
+ TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder();
+ if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
+ builder.setMinorType(leftField.getType().getMinorType());
builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
- outputFields.add(MaterializedField.create(leftField.getName(), builder.build()));
} else {
- // If the output type is not the same,
- // cast the column of one of the table to a data type which is the Least Restrictive
- MajorType.Builder builder = MajorType.newBuilder();
- if (leftField.getType().getMinorType() == rightField.getType().getMinorType()) {
- builder.setMinorType(leftField.getType().getMinorType());
- builder = Types.calculateTypePrecisionAndScale(leftField.getType(), rightField.getType(), builder);
- } else {
- List<MinorType> types = Lists.newLinkedList();
- types.add(leftField.getType().getMinorType());
- types.add(rightField.getType().getMinorType());
- MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
- if (outputMinorType == null) {
- throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString() +
- " on the left side and " + rightField.getType().getMinorType().toString() +
- " on the right side in column " + index + " of UNION ALL");
- }
- builder.setMinorType(outputMinorType);
+ List<TypeProtos.MinorType> types = Lists.newLinkedList();
+ types.add(leftField.getType().getMinorType());
+ types.add(rightField.getType().getMinorType());
+ TypeProtos.MinorType outputMinorType = TypeCastRules.getLeastRestrictiveType(types);
+ if (outputMinorType == null) {
+ throw new DrillRuntimeException("Type mismatch between " + leftField.getType().getMinorType().toString() +
+ " on the left side and " + rightField.getType().getMinorType().toString() +
+ " on the right side in column " + index + " of UNION ALL");
}
-
- // The output data mode should be as flexible as the more flexible one from the two input tables
- List<DataMode> dataModes = Lists.newLinkedList();
- dataModes.add(leftField.getType().getMode());
- dataModes.add(rightField.getType().getMode());
- builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
-
- outputFields.add(MaterializedField.create(leftField.getName(), builder.build()));
+ builder.setMinorType(outputMinorType);
}
- ++index;
- }
- assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
- }
-
- private void inferOutputFieldsFromSingleSide(final BatchSchema schemaForNames, final BatchSchema schemaForTypes) {
- outputFields = Lists.newArrayList();
+ // The output data mode should be as flexible as the more flexible one from the two input tables
+ List<TypeProtos.DataMode> dataModes = Lists.newLinkedList();
+ dataModes.add(leftField.getType().getMode());
+ dataModes.add(rightField.getType().getMode());
+ builder.setMode(TypeCastRules.getLeastRestrictiveDataMode(dataModes));
- final List<String> outputColumnNames = Lists.newArrayList();
- for (MaterializedField materializedField : schemaForNames) {
- outputColumnNames.add(materializedField.getName());
- }
-
- final Iterator<MaterializedField> iterForTypes = schemaForTypes.iterator();
- for (int i = 0; iterForTypes.hasNext(); ++i) {
- MaterializedField field = iterForTypes.next();
- outputFields.add(MaterializedField.create(outputColumnNames.get(i), field.getType()));
+ container.addOrGet(MaterializedField.create(leftField.getName(), builder.build()), callBack);
}
+ ++index;
}
- public List<MaterializedField> getOutputFields() {
- if (outputFields == null) {
- throw new NullPointerException("Output fields have not been inferred");
- }
-
- return outputFields;
- }
+ assert !leftIter.hasNext() && ! rightIter.hasNext() : "Mis-match of column count should have been detected when validating sqlNode at planning";
+ }
- public void killIncoming(boolean sendUpstream) {
- leftSide.getRecordBatch().kill(sendUpstream);
- rightSide.getRecordBatch().kill(sendUpstream);
+ private void inferOutputFieldsOneSide(final BatchSchema schema) {
+ for (MaterializedField field : schema) {
+ container.addOrGet(field, callBack);
}
+ }
- public RecordBatch getLeftRecordBatch() {
- return leftSide.getRecordBatch();
- }
+ private static boolean hasSameTypeAndMode(MaterializedField leftField, MaterializedField rightField) {
+ return (leftField.getType().getMinorType() == rightField.getType().getMinorType())
+ && (leftField.getType().getMode() == rightField.getType().getMode());
+ }
- public RecordBatch getRightRecordBatch() {
- return rightSide.getRecordBatch();
+ private class BatchStatusWrappper {
+ boolean prefetched;
+ final RecordBatch batch;
+ final int inputIndex;
+ final IterOutcome outcome;
+
+ BatchStatusWrappper(boolean prefetched, IterOutcome outcome, RecordBatch batch, int inputIndex) {
+ this.prefetched = prefetched;
+ this.outcome = outcome;
+ this.batch = batch;
+ this.inputIndex = inputIndex;
}
+ }
- private class OneSideInput {
- private IterOutcome upstream = IterOutcome.NOT_YET;
- private RecordBatch recordBatch;
+ private class UnionInputIterator implements Iterator<Pair<IterOutcome, RecordBatch>> {
+ private Stack<BatchStatusWrappper> batchStatusStack = new Stack<>();
- public OneSideInput(RecordBatch recordBatch) {
- this.recordBatch = recordBatch;
+ UnionInputIterator(IterOutcome leftOutCome, RecordBatch left, IterOutcome rightOutCome, RecordBatch right) {
+ if (rightOutCome == IterOutcome.OK_NEW_SCHEMA) {
+ batchStatusStack.push(new BatchStatusWrappper(true, IterOutcome.OK_NEW_SCHEMA, right, 1));
}
- public RecordBatch getRecordBatch() {
- return recordBatch;
+ if (leftOutCome == IterOutcome.OK_NEW_SCHEMA) {
+ batchStatusStack.push(new BatchStatusWrappper(true, IterOutcome.OK_NEW_SCHEMA, left, 0));
}
+ }
- public IterOutcome nextBatch() {
- if (upstream == IterOutcome.NONE) {
- throw new IllegalStateException(String.format("Unknown state %s.", upstream));
- }
+ @Override
+ public boolean hasNext() {
+ return ! batchStatusStack.isEmpty();
+ }
- if (upstream == IterOutcome.NOT_YET) {
- upstream = unionAllRecordBatch.next(recordBatch);
+ @Override
+ public Pair<IterOutcome, RecordBatch> next() {
+ while (!batchStatusStack.isEmpty()) {
+ BatchStatusWrappper topStatus = batchStatusStack.peek();
- return upstream;
+ if (topStatus.prefetched) {
+ topStatus.prefetched = false;
+ return Pair.of(topStatus.outcome, topStatus.batch);
} else {
- do {
- upstream = unionAllRecordBatch.next(recordBatch);
- } while (upstream == IterOutcome.OK && recordBatch.getRecordCount() == 0);
-
- return upstream;
+ IterOutcome outcome = UnionAllRecordBatch.this.next(topStatus.inputIndex, topStatus.batch);
+ switch (outcome) {
+ case OK:
+ case OK_NEW_SCHEMA:
+ return Pair.of(outcome, topStatus.batch);
+ case OUT_OF_MEMORY:
+ case STOP:
+ batchStatusStack.pop();
+ return Pair.of(outcome, topStatus.batch);
+ case NONE:
+ batchStatusStack.pop();
+ if (batchStatusStack.isEmpty()) {
+ return Pair.of(IterOutcome.NONE, null);
+ }
+ break;
+ default:
+ throw new IllegalStateException(String.format("Unexpected state %s", outcome));
+ }
}
}
+
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
}
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 2be1ed528..a8ee0dee3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -249,14 +249,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
// OK doesn't change high-level state.
break;
case NONE:
- // NONE is allowed as long as OK_NEW_SCHEMA was seen, except if
- // already terminated (checked above).
- if (validationState != ValidationState.HAVE_SCHEMA) {
- throw new IllegalStateException(
- String.format(
- "next() returned %s without first returning %s [#%d, %s]",
- batchState, OK_NEW_SCHEMA, instNum, batchTypeName));
- }
+ // NONE is allowed even without seeing a OK_NEW_SCHEMA. Such NONE is called
+ // FAST NONE.
// NONE moves to terminal high-level state.
validationState = ValidationState.TERMINAL;
break;
@@ -306,12 +300,8 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
"Incoming batch [#%d, %s] has a null schema. This is not allowed.",
instNum, batchTypeName));
}
- if (lastSchema.getFieldCount() == 0) {
- throw new IllegalStateException(
- String.format(
- "Incoming batch [#%d, %s] has an empty schema. This is not allowed.",
- instNum, batchTypeName));
- }
+ // It's legal for a batch to have zero field. For instance, a relational table could have
+ // zero columns. Querying such table requires execution operator to process batch with 0 field.
if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
throw new IllegalStateException(
String.format(
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index 2298df596..a8eddbc62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -39,6 +39,6 @@ public class ValuesBatchCreator implements BatchCreator<Values> {
assert children.isEmpty();
JSONRecordReader reader = new JSONRecordReader(context, config.getContent().asNode(), null, Collections.singletonList(SchemaPath.getSimplePath("*")));
- return new ScanBatch(config, context, Iterators.singletonIterator((RecordReader) reader));
+ return new ScanBatch(config, context, Collections.singletonList((RecordReader) reader));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 7e4483bcf..df80a10fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -42,8 +42,7 @@ import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.type.RelDataType;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
+import org.apache.drill.exec.util.Utilities;
/**
* GroupScan of a Drill table.
@@ -160,12 +159,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
final ScanStats stats = groupScan.getScanStats(settings);
int columnCount = getRowType().getFieldCount();
double ioCost = 0;
- boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new Predicate<String>() {
- @Override
- public boolean apply(String input) {
- return Preconditions.checkNotNull(input).equals("*");
- }
- }).isPresent();
+ boolean isStarQuery = Utilities.isStarQuery(columns);
if (isStarQuery) {
columnCount = STAR_COLUMN_COST;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 25cd71737..d974badda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -35,18 +35,45 @@ import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
+/**
+ * A physical Prel node for Project operator.
+ */
public class ProjectPrel extends DrillProjectRelBase implements Prel{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectPrel.class);
+ private final boolean outputProj;
public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
RelDataType rowType) {
+ this(cluster, traits, child, exps, rowType, false);
+ }
+
+ /**
+ * Constructor for ProjectPrel.
+ * @param cluster
+ * @param traits traits of ProjectPrel node
+ * @param child input
+ * @param exps list of RexNode, representing expressions of projection.
+ * @param rowType output rowType of projection expression.
+ * @param outputProj true if ProjectPrel is inserted by {@link org.apache.drill.exec.planner.physical.visitor.TopProjectVisitor}
+ * Such top Project operator does the following processing, before the result was presented to Screen/Writer
+ * <ol>
+ * <li>ensure final output field names are preserved</li>
+ * <li>handle cases where input does not return any batch (a fast NONE) (see ProjectRecordBatch.handleNullInput() method)</li>
+ * <li>handle cases where expressions in upstream operator were evaluated to NULL type </li>
+ * (Null type will be converted into Nullable-INT)
+ * </ol>
+ * false otherwise.
+ */
+ public ProjectPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, List<RexNode> exps,
+ RelDataType rowType, boolean outputProj) {
super(DRILL_PHYSICAL, cluster, traits, child, exps, rowType);
+ this.outputProj = outputProj;
}
@Override
public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> exps, RelDataType rowType) {
- return new ProjectPrel(getCluster(), traitSet, input, exps, rowType);
+ return new ProjectPrel(getCluster(), traitSet, input, exps, rowType, this.outputProj);
}
@@ -57,7 +84,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
org.apache.drill.exec.physical.config.Project p = new org.apache.drill.exec.physical.config.Project(
- this.getProjectExpressions(new DrillParseContext(PrelUtil.getSettings(getCluster()))), childPOP);
+ this.getProjectExpressions(new DrillParseContext(PrelUtil.getSettings(getCluster()))), childPOP, outputProj);
return creator.addMetadata(this, p);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
index 587b00623..08bd9e719 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/TopProjectVisitor.java
@@ -133,9 +133,23 @@ public class TopProjectVisitor extends BasePrelVisitor<Prel, Void, RuntimeExcept
prel.getCluster().getTypeFactory().getTypeSystem().isSchemaCaseSensitive());
RelDataType newRowType = RexUtil.createStructType(prel.getCluster().getTypeFactory(), projections, fieldNames, null);
- ProjectPrel topProject = new ProjectPrel(prel.getCluster(), prel.getTraitSet(), prel, projections, newRowType);
-
- return prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true) ? prel : topProject;
+ ProjectPrel topProject = new ProjectPrel(prel.getCluster(),
+ prel.getTraitSet(),
+ prel,
+ projections,
+ newRowType,
+ true); //outputProj = true : NONE -> OK_NEW_SCHEMA, also handle expression with NULL type.
+
+ if (prel instanceof Project && DrillRelOptUtil.isTrivialProject(topProject, true)) {
+ return new ProjectPrel(prel.getCluster(),
+ prel.getTraitSet(),
+ ((Project) prel).getInput(),
+ ((Project) prel).getProjects(),
+ prel.getRowType(),
+ true); //outputProj = true : NONE -> OK_NEW_SCHEMA, also handle expression with NULL type.
+ } else {
+ return topProject;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
new file mode 100644
index 000000000..1137922ae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
+ protected final RecordBatch left;
+ protected final RecordBatch right;
+
+ // state (IterOutcome) of the left input
+ protected IterOutcome leftUpstream = IterOutcome.NONE;
+
+ // state (IterOutcome) of the right input
+ protected IterOutcome rightUpstream = IterOutcome.NONE;
+
+ protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, RecordBatch left,
+ RecordBatch right) throws OutOfMemoryException {
+ super(popConfig, context, true, context.newOperatorContext(popConfig));
+ this.left = left;
+ this.right = right;
+ }
+
+ protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, RecordBatch left,
+ RecordBatch right) throws OutOfMemoryException {
+ super(popConfig, context, buildSchema);
+ this.left = left;
+ this.right = right;
+ }
+
+ /**
+ * Prefetch first batch from both inputs.
+ * @return true if caller should continue processing
+ * false if caller should stop and exit from processing.
+ */
+ protected boolean prefetchFirstBatchFromBothSides() {
+ leftUpstream = next(0, left);
+ rightUpstream = next(1, right);
+
+ if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) {
+ state = BatchState.STOP;
+ return false;
+ }
+
+ if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) {
+ state = BatchState.OUT_OF_MEMORY;
+ return false;
+ }
+
+ if (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE) {
+ state = BatchState.DONE;
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 65d164dfe..4a9828c63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -61,6 +61,10 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
}
switch (upstream) {
case NONE:
+ if (state == BatchState.FIRST) {
+ return handleNullInput();
+ }
+ return upstream;
case NOT_YET:
case STOP:
if (state == BatchState.FIRST) {
@@ -125,4 +129,26 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
protected abstract boolean setupNewSchema() throws SchemaChangeException;
protected abstract IterOutcome doWork();
+
+ /**
+ * Default behavior to handle NULL input (aka FAST NONE): incoming return NONE before return a OK_NEW_SCHEMA:
+ * This could happen when the underneath Scan operators do not produce any batch with schema.
+ *
+ * <p>
+ * Notice that NULL input is different from input with an empty batch. In the later case, input provides
+ * at least a batch, thought it's empty.
+ *</p>
+ *
+ * <p>
+ * This behavior could be override in each individual operator, if the operator's semantics is to
+ * inject a batch with schema.
+ *</p>
+ *
+ * @return IterOutcome.NONE.
+ */
+ protected IterOutcome handleNullInput() {
+ container.buildSchema(SelectionVectorMode.NONE);
+ return IterOutcome.NONE;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
new file mode 100644
index 000000000..9bcea5073
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleRecordBatch.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.record;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.util.Iterator;
+
+/**
+ * Wrap a VectorContainer into a record batch.
+ */
+public class SimpleRecordBatch implements RecordBatch {
+ private VectorContainer container;
+ private FragmentContext context;
+
+ public SimpleRecordBatch(VectorContainer container, FragmentContext context) {
+ this.container = container;
+ this.context = context;
+ }
+
+ @Override
+ public FragmentContext getContext() {
+ return context;
+ }
+
+ @Override
+ public BatchSchema getSchema() {
+ return container.getSchema();
+ }
+
+ @Override
+ public int getRecordCount() {
+ return container.getRecordCount();
+ }
+
+ @Override
+ public void kill(boolean sendUpstream) {
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVectorId(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return container.getValueAccessorById(clazz, ids);
+ }
+
+ @Override
+ public IterOutcome next() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return container.iterator();
+ }
+
+ @Override
+ public VectorContainer getOutgoingContainer() {
+ throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 215202563..3a95d25b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -26,19 +26,14 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
public abstract class AbstractRecordReader implements RecordReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractRecordReader.class);
- private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
- public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
-
// For text reader, the default columns to read is "columns[0]".
protected static final List<SchemaPath> DEFAULT_TEXT_COLS_TO_READ = ImmutableList.of(new SchemaPath(new PathSegment.NameSegment("columns", new PathSegment.ArraySegment(0))));
@@ -62,7 +57,7 @@ public abstract class AbstractRecordReader implements RecordReader {
* 2) NULL : is NOT allowed. It requires the planner's rule, or GroupScan or ScanBatchCreator to handle NULL.
*/
protected final void setColumns(Collection<SchemaPath> projected) {
- Preconditions.checkNotNull(projected, COL_NULL_ERROR);
+ Preconditions.checkNotNull(projected, Utilities.COL_NULL_ERROR);
isSkipQuery = projected.isEmpty();
Collection<SchemaPath> columnsToRead = projected;
@@ -73,7 +68,7 @@ public abstract class AbstractRecordReader implements RecordReader {
columnsToRead = getDefaultColumnsToRead();
}
- isStarQuery = isStarQuery(columnsToRead);
+ isStarQuery = Utilities.isStarQuery(columnsToRead);
columns = transformColumns(columnsToRead);
logger.debug("columns to read : {}", columns);
@@ -99,15 +94,6 @@ public abstract class AbstractRecordReader implements RecordReader {
return isSkipQuery;
}
- public static boolean isStarQuery(Collection<SchemaPath> projected) {
- return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
- @Override
- public boolean apply(SchemaPath path) {
- return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
- }
- }).isPresent();
- }
-
@Override
public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
for (final ValueVector v : vectorMap.values()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index fa8121e33..4b71b0fe9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.util.Utilities;
import org.apache.hadoop.fs.Path;
import java.util.List;
@@ -63,7 +64,7 @@ public class ColumnExplorer {
public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) {
this.partitionDesignator = optionManager.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
this.columns = columns;
- this.isStarQuery = columns != null && AbstractRecordReader.isStarQuery(columns);
+ this.isStarQuery = columns != null && Utilities.isStarQuery(columns);
this.selectedPartitionColumns = Lists.newArrayList();
this.tableColumns = Lists.newArrayList();
this.allImplicitColumns = initImplicitFileColumns(optionManager);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 1f7bce937..f81f74e94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
}
- return new ScanBatch(scan, context, oContext, readers.iterator(), implicitColumns);
+ return new ScanBatch(scan, context, oContext, readers, implicitColumns);
}
public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index d59cda21b..8442c3274 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -32,6 +32,6 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
@Override
public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
- return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator());
+ return new ScanBatch(config, context, Collections.singletonList(config.getReader()));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index ceb1deb2f..c406bb35c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -229,7 +229,11 @@ public class JSONRecordReader extends AbstractRecordReader {
handleAndRaise("Error parsing JSON", ex);
}
}
- jsonReader.ensureAtLeastOneField(writer);
+ // Skip empty json file with 0 row.
+ // Only when data source has > 0 row, ensure the batch has one field.
+ if (recordCount > 0) {
+ jsonReader.ensureAtLeastOneField(writer);
+ }
writer.setValueCount(recordCount);
updateRunningCount();
return recordCount;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index 199119d92..60581a7f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -34,6 +34,6 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
RecordReader rr = config.getTable().getRecordReader(context.getRootSchema(), config.getFilter(), context.getOptions());
- return new ScanBatch(config, context, Collections.singleton(rr).iterator());
+ return new ScanBatch(config, context, Collections.singletonList(rr));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 9a7563add..8f89effa8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -47,6 +47,6 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {
readers.add(new MockRecordReader(context, e));
}
}
- return new ScanBatch(config, context, readers.iterator());
+ return new ScanBatch(config, context, readers);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 4a8c5f343..5ac10e686 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -29,11 +29,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.util.DrillVersionInfo;
-import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.exec.util.Utilities;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -429,7 +429,7 @@ public class Metadata {
List<RowGroupMetadata_v3> rowGroupMetadataList = Lists.newArrayList();
ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
- ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
+ ALL_COLS.add(Utilities.STAR_COLUMN);
boolean autoCorrectCorruptDates = formatConfig.areCorruptDatesAutoCorrected();
ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, autoCorrectCorruptDates);
if (logger.isDebugEnabled()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 78e965577..84e969a8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -23,7 +23,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.ExecErrorConstants;
import org.apache.parquet.SemanticVersion;
import org.apache.parquet.VersionParser;
@@ -281,7 +281,7 @@ public class ParquetReaderUtility {
// this reader only supports flat data, this is restricted in the ParquetScanBatchCreator
// creating a NameSegment makes sure we are using the standard code for comparing names,
// currently it is all case-insensitive
- if (AbstractRecordReader.isStarQuery(columns)
+ if (Utilities.isStarQuery(columns)
|| new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
int colIndex = -1;
ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 21fc4ef9d..60179482f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -153,7 +153,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
map.putAll(Maps.difference(map, diff).entriesOnlyOnRight());
}
- return new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), implicitColumns);
+ return new ScanBatch(rowGroupScan, context, oContext, readers, implicitColumns);
}
private static boolean isComplex(ParquetMetadata footer) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
index 9814b537f..10187b7a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
@@ -225,7 +226,7 @@ public class ParquetSchema {
for (int i = 0; i < columnsFound.length; i++) {
SchemaPath col = projectedColumns.get(i);
assert col != null;
- if ( ! columnsFound[i] && ! col.equals(ParquetRecordReader.STAR_COLUMN)) {
+ if ( ! columnsFound[i] && ! col.equals(Utilities.STAR_COLUMN)) {
nullFilledVectors.add(createMissingColumn(col, output));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index 2b0ef3f78..ab87a4ad2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -46,6 +46,6 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
final Iterator<Object> iterator = table.getIterator(context);
final RecordReader reader = new PojoRecordReader(table.getPojoClass(), ImmutableList.copyOf(iterator));
- return new ScanBatch(scan, context, Collections.singleton(reader).iterator());
+ return new ScanBatch(scan, context, Collections.singletonList(reader));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
index 6ee31604b..35358c25f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/Utilities.java
@@ -17,14 +17,23 @@
*/
package org.apache.drill.exec.util;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import java.util.Collection;
+
public class Utilities {
+ public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
+ public static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
+
public static String getFileNameForQueryFragment(FragmentContext context, String location, String tag) {
/*
* From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
@@ -68,4 +77,18 @@ public class Utilities {
String v = Utilities.class.getPackage().getImplementationVersion();
return v;
}
+
+ /**
+ * Return true if list of schema path has star column.
+ * @param projected
+ * @return
+ */
+ public static boolean isStarQuery(Collection<SchemaPath> projected) {
+ return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
+ @Override
+ public boolean apply(SchemaPath path) {
+ return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
+ }
+ }).isPresent();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
index d836bfc0b..4de4c2ad4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.record.VectorWrapper;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
@@ -178,9 +180,20 @@ public class VectorUtil {
}
}
+ public static void allocateVectors(Iterable<ValueVector> valueVectors, int count) {
+ for (final ValueVector v : valueVectors) {
+ AllocationHelper.allocateNew(v, count);
+ }
+ }
+
+ public static void setValueCount(Iterable<ValueVector> valueVectors, int count) {
+ for (final ValueVector v : valueVectors) {
+ v.getMutator().setValueCount(count);
+ }
+ }
+
private static int getColumnWidth(int[] columnWidths, int columnIndex) {
return (columnWidths == null) ? DEFAULT_COLUMN_WIDTH
: (columnWidths.length > columnIndex) ? columnWidths[columnIndex] : columnWidths[0];
}
-
}