aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
authorHanifi Gunes <hgunes@maprtech.com>2015-04-29 14:54:24 -0700
committerHanifi Gunes <hgunes@maprtech.com>2015-05-11 01:20:26 -0700
commit4689468ef11a70c782f64af451807e1e10cdce65 (patch)
tree7d38c83034657f0bc26a6e8ed60cb8fd168d6465 /exec/java-exec/src/main
parenta3ec52a721860a966dfa351f719458a200b27cbf (diff)
DRILL-2150: Create an abstraction for repeated value vectors.
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/codegen/templates/ComplexReaders.java2
-rw-r--r--exec/java-exec/src/main/codegen/templates/ComplexWriters.java16
-rw-r--r--exec/java-exec/src/main/codegen/templates/FixedValueVectors.java1
-rw-r--r--exec/java-exec/src/main/codegen/templates/ListWriters.java6
-rw-r--r--exec/java-exec/src/main/codegen/templates/MapWriters.java2
-rw-r--r--exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java240
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java)19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java206
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java)43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java85
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java)2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java170
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java525
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java111
32 files changed, 1145 insertions, 596 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
index fa1dac4ec..068efb444 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java
@@ -89,7 +89,7 @@ public class ${nullMode}${name}ReaderImpl extends AbstractFieldReader {
}
public int size(){
- return vector.getAccessor().getCount(idx());
+ return vector.getAccessor().getInnerValueCountAt(idx());
}
public void read(int arrayIndex, ${minor.class?cap_first}Holder h){
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
index 49c75d132..980f9ac68 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
@@ -77,24 +77,24 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
public void write(${minor.class?cap_first}Holder h){
mutator.addSafe(idx(), h);
- vector.getMutator().setValueCount(idx());
+ vector.getMutator().setValueCount(idx()+1);
}
public void write(Nullable${minor.class?cap_first}Holder h){
mutator.addSafe(idx(), h);
- vector.getMutator().setValueCount(idx());
+ vector.getMutator().setValueCount(idx()+1);
}
<#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
mutator.addSafe(idx(), <#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
- vector.getMutator().setValueCount(idx());
+ vector.getMutator().setValueCount(idx()+1);
}
</#if>
public void setPosition(int idx){
super.setPosition(idx);
- mutator.startNewGroup(idx);
+ mutator.startNewValue(idx);
}
@@ -102,24 +102,24 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
public void write(${minor.class}Holder h){
mutator.setSafe(idx(), h);
- vector.getMutator().setValueCount(idx());
+ vector.getMutator().setValueCount(idx()+1);
}
public void write(Nullable${minor.class}Holder h){
mutator.setSafe(idx(), h);
- vector.getMutator().setValueCount(idx());
+ vector.getMutator().setValueCount(idx()+1);
}
<#if !(minor.class == "Decimal9" || minor.class == "Decimal18" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense")>
public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, </#if></#list>){
mutator.setSafe(idx(), <#if mode == "Nullable">1, </#if><#list fields as field>${field.name}<#if field_has_next>, </#if></#list>);
- vector.getMutator().setValueCount(idx());
+ vector.getMutator().setValueCount(idx()+1);
}
<#if mode == "Nullable">
public void writeNull(){
mutator.setNull(idx());
- vector.getMutator().setValueCount(idx());
+ vector.getMutator().setValueCount(idx()+1);
}
</#if>
</#if>
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index a805b8e23..7d8581035 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -764,6 +764,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
allocationMonitor = 0;
}
VectorTrimmer.trim(data, idx);
+ data.writerIndex(valueCount * ${type.width});
}
diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java
index 6df4248d1..ab78603ef 100644
--- a/exec/java-exec/src/main/codegen/templates/ListWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java
@@ -46,7 +46,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{
protected final ${containerClass} container;
private Mode mode = Mode.INIT;
private FieldWriter writer;
- protected RepeatedVector innerVector;
+ protected RepeatedValueVector innerVector;
<#if mode == "Repeated">private int currentChildIndex = 0;</#if>
public ${mode}ListWriter(String name, ${containerClass} container, FieldWriter parent){
@@ -158,7 +158,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{
public void start(){
final RepeatedListVector list = (RepeatedListVector) container;
- final RepeatedListVector.Mutator mutator = list.getMutator();
+ final RepeatedListVector.RepeatedMutator mutator = list.getMutator();
// make sure that the current vector can support the end position of this list.
if(container.getValueCapacity() <= idx()){
@@ -169,7 +169,7 @@ public class ${mode}ListWriter extends AbstractFieldWriter{
RepeatedListHolder h = new RepeatedListHolder();
list.getAccessor().get(idx(), h);
if(h.start >= h.end){
- mutator.startNewGroup(idx());
+ mutator.startNewValue(idx());
}
currentChildIndex = container.getMutator().add(idx());
if(writer != null){
diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java
index 38df84bf2..06a6813d4 100644
--- a/exec/java-exec/src/main/codegen/templates/MapWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java
@@ -116,7 +116,7 @@ public class ${mode}MapWriter extends AbstractFieldWriter{
map.getAccessor().get(idx(), h);
if(h.start >= h.end){
- container.getMutator().startNewGroup(idx());
+ container.getMutator().startNewValue(idx());
}
currentChildIndex = container.getMutator().add(idx());
for(FieldWriter w: fields.values()){
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index c0fba660c..37b8fac12 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -19,9 +19,9 @@
import java.lang.Override;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.BaseRepeatedValueVector;
import org.apache.drill.exec.vector.BaseValueVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
-import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
import org.mortbay.jetty.servlet.Holder;
<@pp.dropOutputFile />
@@ -48,14 +48,11 @@ package org.apache.drill.exec.vector;
* NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker.
*/
-public final class Repeated${minor.class}Vector extends BaseValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>Vector {
+public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector implements Repeated<#if type.major == "VarLen">VariableWidth<#else>FixedWidth</#if>VectorLike {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Repeated${minor.class}Vector.class);
- private int parentValueCount;
- private int childValueCount;
-
- private final UInt4Vector offsets; // offsets to start of each record
- private final ${minor.class}Vector values;
+ // we maintain local reference to concrete vector type for performance reasons.
+ private ${minor.class}Vector values;
private final FieldReader reader = new Repeated${minor.class}ReaderImpl(Repeated${minor.class}Vector.this);
private final Mutator mutator = new Mutator();
private final Accessor accessor = new Accessor();
@@ -63,56 +60,57 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
public Repeated${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
- this.offsets = new UInt4Vector(null, allocator);
- MaterializedField mf = MaterializedField.create(field.getPath(), Types.required(field.getType().getMinorType()));
- this.values = new ${minor.class}Vector(mf, allocator);
+ addOrGetVector(VectorDescriptor.create(Types.required(field.getType().getMinorType())));
}
@Override
- public FieldReader getReader(){
- return reader;
+ public Mutator getMutator() {
+ return mutator;
}
- public int getValueCapacity(){
- return Math.min(values.getValueCapacity(), offsets.getValueCapacity() - 1);
+ @Override
+ public Accessor getAccessor() {
+ return accessor;
}
- public int getBufferSize(){
- if(accessor.getGroupCount() == 0){
- return 0;
- }
- return offsets.getBufferSize() + values.getBufferSize();
+ @Override
+ public FieldReader getReader(){
+ return reader;
}
- public UInt4Vector getOffsetVector(){
- return offsets;
- }
-
- public ${minor.class}Vector getValuesVector(){
+ @Override
+ public ${minor.class}Vector getDataVector(){
return values;
}
-
- public DrillBuf getBuffer(){
- return values.getBuffer();
- }
-
+
+ @Override
public TransferPair getTransferPair(){
return new TransferImpl(getField());
}
+
+ @Override
public TransferPair getTransferPair(FieldReference ref){
return new TransferImpl(getField().clone(ref));
}
+ @Override
public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((Repeated${minor.class}Vector) to);
}
-
+
+ @Override
+ public AddOrGetResult<${minor.class}Vector> addOrGetVector(VectorDescriptor descriptor) {
+ final AddOrGetResult<${minor.class}Vector> result = super.addOrGetVector(descriptor);
+ if (result.isCreated()) {
+ values = result.getVector();
+ }
+ return result;
+ }
+
public void transferTo(Repeated${minor.class}Vector target){
target.clear();
offsets.transferTo(target.offsets);
values.transferTo(target.values);
- target.parentValueCount = parentValueCount;
- target.childValueCount = childValueCount;
clear();
}
@@ -132,8 +130,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
normalizedPos = a.get(startIndex+i) - startPos;
m.set(i, normalizedPos);
}
- to.parentValueCount = groups;
- to.childValueCount = valuesToCopy;
m.setValueCount(groups == 0 ? 0 : groups + 1);
}
@@ -167,33 +163,27 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
}
public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
- int count = v.getAccessor().getCount(inIndex);
- getMutator().startNewGroup(outIndex);
+ final int count = v.getAccessor().getInnerValueCountAt(inIndex);
+ getMutator().startNewValue(outIndex);
for (int i = 0; i < count; i++) {
getMutator().add(outIndex, v.getAccessor().get(inIndex, i));
}
}
public void copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
- int count = v.getAccessor().getCount(inIndex);
- getMutator().startNewGroup(outIndex);
+ final int count = v.getAccessor().getInnerValueCountAt(inIndex);
+ getMutator().startNewValue(outIndex);
for (int i = 0; i < count; i++) {
getMutator().addSafe(outIndex, v.getAccessor().get(inIndex, i));
}
}
- @Override
- public void setInitialCapacity(int numRecords) {
- offsets.setInitialCapacity(numRecords + 1);
- values.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
- }
public boolean allocateNewSafe(){
if(!offsets.allocateNewSafe()) return false;
offsets.zeroVector();
if(!values.allocateNewSafe()) return false;
mutator.reset();
- accessor.reset();
return true;
}
@@ -202,36 +192,28 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
offsets.zeroVector();
values.allocateNew();
mutator.reset();
- accessor.reset();
}
<#if type.major == "VarLen">
@Override
- public SerializedField getMetadata() {
- return getMetadataBuilder() //
- .setGroupCount(this.parentValueCount) //
- .setValueCount(this.childValueCount) //
- .setVarByteLength(values.getVarByteLength()) //
- .setBufferLength(getBufferSize()) //
- .build();
+ protected SerializedField.Builder getMetadataBuilder() {
+ return super.getMetadataBuilder()
+ .setVarByteLength(values.getVarByteLength());
}
- public void allocateNew(int totalBytes, int parentValueCount, int childValueCount) {
- offsets.allocateNew(parentValueCount+1);
+ public void allocateNew(int totalBytes, int valueCount, int innerValueCount) {
+ offsets.allocateNew(valueCount+1);
offsets.zeroVector();
- values.allocateNew(totalBytes, childValueCount);
+ values.allocateNew(totalBytes, innerValueCount);
mutator.reset();
- accessor.reset();
}
@Override
- public int load(int dataBytes, int parentValueCount, int childValueCount, DrillBuf buf){
+ public int load(int dataBytes, int valueCount, int innerValueCount, DrillBuf buf){
clear();
- this.parentValueCount = parentValueCount;
- this.childValueCount = childValueCount;
int loaded = 0;
- loaded += offsets.load(parentValueCount+1, buf.slice(loaded, buf.capacity() - loaded));
- loaded += values.load(dataBytes + 4*(childValueCount + 1), childValueCount, buf.slice(loaded, buf.capacity() - loaded));
+ loaded += offsets.load(valueCount+1, buf.slice(loaded, buf.capacity() - loaded));
+ loaded += values.load(dataBytes + 4*(innerValueCount + 1), innerValueCount, buf.slice(loaded, buf.capacity() - loaded));
return loaded;
}
@@ -247,32 +229,20 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
}
<#else>
-
- @Override
- public SerializedField getMetadata() {
- return getMetadataBuilder()
- .setGroupCount(this.parentValueCount)
- .setValueCount(this.childValueCount)
- .setBufferLength(getBufferSize())
- .build();
- }
-
- public void allocateNew(int parentValueCount, int childValueCount) {
+
+ public void allocateNew(int valueCount, int innerValueCount) {
clear();
- offsets.allocateNew(parentValueCount+1);
+ offsets.allocateNew(valueCount+1);
offsets.zeroVector();
- values.allocateNew(childValueCount);
+ values.allocateNew(innerValueCount);
mutator.reset();
- accessor.reset();
}
- public int load(int parentValueCount, int childValueCount, DrillBuf buf){
+ public int load(int valueCount, int innerValueCount, DrillBuf buf){
clear();
- this.parentValueCount = parentValueCount;
- this.childValueCount = childValueCount;
int loaded = 0;
- loaded += offsets.load(parentValueCount+1, buf.slice(loaded, buf.capacity() - loaded));
- loaded += values.load(childValueCount, buf.slice(loaded, buf.capacity() - loaded));
+ loaded += offsets.load(valueCount+1, buf.slice(loaded, buf.capacity() - loaded));
+ loaded += values.load(innerValueCount, buf.slice(loaded, buf.capacity() - loaded));
return loaded;
}
@@ -284,49 +254,12 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
}
</#if>
- @Override
- public DrillBuf[] getBuffers(boolean clear) {
- DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), values.getBuffers(false), DrillBuf.class);
- if (clear) {
- for (DrillBuf buffer:buffers) {
- buffer.retain();
- }
- clear();
- }
- return buffers;
- }
-
- public void clear(){
- offsets.clear();
- values.clear();
- parentValueCount = 0;
- childValueCount = 0;
- }
-
- public Mutator getMutator(){
- return mutator;
- }
-
- public Accessor getAccessor(){
- return accessor;
- }
// This is declared a subclass of the accessor declared inside of FixedWidthVector, this is also used for
// variable length vectors, as they should ahve consistent interface as much as possible, if they need to diverge
// in the future, the interface shold be declared in the respective value vector superclasses for fixed and variable
// and we should refer to each in the generation template
- public final class Accessor extends BaseValueVector.BaseAccessor implements RepeatedFixedWidthVector.RepeatedAccessor{
-
- /**
- * Get the elements at the given index.
- */
- public int getCount(int index) {
- return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
- }
-
- public ValueVector getAllChildValues() {
- return values;
- }
+ public final class Accessor extends BaseRepeatedValueVector.BaseRepeatedAccessor {
public List<${friendlyType}> getObject(int index) {
List<${friendlyType}> vals = new JsonStringArrayList();
@@ -337,10 +270,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
}
return vals;
}
-
- public int getGroupSizeAtIndex(int index){
- return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
- }
public ${friendlyType} getSingleObject(int index, int arrayIndex){
int start = offsets.getAccessor().get(index);
@@ -360,12 +289,7 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
</#if> get(int index, int positionIndex) {
return values.getAccessor().get(offsets.getAccessor().get(index) + positionIndex);
}
-
-
- public boolean isNull(int index){
- return false;
- }
-
+
public void get(int index, Repeated${minor.class}Holder holder){
holder.start = offsets.getAccessor().get(index);
holder.end = offsets.getAccessor().get(index+1);
@@ -375,61 +299,24 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
public void get(int index, int positionIndex, ${minor.class}Holder holder) {
int offset = offsets.getAccessor().get(index);
assert offset >= 0;
- assert positionIndex < getCount(index);
+ assert positionIndex < getInnerValueCountAt(index);
values.getAccessor().get(offset + positionIndex, holder);
}
public void get(int index, int positionIndex, Nullable${minor.class}Holder holder) {
int offset = offsets.getAccessor().get(index);
assert offset >= 0;
- if (positionIndex >= getCount(index)) {
+ if (positionIndex >= getInnerValueCountAt(index)) {
holder.isSet = 0;
return;
}
values.getAccessor().get(offset + positionIndex, holder);
}
-
- public MaterializedField getField() {
- return field;
- }
-
- public int getGroupCount(){
- return parentValueCount;
- }
-
- public int getValueCount(){
- return childValueCount;
- }
-
- public void reset(){
-
- }
}
- public final class Mutator extends BaseValueVector.BaseMutator implements RepeatedMutator {
-
-
- private Mutator(){
- }
-
- public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
- offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index) + repetitionCount);
- }
-
- public BaseDataValueVector getDataVector() {
- return values;
- }
-
- public void setValueCounts(int parentValueCount, int childValueCount){
- Repeated${minor.class}Vector.this.parentValueCount = parentValueCount;
- Repeated${minor.class}Vector.this.childValueCount = childValueCount;
- values.getMutator().setValueCount(childValueCount);
- offsets.getMutator().setValueCount(parentValueCount == 0 ? 0 : parentValueCount + 1);
- }
+ public final class Mutator extends BaseRepeatedValueVector.BaseRepeatedMutator implements RepeatedMutator {
- public void startNewGroup(int index) {
- offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
- }
+ private Mutator() { }
/**
* Add an element to the given record index. This is similar to the set() method in other
@@ -468,7 +355,7 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
public void setSafe(int index, Repeated${minor.class}Holder h){
${minor.class}Holder ih = new ${minor.class}Holder();
- getMutator().startNewGroup(index);
+ getMutator().startNewValue(index);
for(int i = h.start; i < h.end; i++){
h.vector.getAccessor().get(i, ih);
getMutator().addSafe(index, ih);
@@ -511,17 +398,6 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
add(index, innerHolder);
}
}
-
- /**
- * Set the number of value groups in this repeated field.
- * @param groupCount Count of Value Groups.
- */
- public void setValueCount(int groupCount) {
- parentValueCount = groupCount;
- childValueCount = offsets.getAccessor().get(groupCount);
- offsets.getMutator().setValueCount(groupCount == 0 ? 0 : groupCount+1);
- values.getMutator().setValueCount(childValueCount);
- }
public void generateTestData(final int valCount){
int[] sizes = {1,2,0,6};
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java
new file mode 100644
index 000000000..f2a7e6391
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeRuntimeException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class SchemaChangeRuntimeException extends DrillRuntimeException {
+ public SchemaChangeRuntimeException() {
+ super();
+ }
+
+ public SchemaChangeRuntimeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+ public SchemaChangeRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SchemaChangeRuntimeException(String message) {
+ super(message);
+ }
+
+ public SchemaChangeRuntimeException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 7a5b352b0..00a78fd59 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -21,15 +21,13 @@ import java.io.IOException;
import java.util.List;
import com.carrotsearch.hppc.IntOpenHashSet;
-import com.google.common.base.Preconditions;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
@@ -52,9 +50,8 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.vector.RepeatedVector;
+import org.apache.drill.exec.vector.RepeatedValueVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -129,13 +126,13 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
private void setFlattenVector() {
try {
- flattener.setFlattenField((RepeatedVector) incoming.getValueAccessorById(
- incoming.getSchema().getColumn(
- incoming.getValueVectorId(
- popConfig.getColumn()).getFieldIds()[0]).getValueClass(),
- incoming.getValueVectorId(popConfig.getColumn()).getFieldIds()).getValueVector());
+ final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn());
+ final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]);
+ final RepeatedValueVector vector = RepeatedValueVector.class.cast(incoming.getValueAccessorById(
+ field.getValueClass(), typedFieldId.getFieldIds()).getValueVector());
+ flattener.setFlattenField(vector);
} catch (Exception ex) {
- throw new DrillRuntimeException("Trying to flatten a non-repeated filed.");
+ throw UserException.unsupportedError(ex).message("Trying to flatten a non-repeated field.").build();
}
}
@@ -152,7 +149,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
// inside of the the flattener for the current batch
setFlattenVector();
- int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getValueCount();
+ int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount();
int outputRecords = flattener.flattenRecords(0, incomingRecordCount, 0);
// TODO - change this to be based on the repeated vector length
if (outputRecords < childCount) {
@@ -178,7 +175,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
}
private void handleRemainder() {
- int remainingRecordCount = flattener.getFlattenField().getAccessor().getValueCount() - remainderIndex;
+ int remainingRecordCount = flattener.getFlattenField().getAccessor().getInnerValueCount() - remainderIndex;
if (!doAlloc()) {
outOfMemory = true;
return;
@@ -271,7 +268,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
if (flattenField instanceof RepeatedMapVector) {
tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference);
} else {
- ValueVector vvIn = ((RepeatedVector)flattenField).getAccessor().getAllChildValues();
+ final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector();
// vvIn may be null because of fast schema return for repeated list vectors
if (vvIn != null) {
tp = vvIn.getTransferPair(reference);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
index 96209a23b..b8d040c40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java
@@ -31,8 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import com.google.common.collect.ImmutableList;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector.RepeatedAccessor;
-import org.apache.drill.exec.vector.RepeatedVector;
+import org.apache.drill.exec.vector.RepeatedValueVector;
public abstract class FlattenTemplate implements Flattener {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenTemplate.class);
@@ -43,9 +42,9 @@ public abstract class FlattenTemplate implements Flattener {
private SelectionVector2 vector2;
private SelectionVector4 vector4;
private SelectionVectorMode svMode;
- RepeatedVector fieldToFlatten;
- RepeatedAccessor accessor;
- private int groupIndex;
+ private RepeatedValueVector fieldToFlatten;
+ private RepeatedValueVector.RepeatedAccessor accessor;
+ private int valueIndex;
// this allows for groups to be written between batches if we run out of space, for cases where we have finished
// a batch on the boundary it will be set to 0
@@ -60,12 +59,12 @@ public abstract class FlattenTemplate implements Flattener {
}
@Override
- public void setFlattenField(RepeatedVector flattenField) {
+ public void setFlattenField(RepeatedValueVector flattenField) {
this.fieldToFlatten = flattenField;
- this.accessor = flattenField.getAccessor();
+ this.accessor = RepeatedValueVector.RepeatedAccessor.class.cast(flattenField.getAccessor());
}
- public RepeatedVector getFlattenField() {
+ public RepeatedValueVector getFlattenField() {
return fieldToFlatten;
}
@@ -84,14 +83,14 @@ public abstract class FlattenTemplate implements Flattener {
childIndexWithinCurrGroup = 0;
}
outer: {
- final int groupCount = accessor.getGroupCount();
- for ( ; groupIndex < groupCount; groupIndex++) {
- currGroupSize = accessor.getGroupSizeAtIndex(groupIndex);
+ final int valueCount = accessor.getValueCount();
+ for ( ; valueIndex < valueCount; valueIndex++) {
+ currGroupSize = accessor.getInnerValueCountAt(valueIndex);
for ( ; childIndexWithinCurrGroup < currGroupSize; childIndexWithinCurrGroup++) {
if (firstOutputIndex == OUTPUT_BATCH_SIZE) {
break outer;
}
- doEval(groupIndex, firstOutputIndex);
+ doEval(valueIndex, firstOutputIndex);
firstOutputIndex++;
childIndex++;
}
@@ -133,7 +132,7 @@ public abstract class FlattenTemplate implements Flattener {
@Override
public void resetGroupIndex() {
- this.groupIndex = 0;
+ this.valueIndex = 0;
this.currGroupSize = 0;
this.childIndex = 0;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 2141ca2c4..323bf4349 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -24,14 +24,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.vector.RepeatedVector;
+import org.apache.drill.exec.vector.RepeatedValueVector;
public interface Flattener {
public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException;
public abstract int flattenRecords(int startIndex, int recordCount, int firstOutputIndex);
- public void setFlattenField(RepeatedVector repeatedColumn);
- public RepeatedVector getFlattenField();
+ public void setFlattenField(RepeatedValueVector repeatedColumn);
+ public RepeatedValueVector getFlattenField();
public void resetGroupIndex();
public static TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 32ffb6f0b..946d11763 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -63,6 +63,7 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import com.carrotsearch.hppc.IntOpenHashSet;
@@ -415,7 +416,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
cg.addExpr(expr);
- } else{
+ } else {
// need to do evaluation.
final ValueVector vector = container.addOrGet(outputField, callBack);
allocationVectors.add(vector);
@@ -424,6 +425,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
final ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, useSetSafe);
final HoldingContainer hc = cg.addExpr(write);
+ // We cannot do multiple transfers from the same vector. However we still need to instantiate the output vector.
+ if (expr instanceof ValueVectorReadExpression) {
+ final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+ if (!vectorRead.hasReadPath()) {
+ final TypedFieldId id = vectorRead.getFieldId();
+ final ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+ vvIn.makeTransferPair(vector);
+ }
+ }
logger.debug("Added eval for project expression.");
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index 8387d49a6..e602fd745 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -18,9 +18,9 @@
package org.apache.drill.exec.store;
import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
import org.apache.drill.exec.vector.RepeatedMutator;
-import org.apache.drill.exec.vector.RepeatedVariableWidthVector;
+import org.apache.drill.exec.vector.RepeatedVariableWidthVectorLike;
import org.apache.drill.exec.vector.ValueVector;
public class VectorHolder {
@@ -35,7 +35,7 @@ public class VectorHolder {
public VectorHolder(int length, ValueVector vector) {
this.length = length;
this.vector = vector;
- if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+ if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) {
repeated = true;
}
}
@@ -43,7 +43,7 @@ public class VectorHolder {
public VectorHolder(ValueVector vector) {
this.length = vector.getValueCapacity();
this.vector = vector;
- if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+ if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) {
repeated = true;
}
}
@@ -90,7 +90,7 @@ public class VectorHolder {
public void populateVectorLength() {
ValueVector.Mutator mutator = vector.getMutator();
- if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+ if (vector instanceof RepeatedFixedWidthVectorLike || vector instanceof RepeatedVariableWidthVectorLike) {
mutator.setValueCount(groupCount);
} else {
mutator.setValueCount(count);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
index 3ad5c2a36..40276f498 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
@@ -192,7 +192,7 @@ class RepeatedVarCharOutput extends TextOutput {
}
private void loadVarCharDataAddress(){
- DrillBuf buf = vector.getValuesVector().getBuffer();
+ DrillBuf buf = vector.getDataVector().getBuffer();
checkBuf(buf);
this.characterData = buf.memoryAddress();
this.characterDataOriginal = buf.memoryAddress();
@@ -200,7 +200,7 @@ class RepeatedVarCharOutput extends TextOutput {
}
private void loadVarCharOffsetAddress(){
- DrillBuf buf = vector.getValuesVector().getOffsetVector().getBuffer();
+ DrillBuf buf = vector.getDataVector().getOffsetVector().getBuffer();
checkBuf(buf);
this.charLengthOffset = buf.memoryAddress() + 4;
this.charLengthOffsetOriginal = buf.memoryAddress() + 4; // add four as offsets conceptually start at 1. (first item is 0..1)
@@ -208,14 +208,14 @@ class RepeatedVarCharOutput extends TextOutput {
}
private void expandVarCharOffsets(){
- vector.getValuesVector().getOffsetVector().reAlloc();
+ vector.getDataVector().getOffsetVector().reAlloc();
long diff = charLengthOffset - charLengthOffsetOriginal;
loadVarCharOffsetAddress();
charLengthOffset += diff;
}
private void expandVarCharData(){
- vector.getValuesVector().reAlloc();
+ vector.getDataVector().reAlloc();
long diff = characterData - characterDataOriginal;
loadVarCharDataAddress();
characterData += diff;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index 7f8b61175..2b929a473 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -20,7 +20,10 @@ package org.apache.drill.exec.store.parquet.columnreaders;
import java.io.IOException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
+import org.apache.drill.exec.vector.RepeatedValueVector;
+import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
@@ -29,7 +32,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
public class FixedWidthRepeatedReader extends VarLengthColumn {
- RepeatedFixedWidthVector castedRepeatedVector;
+ RepeatedValueVector castedRepeatedVector;
ColumnReader dataReader;
int dataTypeLengthInBytes;
// we can do a vector copy of the data once we figure out how much we need to copy
@@ -47,9 +50,9 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
boolean notFishedReadingList;
byte[] leftOverBytes;
- FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
+ FixedWidthRepeatedReader(ParquetRecordReader parentReader, ColumnReader dataReader, int dataTypeLengthInBytes, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, RepeatedValueVector valueVector, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, valueVector, schemaElement);
- castedRepeatedVector = (RepeatedFixedWidthVector) valueVector;
+ this.castedRepeatedVector = valueVector;
this.dataTypeLengthInBytes = dataTypeLengthInBytes;
this.dataReader = dataReader;
this.dataReader.pageReader.clear();
@@ -65,7 +68,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
bytesReadInCurrentPass = 0;
valuesReadInCurrentPass = 0;
pageReader.valuesReadyToRead = 0;
- dataReader.vectorData = castedRepeatedVector.getMutator().getDataVector().getBuffer();
+ dataReader.vectorData = BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer();
dataReader.valuesReadInCurrentPass = 0;
repeatedGroupsReadInCurrentPass = 0;
}
@@ -200,8 +203,8 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
currentValueListLength += numLeftoverVals;
}
// this should not fail
- castedRepeatedVector.getMutator().setRepetitionAtIndexSafe(repeatedGroupsReadInCurrentPass,
- currentValueListLength);
+ final UInt4Vector offsets = castedRepeatedVector.getOffsetVector();
+ offsets.getMutator().setSafe(repeatedGroupsReadInCurrentPass + 1, offsets.getAccessor().get(repeatedGroupsReadInCurrentPass));
// This field is being referenced in the superclass determineSize method, so we need to set it here
// again going to make this the length in BYTES to avoid repetitive multiplication/division
dataTypeLengthInBits = repeatedValuesInCurrentList * dataTypeLengthInBytes;
@@ -218,12 +221,13 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
dataReader.valuesReadInCurrentPass = 0;
dataReader.readValues(valuesToRead);
valuesReadInCurrentPass += valuesToRead;
- castedRepeatedVector.getMutator().setValueCounts(repeatedGroupsReadInCurrentPass, valuesReadInCurrentPass);
+ castedRepeatedVector.getMutator().setValueCount(repeatedGroupsReadInCurrentPass);
+ castedRepeatedVector.getDataVector().getMutator().setValueCount(valuesReadInCurrentPass);
}
@Override
public int capacity() {
- return castedRepeatedVector.getMutator().getDataVector().getBuffer().capacity();
+ return BaseDataValueVector.class.cast(castedRepeatedVector.getDataVector()).getBuffer().capacity();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2f07fb352..0cbd48013 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -41,7 +41,7 @@ import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.parquet.DirectCodecFactory;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.RepeatedValueVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -274,7 +274,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
try {
- ValueVector v;
+ ValueVector vector;
SchemaElement schemaElement;
ArrayList<VarLengthColumn> varLengthColumns = new ArrayList<>();
// initialize all of the column read status objects
@@ -292,23 +292,24 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
- v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ vector = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
if (column.getMaxRepetitionLevel() > 0) {
+ final RepeatedValueVector repeatedVector = RepeatedValueVector.class.cast(vector);
ColumnReader dataReader = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
column, columnChunkMetaData, recordsPerBatch,
- ((RepeatedFixedWidthVector) v).getMutator().getDataVector(), schemaElement);
+ repeatedVector.getDataVector(), schemaElement);
varLengthColumns.add(new FixedWidthRepeatedReader(this, dataReader,
- getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, v, schemaElement));
+ getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, repeatedVector, schemaElement));
}
else {
columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
- column, columnChunkMetaData, recordsPerBatch, v,
+ column, columnChunkMetaData, recordsPerBatch, vector,
schemaElement));
}
} else {
// create a reader and add it to the appropriate list
- varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, v, schemaElement));
+ varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, column, columnChunkMetaData, false, vector, schemaElement));
}
}
varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index e25bd7408..c59ade9ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -162,7 +162,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
// index of the scanned field
int p = 0;
int i = 0;
- vector.getMutator().startNewGroup(recordCount);
+ vector.getMutator().startNewValue(recordCount);
// Process each field in this line
while (end < value.getLength() - 1) {
if(numCols > 0 && p >= numCols) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java
index df4279ac4..7d1f08dab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AddOrGetResult.java
@@ -17,9 +17,22 @@
*/
package org.apache.drill.exec.vector;
-public interface RepeatedVector extends ValueVector {
- public static final int DEFAULT_REPEAT_PER_RECORD = 4;
+import com.google.common.base.Preconditions;
- public RepeatedFixedWidthVector.RepeatedAccessor getAccessor();
+public class AddOrGetResult<V extends ValueVector> {
+ private final V vector;
+ private final boolean created;
+ public AddOrGetResult(V vector, boolean created) {
+ this.vector = Preconditions.checkNotNull(vector);
+ this.created = created;
+ }
+
+ public V getVector() {
+ return vector;
+ }
+
+ public boolean isCreated() {
+ return created;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index bf465c799..eddefd0ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -31,10 +31,10 @@ public class AllocationHelper {
((FixedWidthVector) v).allocateNew(valueCount);
} else if (v instanceof VariableWidthVector) {
((VariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount);
- }else if(v instanceof RepeatedFixedWidthVector){
- ((RepeatedFixedWidthVector) v).allocateNew(valueCount, childValCount);
- }else if(v instanceof RepeatedVariableWidthVector){
- ((RepeatedVariableWidthVector) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount);
+ }else if(v instanceof RepeatedFixedWidthVectorLike){
+ ((RepeatedFixedWidthVectorLike) v).allocateNew(valueCount, childValCount);
+ }else if(v instanceof RepeatedVariableWidthVectorLike){
+ ((RepeatedVariableWidthVectorLike) v).allocateNew(childValCount * bytesPerValue, valueCount, childValCount);
}else{
v.allocateNew();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 0c6097c48..6d356f2c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -22,8 +22,8 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.MaterializedField;
-public abstract class BaseDataValueVector<V extends BaseValueVector<V, A, M>, A extends BaseValueVector.BaseAccessor,
- M extends BaseValueVector.BaseMutator> extends BaseValueVector<V, A, M> {
+public abstract class BaseDataValueVector<A extends BaseValueVector.BaseAccessor, M extends BaseValueVector.BaseMutator>
+ extends BaseValueVector<A, M> {
protected DrillBuf data;
@@ -36,6 +36,7 @@ public abstract class BaseDataValueVector<V extends BaseValueVector<V, A, M>, A
public void clear() {
data.release();
data = allocator.getEmpty();
+ super.clear();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java
new file mode 100644
index 000000000..bcf079375
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseRepeatedValueVector.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ObjectArrays;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeRuntimeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.MaterializedField;
+
+public abstract class BaseRepeatedValueVector<A extends RepeatedValueVector.RepeatedAccessor, M extends RepeatedValueVector.RepeatedMutator>
+ extends BaseValueVector<A, M> implements RepeatedValueVector<A, M> {
+
+ public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE;
+ public final static String OFFSETS_VECTOR_NAME = "offsets";
+ public final static String DATA_VECTOR_NAME = "data";
+
+ private final static MaterializedField offsetsField =
+ MaterializedField.create(OFFSETS_VECTOR_NAME, Types.required(TypeProtos.MinorType.UINT4));
+
+ protected final UInt4Vector offsets;
+ protected ValueVector vector;
+
+ protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator allocator) {
+ this(field, allocator, DEFAULT_DATA_VECTOR);
+ }
+
+ protected BaseRepeatedValueVector(MaterializedField field, BufferAllocator allocator, ValueVector vector) {
+ super(field, allocator);
+ this.offsets = new UInt4Vector(offsetsField, allocator);
+ this.vector = Preconditions.checkNotNull(vector, "data vector cannot be null");
+ }
+
+ @Override
+ public boolean allocateNewSafe() {
+ if (!offsets.allocateNewSafe()) {
+ return false;
+ }
+ offsets.zeroVector();
+ return vector.allocateNewSafe();
+ }
+
+ @Override
+ public UInt4Vector getOffsetVector() {
+ return offsets;
+ }
+
+ @Override
+ public ValueVector getDataVector() {
+ return vector;
+ }
+
+ @Override
+ public void setInitialCapacity(int numRecords) {
+ offsets.setInitialCapacity(numRecords + 1);
+ vector.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
+ }
+
+ @Override
+ public int getValueCapacity() {
+ final int offsetValueCapacity = offsets.getValueCapacity() - 1;
+ if (vector == DEFAULT_DATA_VECTOR) {
+ return offsetValueCapacity;
+ }
+ return Math.min(vector.getValueCapacity(), offsetValueCapacity);
+ }
+
+ @Override
+ protected UserBitShared.SerializedField.Builder getMetadataBuilder() {
+ return super.getMetadataBuilder()
+ .setGroupCount(getAccessor().getValueCount())
+ .setValueCount(getAccessor().getInnerValueCount())
+ .addChild(vector.getMetadata());
+ }
+
+ @Override
+ public int getBufferSize() {
+ if (getAccessor().getValueCount() == 0) {
+ return 0;
+ }
+ return offsets.getBufferSize() + vector.getBufferSize();
+ }
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return Collections.singleton(getDataVector()).iterator();
+ }
+
+ @Override
+ public void clear() {
+ offsets.clear();
+ vector.clear();
+ super.clear();
+ }
+
+ @Override
+ public DrillBuf[] getBuffers(boolean clear) {
+ final DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), vector.getBuffers(false), DrillBuf.class);
+ if (clear) {
+ for (DrillBuf buffer:buffers) {
+ buffer.retain();
+ }
+ clear();
+ }
+ return buffers;
+ }
+
+ /**
+ * Returns 1 if inner vector is explicitly set via #addOrGetVector else 0
+ *
+ * @see {@link ContainerVectorLike#size}
+ */
+ @Override
+ public int size() {
+ return vector == DEFAULT_DATA_VECTOR ? 0:1;
+ }
+
+ @Override
+ public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
+ boolean created = false;
+ if (vector == DEFAULT_DATA_VECTOR) {
+ vector = TypeHelper.getNewVector(MaterializedField.create(DATA_VECTOR_NAME, descriptor.getType()), allocator);
+ getField().addChild(vector.getField());
+ created = true;
+ }
+
+ final TypeProtos.MajorType actual = vector.getField().getType();
+ if (!actual.equals(descriptor.getType())) {
+ final String msg = String.format("Inner vector type mismatch. Requested type: [%s], actual type: [%s]",
+ descriptor.getType(), actual);
+ throw new SchemaChangeRuntimeException(msg);
+ }
+
+ return new AddOrGetResult<>((T)vector, created);
+ }
+
+ public abstract class BaseRepeatedAccessor extends BaseValueVector.BaseAccessor implements RepeatedAccessor {
+
+ @Override
+ public int getValueCount() {
+ return Math.max(offsets.getAccessor().getValueCount() - 1, 0);
+ }
+
+ @Override
+ public int getInnerValueCount() {
+ return vector.getAccessor().getValueCount();
+ }
+
+ @Override
+ public int getInnerValueCountAt(int index) {
+ return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return false;
+ }
+
+ @Override
+ public boolean isEmpty(int index) {
+ return false;
+ }
+ }
+
+ public abstract class BaseRepeatedMutator extends BaseValueVector.BaseMutator implements RepeatedMutator {
+
+ @Override
+ public void startNewValue(int index) {
+ offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
+ setValueCount(index+1);
+ }
+
+ @Override
+ public void setValueCount(int valueCount) {
+ // TODO: populate offset end points
+ offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1);
+ final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount);
+ vector.getMutator().setValueCount(childValueCount);
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 22f0fe740..67c489daf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -25,9 +25,10 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
-public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A extends BaseValueVector.BaseAccessor,
- M extends BaseValueVector.BaseMutator> implements ValueVector<V, A, M> {
+public abstract class BaseValueVector<A extends ValueVector.Accessor, M extends ValueVector.Mutator>
+ implements ValueVector<A, M> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
protected final BufferAllocator allocator;
@@ -40,6 +41,11 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte
}
@Override
+ public void clear() {
+ getMutator().reset();
+ }
+
+ @Override
public void close() {
clear();
}
@@ -54,6 +60,11 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte
}
@Override
+ public TransferPair getTransferPair() {
+ return getTransferPair(new FieldReference(getField().getPath()));
+ }
+
+ @Override
public SerializedField getMetadata() {
return getMetadataBuilder().build();
}
@@ -76,11 +87,15 @@ public abstract class BaseValueVector<V extends BaseValueVector<V, A, M>, A exte
public abstract static class BaseMutator implements ValueVector.Mutator {
protected BaseMutator() { }
+ @Override
+ public void generateTestData(int values) { }
+
+ //TODO: consider making mutator stateless(if possible) on another issue.
public void reset() { }
}
@Override
- public Iterator<ValueVector<V,A,M>> iterator() {
+ public Iterator<ValueVector> iterator() {
return Iterators.emptyIterator();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java
new file mode 100644
index 000000000..95e3365d8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ContainerVectorLike.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+/**
+ * A mix-in used for introducing container vector-like behaviour.
+ */
+public interface ContainerVectorLike {
+
+ /**
+ * Creates and adds a child vector if none with the same name exists, else returns the vector instance.
+ *
+ * @param descriptor vector descriptor
+ * @return result of operation wrapping vector corresponding to the given descriptor and whether it's newly created
+ * @throws org.apache.drill.common.exceptions.DrillRuntimeException
+ * if schema change is not permissible between the given and existing data vector types.
+ */
+ <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor);
+
+ /**
+ * Returns the number of child vectors in this container vector-like instance.
+ */
+ int size();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java
index eaae7ad60..450c673b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedFixedWidthVectorLike.java
@@ -19,35 +19,38 @@ package org.apache.drill.exec.vector;
import io.netty.buffer.DrillBuf;
-public interface RepeatedFixedWidthVector extends ValueVector, RepeatedVector {
+/**
+ * A {@link org.apache.drill.exec.vector.ValueVector} mix-in that can be used in conjunction with
+ * {@link org.apache.drill.exec.vector.RepeatedValueVector} subtypes.
+ */
+public interface RepeatedFixedWidthVectorLike {
/**
* Allocate a new memory space for this vector. Must be called prior to using the ValueVector.
*
- * @param parentValueCount Number of separate repeating groupings.
- * @param childValueCount Number of supported values in the vector.
+ * @param valueCount Number of separate repeating groupings.
+ * @param innerValueCount Number of supported values in the vector.
*/
- public void allocateNew(int parentValueCount, int childValueCount);
+ public void allocateNew(int valueCount, int innerValueCount);
/**
* Load the records in the provided buffer based on the given number of values.
- * @param parentValueCount Number of separate repeating groupings.
- * @param valueCount Number atomic values the buffer contains.
+ * @param valueCount Number of separate repeating groupings.
+ * @param innerValueCount Number atomic values the buffer contains.
* @param buf Incoming buffer.
* @return The number of bytes of the buffer that were consumed.
*/
- public int load(int parentValueCount, int childValueCount, DrillBuf buf);
-
- public abstract RepeatedMutator getMutator();
+ public int load(int valueCount, int innerValueCount, DrillBuf buf);
- public interface RepeatedAccessor extends Accessor {
- public int getGroupCount();
- public int getValueCount();
- public int getGroupSizeAtIndex(int index);
- public ValueVector getAllChildValues();
- }
- public interface RepeatedMutator extends Mutator {
- public void setValueCounts(int parentValueCount, int childValueCount);
- public void setRepetitionAtIndexSafe(int index, int repetitionCount);
- public BaseDataValueVector getDataVector();
- }
+// public interface RepeatedAccessor extends Accessor {
+// public int getGroupCount();
+// public int getValueCount();
+// public int getGroupSizeAtIndex(int index);
+// public ValueVector getAllChildValues();
+// }
+//
+// public interface RepeatedMutator extends Mutator {
+// public void setValueCounts(int parentValueCount, int childValueCount);
+// public void setRepetitionAtIndexSafe(int index, int repetitionCount);
+// public BaseDataValueVector getDataVector();
+// }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java
new file mode 100644
index 000000000..d5a82816a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedValueVector.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+/**
+ * An abstraction representing repeated value vectors.
+ *
+ * A repeated vector contains values that may either be flat or nested. A value consists of zero or more cells(inner values).
+ * Current design maintains data and offsets vectors. Each cell is stored in the data vector. Repeated vector
+ * uses the offset vector to determine the sequence of cells pertaining to an individual value.
+ *
+ * @param <A> repeated accessor type
+ * @param <M> repeated mutator type
+ */
+public interface RepeatedValueVector<A extends RepeatedValueVector.RepeatedAccessor, M extends RepeatedValueVector.RepeatedMutator>
+ extends ValueVector<A, M>, ContainerVectorLike {
+
+ final static int DEFAULT_REPEAT_PER_RECORD = 5;
+
+ /**
+ * Returns the underlying offset vector or null if none exists.
+ *
+ * TODO(DRILL-2995): eliminate exposing low-level interfaces.
+ */
+ UInt4Vector getOffsetVector();
+
+ /**
+ * Returns the underlying data vector or null if none exists.
+ */
+ ValueVector getDataVector();
+
+ @Override
+ A getAccessor();
+
+ @Override
+ M getMutator();
+
+ interface RepeatedAccessor extends ValueVector.Accessor {
+ /**
+ * Returns total number of cells that vector contains.
+ *
+ * The result includes empty, null valued cells.
+ */
+ int getInnerValueCount();
+
+
+ /**
+ * Returns number of cells that the value at the given index contains.
+ */
+ int getInnerValueCountAt(int index);
+
+ /**
+ * Returns true if the value at the given index is empty, false otherwise.
+ *
+ * @param index value index
+ */
+ boolean isEmpty(int index);
+ }
+
+ interface RepeatedMutator extends ValueVector.Mutator {
+ /**
+ * Starts a new value that is a container of cells.
+ *
+ * @param index index of new value to start
+ */
+ void startNewValue(int index);
+
+
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java
index a49934168..ac8589ef1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedVariableWidthVectorLike.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.vector;
import io.netty.buffer.DrillBuf;
-public interface RepeatedVariableWidthVector extends ValueVector, RepeatedVector {
+public interface RepeatedVariableWidthVectorLike {
/**
* Allocate a new memory space for this vector. Must be called prior to using the ValueVector.
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
index 386ee3495..de05131fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/SchemaChangeCallBack.java
@@ -28,9 +28,9 @@ public class SchemaChangeCallBack implements CallBack {
}
public boolean getSchemaChange() {
- boolean schemaChange = this.schemaChange;
- this.schemaChange = false;
- return schemaChange;
+ final boolean current = schemaChange;
+ schemaChange = false;
+ return current;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index e4a0997ed..ab9992e6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -37,12 +37,11 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
* A vector when instantiated, relies on a {@link org.apache.drill.exec.record.DeadBuf dead buffer}. It is important
* that vector is allocated before attempting to read or write.
*
- * @param <V> actual value vector type
* @param <A> accessor type that supports reading from this vector
* @param <M> mutator type that supports writing to this vector
*/
-public interface ValueVector<V extends ValueVector, A extends ValueVector.Accessor, M extends ValueVector.Mutator>
- extends Closeable, Iterable<ValueVector<V, A, M>> {
+public interface ValueVector<A extends ValueVector.Accessor, M extends ValueVector.Mutator>
+ extends Closeable, Iterable<ValueVector> {
/**
* Allocate new buffers. ValueVector implements logic to determine how much to allocate.
@@ -94,7 +93,7 @@ public interface ValueVector<V extends ValueVector, A extends ValueVector.Access
* Returns a new {@link org.apache.drill.exec.record.TransferPair transfer pair} that is used to transfer underlying
* buffers into the target vector.
*/
- TransferPair makeTransferPair(V target);
+ TransferPair makeTransferPair(ValueVector target);
/**
* Returns an {@link org.apache.drill.exec.vector.ValueVector.Accessor accessor} that is used to read from this vector
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java
new file mode 100644
index 000000000..9a29848cf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VectorDescriptor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.common.types.TypeProtos;
+
+public class VectorDescriptor {
+ private static final String DEFAULT_NAME = new String("NONE");
+
+ private final TypeProtos.MajorType type;
+ private final String name;
+
+ public VectorDescriptor(TypeProtos.MajorType type) {
+ this(DEFAULT_NAME, type);
+ }
+
+ public VectorDescriptor(String name,TypeProtos.MajorType type) {
+ this.name = Preconditions.checkNotNull(name, "name cannot be null");
+ this.type = Preconditions.checkNotNull(type, "type cannot be null");
+ }
+
+ public TypeProtos.MajorType getType() {
+ return type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean hasName() {
+ return name != DEFAULT_NAME;
+ }
+
+ public static VectorDescriptor create(String name, TypeProtos.MajorType type) {
+ return new VectorDescriptor(name, type);
+ }
+
+ public static VectorDescriptor create(TypeProtos.MajorType type) {
+ return new VectorDescriptor(type);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
new file mode 100644
index 000000000..db8d327fd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector;
+
+import java.util.Iterator;
+
+import com.google.common.collect.Iterators;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.impl.NullReader;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+public class ZeroVector implements ValueVector {
+ public final static ZeroVector INSTANCE = new ZeroVector();
+
+ private final MaterializedField field = MaterializedField.create("[DEFAULT]", Types.LATE_BIND_TYPE);
+
+ private final TransferPair defaultPair = new TransferPair() {
+ @Override
+ public void transfer() { }
+
+ @Override
+ public void splitAndTransfer(int startIndex, int length) { }
+
+ @Override
+ public ValueVector getTo() {
+ return ZeroVector.this;
+ }
+
+ @Override
+ public void copyValueSafe(int from, int to) { }
+ };
+
+ private final Accessor defaultAccessor = new Accessor() {
+ @Override
+ public Object getObject(int index) {
+ return null;
+ }
+
+ @Override
+ public int getValueCount() {
+ return 0;
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return true;
+ }
+ };
+
+ private final Mutator defaultMutator = new Mutator() {
+ @Override
+ public void setValueCount(int valueCount) { }
+
+ @Override
+ public void reset() { }
+
+ @Override
+ public void generateTestData(int values) { }
+ };
+
+ public ZeroVector() { }
+
+ @Override
+ public void close() { }
+
+ @Override
+ public void clear() { }
+
+ @Override
+ public MaterializedField getField() {
+ return field;
+ }
+
+ @Override
+ public TransferPair getTransferPair() {
+ return defaultPair;
+ }
+
+ @Override
+ public UserBitShared.SerializedField getMetadata() {
+ return getField()
+ .getAsBuilder()
+ .setBufferLength(getBufferSize())
+ .setValueCount(getAccessor().getValueCount())
+ .build();
+ }
+
+ @Override
+ public Iterator iterator() {
+ return Iterators.emptyIterator();
+ }
+
+ @Override
+ public int getBufferSize() {
+ return 0;
+ }
+
+ @Override
+ public DrillBuf[] getBuffers(boolean clear) {
+ return new DrillBuf[0];
+ }
+
+ @Override
+ public void allocateNew() throws OutOfMemoryRuntimeException {
+ allocateNewSafe();
+ }
+
+ @Override
+ public boolean allocateNewSafe() {
+ return true;
+ }
+
+ @Override
+ public void setInitialCapacity(int numRecords) { }
+
+ @Override
+ public int getValueCapacity() {
+ return 0;
+ }
+
+ @Override
+ public TransferPair getTransferPair(FieldReference ref) {
+ return defaultPair;
+ }
+
+ @Override
+ public TransferPair makeTransferPair(ValueVector target) {
+ return defaultPair;
+ }
+
+ @Override
+ public Accessor getAccessor() {
+ return defaultAccessor;
+ }
+
+ @Override
+ public Mutator getMutator() {
+ return defaultMutator;
+ }
+
+ @Override
+ public FieldReader getReader() {
+ return NullReader.INSTANCE;
+ }
+
+ @Override
+ public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) { }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 41388393c..b615b66f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -101,7 +101,8 @@ public class MapVector extends AbstractMapVector {
@Override
public void setInitialCapacity(int numRecords) {
- for (ValueVector v : (ValueVector<?,?,?>)this) {
+ final Iterable<ValueVector> container = this;
+ for (ValueVector v : container) {
v.setInitialCapacity(numRecords);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index c06102999..b5de8b1e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -17,13 +17,12 @@
*/
package org.apache.drill.exec.vector.complex;
+import com.google.common.base.Preconditions;
import io.netty.buffer.DrillBuf;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
@@ -33,380 +32,391 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.ComplexHolder;
import org.apache.drill.exec.expr.holders.RepeatedListHolder;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.util.JsonStringArrayList;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.AddOrGetResult;
+import org.apache.drill.exec.vector.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
import org.apache.drill.exec.util.CallBack;
+import org.apache.drill.exec.vector.RepeatedValueVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VectorDescriptor;
import org.apache.drill.exec.vector.complex.impl.NullReader;
import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
-import com.google.common.base.Preconditions;
-
-public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{
+public class RepeatedListVector extends AbstractContainerVector
+ implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
public final static MajorType TYPE = Types.repeated(MinorType.LIST);
-
- private final UInt4Vector offsets; // offsets to start of each record
- private final Mutator mutator = new Mutator();
- private final RepeatedListAccessor accessor = new RepeatedListAccessor();
- private ValueVector vector;
private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
- private final EmptyValuePopulator emptyPopulator;
+ private final DelegateRepeatedVector delegate;
+
+ protected static class DelegateRepeatedVector
+ extends BaseRepeatedValueVector<DelegateRepeatedVector.RepeatedListAccessor, DelegateRepeatedVector.RepeatedListMutator> {
+
+ private final RepeatedListAccessor accessor = new RepeatedListAccessor();
+ private final RepeatedListMutator mutator = new RepeatedListMutator();
+ private final EmptyValuePopulator emptyPopulator;
+ private transient DelegateTransferPair ephPair;
+
+ public class RepeatedListAccessor extends BaseRepeatedValueVector.BaseRepeatedAccessor {
+
+ @Override
+ public Object getObject(int index) {
+ List<Object> list = new JsonStringArrayList();
+ final int start = offsets.getAccessor().get(index);
+ final int until = offsets.getAccessor().get(index+1);
+ for (int i = start; i < until; i++) {
+ list.add(vector.getAccessor().getObject(i));
+ }
+ return list;
+ }
+ public void get(int index, RepeatedListHolder holder) {
+ assert index <= getValueCapacity();
+ holder.start = getOffsetVector().getAccessor().get(index);
+ holder.end = getOffsetVector().getAccessor().get(index+1);
+ }
- public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack){
- this(MaterializedField.create(path, TYPE), allocator, callBack);
- }
+ public void get(int index, ComplexHolder holder) {
+ final FieldReader reader = getReader();
+ reader.setPosition(index);
+ holder.reader = reader;
+ }
- public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack){
- super(field, allocator, callBack);
- int childrenSize = field.getChildren().size();
+ public void get(int index, int arrayIndex, ComplexHolder holder) {
+ final RepeatedListHolder listHolder = new RepeatedListHolder();
+ get(index, listHolder);
+ int offset = listHolder.start + arrayIndex;
+ if (offset >= listHolder.end) {
+ holder.reader = NullReader.INSTANCE;
+ } else {
+ FieldReader r = getDataVector().getReader();
+ r.setPosition(offset);
+ holder.reader = r;
+ }
+ }
+ }
- // repeated list vector should not have more than one child
- assert childrenSize <= 1;
+ public class RepeatedListMutator extends BaseRepeatedValueVector.BaseRepeatedMutator {
- if (childrenSize > 0) {
- MaterializedField child = field.getChildren().iterator().next();
- setVector(TypeHelper.getNewVector(child, allocator, callBack));
- }
+ public int add(int index) {
+ final int curEnd = getOffsetVector().getAccessor().get(index+1);
+ getOffsetVector().getMutator().setSafe(index + 1, curEnd + 1);
+ return curEnd;
+ }
- this.offsets = new UInt4Vector(null, allocator);
- this.emptyPopulator = new EmptyValuePopulator(offsets);
- }
+ @Override
+ public void startNewValue(int index) {
+ emptyPopulator.populate(index+1);
+ super.startNewValue(index);
+ }
- @Override
- public RepeatedListReaderImpl getReader() {
- return reader;
- }
+ @Override
+ public void setValueCount(int valueCount) {
+ emptyPopulator.populate(valueCount);
+ super.setValueCount(valueCount);
+ }
+ }
- @Override
- public int size() {
- return vector != null ? 1 : 0;
- }
+ public class DelegateTransferPair implements TransferPair {
+ private final DelegateRepeatedVector target;
+ private final TransferPair[] children;
- transient private RepeatedListTransferPair ephPair;
+ public DelegateTransferPair(DelegateRepeatedVector target) {
+ this.target = Preconditions.checkNotNull(target);
+ if (target.getDataVector() == DEFAULT_DATA_VECTOR) {
+ target.addOrGetVector(VectorDescriptor.create(getDataVector().getField().getType()));
+ target.getDataVector().allocateNew();
+ }
+ this.children = new TransferPair[] {
+ getOffsetVector().makeTransferPair(target.getOffsetVector()),
+ getDataVector().makeTransferPair(target.getDataVector())
+ };
+ }
- public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
- if(ephPair == null || ephPair.from != from) {
- ephPair = (RepeatedListTransferPair) from.makeTransferPair(this);
- }
- ephPair.copyValueSafe(fromIndex, thisIndex);
- }
+ @Override
+ public void transfer() {
+ for (TransferPair child:children) {
+ child.transfer();
+ }
+ }
- public Mutator getMutator() {
- return mutator;
- }
+ @Override
+ public ValueVector getTo() {
+ return target;
+ }
- public void setInitialCapacity(int numRecords) {
- offsets.setInitialCapacity(numRecords + 1);
- if (vector != null) {
- vector.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
- }
- }
+ @Override
+ public void splitAndTransfer(int startIndex, int length) {
+ throw new UnsupportedOperationException("Repeated list does not support split & transfer operation");
+ }
- @Override
- public boolean allocateNewSafe() {
- if (!offsets.allocateNewSafe()) {
- return false;
+ @Override
+ public void copyValueSafe(int srcIndex, int destIndex) {
+ final RepeatedListHolder holder = new RepeatedListHolder();
+ getAccessor().get(srcIndex, holder);
+ target.emptyPopulator.populate(destIndex+1);
+ final TransferPair vectorTransfer = children[1];
+ int newIndex = target.getOffsetVector().getAccessor().get(destIndex);
+ //todo: make this a bulk copy.
+ for (int i = holder.start; i < holder.end; i++, newIndex++) {
+ vectorTransfer.copyValueSafe(i, newIndex);
+ }
+ target.getOffsetVector().getMutator().setSafe(destIndex + 1, newIndex);
+ }
}
- offsets.zeroVector();
- if (vector != null) {
- return vector.allocateNewSafe();
- } else {
- return true;
+ public DelegateRepeatedVector(SchemaPath path, BufferAllocator allocator) {
+ this(MaterializedField.create(path, TYPE), allocator);
}
- }
- public void reAlloc() {
- offsets.reAlloc();
- }
-
- public class Mutator implements ValueVector.Mutator, RepeatedMutator{
+ public DelegateRepeatedVector(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator);
+ this.emptyPopulator = new EmptyValuePopulator(getOffsetVector());
+ }
- public void startNewGroup(int index) {
- emptyPopulator.populate(index+1);
- offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
+ @Override
+ public void allocateNew() throws OutOfMemoryRuntimeException {
+ if (!allocateNewSafe()) {
+ throw new OutOfMemoryRuntimeException();
+ }
}
- public int add(int index) {
- final int prevEnd = offsets.getAccessor().get(index+1);
- offsets.getMutator().setSafe(index+1, prevEnd+1);
- return prevEnd;
+ @Override
+ protected SerializedField.Builder getMetadataBuilder() {
+ return super.getMetadataBuilder();
}
- public void setValueCount(int groupCount) {
- emptyPopulator.populate(groupCount);
- offsets.getMutator().setValueCount(groupCount+1);
+ @Override
+ public TransferPair getTransferPair(FieldReference ref) {
+ return makeTransferPair(new DelegateRepeatedVector(ref, allocator));
+ }
- if (vector != null) {
- int valueCount = offsets.getAccessor().get(groupCount);
- vector.getMutator().setValueCount(valueCount);
- }
+ @Override
+ public TransferPair makeTransferPair(ValueVector target) {
+ return new DelegateTransferPair(DelegateRepeatedVector.class.cast(target));
}
@Override
- public void reset() { }
+ public RepeatedListAccessor getAccessor() {
+ return accessor;
+ }
@Override
- public void generateTestData(int values) {
+ public RepeatedListMutator getMutator() {
+ return mutator;
}
@Override
- public void setValueCounts(int parentValueCount, int childValueCount) {
- // TODO - determine if this should be implemented for this class
+ public FieldReader getReader() {
throw new UnsupportedOperationException();
}
@Override
- public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
+ public void load(SerializedField metadata, DrillBuf buffer) {
+ //TODO(DRILL-2997): get rid of the notion of "group count" completely
+ final int valueCount = metadata.getGroupCount();
+ final int bufOffset = offsets.load(valueCount + 1, buffer);
+ final SerializedField childField = metadata.getChildList().get(0);
+ if (getDataVector() == DEFAULT_DATA_VECTOR) {
+ addOrGetVector(VectorDescriptor.create(childField.getMajorType()));
+ }
+
+ if (childField.getValueCount() == 0) {
+ vector.clear();
+ } else {
+ vector.load(childField, buffer.slice(bufOffset, childField.getBufferLength()));
+ }
}
- @Override
- public BaseDataValueVector getDataVector() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ public void copyFromSafe(int fromIndex, int thisIndex, DelegateRepeatedVector from) {
+ if(ephPair == null || ephPair.target != from) {
+ ephPair = DelegateTransferPair.class.cast(from.makeTransferPair(this));
+ }
+ ephPair.copyValueSafe(fromIndex, thisIndex);
}
+
}
- public class RepeatedListAccessor implements RepeatedAccessor{
+ protected class RepeatedListTransferPair implements TransferPair {
+ private final TransferPair delegate;
- @Override
- public Object getObject(int index) {
- List<Object> l = new JsonStringArrayList();
- int end = offsets.getAccessor().get(index+1);
- for (int i = offsets.getAccessor().get(index); i < end; i++) {
- l.add(vector.getAccessor().getObject(i));
- }
- return l;
+ public RepeatedListTransferPair(TransferPair delegate) {
+ this.delegate = delegate;
}
- public int getGroupSizeAtIndex(int index) {
- return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
+ public void transfer() {
+ delegate.transfer();
}
@Override
- public ValueVector getAllChildValues() {
- return vector;
+ public void splitAndTransfer(int startIndex, int length) {
+ delegate.splitAndTransfer(startIndex, length);
}
@Override
- public int getValueCount() {
- return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
+ public ValueVector getTo() {
+ final DelegateRepeatedVector delegateVector = DelegateRepeatedVector.class.cast(delegate.getTo());
+ return new RepeatedListVector(getField(), allocator, callBack, delegateVector);
}
- public void get(int index, RepeatedListHolder holder) {
- assert index <= getValueCapacity();
- holder.start = offsets.getAccessor().get(index);
- holder.end = offsets.getAccessor().get(index+1);
+ @Override
+ public void copyValueSafe(int from, int to) {
+ delegate.copyValueSafe(from, to);
}
+ }
- public void get(int index, ComplexHolder holder) {
- FieldReader reader = getReader();
- reader.setPosition(index);
- holder.reader = reader;
- }
+ public RepeatedListVector(SchemaPath path, BufferAllocator allocator, CallBack callBack) {
+ this(MaterializedField.create(path, TYPE), allocator, callBack);
+ }
- public void get(int index, int arrayIndex, ComplexHolder holder) {
- RepeatedListHolder h = new RepeatedListHolder();
- get(index, h);
- int offset = h.start + arrayIndex;
+ public RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) {
+ this(field, allocator, callBack, new DelegateRepeatedVector(field, allocator));
+ }
- if (offset >= h.end) {
- holder.reader = NullReader.INSTANCE;
- } else {
- FieldReader r = vector.getReader();
- r.setPosition(offset);
- holder.reader = r;
- }
- }
+ protected RepeatedListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack, DelegateRepeatedVector delegate) {
+ super(field, allocator, callBack);
+ int childrenSize = field.getChildren().size();
- @Override
- public boolean isNull(int index) {
- return false;
+ // repeated list vector should not have more than one child
+ assert childrenSize <= 1;
+ this.delegate = Preconditions.checkNotNull(delegate);
+ if (childrenSize > 0) {
+ MaterializedField child = field.getChildren().iterator().next();
+ addOrGetVector(VectorDescriptor.create(child.getType()));
+// setVector(TypeHelper.getNewVector(child, allocator, callBack));
}
+ }
+
@Override
- public int getGroupCount() {
- final int valueCount = offsets.getAccessor().getValueCount();
- return valueCount == 0 ? 0 : valueCount - 1;
- }
+ public RepeatedListReaderImpl getReader() {
+ return reader;
}
@Override
- public int getBufferSize() {
- return offsets.getBufferSize() + vector.getBufferSize();
+ public DelegateRepeatedVector.RepeatedListAccessor getAccessor() {
+ return delegate.getAccessor();
}
@Override
- public void close() {
- offsets.close();
- super.close();
+ public DelegateRepeatedVector.RepeatedListMutator getMutator() {
+ return delegate.getMutator();
}
@Override
- public void clear() {
- getMutator().reset();
- offsets.clear();
- if (vector != null) {
- vector.clear();
- }
+ public UInt4Vector getOffsetVector() {
+ return delegate.getOffsetVector();
}
@Override
- public TransferPair getTransferPair() {
- return new RepeatedListTransferPair(getField().getPath());
+ public ValueVector getDataVector() {
+ return delegate.getDataVector();
}
- public class RepeatedListTransferPair implements TransferPair{
- private final RepeatedListVector from = RepeatedListVector.this;
- private final RepeatedListVector to;
- private final TransferPair vectorTransfer;
-
- private RepeatedListTransferPair(RepeatedListVector to) {
- this.to = to;
- if (to.vector == null) {
- to.vector = to.addOrGet(null, vector.getField().getType(), vector.getClass());
- to.vector.allocateNew();
- }
- this.vectorTransfer = vector.makeTransferPair(to.vector);
- }
+ @Override
+ public void allocateNew() throws OutOfMemoryRuntimeException {
+ delegate.allocateNew();
+ }
- private RepeatedListTransferPair(SchemaPath path) {
- this.to = new RepeatedListVector(path, allocator, callBack);
- vectorTransfer = vector.getTransferPair();
- this.to.vector = vectorTransfer.getTo();
- }
+ @Override
+ public boolean allocateNewSafe() {
+ return delegate.allocateNewSafe();
+ }
- @Override
- public void transfer() {
- offsets.transferTo(to.offsets);
- vectorTransfer.transfer();
- clear();
+ @Override
+ public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
+ final AddOrGetResult<T> result = delegate.addOrGetVector(descriptor);
+ if (result.isCreated() && callBack != null) {
+ callBack.doWork();
}
+ return result;
+ }
- @Override
- public ValueVector getTo() {
- return to;
- }
+ @Override
+ public int size() {
+ return delegate.size();
+ }
- @Override
- public void splitAndTransfer(int startIndex, int length) {
- throw new UnsupportedOperationException();
- }
+ @Override
+ public int getBufferSize() {
+ return delegate.getBufferSize();
+ }
- @Override
- public void copyValueSafe(int srcIndex, int destIndex) {
- RepeatedListHolder holder = new RepeatedListHolder();
- accessor.get(srcIndex, holder);
- to.emptyPopulator.populate(destIndex+1);
- int newIndex = to.offsets.getAccessor().get(destIndex);
- //todo: make this a bulk copy.
- for (int i = holder.start; i < holder.end; i++, newIndex++) {
- vectorTransfer.copyValueSafe(i, newIndex);
- }
- to.offsets.getMutator().setSafe(destIndex + 1, newIndex);
- }
+ @Override
+ public void close() {
+ delegate.close();
+ }
+ @Override
+ public void clear() {
+ delegate.clear();
}
@Override
- public TransferPair makeTransferPair(ValueVector to) {
- if (!(to instanceof RepeatedListVector ) ) {
- throw new IllegalArgumentException("You can't make a transfer pair from an incompatible .");
- }
- return new RepeatedListTransferPair( (RepeatedListVector) to);
+ public TransferPair getTransferPair() {
+ return new RepeatedListTransferPair(delegate.getTransferPair());
}
@Override
public TransferPair getTransferPair(FieldReference ref) {
- return new RepeatedListTransferPair(ref);
+ return new RepeatedListTransferPair(delegate.getTransferPair(ref));
}
@Override
- public int getValueCapacity() {
- if (vector == null) {
- return offsets.getValueCapacity() - 1;
- }
- return Math.min(offsets.getValueCapacity() - 1, vector.getValueCapacity());
+ public TransferPair makeTransferPair(ValueVector to) {
+ final RepeatedListVector target = RepeatedListVector.class.cast(to);
+ return new RepeatedListTransferPair(delegate.makeTransferPair(target.delegate));
}
@Override
- public RepeatedListAccessor getAccessor() {
- return accessor;
+ public int getValueCapacity() {
+ return delegate.getValueCapacity();
}
@Override
public DrillBuf[] getBuffers(boolean clear) {
- DrillBuf[] buffers = ArrayUtils.addAll(offsets.getBuffers(false), vector.getBuffers(false));
- if (clear) {
- // does not make much sense but we have to retain buffers even when clear is set. refactor this interface.
- for (DrillBuf buffer:buffers) {
- buffer.retain();
- }
- clear();
- }
- return buffers;
+ return delegate.getBuffers(clear);
}
- protected void setVector(ValueVector newVector) {
- vector = Preconditions.checkNotNull(newVector);
- getField().addChild(newVector.getField());
- }
@Override
public void load(SerializedField metadata, DrillBuf buf) {
- SerializedField childField = metadata.getChildList().get(0);
-
- int bufOffset = offsets.load(metadata.getValueCount()+1, buf);
-
- MaterializedField fieldDef = MaterializedField.create(childField);
- if (vector == null) {
- setVector(TypeHelper.getNewVector(fieldDef, allocator));
- }
-
- if (childField.getValueCount() == 0) {
- vector.clear();
- } else {
- vector.load(childField, buf.slice(bufOffset, childField.getBufferLength()));
- }
+ delegate.load(metadata, buf);
}
@Override
public SerializedField getMetadata() {
- return getField() //
- .getAsBuilder() //
- .setBufferLength(getBufferSize()) //
- .setValueCount(accessor.getGroupCount()) //
- .addChild(vector.getMetadata()) //
- .build();
+ return delegate.getMetadata();
}
@Override
public Iterator<ValueVector> iterator() {
- return Collections.singleton(vector).iterator();
+ return delegate.iterator();
+ }
+
+ @Override
+ public void setInitialCapacity(int numRecords) {
+ delegate.setInitialCapacity(numRecords);
}
+ /**
+ * @deprecated
+ * prefer using {@link #addOrGetVector(org.apache.drill.exec.vector.VectorDescriptor)} instead.
+ */
@Override
public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
- Preconditions.checkArgument(name == null);
-
- if(vector == null){
- final MaterializedField child = MaterializedField.create(getField().getPath().getUnindexedArrayChild(), type);
- vector = TypeHelper.getNewVector(child, allocator, callBack);
- setVector(vector);
- if (callBack != null) {
- callBack.doWork();
- }
- }
- return typeify(vector, clazz);
+ final AddOrGetResult<T> result = addOrGetVector(VectorDescriptor.create(type));
+ return result.getVector();
}
@Override
@@ -414,18 +424,18 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
if (name != null) {
return null;
}
- return typeify(vector, clazz);
+ return typeify(delegate.getDataVector(), clazz);
}
@Override
- public void allocateNew(int parentValueCount, int childValueCount) {
+ public void allocateNew(int valueCount, int innerValueCount) {
clear();
- offsets.allocateNew(parentValueCount + 1);
- mutator.reset();
+ getOffsetVector().allocateNew(valueCount + 1);
+ getMutator().reset();
}
@Override
- public int load(int parentValueCount, int childValueCount, DrillBuf buf) {
+ public int load(int valueCount, int innerValueCount, DrillBuf buf) {
throw new UnsupportedOperationException();
}
@@ -434,7 +444,18 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
if (name != null) {
return null;
}
- return new VectorWithOrdinal(vector, 0);
+ return new VectorWithOrdinal(delegate.getDataVector(), 0);
}
+
+ public void copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from) {
+ delegate.copyFromSafe(fromIndex, thisIndex, from.delegate);
+ }
+
+
+// protected void setVector(ValueVector newVector) {
+// vector = Preconditions.checkNotNull(newVector);
+// getField().addChild(newVector.getField());
+// }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index e5d48dd6f..a97847ba0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -41,11 +41,15 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.util.JsonStringArrayList;
+import org.apache.drill.exec.vector.AddOrGetResult;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.BaseRepeatedValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVectorLike;
+import org.apache.drill.exec.vector.RepeatedValueVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VectorDescriptor;
import org.apache.drill.exec.vector.complex.impl.NullReader;
import org.apache.drill.exec.vector.complex.impl.RepeatedMapReaderImpl;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
@@ -53,7 +57,7 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixedWidthVector {
+public class RepeatedMapVector extends AbstractMapVector implements RepeatedValueVector, RepeatedFixedWidthVectorLike {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
@@ -71,10 +75,27 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
this.emptyPopulator = new EmptyValuePopulator(offsets);
}
+ @Override
+ public UInt4Vector getOffsetVector() {
+ return offsets;
+ }
+
+ @Override
+ public ValueVector getDataVector() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T extends ValueVector> AddOrGetResult<T> addOrGetVector(VectorDescriptor descriptor) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void setInitialCapacity(int numRecords) {
offsets.setInitialCapacity(numRecords + 1);
- for(ValueVector v : (ValueVector<?,?,?>)this) {
- v.setInitialCapacity(numRecords * DEFAULT_REPEAT_PER_RECORD);
+ final Iterable<ValueVector> container = this;
+ for(ValueVector v : container) {
+ v.setInitialCapacity(numRecords * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
}
}
@@ -84,20 +105,16 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
}
@Override
- public void allocateNew(int groupCount, int valueCount) {
+ public void allocateNew(int groupCount, int innerValueCount) {
clear();
offsets.allocateNew(groupCount+1);
offsets.zeroVector();
for (ValueVector v : getChildren()) {
- AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, valueCount);
+ AllocationHelper.allocatePrecomputedChildCount(v, groupCount, 50, innerValueCount);
}
mutator.reset();
}
- public void reAlloc() {
- offsets.reAlloc();
- }
-
public Iterator<String> fieldNameIterator() {
return getChildFieldNames().iterator();
}
@@ -111,7 +128,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
@Override
public int getBufferSize() {
- if (accessor.getGroupCount() == 0) {
+ if (getAccessor().getValueCount() == 0) {
return 0;
}
long buffer = offsets.getBufferSize();
@@ -425,14 +442,15 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
Preconditions.checkArgument(bufOffset == buf.capacity());
}
+
@Override
public SerializedField getMetadata() {
SerializedField.Builder b = getField() //
.getAsBuilder() //
.setBufferLength(getBufferSize()) //
- .setGroupCount(accessor.getGroupCount())
+ .setGroupCount(accessor.getValueCount())
// while we don't need to actually read this on load, we need it to make sure we don't skip deserialization of this vector
- .setValueCount(accessor.getGroupCount());
+ .setValueCount(accessor.getInnerValueCount());
for (ValueVector v : getChildren()) {
b.addChild(v.getMetadata());
}
@@ -467,16 +485,31 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
@Override
public int getValueCount() {
- return offsets.getAccessor().get(offsets.getAccessor().getValueCount() - 1);
+ return Math.max(offsets.getAccessor().getValueCount() - 1, 0);
}
- public int getGroupSizeAtIndex(int index) {
+ @Override
+ public int getInnerValueCount() {
+ final int valueCount = getValueCount();
+ if (valueCount == 0) {
+ return 0;
+ }
+ return offsets.getAccessor().get(valueCount);
+ }
+
+ @Override
+ public int getInnerValueCountAt(int index) {
return offsets.getAccessor().get(index+1) - offsets.getAccessor().get(index);
}
@Override
- public ValueVector getAllChildValues() {
- throw new UnsupportedOperationException("Cannot retrieve inner vector from repeated map.");
+ public boolean isEmpty(int index) {
+ return false;
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return false;
}
public void get(int index, RepeatedMapHolder holder) {
@@ -504,32 +537,18 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
}
}
- @Override
- public boolean isNull(int index) {
- return false;
- }
-
- @Override
- public int getGroupCount() {
- final int valueCount = offsets.getAccessor().getValueCount();
- return valueCount == 0 ? 0 : valueCount - 1;
- }
}
- public class Mutator implements ValueVector.Mutator, RepeatedMutator {
+ public class Mutator implements RepeatedMutator {
- public void startNewGroup(int index) {
+ @Override
+ public void startNewValue(int index) {
emptyPopulator.populate(index+1);
offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index));
}
- public int add(int index) {
- final int prevEnd = offsets.getAccessor().get(index+1);
- offsets.getMutator().setSafe(index + 1, prevEnd + 1);
- return prevEnd;
- }
-
+ @Override
public void setValueCount(int topLevelValueCount) {
emptyPopulator.populate(topLevelValueCount);
offsets.getMutator().setValueCount(topLevelValueCount == 0 ? 0 : topLevelValueCount+1);
@@ -543,22 +562,12 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
public void reset() { }
@Override
- public void generateTestData(int values) {
- }
+ public void generateTestData(int values) { }
- @Override
- public void setValueCounts(int parentValueCount, int childValueCount) {
- // TODO - determine if this should be implemented for this class
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setRepetitionAtIndexSafe(int index, int repetitionCount) {
- }
-
- @Override
- public BaseDataValueVector getDataVector() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ public int add(int index) {
+ final int prevEnd = offsets.getAccessor().get(index+1);
+ offsets.getMutator().setSafe(index + 1, prevEnd + 1);
+ return prevEnd;
}
}
@@ -573,7 +582,7 @@ public class RepeatedMapVector extends AbstractMapVector implements RepeatedFixe
}
@Override
- public int load(int parentValueCount, int childValueCount, DrillBuf buf) {
+ public int load(int valueCount, int innerValueCount, DrillBuf buf) {
throw new UnsupportedOperationException();
}
}