diff options
author | Steven Phillips <smp@apache.org> | 2015-07-08 17:35:09 -0700 |
---|---|---|
committer | Steven Phillips <smp@apache.org> | 2015-07-28 18:13:22 -0700 |
commit | 496f14669b485d5cd51b1f2a742b90de794190a9 (patch) | |
tree | ce4fc8534dd02c50364d13fb2f332df019b2308c /exec/java-exec/src/main/java/org/apache/drill/exec | |
parent | 5e33a286a3dad4b44c00dbae7fa40be678742b14 (diff) |
DRILL-3353: Fix dropping nested fields
Use the SchemaChangeCallBack in more places to track schema changes
Reset the ephemeral transfer pair when making a new transfer pair for Map or RepeatedMap
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec')
12 files changed, 61 insertions, 20 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java index 0fe79d90c..e109ec07f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java @@ -21,6 +21,7 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.vector.ValueVector; /** @@ -61,4 +62,10 @@ public interface OutputMutator { * @return A DrillBuf that will be released at the end of the current query (and can be resized as desired during use). */ public DrillBuf getManagedBuffer(); + + /** + * + * @return the CallBack object for this mutator + */ + public CallBack getCallBack(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 4b91e1fcd..873ae76b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -51,6 +51,7 @@ import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.SchemaChangeCallBack; @@ -220,6 +221,11 @@ public class ScanBatch implements CloseableRecordBatch { hasReadNonEmptyFile = true; populatePartitionVectors(); + for (VectorWrapper w : container) { + w.getValueVector().getMutator().setValueCount(recordCount); + } + + // this is a slight misuse of this metric but it will allow Readers to report how many records they generated. final boolean isNewSchema = mutator.isNewSchema(); oContext.getStats().batchReceived(0, getRecordCount(), isNewSchema); @@ -354,6 +360,11 @@ public class ScanBatch implements CloseableRecordBatch { public DrillBuf getManagedBuffer() { return oContext.getManagedBuffer(); } + + @Override + public CallBack getCallBack() { + return callBack; + } } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 516b0282f..10f1d7fbf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -276,7 +276,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context); SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context); if (copier == null) { - copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch); + copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null); } else { for (VectorWrapper<?> i : batch) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 5eee9dfe5..c1d78c374 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -193,7 +193,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ cg.addExpr(new ReturnValueExpression(expr)); for (VectorWrapper<?> v : incoming) { - TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField())); + TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack)); transfers.add(pair); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index d9330ea75..4ea5a5cc6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -63,7 +63,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { for(VectorWrapper<?> v : incoming){ - TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField())); + TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField(), callBack)); transfers.add(pair); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index b6e5dc0ba..5b5c90d47 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -325,7 +325,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { continue; } final FieldReference ref = new FieldReference(name); - final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType())); + final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType()), callBack); final TransferPair tp = vvIn.makeTransferPair(vvOut); transfers.add(tp); } @@ -399,7 +399,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { Preconditions.checkNotNull(incoming); final FieldReference ref = getRef(namedExpression); - final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType())); + final ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType()), callBack); final TransferPair tp = vvIn.makeTransferPair(vvOut); transfers.add(tp); transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 57e7b55d8..b5b1b0afe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -34,7 +34,9 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.vector.CopyUtil; +import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; import com.google.common.base.Preconditions; @@ -194,7 +196,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect @Override public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing){ for(VectorWrapper<?> vv : incoming){ - TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField())); + TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); pairs.add(tp); } } @@ -220,7 +222,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); for(VectorWrapper<?> vv : incoming){ - TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField())); + TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); } try { @@ -237,14 +239,14 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private Copier getGenerated4Copier() throws SchemaChangeException { Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); - return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this); + return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this, callBack); } - public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{ + public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{ for(VectorWrapper<?> vv : batch){ ValueVector v = vv.getValueVectors()[0]; - v.makeTransferPair(container.addOrGet(v.getField())); + v.makeTransferPair(container.addOrGet(v.getField(), callBack)); } try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 12b15a9fa..f118535bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -143,9 +143,22 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { enableDictionary = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); } + private boolean containsComplexVectors(BatchSchema schema) { + for (MaterializedField field : schema) { + MinorType type = field.getType().getMinorType(); + switch (type) { + case MAP: + case LIST: + return true; + default: + } + } + return false; + } + @Override public void updateSchema(VectorAccessible batch) throws IOException { - if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema())) { + if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema()) || containsComplexVectors(this.batchSchema)) { if (this.batchSchema != null) { flush(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java index 1df4b81e6..efba46dcb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java @@ -41,9 +41,10 @@ public abstract class AbstractMapVector extends AbstractContainerVector { private final MapWithOrdinal<String, ValueVector> vectors = new MapWithOrdinal<>(); protected AbstractMapVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) { - super(field, allocator, callBack); + super(field.clone(), allocator, callBack); + MaterializedField clonedField = field.clone(); // create the hierarchy of the child vectors based on the materialized field - for (MaterializedField child : field.getChildren()) { + for (MaterializedField child : clonedField.getChildren()) { if (!child.equals(BaseRepeatedValueVector.OFFSETS_FIELD)) { String fieldName = child.getLastName(); ValueVector v = TypeHelper.getNewVector(child, allocator, callBack); @@ -116,7 +117,7 @@ public abstract class AbstractMapVector extends AbstractContainerVector { create = true; } if (create) { - final T vector = (T) TypeHelper.getNewVector(field.getPath(), name, allocator, type); + final T vector = (T) TypeHelper.getNewVector(field.getPath(), name, allocator, type, callBack); putChild(name, vector); if (callBack!=null) { callBack.doWork(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java index 3032aacad..1e30ea211 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -161,6 +161,8 @@ public class MapVector extends AbstractMapVector { this.from = from; this.to = to; this.pairs = new TransferPair[from.size()]; + this.to.ephPair = null; + this.to.ephPair2 = null; int i = 0; ValueVector vector; @@ -294,9 +296,12 @@ public class MapVector extends AbstractMapVector { public Object getObject(int index) { Map<String, Object> vv = new JsonStringHashMap(); for (String child:getChildFieldNames()) { - Object value = getChild(child).getAccessor().getObject(index); - if (value != null) { - vv.put(child, value); + ValueVector v = getChild(child); + if (v != null) { + Object value = v.getAccessor().getObject(index); + if (value != null) { + vv.put(child, value); + } } } return vv; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index 97f5b3966..644e5db67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -326,6 +326,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedValu this.from = from; this.to = to; this.pairs = new TransferPair[from.size()]; + this.to.ephPair = null; int i = 0; ValueVector vector; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java index 6b6ab46a1..5aea0ca50 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java @@ -21,6 +21,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; @@ -35,7 +36,7 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple public VectorContainerWriter(OutputMutator mutator) { super(null); this.mutator = mutator; - this.mapVector = new SpecialMapVector(); + this.mapVector = new SpecialMapVector(mutator.getCallBack()); this.mapRoot = new SingleMapWriter(mapVector, this); } @@ -81,8 +82,8 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple private class SpecialMapVector extends MapVector { - public SpecialMapVector() { - super("", null, null); + public SpecialMapVector(CallBack callback) { + super("", null, callback); } @Override |