diff options
author | Hanifi Gunes <hgunes@maprtech.com> | 2015-04-29 14:54:24 -0700 |
---|---|---|
committer | Hanifi Gunes <hgunes@maprtech.com> | 2015-05-11 01:20:26 -0700 |
commit | 4689468ef11a70c782f64af451807e1e10cdce65 (patch) | |
tree | 7d38c83034657f0bc26a6e8ed60cb8fd168d6465 /exec/java-exec/src/main | |
parent | a3ec52a721860a966dfa351f719458a200b27cbf (diff) |
DRILL-2150: Create an abstraction for repeated value vectors.
Diffstat (limited to 'exec/java-exec/src/main')
32 files changed, 1145 insertions, 596 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java index fa1dac4ec..068efb444 100644 --- a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java +++ b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java @@ -89,7 +89,7 @@ public class ${nullMode}${name}ReaderImpl extends AbstractFieldReader { } public int size(){ - return vector.getAccessor().getCount(idx()); + return vector.getAccessor().getInnerValueCountAt(idx()); } public void read(int arrayIndex, ${minor.class?cap_first}Holder h){ diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java index 49c75d132..980f9ac68 100644 --- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java @@ -77,24 +77,24 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { public void write(${minor.class?cap_first}Holder h){ mutator.addSafe(idx(), h); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } public void write(Nullable${minor.class?cap_first}Holder h){ mutator.addSafe(idx(), h); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")> public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){ mutator.addSafe(idx(), <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } </#if> public void setPosition(int idx){ super.setPosition(idx); - mutator.startNewGroup(idx); + mutator.startNewValue(idx); } @@ -102,24 +102,24 @@ public class ${eName}WriterImpl extends AbstractFieldWriter { public void write(${minor.class}Holder h){ mutator.setSafe(idx(), h); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } public void write(Nullable${minor.class}Holder h){ mutator.setSafe(idx(), h); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } <#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")> public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){ mutator.setSafe(idx(), <#if mode == "Nullable">1, </#if><#list fields as field>${field.name}<#if field_has_next>, </#if></#list>); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } <#if mode == "Nullable"> public void writeNull(){ mutator.setNull(idx()); - vector.getMutator().setValueCount(idx()); + vector.getMutator().setValueCount(idx()+1); } </#if> </#if> diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java index a805b8e23..7d8581035 100644 --- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java @@ -764,6 +764,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F allocationMonitor = 0; } VectorTrimmer.trim(data, idx); + data.writerIndex(valueCount * ${type.width}); } diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java index 6df4248d1..ab78603ef 100644 --- a/exec/java-exec/src/main/codegen/templates/ListWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java @@ -46,7 +46,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ protected final ${containerClass} container; private Mode mode = Mode.INIT; private FieldWriter writer; - protected RepeatedVector innerVector; + protected RepeatedValueVector innerVector; <#if mode == "Repeated">private int currentChildIndex = 0;</#if> public ${mode}ListWriter(String name, ${containerClass} container, FieldWriter parent){ @@ -158,7 +158,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ public void start(){ final RepeatedListVector list = (RepeatedListVector) container; - final RepeatedListVector.Mutator mutator = list.getMutator(); + final RepeatedListVector.RepeatedMutator mutator = list.getMutator(); // make sure that the current vector can support the end position of this list. if(container.getValueCapacity() <= idx()){ @@ -169,7 +169,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{ RepeatedListHolder h = new RepeatedListHolder(); list.getAccessor().get(idx(), h); if(h.start >= h.end){ - mutator.startNewGroup(idx()); + mutator.startNewValue(idx()); } currentChildIndex = container.getMutator().add(idx()); if(writer != null){ diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java index 38df84bf2..06a6813d4 100644 --- a/exec/java-exec/src/main/codegen/templates/MapWriters.java +++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java @@ -116,7 +116,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter{ map.getAccessor().get(idx(), h); if(h.start >= h.end){ - container.getMutator().startNewGroup(idx()); + container.getMutator().startNewValue(idx()); } currentChildIndex = container.getMutator().add(idx()); for(FieldWriter w: fields.values()){ diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java index c0fba660c..37b8fac12 100644 --- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java @@ -19,9 +19,9 @@ import java.lang.Override; import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.BaseRepeatedValueVector; import org.apache.drill.exec.vector.BaseValueVector; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; -import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike; import org.mortbay.jetty.servlet.Holder; <@pp.dropOutputFile /> @@ -48,14 +48,11 @@ package org.apache.drill.exec.vector; * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker. */ -public final class Repeated${minor.class}Vector extends BaseValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector { +public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>VectorLike { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Repeated${minor.class}Vector.class); - private int parentValueCount; - private int childValueCount; - - private final UInt4Vector offsets; // offsets to start of each record - private final ${minor.class}Vector values; + // we maintain local reference to concrete vector type for performance reasons. + private ${minor.class}Vector values; private final FieldReader reader = new Repeated${minor.class}ReaderImpl(Repeated${minor.class}Vector.this); private final Mutator mutator = new Mutator(); private final Accessor accessor = new Accessor(); @@ -63,56 +60,57 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen public Repeated${minor.class}Vector(MaterializedField field, BufferAllocator allocator) { super(field, allocator); - this.offsets = new UInt4Vector(null, allocator); - MaterializedField mf = MaterializedField.create(field.getPath(), Types.required(field.getType().getMinorType())); - this.values = new ${minor.class}Vector(mf, allocator); + addOrGetVector(VectorDescriptor.create(Types.required(field.getType().getMinorType()))); } @Override - public FieldReader getReader(){ - return reader; + public Mutator getMutator() { + return mutator; } - public int getValueCapacity(){ - return Math.min(values.getValueCapacity(), offsets.getValueCapacity() - 1); + @Override + public Accessor getAccessor() { + return accessor; } - public int getBufferSize(){ - if(accessor.getGroupCount() == 0){ - return 0; - } - return offsets.getBufferSize() + values.getBufferSize(); + @Override + public FieldReader getReader(){ + return reader; } - public UInt4Vector getOffsetVector(){ - return offsets; - } - - public ${minor.class}Vector getValuesVector(){ + @Override + public ${minor.class}Vector getDataVector(){ return values; } - - public DrillBuf getBuffer(){ - return values.getBuffer(); - } - + + @Override public TransferPair getTransferPair(){ return new TransferImpl(getField()); } + + @Override public TransferPair getTransferPair(FieldReference ref){ return new TransferImpl(getField().clone(ref)); } + @Override public TransferPair makeTransferPair(ValueVector to) { return new TransferImpl((Repeated${minor.class}Vector) to); } - + + @Override + public AddOrGetResult<${minor.class}Vector> addOrGetVector(VectorDescriptor descriptor) { + final AddOrGetResult<${minor.class}Vector> result = super.addOrGetVector(descriptor); + if (result.isCreated()) { + values = result.getVector(); + } + return result; + } + public void transferTo(Repeated${minor.class}Vector target){ target.clear(); offsets.transferTo(target.offsets); values.transferTo(target.values); - target.parentValueCount = parentValueCount; - target.childValueCount = childValueCount; clear(); } @@ -132,8 +130,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen normalizedPos = a.get(startIndex+i) - startPos; m.set(i, normalizedPos); } - to.parentValueCount = groups; - to.childValueCount = valuesToCopy; m.setValueCount(groups == 0 ? 0 : groups + 1); } @@ -167,33 +163,27 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen } public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){ - int count = v.getAccessor().getCount(inIndex); - getMutator().startNewGroup(outIndex); + final int count = v.getAccessor().getInnerValueCountAt(inIndex); + getMutator().startNewValue(outIndex); for (int i = 0; i < count; i++) { getMutator().add(outIndex, v.getAccessor().get(inIndex, i)); } } public void copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){ - int count = v.getAccessor().getCount(inIndex); - getMutator().startNewGroup(outIndex); + final int count = v.getAccessor().getInnerValueCountAt(inIndex); + getMutator().startNewValue(outIndex); for (int i = 0; i < count; i++) { getMutator().addSafe(outIndex, v.getAccessor().get(inIndex, i)); } } - @Override - public void setInitialCapacity(int numRecords) { - offsets.setInitialCapacity(numRecords + 1); - values.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD); - } public boolean allocateNewSafe(){ if(!offsets.allocateNewSafe()) return false; offsets.zeroVector(); if(!values.allocateNewSafe()) return false; mutator.reset(); - accessor.reset(); return true; } @@ -202,36 +192,28 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen offsets.zeroVector(); values.allocateNew(); mutator.reset(); - accessor.reset(); } <#if type.major == "VarLen"> @Override - public SerializedField getMetadata() { - return getMetadataBuilder() // - .setGroupCount(this.parentValueCount) // - .setValueCount(this.childValueCount) // - .setVarByteLength(values.getVarByteLength()) // - .setBufferLength(getBufferSize()) // - .build(); + protected SerializedField.Builder getMetadataBuilder() { + return super.getMetadataBuilder() + .setVarByteLength(values.getVarByteLength()); } - public void allocateNew(int totalBytes, int parentValueCount, int childValueCount) { - offsets.allocateNew(parentValueCount+1); + public void allocateNew(int totalBytes, int valueCount, int innerValueCount) { + offsets.allocateNew(valueCount+1); offsets.zeroVector(); - values.allocateNew(totalBytes, childValueCount); + values.allocateNew(totalBytes, innerValueCount); mutator.reset(); - accessor.reset(); } @Override - public int load(int dataBytes, int parentValueCount, int childValueCount, DrillBuf buf){ + public int load(int dataBytes, int valueCount, int innerValueCount, DrillBuf buf){ clear(); - this.parentValueCount = parentValueCount; - this.childValueCount = childValueCount; int loaded = 0; - loaded += offsets.load(parentValueCount+1, buf.slice(loaded, buf.capacity() - loaded)); - loaded += values.load(dataBytes + 4*(childValueCount + 1), childValueCount, buf.slice(loaded, buf.capacity() - loaded)); + loaded += offsets.load(valueCount+1, buf.slice(loaded, buf.capacity() - loaded)); + loaded += values.load(dataBytes + 4*(innerValueCount + 1), innerValueCount, buf.slice(loaded, buf.capacity() - loaded)); return loaded; } @@ -247,32 +229,20 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen } <#else> - - @Override - public SerializedField getMetadata() { - return getMetadataBuilder() - .setGroupCount(this.parentValueCount) - .setValueCount(this.childValueCount) - .setBufferLength(getBufferSize()) - .build(); - } - - public void allocateNew(int parentValueCount, int childValueCount) { + + public void allocateNew(int valueCount, int innerValueCount) { clear(); - offsets.allocateNew(parentValueCount+1); + offsets.allocateNew(valueCount+1); offsets.zeroVector(); - values.allocateNew(childValueCount); + values.allocateNew(innerValueCount); mutator.reset(); - accessor.reset(); } - public int load(int parentValueCount, int childValueCount, DrillBuf buf){ + public int load(int valueCount, int innerValueCount, DrillBuf buf){ clear(); - this.parentValueCount = parentValueCount; - this.childValueCount = childValueCount; int loaded = 0; - loaded += offsets.load(parentValueCount+1, buf.slice(loaded, buf.capacity() - loaded)); - loaded += values.load(childValueCount, buf.slice(loaded, buf.capacity() - loaded)); + loaded += offsets.load(valueCount+1, buf.slice(loaded, buf.capacity() - loaded)); + loaded += values.load(innerValueCount, buf.slice(loaded, buf.capacity() - loaded)); return loaded; } @@ -284,49 +254,12 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen } </#if> - @Override - public DrillBuf[] getBuffers(boolean clear) { - DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), values.getBuffers(false), DrillBuf.class); - if (clear) { - for (DrillBuf buffer:buffers) { - buffer.retain(); - } - clear(); - } - return buffers; - } - - public void clear(){ - offsets.clear(); - values.clear(); - parentValueCount = 0; - childValueCount = 0; - } - - public Mutator getMutator(){ - return mutator; - } - - public Accessor getAccessor(){ - return accessor; - } // This is declared a subclass of the accessor declared inside of FixedWidthVector, this is also used for // variable length vectors, as they should ahve consistent interface as much as possible, if they need to diverge // in the future, the interface shold be declared in the respective value vector superclasses for fixed and variable // and we should refer to each in the generation template - public final class Accessor extends BaseValueVector.BaseAccessor implements RepeatedFixedWidthVector.RepeatedAccessor{ - - /** - * Get the elements at the given index. - */ - public int getCount(int index) { - return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); - } - - public ValueVector getAllChildValues() { - return values; - } + public final class Accessor extends BaseRepeatedValueVector.BaseRepeatedAccessor { public List<${friendlyType}> getObject(int index) { List<${friendlyType}> vals = new JsonStringArrayList(); @@ -337,10 +270,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen } return vals; } - - public int getGroupSizeAtIndex(int index){ - return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); - } public ${friendlyType} getSingleObject(int index, int arrayIndex){ int start = offsets.getAccessor().get(index); @@ -360,12 +289,7 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen </#if> get(int index, int positionIndex) { return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex); } - - - public boolean isNull(int index){ - return false; - } - + public void get(int index, Repeated${minor.class}Holder holder){ holder.start = offsets.getAccessor().get(index); holder.end = offsets.getAccessor().get(index+1); @@ -375,61 +299,24 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen public void get(int index, int positionIndex, ${minor.class}Holder holder) { int offset = offsets.getAccessor().get(index); assert offset >= 0; - assert positionIndex < getCount(index); + assert positionIndex < getInnerValueCountAt(index); values.getAccessor().get(offset + positionIndex, holder); } public void get(int index, int positionIndex, Nullable${minor.class}Holder holder) { int offset = offsets.getAccessor().get(index); assert offset >= 0; - if (positionIndex >= getCount(index)) { + if (positionIndex >= getInnerValueCountAt(index)) { holder.isSet = 0; return; } values.getAccessor().get(offset + positionIndex, holder); } - - public MaterializedField getField() { - return field; - } - - public int getGroupCount(){ - return parentValueCount; - } - - public int getValueCount(){ - return childValueCount; - } - - public void reset(){ - - } } - public final class Mutator extends BaseValueVector.BaseMutator implements RepeatedMutator { - - - private Mutator(){ - } - - public void setRepetitionAtIndexSafe(int index, int repetitionCount) { - offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index) + repetitionCount); - } - - public BaseDataValueVector getDataVector() { - return values; - } - - public void setValueCounts(int parentValueCount, int childValueCount){ - Repeated${minor.class}Vector.this.parentValueCount = parentValueCount; - Repeated${minor.class}Vector.this.childValueCount = childValueCount; - values.getMutator().setValueCount(childValueCount); - offsets.getMutator().setValueCount(parentValueCount == 0 ? 0 : parentValueCount + 1); - } + public final class Mutator extends BaseRepeatedValueVector.BaseRepeatedMutator implements RepeatedMutator { - public void startNewGroup(int index) { - offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index)); - } + private Mutator() { } /** * Add an element to the given record index. This is similar to the set() method in other @@ -468,7 +355,7 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen public void setSafe(int index, Repeated${minor.class}Holder h){ ${minor.class}Holder ih = new ${minor.class}Holder(); - getMutator().startNewGroup(index); + getMutator().startNewValue(index); for(int i = h.start; i < h.end; i++){ h.vector.getAccessor().get(i, ih); getMutator().addSafe(index, ih); @@ -511,17 +398,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen add(index, innerHolder); } } - - /** - * Set the number of value groups in this repeated field. - * @param groupCount Count of Value Groups. - */ - public void setValueCount(int groupCount) { - parentValueCount = groupCount; - childValueCount = offsets.getAccessor().get(groupCount); - offsets.getMutator().setValueCount(groupCount == 0 ? 0 : groupCount+1); - values.getMutator().setValueCount(childValueCount); - } public void generateTestData(final int valCount){ int[] sizes = {1,2,0,6}; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java new file mode 100644 index 000000000..f2a7e6391 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.exception; + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +public class SchemaChangeRuntimeException extends DrillRuntimeException { + public SchemaChangeRuntimeException() { + super(); + } + + public SchemaChangeRuntimeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public SchemaChangeRuntimeException(String message, Throwable cause) { + super(message, cause); + } + + public SchemaChangeRuntimeException(String message) { + super(message); + } + + public SchemaChangeRuntimeException(Throwable cause) { + super(cause); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 7a5b352b0..00a78fd59 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -21,15 +21,13 @@ import java.io.IOException; import java.util.List; import com.carrotsearch.hppc.IntOpenHashSet; -import com.google.common.base.Preconditions; -import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -52,9 +50,8 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.RepeatedVector; +import org.apache.drill.exec.vector.RepeatedValueVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; @@ -129,13 +126,13 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { private void setFlattenVector() { try { - flattener.setFlattenField((RepeatedVector) incoming.getValueAccessorById( - incoming.getSchema().getColumn( - incoming.getValueVectorId( - popConfig.getColumn()).getFieldIds()[0]).getValueClass(), - incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector()); + final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); + final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); + final RepeatedValueVector vector = RepeatedValueVector.class.cast(incoming.getValueAccessorById( + field.getValueClass(), typedFieldId.getFieldIds()).getValueVector()); + flattener.setFlattenField(vector); } catch (Exception ex) { - throw new DrillRuntimeException("Trying to flatten a non-repeated filed."); + throw UserException.unsupportedError(ex).message("Trying to flatten a non-repeated field.").build(); } } @@ -152,7 +149,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // inside of the the flattener for the current batch setFlattenVector(); - int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getValueCount(); + int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount(); int outputRecords = flattener.flattenRecords(0, incomingRecordCount, 0); // TODO - change this to be based on the repeated vector length if (outputRecords < childCount) { @@ -178,7 +175,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } private void handleRemainder() { - int remainingRecordCount = flattener.getFlattenField().getAccessor().getValueCount() - remainderIndex; + int remainingRecordCount = flattener.getFlattenField().getAccessor().getInnerValueCount() - remainderIndex; if (!doAlloc()) { outOfMemory = true; return; @@ -271,7 +268,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { if (flattenField instanceof RepeatedMapVector) { tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference); } else { - ValueVector vvIn = ((RepeatedVector)flattenField).getAccessor().getAllChildValues(); + final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector(); // vvIn may be null because of fast schema return for repeated list vectors if (vvIn != null) { tp = vvIn.getTransferPair(reference); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java index 96209a23b..b8d040c40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java @@ -31,8 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import com.google.common.collect.ImmutableList; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector.RepeatedAccessor; -import org.apache.drill.exec.vector.RepeatedVector; +import org.apache.drill.exec.vector.RepeatedValueVector; public abstract class FlattenTemplate implements Flattener { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenTemplate.class); @@ -43,9 +42,9 @@ public abstract class FlattenTemplate implements Flattener { private SelectionVector2 vector2; private SelectionVector4 vector4; private SelectionVectorMode svMode; - RepeatedVector fieldToFlatten; - RepeatedAccessor accessor; - private int groupIndex; + private RepeatedValueVector fieldToFlatten; + private RepeatedValueVector.RepeatedAccessor accessor; + private int valueIndex; // this allows for groups to be written between batches if we run out of space, for cases where we have finished // a batch on the boundary it will be set to 0 @@ -60,12 +59,12 @@ public abstract class FlattenTemplate implements Flattener { } @Override - public void setFlattenField(RepeatedVector flattenField) { + public void setFlattenField(RepeatedValueVector flattenField) { this.fieldToFlatten = flattenField; - this.accessor = flattenField.getAccessor(); + this.accessor = RepeatedValueVector.RepeatedAccessor.class.cast(flattenField.getAccessor()); } - public RepeatedVector getFlattenField() { + public RepeatedValueVector getFlattenField() { return fieldToFlatten; } @@ -84,14 +83,14 @@ public abstract class FlattenTemplate implements Flattener { childIndexWithinCurrGroup = 0; } outer: { - final int groupCount = accessor.getGroupCount(); - for ( ; groupIndex < groupCount; groupIndex++) { - currGroupSize = accessor.getGroupSizeAtIndex(groupIndex); + final int valueCount = accessor.getValueCount(); + for ( ; valueIndex < valueCount; valueIndex++) { + currGroupSize = accessor.getInnerValueCountAt(valueIndex); for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) { if (firstOutputIndex == OUTPUT_BATCH_SIZE) { break outer; } - doEval(groupIndex, firstOutputIndex); + doEval(valueIndex, firstOutputIndex); firstOutputIndex++; childIndex++; } @@ -133,7 +132,7 @@ public abstract class FlattenTemplate implements Flattener { @Override public void resetGroupIndex() { - this.groupIndex = 0; + this.valueIndex = 0; this.currGroupSize = 0; this.childIndex = 0; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java index 2141ca2c4..323bf4349 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java @@ -24,14 +24,14 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; -import org.apache.drill.exec.vector.RepeatedVector; +import org.apache.drill.exec.vector.RepeatedValueVector; public interface Flattener { public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException; public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex); - public void setFlattenField(RepeatedVector repeatedColumn); - public RepeatedVector getFlattenField(); + public void setFlattenField(RepeatedValueVector repeatedColumn); + public RepeatedValueVector getFlattenField(); public void resetGroupIndex(); public static TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 32ffb6f0b..946d11763 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -63,6 +63,7 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntOpenHashSet; @@ -415,7 +416,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); cg.addExpr(expr); - } else{ + } else { // need to do evaluation. final ValueVector vector = container.addOrGet(outputField, callBack); allocationVectors.add(vector); @@ -424,6 +425,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe); final HoldingContainer hc = cg.addExpr(write); + // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector. + if (expr instanceof ValueVectorReadExpression) { + final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; + if (!vectorRead.hasReadPath()) { + final TypedFieldId id = vectorRead.getFieldId(); + final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); + vvIn.makeTransferPair(vector); + } + } logger.debug("Added eval for project expression."); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java index 8387d49a6..e602fd745 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java @@ -18,9 +18,9 @@ package org.apache.drill.exec.store; import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike; import org.apache.drill.exec.vector.RepeatedMutator; -import org.apache.drill.exec.vector.RepeatedVariableWidthVector; +import org.apache.drill.exec.vector.RepeatedVariableWidthVectorLike; import org.apache.drill.exec.vector.ValueVector; public class VectorHolder { @@ -35,7 +35,7 @@ public class VectorHolder { public VectorHolder(int length, ValueVector vector) { this.length = length; this.vector = vector; - if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) { repeated = true; } } @@ -43,7 +43,7 @@ public class VectorHolder { public VectorHolder(ValueVector vector) { this.length = vector.getValueCapacity(); this.vector = vector; - if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) { repeated = true; } } @@ -90,7 +90,7 @@ public class VectorHolder { public void populateVectorLength() { ValueVector.Mutator mutator = vector.getMutator(); - if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) { mutator.setValueCount(groupCount); } else { mutator.setValueCount(count); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java index 3ad5c2a36..40276f498 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java @@ -192,7 +192,7 @@ class RepeatedVarCharOutput extends TextOutput { } private void loadVarCharDataAddress(){ - DrillBuf buf = vector.getValuesVector().getBuffer(); + DrillBuf buf = vector.getDataVector().getBuffer(); checkBuf(buf); this.characterData = buf.memoryAddress(); this.characterDataOriginal = buf.memoryAddress(); @@ -200,7 +200,7 @@ class RepeatedVarCharOutput extends TextOutput { } private void loadVarCharOffsetAddress(){ - DrillBuf buf = vector.getValuesVector().getOffsetVector().getBuffer(); + DrillBuf buf = vector.getDataVector().getOffsetVector().getBuffer(); checkBuf(buf); this.charLengthOffset = buf.memoryAddress() + 4; this.charLengthOffsetOriginal = buf.memoryAddress() + 4; // add four as offsets conceptually start at 1. (first item is 0..1) @@ -208,14 +208,14 @@ class RepeatedVarCharOutput extends TextOutput { } private void expandVarCharOffsets(){ - vector.getValuesVector().getOffsetVector().reAlloc(); + vector.getDataVector().getOffsetVector().reAlloc(); long diff = charLengthOffset - charLengthOffsetOriginal; loadVarCharOffsetAddress(); charLengthOffset += diff; } private void expandVarCharData(){ - vector.getValuesVector().reAlloc(); + vector.getDataVector().reAlloc(); long diff = characterData - characterDataOriginal; loadVarCharDataAddress(); characterData += diff; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java index 7f8b61175..2b929a473 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java @@ -20,7 +20,10 @@ package org.apache.drill.exec.store.parquet.columnreaders; import java.io.IOException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.BaseDataValueVector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike; +import org.apache.drill.exec.vector.RepeatedValueVector; +import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; import parquet.column.ColumnDescriptor; @@ -29,7 +32,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData; public class FixedWidthRepeatedReader extends VarLengthColumn { - RepeatedFixedWidthVector castedRepeatedVector; + RepeatedValueVector castedRepeatedVector; ColumnReader dataReader; int dataTypeLengthInBytes; // we can do a vector copy of the data once we figure out how much we need to copy @@ -47,9 +50,9 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { boolean notFishedReadingList; byte[] leftOverBytes; - FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException { + FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement); - castedRepeatedVector = (RepeatedFixedWidthVector) valueVector; + this.castedRepeatedVector = valueVector; this.dataTypeLengthInBytes = dataTypeLengthInBytes; this.dataReader = dataReader; this.dataReader.pageReader.clear(); @@ -65,7 +68,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { bytesReadInCurrentPass = 0; valuesReadInCurrentPass = 0; pageReader.valuesReadyToRead = 0; - dataReader.vectorData = castedRepeatedVector.getMutator().getDataVector().getBuffer(); + dataReader.vectorData = BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer(); dataReader.valuesReadInCurrentPass = 0; repeatedGroupsReadInCurrentPass = 0; } @@ -200,8 +203,8 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { currentValueListLength += numLeftoverVals; } // this should not fail - castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass, - currentValueListLength); + final UInt4Vector offsets = castedRepeatedVector.getOffsetVector(); + offsets.getMutator().setSafe(repeatedGroupsReadInCurrentPass + 1, offsets.getAccessor().get(repeatedGroupsReadInCurrentPass)); // This field is being referenced in the superclass determineSize method, so we need to set it here // again going to make this the length in BYTES to avoid repetitive multiplication/division dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes; @@ -218,12 +221,13 @@ public class FixedWidthRepeatedReader extends VarLengthColumn { dataReader.valuesReadInCurrentPass = 0; dataReader.readValues(valuesToRead); valuesReadInCurrentPass += valuesToRead; - castedRepeatedVector.getMutator().setValueCounts(repeatedGroupsReadInCurrentPass, valuesReadInCurrentPass); + castedRepeatedVector.getMutator().setValueCount(repeatedGroupsReadInCurrentPass); + castedRepeatedVector.getDataVector().getMutator().setValueCount(valuesReadInCurrentPass); } @Override public int capacity() { - return castedRepeatedVector.getMutator().getDataVector().getBuffer().capacity(); + return BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer().capacity(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 2f07fb352..0cbd48013 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -41,7 +41,7 @@ import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.parquet.DirectCodecFactory; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.RepeatedValueVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -274,7 +274,7 @@ public class ParquetRecordReader extends AbstractRecordReader { } try { - ValueVector v; + ValueVector vector; SchemaElement schemaElement; ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>(); // initialize all of the column read status objects @@ -292,23 +292,24 @@ public class ParquetRecordReader extends AbstractRecordReader { } fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY; - v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); + vector = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { if (column.getMaxRepetitionLevel() > 0) { + final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector); ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, - ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement); + repeatedVector.getDataVector(), schemaElement); varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader, - getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement)); + getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, repeatedVector, schemaElement)); } else { columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength, - column, columnChunkMetaData, recordsPerBatch, v, + column, columnChunkMetaData, recordsPerBatch, vector, schemaElement)); } } else { // create a reader and add it to the appropriate list - varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement)); + varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, vector, schemaElement)); } } varLengthReader = new VarLenBinaryReader(this, varLengthColumns); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index e25bd7408..c59ade9ba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -162,7 +162,7 @@ public class DrillTextRecordReader extends AbstractRecordReader { // index of the scanned field int p = 0; int i = 0; - vector.getMutator().startNewGroup(recordCount); + vector.getMutator().startNewValue(recordCount); // Process each field in this line while (end < value.getLength() - 1) { if(numCols > 0 && p >= numCols) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java index df4279ac4..7d1f08dab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java @@ -17,9 +17,22 @@ */ package org.apache.drill.exec.vector; -public interface RepeatedVector extends ValueVector { - public static final int DEFAULT_REPEAT_PER_RECORD = 4; +import com.google.common.base.Preconditions; - public RepeatedFixedWidthVector.RepeatedAccessor getAccessor(); +public class AddOrGetResult<V extends ValueVector> { + private final V vector; + private final boolean created; + public AddOrGetResult(V vector, boolean created) { + this.vector = Preconditions.checkNotNull(vector); + this.created = created; + } + + public V getVector() { + return vector; + } + + public boolean isCreated() { + return created; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java index bf465c799..eddefd0ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java @@ -31,10 +31,10 @@ public class AllocationHelper { ((FixedWidthVector) v).allocateNew(valueCount); } else if (v instanceof VariableWidthVector) { ((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount); - }else if(v instanceof RepeatedFixedWidthVector){ - ((RepeatedFixedWidthVector) v).allocateNew(valueCount, childValCount); - }else if(v instanceof RepeatedVariableWidthVector){ - ((RepeatedVariableWidthVector) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount); + }else if(v instanceof RepeatedFixedWidthVectorLike){ + ((RepeatedFixedWidthVectorLike) v).allocateNew(valueCount, childValCount); + }else if(v instanceof RepeatedVariableWidthVectorLike){ + ((RepeatedVariableWidthVectorLike) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount); }else{ v.allocateNew(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java index 0c6097c48..6d356f2c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java @@ -22,8 +22,8 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.MaterializedField; -public abstract class BaseDataValueVector<V extends BaseValueVector<V, A, M>, A extends BaseValueVector.BaseAccessor, - M extends BaseValueVector.BaseMutator> extends BaseValueVector<V, A, M> { +public abstract class BaseDataValueVector<A extends BaseValueVector.BaseAccessor, M extends BaseValueVector.BaseMutator> + extends BaseValueVector<A, M> { protected DrillBuf data; @@ -36,6 +36,7 @@ public abstract class BaseDataValueVector<V extends BaseValueVector<V, A, M>, A public void clear() { data.release(); data = allocator.getEmpty(); + super.clear(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java new file mode 100644 index 000000000..bcf079375 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import java.util.Collections; +import java.util.Iterator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ObjectArrays; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeRuntimeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.record.MaterializedField; + +public abstract class BaseRepeatedValueVector<A extends RepeatedValueVector.RepeatedAccessor, M extends RepeatedValueVector.RepeatedMutator> + extends BaseValueVector<A, M> implements RepeatedValueVector<A, M> { + + public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE; + public final static String OFFSETS_VECTOR_NAME = "offsets"; + public final static String DATA_VECTOR_NAME = "data"; + + private final static MaterializedField offsetsField = + MaterializedField.create(OFFSETS_VECTOR_NAME, Types.required(TypeProtos.MinorType.UINT4)); + + protected final UInt4Vector offsets; + protected ValueVector vector; + + protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator allocator) { + this(field, allocator, DEFAULT_DATA_VECTOR); + } + + protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator allocator, ValueVector vector) { + super(field, allocator); + this.offsets = new UInt4Vector(offsetsField, allocator); + this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null"); + } + + @Override + public boolean allocateNewSafe() { + if (!offsets.allocateNewSafe()) { + return false; + } + offsets.zeroVector(); + return vector.allocateNewSafe(); + } + + @Override + public UInt4Vector getOffsetVector() { + return offsets; + } + + @Override + public ValueVector getDataVector() { + return vector; + } + + @Override + public void setInitialCapacity(int numRecords) { + offsets.setInitialCapacity(numRecords + 1); + vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD); + } + + @Override + public int getValueCapacity() { + final int offsetValueCapacity = offsets.getValueCapacity() - 1; + if (vector == DEFAULT_DATA_VECTOR) { + return offsetValueCapacity; + } + return Math.min(vector.getValueCapacity(), offsetValueCapacity); + } + + @Override + protected UserBitShared.SerializedField.Builder getMetadataBuilder() { + return super.getMetadataBuilder() + .setGroupCount(getAccessor().getValueCount()) + .setValueCount(getAccessor().getInnerValueCount()) + .addChild(vector.getMetadata()); + } + + @Override + public int getBufferSize() { + if (getAccessor().getValueCount() == 0) { + return 0; + } + return offsets.getBufferSize() + vector.getBufferSize(); + } + + @Override + public Iterator<ValueVector> iterator() { + return Collections.singleton(getDataVector()).iterator(); + } + + @Override + public void clear() { + offsets.clear(); + vector.clear(); + super.clear(); + } + + @Override + public DrillBuf[] getBuffers(boolean clear) { + final DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), vector.getBuffers(false), DrillBuf.class); + if (clear) { + for (DrillBuf buffer:buffers) { + buffer.retain(); + } + clear(); + } + return buffers; + } + + /** + * Returns 1 if inner vector is explicitly set via #addOrGetVector else 0 + * + * @see {@link ContainerVectorLike#size} + */ + @Override + public int size() { + return vector == DEFAULT_DATA_VECTOR ? 0:1; + } + + @Override + public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) { + boolean created = false; + if (vector == DEFAULT_DATA_VECTOR) { + vector = TypeHelper.getNewVector(MaterializedField.create(DATA_VECTOR_NAME, descriptor.getType()), allocator); + getField().addChild(vector.getField()); + created = true; + } + + final TypeProtos.MajorType actual = vector.getField().getType(); + if (!actual.equals(descriptor.getType())) { + final String msg = String.format("Inner vector type mismatch. Requested type: [%s], actual type: [%s]", + descriptor.getType(), actual); + throw new SchemaChangeRuntimeException(msg); + } + + return new AddOrGetResult<>((T)vector, created); + } + + public abstract class BaseRepeatedAccessor extends BaseValueVector.BaseAccessor implements RepeatedAccessor { + + @Override + public int getValueCount() { + return Math.max(offsets.getAccessor().getValueCount() - 1, 0); + } + + @Override + public int getInnerValueCount() { + return vector.getAccessor().getValueCount(); + } + + @Override + public int getInnerValueCountAt(int index) { + return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); + } + + @Override + public boolean isNull(int index) { + return false; + } + + @Override + public boolean isEmpty(int index) { + return false; + } + } + + public abstract class BaseRepeatedMutator extends BaseValueVector.BaseMutator implements RepeatedMutator { + + @Override + public void startNewValue(int index) { + offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index)); + setValueCount(index+1); + } + + @Override + public void setValueCount(int valueCount) { + // TODO: populate offset end points + offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1); + final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount); + vector.getMutator().setValueCount(childValueCount); + } + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java index 22f0fe740..67c489daf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java @@ -25,9 +25,10 @@ import org.apache.drill.common.expression.FieldReference; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; -public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A extends BaseValueVector.BaseAccessor, - M extends BaseValueVector.BaseMutator> implements ValueVector<V, A, M> { +public abstract class BaseValueVector<A extends ValueVector.Accessor, M extends ValueVector.Mutator> + implements ValueVector<A, M> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class); protected final BufferAllocator allocator; @@ -40,6 +41,11 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte } @Override + public void clear() { + getMutator().reset(); + } + + @Override public void close() { clear(); } @@ -54,6 +60,11 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte } @Override + public TransferPair getTransferPair() { + return getTransferPair(new FieldReference(getField().getPath())); + } + + @Override public SerializedField getMetadata() { return getMetadataBuilder().build(); } @@ -76,11 +87,15 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte public abstract static class BaseMutator implements ValueVector.Mutator { protected BaseMutator() { } + @Override + public void generateTestData(int values) { } + + //TODO: consider making mutator stateless(if possible) on another issue. public void reset() { } } @Override - public Iterator<ValueVector<V,A,M>> iterator() { + public Iterator<ValueVector> iterator() { return Iterators.emptyIterator(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java new file mode 100644 index 000000000..95e3365d8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +/** + * A mix-in used for introducing container vector-like behaviour. + */ +public interface ContainerVectorLike { + + /** + * Creates and adds a child vector if none with the same name exists, else returns the vector instance. + * + * @param descriptor vector descriptor + * @return result of operation wrapping vector corresponding to the given descriptor and whether it's newly created + * @throws org.apache.drill.common.exceptions.DrillRuntimeException + * if schema change is not permissible between the given and existing data vector types. + */ + <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor); + + /** + * Returns the number of child vectors in this container vector-like instance. + */ + int size(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java index eaae7ad60..450c673b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java @@ -19,35 +19,38 @@ package org.apache.drill.exec.vector; import io.netty.buffer.DrillBuf; -public interface RepeatedFixedWidthVector extends ValueVector, RepeatedVector { +/** + * A {@link org.apache.drill.exec.vector.ValueVector} mix-in that can be used in conjunction with + * {@link org.apache.drill.exec.vector.RepeatedValueVector} subtypes. + */ +public interface RepeatedFixedWidthVectorLike { /** * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. * - * @param parentValueCount Number of separate repeating groupings. - * @param childValueCount Number of supported values in the vector. + * @param valueCount Number of separate repeating groupings. + * @param innerValueCount Number of supported values in the vector. */ - public void allocateNew(int parentValueCount, int childValueCount); + public void allocateNew(int valueCount, int innerValueCount); /** * Load the records in the provided buffer based on the given number of values. - * @param parentValueCount Number of separate repeating groupings. - * @param valueCount Number atomic values the buffer contains. + * @param valueCount Number of separate repeating groupings. + * @param innerValueCount Number atomic values the buffer contains. * @param buf Incoming buffer. * @return The number of bytes of the buffer that were consumed. */ - public int load(int parentValueCount, int childValueCount, DrillBuf buf); - - public abstract RepeatedMutator getMutator(); + public int load(int valueCount, int innerValueCount, DrillBuf buf); - public interface RepeatedAccessor extends Accessor { - public int getGroupCount(); - public int getValueCount(); - public int getGroupSizeAtIndex(int index); - public ValueVector getAllChildValues(); - } - public interface RepeatedMutator extends Mutator { - public void setValueCounts(int parentValueCount, int childValueCount); - public void setRepetitionAtIndexSafe(int index, int repetitionCount); - public BaseDataValueVector getDataVector(); - } +// public interface RepeatedAccessor extends Accessor { +// public int getGroupCount(); +// public int getValueCount(); +// public int getGroupSizeAtIndex(int index); +// public ValueVector getAllChildValues(); +// } +// +// public interface RepeatedMutator extends Mutator { +// public void setValueCounts(int parentValueCount, int childValueCount); +// public void setRepetitionAtIndexSafe(int index, int repetitionCount); +// public BaseDataValueVector getDataVector(); +// } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java new file mode 100644 index 000000000..d5a82816a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +/** + * An abstraction representing repeated value vectors. + * + * A repeated vector contains values that may either be flat or nested. A value consists of zero or more cells(inner values). + * Current design maintains data and offsets vectors. Each cell is stored in the data vector. Repeated vector + * uses the offset vector to determine the sequence of cells pertaining to an individual value. + * + * @param <A> repeated accessor type + * @param <M> repeated mutator type + */ +public interface RepeatedValueVector<A extends RepeatedValueVector.RepeatedAccessor, M extends RepeatedValueVector.RepeatedMutator> + extends ValueVector<A, M>, ContainerVectorLike { + + final static int DEFAULT_REPEAT_PER_RECORD = 5; + + /** + * Returns the underlying offset vector or null if none exists. + * + * TODO(DRILL-2995): eliminate exposing low-level interfaces. + */ + UInt4Vector getOffsetVector(); + + /** + * Returns the underlying data vector or null if none exists. + */ + ValueVector getDataVector(); + + @Override + A getAccessor(); + + @Override + M getMutator(); + + interface RepeatedAccessor extends ValueVector.Accessor { + /** + * Returns total number of cells that vector contains. + * + * The result includes empty, null valued cells. + */ + int getInnerValueCount(); + + + /** + * Returns number of cells that the value at the given index contains. + */ + int getInnerValueCountAt(int index); + + /** + * Returns true if the value at the given index is empty, false otherwise. + * + * @param index value index + */ + boolean isEmpty(int index); + } + + interface RepeatedMutator extends ValueVector.Mutator { + /** + * Starts a new value that is a container of cells. + * + * @param index index of new value to start + */ + void startNewValue(int index); + + + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java index a49934168..ac8589ef1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.vector; import io.netty.buffer.DrillBuf; -public interface RepeatedVariableWidthVector extends ValueVector, RepeatedVector { +public interface RepeatedVariableWidthVectorLike { /** * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java index 386ee3495..de05131fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java @@ -28,9 +28,9 @@ public class SchemaChangeCallBack implements CallBack { } public boolean getSchemaChange() { - boolean schemaChange = this.schemaChange; - this.schemaChange = false; - return schemaChange; + final boolean current = schemaChange; + schemaChange = false; + return current; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java index e4a0997ed..ab9992e6f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -37,12 +37,11 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader; * A vector when instantiated, relies on a {@link org.apache.drill.exec.record.DeadBuf dead buffer}. It is important * that vector is allocated before attempting to read or write. * - * @param <V> actual value vector type * @param <A> accessor type that supports reading from this vector * @param <M> mutator type that supports writing to this vector */ -public interface ValueVector<V extends ValueVector, A extends ValueVector.Accessor, M extends ValueVector.Mutator> - extends Closeable, Iterable<ValueVector<V, A, M>> { +public interface ValueVector<A extends ValueVector.Accessor, M extends ValueVector.Mutator> + extends Closeable, Iterable<ValueVector> { /** * Allocate new buffers. ValueVector implements logic to determine how much to allocate. @@ -94,7 +93,7 @@ public interface ValueVector<V extends ValueVector, A extends ValueVector.Access * Returns a new {@link org.apache.drill.exec.record.TransferPair transfer pair} that is used to transfer underlying * buffers into the target vector. */ - TransferPair makeTransferPair(V target); + TransferPair makeTransferPair(ValueVector target); /** * Returns an {@link org.apache.drill.exec.vector.ValueVector.Accessor accessor} that is used to read from this vector diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java new file mode 100644 index 000000000..9a29848cf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import com.google.common.base.Preconditions; +import org.apache.drill.common.types.TypeProtos; + +public class VectorDescriptor { + private static final String DEFAULT_NAME = new String("NONE"); + + private final TypeProtos.MajorType type; + private final String name; + + public VectorDescriptor(TypeProtos.MajorType type) { + this(DEFAULT_NAME, type); + } + + public VectorDescriptor(String name,TypeProtos.MajorType type) { + this.name = Preconditions.checkNotNull(name, "name cannot be null"); + this.type = Preconditions.checkNotNull(type, "type cannot be null"); + } + + public TypeProtos.MajorType getType() { + return type; + } + + public String getName() { + return name; + } + + public boolean hasName() { + return name != DEFAULT_NAME; + } + + public static VectorDescriptor create(String name, TypeProtos.MajorType type) { + return new VectorDescriptor(name, type); + } + + public static VectorDescriptor create(TypeProtos.MajorType type) { + return new VectorDescriptor(type); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java new file mode 100644 index 000000000..db8d327fd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import java.util.Iterator; + +import com.google.common.collect.Iterators; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.complex.impl.NullReader; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +public class ZeroVector implements ValueVector { + public final static ZeroVector INSTANCE = new ZeroVector(); + + private final MaterializedField field = MaterializedField.create("[DEFAULT]", Types.LATE_BIND_TYPE); + + private final TransferPair defaultPair = new TransferPair() { + @Override + public void transfer() { } + + @Override + public void splitAndTransfer(int startIndex, int length) { } + + @Override + public ValueVector getTo() { + return ZeroVector.this; + } + + @Override + public void copyValueSafe(int from, int to) { } + }; + + private final Accessor defaultAccessor = new Accessor() { + @Override + public Object getObject(int index) { + return null; + } + + @Override + public int getValueCount() { + return 0; + } + + @Override + public boolean isNull(int index) { + return true; + } + }; + + private final Mutator defaultMutator = new Mutator() { + @Override + public void setValueCount(int valueCount) { } + + @Override + public void reset() { } + + @Override + public void generateTestData(int values) { } + }; + + public ZeroVector() { } + + @Override + public void close() { } + + @Override + public void clear() { } + + @Override + public MaterializedField getField() { + return field; + } + + @Override + public TransferPair getTransferPair() { + return defaultPair; + } + + @Override + public UserBitShared.SerializedField getMetadata() { + return getField() + .getAsBuilder() + .setBufferLength(getBufferSize()) + .setValueCount(getAccessor().getValueCount()) + .build(); + } + + @Override + public Iterator iterator() { + return Iterators.emptyIterator(); + } + + @Override + public int getBufferSize() { + return 0; + } + + @Override + public DrillBuf[] getBuffers(boolean clear) { + return new DrillBuf[0]; + } + + @Override + public void allocateNew() throws OutOfMemoryRuntimeException { + allocateNewSafe(); + } + + @Override + public boolean allocateNewSafe() { + return true; + } + + @Override + public void setInitialCapacity(int numRecords) { } + + @Override + public int getValueCapacity() { + return 0; + } + + @Override + public TransferPair getTransferPair(FieldReference ref) { + return defaultPair; + } + + @Override + public TransferPair makeTransferPair(ValueVector target) { + return defaultPair; + } + + @Override + public Accessor getAccessor() { + return defaultAccessor; + } + + @Override + public Mutator getMutator() { + return defaultMutator; + } + + @Override + public FieldReader getReader() { + return NullReader.INSTANCE; + } + + @Override + public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) { } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java index 41388393c..b615b66f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -101,7 +101,8 @@ public class MapVector extends AbstractMapVector { @Override public void setInitialCapacity(int numRecords) { - for (ValueVector v : (ValueVector<?,?,?>)this) { + final Iterable<ValueVector> container = this; + for (ValueVector v : container) { v.setInitialCapacity(numRecords); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index c06102999..b5de8b1e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -17,13 +17,12 @@ */ package org.apache.drill.exec.vector.complex; +import com.google.common.base.Preconditions; import io.netty.buffer.DrillBuf; -import java.util.Collections; import java.util.Iterator; import java.util.List; -import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; @@ -33,380 +32,391 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.holders.ComplexHolder; import org.apache.drill.exec.expr.holders.RepeatedListHolder; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.util.JsonStringArrayList; -import org.apache.drill.exec.vector.BaseDataValueVector; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.AddOrGetResult; +import org.apache.drill.exec.vector.BaseRepeatedValueVector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike; import org.apache.drill.exec.util.CallBack; +import org.apache.drill.exec.vector.RepeatedValueVector; import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VectorDescriptor; import org.apache.drill.exec.vector.complex.impl.NullReader; import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl; import org.apache.drill.exec.vector.complex.reader.FieldReader; -import com.google.common.base.Preconditions; - -public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{ +public class RepeatedListVector extends AbstractContainerVector + implements RepeatedValueVector, RepeatedFixedWidthVectorLike { public final static MajorType TYPE = Types.repeated(MinorType.LIST); - - private final UInt4Vector offsets; // offsets to start of each record - private final Mutator mutator = new Mutator(); - private final RepeatedListAccessor accessor = new RepeatedListAccessor(); - private ValueVector vector; private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this); - private final EmptyValuePopulator emptyPopulator; + private final DelegateRepeatedVector delegate; + + protected static class DelegateRepeatedVector + extends BaseRepeatedValueVector<DelegateRepeatedVector.RepeatedListAccessor, DelegateRepeatedVector.RepeatedListMutator> { + + private final RepeatedListAccessor accessor = new RepeatedListAccessor(); + private final RepeatedListMutator mutator = new RepeatedListMutator(); + private final EmptyValuePopulator emptyPopulator; + private transient DelegateTransferPair ephPair; + + public class RepeatedListAccessor extends BaseRepeatedValueVector.BaseRepeatedAccessor { + + @Override + public Object getObject(int index) { + List<Object> list = new JsonStringArrayList(); + final int start = offsets.getAccessor().get(index); + final int until = offsets.getAccessor().get(index+1); + for (int i = start; i < until; i++) { + list.add(vector.getAccessor().getObject(i)); + } + return list; + } + public void get(int index, RepeatedListHolder holder) { + assert index <= getValueCapacity(); + holder.start = getOffsetVector().getAccessor().get(index); + holder.end = getOffsetVector().getAccessor().get(index+1); + } - public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack){ - this(MaterializedField.create(path, TYPE), allocator, callBack); - } + public void get(int index, ComplexHolder holder) { + final FieldReader reader = getReader(); + reader.setPosition(index); + holder.reader = reader; + } - public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){ - super(field, allocator, callBack); - int childrenSize = field.getChildren().size(); + public void get(int index, int arrayIndex, ComplexHolder holder) { + final RepeatedListHolder listHolder = new RepeatedListHolder(); + get(index, listHolder); + int offset = listHolder.start + arrayIndex; + if (offset >= listHolder.end) { + holder.reader = NullReader.INSTANCE; + } else { + FieldReader r = getDataVector().getReader(); + r.setPosition(offset); + holder.reader = r; + } + } + } - // repeated list vector should not have more than one child - assert childrenSize <= 1; + public class RepeatedListMutator extends BaseRepeatedValueVector.BaseRepeatedMutator { - if (childrenSize > 0) { - MaterializedField child = field.getChildren().iterator().next(); - setVector(TypeHelper.getNewVector(child, allocator, callBack)); - } + public int add(int index) { + final int curEnd = getOffsetVector().getAccessor().get(index+1); + getOffsetVector().getMutator().setSafe(index + 1, curEnd + 1); + return curEnd; + } - this.offsets = new UInt4Vector(null, allocator); - this.emptyPopulator = new EmptyValuePopulator(offsets); - } + @Override + public void startNewValue(int index) { + emptyPopulator.populate(index+1); + super.startNewValue(index); + } - @Override - public RepeatedListReaderImpl getReader() { - return reader; - } + @Override + public void setValueCount(int valueCount) { + emptyPopulator.populate(valueCount); + super.setValueCount(valueCount); + } + } - @Override - public int size() { - return vector != null ? 1 : 0; - } + public class DelegateTransferPair implements TransferPair { + private final DelegateRepeatedVector target; + private final TransferPair[] children; - transient private RepeatedListTransferPair ephPair; + public DelegateTransferPair(DelegateRepeatedVector target) { + this.target = Preconditions.checkNotNull(target); + if (target.getDataVector() == DEFAULT_DATA_VECTOR) { + target.addOrGetVector(VectorDescriptor.create(getDataVector().getField().getType())); + target.getDataVector().allocateNew(); + } + this.children = new TransferPair[] { + getOffsetVector().makeTransferPair(target.getOffsetVector()), + getDataVector().makeTransferPair(target.getDataVector()) + }; + } - public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) { - if(ephPair == null || ephPair.from != from) { - ephPair = (RepeatedListTransferPair) from.makeTransferPair(this); - } - ephPair.copyValueSafe(fromIndex, thisIndex); - } + @Override + public void transfer() { + for (TransferPair child:children) { + child.transfer(); + } + } - public Mutator getMutator() { - return mutator; - } + @Override + public ValueVector getTo() { + return target; + } - public void setInitialCapacity(int numRecords) { - offsets.setInitialCapacity(numRecords + 1); - if (vector != null) { - vector.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD); - } - } + @Override + public void splitAndTransfer(int startIndex, int length) { + throw new UnsupportedOperationException("Repeated list does not support split & transfer operation"); + } - @Override - public boolean allocateNewSafe() { - if (!offsets.allocateNewSafe()) { - return false; + @Override + public void copyValueSafe(int srcIndex, int destIndex) { + final RepeatedListHolder holder = new RepeatedListHolder(); + getAccessor().get(srcIndex, holder); + target.emptyPopulator.populate(destIndex+1); + final TransferPair vectorTransfer = children[1]; + int newIndex = target.getOffsetVector().getAccessor().get(destIndex); + //todo: make this a bulk copy. + for (int i = holder.start; i < holder.end; i++, newIndex++) { + vectorTransfer.copyValueSafe(i, newIndex); + } + target.getOffsetVector().getMutator().setSafe(destIndex + 1, newIndex); + } } - offsets.zeroVector(); - if (vector != null) { - return vector.allocateNewSafe(); - } else { - return true; + public DelegateRepeatedVector(SchemaPath path, BufferAllocator allocator) { + this(MaterializedField.create(path, TYPE), allocator); } - } - public void reAlloc() { - offsets.reAlloc(); - } - - public class Mutator implements ValueVector.Mutator, RepeatedMutator{ + public DelegateRepeatedVector(MaterializedField field, BufferAllocator allocator) { + super(field, allocator); + this.emptyPopulator = new EmptyValuePopulator(getOffsetVector()); + } - public void startNewGroup(int index) { - emptyPopulator.populate(index+1); - offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index)); + @Override + public void allocateNew() throws OutOfMemoryRuntimeException { + if (!allocateNewSafe()) { + throw new OutOfMemoryRuntimeException(); + } } - public int add(int index) { - final int prevEnd = offsets.getAccessor().get(index+1); - offsets.getMutator().setSafe(index+1, prevEnd+1); - return prevEnd; + @Override + protected SerializedField.Builder getMetadataBuilder() { + return super.getMetadataBuilder(); } - public void setValueCount(int groupCount) { - emptyPopulator.populate(groupCount); - offsets.getMutator().setValueCount(groupCount+1); + @Override + public TransferPair getTransferPair(FieldReference ref) { + return makeTransferPair(new DelegateRepeatedVector(ref, allocator)); + } - if (vector != null) { - int valueCount = offsets.getAccessor().get(groupCount); - vector.getMutator().setValueCount(valueCount); - } + @Override + public TransferPair makeTransferPair(ValueVector target) { + return new DelegateTransferPair(DelegateRepeatedVector.class.cast(target)); } @Override - public void reset() { } + public RepeatedListAccessor getAccessor() { + return accessor; + } @Override - public void generateTestData(int values) { + public RepeatedListMutator getMutator() { + return mutator; } @Override - public void setValueCounts(int parentValueCount, int childValueCount) { - // TODO - determine if this should be implemented for this class + public FieldReader getReader() { throw new UnsupportedOperationException(); } @Override - public void setRepetitionAtIndexSafe(int index, int repetitionCount) { + public void load(SerializedField metadata, DrillBuf buffer) { + //TODO(DRILL-2997): get rid of the notion of "group count" completely + final int valueCount = metadata.getGroupCount(); + final int bufOffset = offsets.load(valueCount + 1, buffer); + final SerializedField childField = metadata.getChildList().get(0); + if (getDataVector() == DEFAULT_DATA_VECTOR) { + addOrGetVector(VectorDescriptor.create(childField.getMajorType())); + } + + if (childField.getValueCount() == 0) { + vector.clear(); + } else { + vector.load(childField, buffer.slice(bufOffset, childField.getBufferLength())); + } } - @Override - public BaseDataValueVector getDataVector() { - return null; //To change body of implemented methods use File | Settings | File Templates. + public void copyFromSafe(int fromIndex, int thisIndex, DelegateRepeatedVector from) { + if(ephPair == null || ephPair.target != from) { + ephPair = DelegateTransferPair.class.cast(from.makeTransferPair(this)); + } + ephPair.copyValueSafe(fromIndex, thisIndex); } + } - public class RepeatedListAccessor implements RepeatedAccessor{ + protected class RepeatedListTransferPair implements TransferPair { + private final TransferPair delegate; - @Override - public Object getObject(int index) { - List<Object> l = new JsonStringArrayList(); - int end = offsets.getAccessor().get(index+1); - for (int i = offsets.getAccessor().get(index); i < end; i++) { - l.add(vector.getAccessor().getObject(i)); - } - return l; + public RepeatedListTransferPair(TransferPair delegate) { + this.delegate = delegate; } - public int getGroupSizeAtIndex(int index) { - return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); + public void transfer() { + delegate.transfer(); } @Override - public ValueVector getAllChildValues() { - return vector; + public void splitAndTransfer(int startIndex, int length) { + delegate.splitAndTransfer(startIndex, length); } @Override - public int getValueCount() { - return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1); + public ValueVector getTo() { + final DelegateRepeatedVector delegateVector = DelegateRepeatedVector.class.cast(delegate.getTo()); + return new RepeatedListVector(getField(), allocator, callBack, delegateVector); } - public void get(int index, RepeatedListHolder holder) { - assert index <= getValueCapacity(); - holder.start = offsets.getAccessor().get(index); - holder.end = offsets.getAccessor().get(index+1); + @Override + public void copyValueSafe(int from, int to) { + delegate.copyValueSafe(from, to); } + } - public void get(int index, ComplexHolder holder) { - FieldReader reader = getReader(); - reader.setPosition(index); - holder.reader = reader; - } + public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack) { + this(MaterializedField.create(path, TYPE), allocator, callBack); + } - public void get(int index, int arrayIndex, ComplexHolder holder) { - RepeatedListHolder h = new RepeatedListHolder(); - get(index, h); - int offset = h.start + arrayIndex; + public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) { + this(field, allocator, callBack, new DelegateRepeatedVector(field, allocator)); + } - if (offset >= h.end) { - holder.reader = NullReader.INSTANCE; - } else { - FieldReader r = vector.getReader(); - r.setPosition(offset); - holder.reader = r; - } - } + protected RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack, DelegateRepeatedVector delegate) { + super(field, allocator, callBack); + int childrenSize = field.getChildren().size(); - @Override - public boolean isNull(int index) { - return false; + // repeated list vector should not have more than one child + assert childrenSize <= 1; + this.delegate = Preconditions.checkNotNull(delegate); + if (childrenSize > 0) { + MaterializedField child = field.getChildren().iterator().next(); + addOrGetVector(VectorDescriptor.create(child.getType())); +// setVector(TypeHelper.getNewVector(child, allocator, callBack)); } + } + @Override - public int getGroupCount() { - final int valueCount = offsets.getAccessor().getValueCount(); - return valueCount == 0 ? 0 : valueCount - 1; - } + public RepeatedListReaderImpl getReader() { + return reader; } @Override - public int getBufferSize() { - return offsets.getBufferSize() + vector.getBufferSize(); + public DelegateRepeatedVector.RepeatedListAccessor getAccessor() { + return delegate.getAccessor(); } @Override - public void close() { - offsets.close(); - super.close(); + public DelegateRepeatedVector.RepeatedListMutator getMutator() { + return delegate.getMutator(); } @Override - public void clear() { - getMutator().reset(); - offsets.clear(); - if (vector != null) { - vector.clear(); - } + public UInt4Vector getOffsetVector() { + return delegate.getOffsetVector(); } @Override - public TransferPair getTransferPair() { - return new RepeatedListTransferPair(getField().getPath()); + public ValueVector getDataVector() { + return delegate.getDataVector(); } - public class RepeatedListTransferPair implements TransferPair{ - private final RepeatedListVector from = RepeatedListVector.this; - private final RepeatedListVector to; - private final TransferPair vectorTransfer; - - private RepeatedListTransferPair(RepeatedListVector to) { - this.to = to; - if (to.vector == null) { - to.vector = to.addOrGet(null, vector.getField().getType(), vector.getClass()); - to.vector.allocateNew(); - } - this.vectorTransfer = vector.makeTransferPair(to.vector); - } + @Override + public void allocateNew() throws OutOfMemoryRuntimeException { + delegate.allocateNew(); + } - private RepeatedListTransferPair(SchemaPath path) { - this.to = new RepeatedListVector(path, allocator, callBack); - vectorTransfer = vector.getTransferPair(); - this.to.vector = vectorTransfer.getTo(); - } + @Override + public boolean allocateNewSafe() { + return delegate.allocateNewSafe(); + } - @Override - public void transfer() { - offsets.transferTo(to.offsets); - vectorTransfer.transfer(); - clear(); + @Override + public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) { + final AddOrGetResult<T> result = delegate.addOrGetVector(descriptor); + if (result.isCreated() && callBack != null) { + callBack.doWork(); } + return result; + } - @Override - public ValueVector getTo() { - return to; - } + @Override + public int size() { + return delegate.size(); + } - @Override - public void splitAndTransfer(int startIndex, int length) { - throw new UnsupportedOperationException(); - } + @Override + public int getBufferSize() { + return delegate.getBufferSize(); + } - @Override - public void copyValueSafe(int srcIndex, int destIndex) { - RepeatedListHolder holder = new RepeatedListHolder(); - accessor.get(srcIndex, holder); - to.emptyPopulator.populate(destIndex+1); - int newIndex = to.offsets.getAccessor().get(destIndex); - //todo: make this a bulk copy. - for (int i = holder.start; i < holder.end; i++, newIndex++) { - vectorTransfer.copyValueSafe(i, newIndex); - } - to.offsets.getMutator().setSafe(destIndex + 1, newIndex); - } + @Override + public void close() { + delegate.close(); + } + @Override + public void clear() { + delegate.clear(); } @Override - public TransferPair makeTransferPair(ValueVector to) { - if (!(to instanceof RepeatedListVector ) ) { - throw new IllegalArgumentException("You can't make a transfer pair from an incompatible ."); - } - return new RepeatedListTransferPair( (RepeatedListVector) to); + public TransferPair getTransferPair() { + return new RepeatedListTransferPair(delegate.getTransferPair()); } @Override public TransferPair getTransferPair(FieldReference ref) { - return new RepeatedListTransferPair(ref); + return new RepeatedListTransferPair(delegate.getTransferPair(ref)); } @Override - public int getValueCapacity() { - if (vector == null) { - return offsets.getValueCapacity() - 1; - } - return Math.min(offsets.getValueCapacity() - 1, vector.getValueCapacity()); + public TransferPair makeTransferPair(ValueVector to) { + final RepeatedListVector target = RepeatedListVector.class.cast(to); + return new RepeatedListTransferPair(delegate.makeTransferPair(target.delegate)); } @Override - public RepeatedListAccessor getAccessor() { - return accessor; + public int getValueCapacity() { + return delegate.getValueCapacity(); } @Override public DrillBuf[] getBuffers(boolean clear) { - DrillBuf[] buffers = ArrayUtils.addAll(offsets.getBuffers(false), vector.getBuffers(false)); - if (clear) { - // does not make much sense but we have to retain buffers even when clear is set. refactor this interface. - for (DrillBuf buffer:buffers) { - buffer.retain(); - } - clear(); - } - return buffers; + return delegate.getBuffers(clear); } - protected void setVector(ValueVector newVector) { - vector = Preconditions.checkNotNull(newVector); - getField().addChild(newVector.getField()); - } @Override public void load(SerializedField metadata, DrillBuf buf) { - SerializedField childField = metadata.getChildList().get(0); - - int bufOffset = offsets.load(metadata.getValueCount()+1, buf); - - MaterializedField fieldDef = MaterializedField.create(childField); - if (vector == null) { - setVector(TypeHelper.getNewVector(fieldDef, allocator)); - } - - if (childField.getValueCount() == 0) { - vector.clear(); - } else { - vector.load(childField, buf.slice(bufOffset, childField.getBufferLength())); - } + delegate.load(metadata, buf); } @Override public SerializedField getMetadata() { - return getField() // - .getAsBuilder() // - .setBufferLength(getBufferSize()) // - .setValueCount(accessor.getGroupCount()) // - .addChild(vector.getMetadata()) // - .build(); + return delegate.getMetadata(); } @Override public Iterator<ValueVector> iterator() { - return Collections.singleton(vector).iterator(); + return delegate.iterator(); + } + + @Override + public void setInitialCapacity(int numRecords) { + delegate.setInitialCapacity(numRecords); } + /** + * @deprecated + * prefer using {@link #addOrGetVector(org.apache.drill.exec.vector.VectorDescriptor)} instead. + */ @Override public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { - Preconditions.checkArgument(name == null); - - if(vector == null){ - final MaterializedField child = MaterializedField.create(getField().getPath().getUnindexedArrayChild(), type); - vector = TypeHelper.getNewVector(child, allocator, callBack); - setVector(vector); - if (callBack != null) { - callBack.doWork(); - } - } - return typeify(vector, clazz); + final AddOrGetResult<T> result = addOrGetVector(VectorDescriptor.create(type)); + return result.getVector(); } @Override @@ -414,18 +424,18 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea if (name != null) { return null; } - return typeify(vector, clazz); + return typeify(delegate.getDataVector(), clazz); } @Override - public void allocateNew(int parentValueCount, int childValueCount) { + public void allocateNew(int valueCount, int innerValueCount) { clear(); - offsets.allocateNew(parentValueCount + 1); - mutator.reset(); + getOffsetVector().allocateNew(valueCount + 1); + getMutator().reset(); } @Override - public int load(int parentValueCount, int childValueCount, DrillBuf buf) { + public int load(int valueCount, int innerValueCount, DrillBuf buf) { throw new UnsupportedOperationException(); } @@ -434,7 +444,18 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea if (name != null) { return null; } - return new VectorWithOrdinal(vector, 0); + return new VectorWithOrdinal(delegate.getDataVector(), 0); } + + public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) { + delegate.copyFromSafe(fromIndex, thisIndex, from.delegate); + } + + +// protected void setVector(ValueVector newVector) { +// vector = Preconditions.checkNotNull(newVector); +// getField().addChild(newVector.getField()); +// } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index e5d48dd6f..a97847ba0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -41,11 +41,15 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.vector.AddOrGetResult; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.BaseDataValueVector; -import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.BaseRepeatedValueVector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike; +import org.apache.drill.exec.vector.RepeatedValueVector; import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VectorDescriptor; import org.apache.drill.exec.vector.complex.impl.NullReader; import org.apache.drill.exec.vector.complex.impl.RepeatedMapReaderImpl; import org.apache.drill.exec.vector.complex.reader.FieldReader; @@ -53,7 +57,7 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixedWidthVector { +public class RepeatedMapVector extends AbstractMapVector implements RepeatedValueVector, RepeatedFixedWidthVectorLike { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class); @@ -71,10 +75,27 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe this.emptyPopulator = new EmptyValuePopulator(offsets); } + @Override + public UInt4Vector getOffsetVector() { + return offsets; + } + + @Override + public ValueVector getDataVector() { + throw new UnsupportedOperationException(); + } + + @Override + public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) { + throw new UnsupportedOperationException(); + } + + @Override public void setInitialCapacity(int numRecords) { offsets.setInitialCapacity(numRecords + 1); - for(ValueVector v : (ValueVector<?,?,?>)this) { - v.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD); + final Iterable<ValueVector> container = this; + for(ValueVector v : container) { + v.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD); } } @@ -84,20 +105,16 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe } @Override - public void allocateNew(int groupCount, int valueCount) { + public void allocateNew(int groupCount, int innerValueCount) { clear(); offsets.allocateNew(groupCount+1); offsets.zeroVector(); for (ValueVector v : getChildren()) { - AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, valueCount); + AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount); } mutator.reset(); } - public void reAlloc() { - offsets.reAlloc(); - } - public Iterator<String> fieldNameIterator() { return getChildFieldNames().iterator(); } @@ -111,7 +128,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe @Override public int getBufferSize() { - if (accessor.getGroupCount() == 0) { + if (getAccessor().getValueCount() == 0) { return 0; } long buffer = offsets.getBufferSize(); @@ -425,14 +442,15 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe Preconditions.checkArgument(bufOffset == buf.capacity()); } + @Override public SerializedField getMetadata() { SerializedField.Builder b = getField() // .getAsBuilder() // .setBufferLength(getBufferSize()) // - .setGroupCount(accessor.getGroupCount()) + .setGroupCount(accessor.getValueCount()) // while we don't need to actually read this on load, we need it to make sure we don't skip deserialization of this vector - .setValueCount(accessor.getGroupCount()); + .setValueCount(accessor.getInnerValueCount()); for (ValueVector v : getChildren()) { b.addChild(v.getMetadata()); } @@ -467,16 +485,31 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe @Override public int getValueCount() { - return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1); + return Math.max(offsets.getAccessor().getValueCount() - 1, 0); } - public int getGroupSizeAtIndex(int index) { + @Override + public int getInnerValueCount() { + final int valueCount = getValueCount(); + if (valueCount == 0) { + return 0; + } + return offsets.getAccessor().get(valueCount); + } + + @Override + public int getInnerValueCountAt(int index) { return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index); } @Override - public ValueVector getAllChildValues() { - throw new UnsupportedOperationException("Cannot retrieve inner vector from repeated map."); + public boolean isEmpty(int index) { + return false; + } + + @Override + public boolean isNull(int index) { + return false; } public void get(int index, RepeatedMapHolder holder) { @@ -504,32 +537,18 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe } } - @Override - public boolean isNull(int index) { - return false; - } - - @Override - public int getGroupCount() { - final int valueCount = offsets.getAccessor().getValueCount(); - return valueCount == 0 ? 0 : valueCount - 1; - } } - public class Mutator implements ValueVector.Mutator, RepeatedMutator { + public class Mutator implements RepeatedMutator { - public void startNewGroup(int index) { + @Override + public void startNewValue(int index) { emptyPopulator.populate(index+1); offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index)); } - public int add(int index) { - final int prevEnd = offsets.getAccessor().get(index+1); - offsets.getMutator().setSafe(index + 1, prevEnd + 1); - return prevEnd; - } - + @Override public void setValueCount(int topLevelValueCount) { emptyPopulator.populate(topLevelValueCount); offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount+1); @@ -543,22 +562,12 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe public void reset() { } @Override - public void generateTestData(int values) { - } + public void generateTestData(int values) { } - @Override - public void setValueCounts(int parentValueCount, int childValueCount) { - // TODO - determine if this should be implemented for this class - throw new UnsupportedOperationException(); - } - - @Override - public void setRepetitionAtIndexSafe(int index, int repetitionCount) { - } - - @Override - public BaseDataValueVector getDataVector() { - return null; //To change body of implemented methods use File | Settings | File Templates. + public int add(int index) { + final int prevEnd = offsets.getAccessor().get(index+1); + offsets.getMutator().setSafe(index + 1, prevEnd + 1); + return prevEnd; } } @@ -573,7 +582,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe } @Override - public int load(int parentValueCount, int childValueCount, DrillBuf buf) { + public int load(int valueCount, int innerValueCount, DrillBuf buf) { throw new UnsupportedOperationException(); } } |