aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec
diff options
context:
space:
mode:
authorSteven Phillips <smp@apache.org>2015-07-08 17:35:09 -0700
committerSteven Phillips <smp@apache.org>2015-07-28 18:13:22 -0700
commit496f14669b485d5cd51b1f2a742b90de794190a9 (patch)
treece4fc8534dd02c50364d13fb2f332df019b2308c /exec/java-exec/src/main/java/org/apache/drill/exec
parent5e33a286a3dad4b44c00dbae7fa40be678742b14 (diff)
DRILL-3353: Fix dropping nested fields
Use the SchemaChangeCallBack in more places to track schema changes Reset the ephemeral transfer pair when making a new transfer pair for Map or RepeatedMap
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java7
12 files changed, 61 insertions, 20 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index 0fe79d90c..e109ec07f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -21,6 +21,7 @@ import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.vector.ValueVector;
/**
@@ -61,4 +62,10 @@ public interface OutputMutator {
* @return A DrillBuf that will be released at the end of the current query (and can be resized as desired during use).
*/
public DrillBuf getManagedBuffer();
+
+ /**
+ *
+ * @return the CallBack object for this mutator
+ */
+ public CallBack getCallBack();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4b91e1fcd..873ae76b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
+import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -220,6 +221,11 @@ public class ScanBatch implements CloseableRecordBatch {
hasReadNonEmptyFile = true;
populatePartitionVectors();
+ for (VectorWrapper w : container) {
+ w.getValueVector().getMutator().setValueCount(recordCount);
+ }
+
+
// this is a slight misuse of this metric but it will allow Readers to report how many records they generated.
final boolean isNewSchema = mutator.isNewSchema();
oContext.getStats().batchReceived(0, getRecordCount(), isNewSchema);
@@ -354,6 +360,11 @@ public class ScanBatch implements CloseableRecordBatch {
public DrillBuf getManagedBuffer() {
return oContext.getManagedBuffer();
}
+
+ @Override
+ public CallBack getCallBack() {
+ return callBack;
+ }
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 516b0282f..10f1d7fbf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -276,7 +276,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
if (copier == null) {
- copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch);
+ copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
} else {
for (VectorWrapper<?> i : batch) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 5eee9dfe5..c1d78c374 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -193,7 +193,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
cg.addExpr(new ReturnValueExpression(expr));
for (VectorWrapper<?> v : incoming) {
- TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField()));
+ TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
transfers.add(pair);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index d9330ea75..4ea5a5cc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -63,7 +63,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
for(VectorWrapper<?> v : incoming){
- TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField()));
+ TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack));
transfers.add(pair);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index b6e5dc0ba..5b5c90d47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -325,7 +325,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
continue;
}
final FieldReference ref = new FieldReference(name);
- final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()));
+ final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
}
@@ -399,7 +399,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
Preconditions.checkNotNull(incoming);
final FieldReference ref = getRef(namedExpression);
- final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()));
+ final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()), callBack);
final TransferPair tp = vvIn.makeTransferPair(vvOut);
transfers.add(tp);
transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 57e7b55d8..b5b1b0afe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -34,7 +34,9 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.vector.CopyUtil;
+import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.base.Preconditions;
@@ -194,7 +196,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
@Override
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing){
for(VectorWrapper<?> vv : incoming){
- TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField()));
+ TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
pairs.add(tp);
}
}
@@ -220,7 +222,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
for(VectorWrapper<?> vv : incoming){
- TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField()));
+ TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack));
}
try {
@@ -237,14 +239,14 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
private Copier getGenerated4Copier() throws SchemaChangeException {
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
- return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this);
+ return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this, callBack);
}
- public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
+ public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{
for(VectorWrapper<?> vv : batch){
ValueVector v = vv.getValueVectors()[0];
- v.makeTransferPair(container.addOrGet(v.getField()));
+ v.makeTransferPair(container.addOrGet(v.getField(), callBack));
}
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 12b15a9fa..f118535bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -143,9 +143,22 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
enableDictionary = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
}
+ private boolean containsComplexVectors(BatchSchema schema) {
+ for (MaterializedField field : schema) {
+ MinorType type = field.getType().getMinorType();
+ switch (type) {
+ case MAP:
+ case LIST:
+ return true;
+ default:
+ }
+ }
+ return false;
+ }
+
@Override
public void updateSchema(VectorAccessible batch) throws IOException {
- if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema())) {
+ if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema()) || containsComplexVectors(this.batchSchema)) {
if (this.batchSchema != null) {
flush();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 1df4b81e6..efba46dcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -41,9 +41,10 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
private final MapWithOrdinal<String, ValueVector> vectors = new MapWithOrdinal<>();
protected AbstractMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
- super(field, allocator, callBack);
+ super(field.clone(), allocator, callBack);
+ MaterializedField clonedField = field.clone();
// create the hierarchy of the child vectors based on the materialized field
- for (MaterializedField child : field.getChildren()) {
+ for (MaterializedField child : clonedField.getChildren()) {
if (!child.equals(BaseRepeatedValueVector.OFFSETS_FIELD)) {
String fieldName = child.getLastName();
ValueVector v = TypeHelper.getNewVector(child, allocator, callBack);
@@ -116,7 +117,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
create = true;
}
if (create) {
- final T vector = (T) TypeHelper.getNewVector(field.getPath(), name, allocator, type);
+ final T vector = (T) TypeHelper.getNewVector(field.getPath(), name, allocator, type, callBack);
putChild(name, vector);
if (callBack!=null) {
callBack.doWork();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 3032aacad..1e30ea211 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -161,6 +161,8 @@ public class MapVector extends AbstractMapVector {
this.from = from;
this.to = to;
this.pairs = new TransferPair[from.size()];
+ this.to.ephPair = null;
+ this.to.ephPair2 = null;
int i = 0;
ValueVector vector;
@@ -294,9 +296,12 @@ public class MapVector extends AbstractMapVector {
public Object getObject(int index) {
Map<String, Object> vv = new JsonStringHashMap();
for (String child:getChildFieldNames()) {
- Object value = getChild(child).getAccessor().getObject(index);
- if (value != null) {
- vv.put(child, value);
+ ValueVector v = getChild(child);
+ if (v != null) {
+ Object value = v.getAccessor().getObject(index);
+ if (value != null) {
+ vv.put(child, value);
+ }
}
}
return vv;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 97f5b3966..644e5db67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -326,6 +326,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu
this.from = from;
this.to = to;
this.pairs = new TransferPair[from.size()];
+ this.to.ephPair = null;
int i = 0;
ValueVector vector;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index 6b6ab46a1..5aea0ca50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -35,7 +36,7 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple
public VectorContainerWriter(OutputMutator mutator) {
super(null);
this.mutator = mutator;
- this.mapVector = new SpecialMapVector();
+ this.mapVector = new SpecialMapVector(mutator.getCallBack());
this.mapRoot = new SingleMapWriter(mapVector, this);
}
@@ -81,8 +82,8 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple
private class SpecialMapVector extends MapVector {
- public SpecialMapVector() {
- super("", null, null);
+ public SpecialMapVector(CallBack callback) {
+ super("", null, callback);
}
@Override