aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2014-05-11 15:12:19 -0700
committerJacques Nadeau <jacques@apache.org>2014-05-11 19:46:51 -0700
commitcdc5daed5218ed70445d178f5fc25df09c573001 (patch)
tree9b6c5bf9d8570bf5bc4a1bd5776b6f698e667f22 /exec/java-exec/src/main/java/org/apache
parent4ffef0183cbe336e30503ceee647b21128ba7738 (diff)
Add support for RepeatedMapVector, MapVector and RepeatedListVector.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java38
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java135
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java246
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java40
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/HashFunctions.java.orig252
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ComplexHolder.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryRuntimeException.java49
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java96
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java38
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java152
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java61
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java196
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java532
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java116
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java50
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java75
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java113
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java391
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java407
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java478
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/StateTool.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java46
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java231
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java258
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java178
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java150
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java72
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java164
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java113
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java205
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java92
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java154
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java104
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java29
77 files changed, 4902 insertions, 1150 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index f4a6d7da6..073a8d55f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -17,26 +17,34 @@
*/
package org.apache.drill.exec.cache;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
import org.apache.drill.common.util.DataInputInputStream;
import org.apache.drill.common.util.DataOutputOutputStream;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-import java.io.*;
-import java.util.List;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
/**
* A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read
@@ -109,10 +117,10 @@ public class VectorAccessibleSerializable implements DrillSerializable {
svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
}
List<ValueVector> vectorList = Lists.newArrayList();
- List<FieldMetadata> fieldList = batchDef.getFieldList();
- for (FieldMetadata metaData : fieldList) {
+ List<SerializedField> fieldList = batchDef.getFieldList();
+ for (SerializedField metaData : fieldList) {
int dataLength = metaData.getBufferLength();
- MaterializedField field = MaterializedField.create(metaData.getDef());
+ MaterializedField field = MaterializedField.create(metaData);
ByteBuf buf = allocator.buffer(dataLength);
buf.writeBytes(input, dataLength);
ValueVector vector = TypeHelper.getNewVector(field, allocator);
@@ -135,7 +143,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
retain = true;
writeToStream(output);
}
-
+
/**
* Serializes the VectorAccessible va and writes it to an output stream
@@ -203,7 +211,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
public VectorAccessible get() {
return va;
}
-
+
public SelectionVector2 getSv2() {
return sv2;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
index 2280aecaf..fc4855236 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.expr;
import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
import java.lang.reflect.Modifier;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -41,6 +40,7 @@ import org.apache.drill.exec.record.TypedFieldId;
import com.beust.jcommander.internal.Lists;
import com.beust.jcommander.internal.Maps;
import com.google.common.base.Preconditions;
+import com.sun.codemodel.JArray;
import com.sun.codemodel.JBlock;
import com.sun.codemodel.JClass;
import com.sun.codemodel.JClassAlreadyExistsException;
@@ -49,20 +49,21 @@ import com.sun.codemodel.JDefinedClass;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JFieldRef;
+import com.sun.codemodel.JInvocation;
import com.sun.codemodel.JMethod;
import com.sun.codemodel.JMod;
import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
public class ClassGenerator<T>{
-
+
public static final GeneratorMapping DEFAULT_SCALAR_MAP = GM("doSetup", "doEval", null, null);
public static final GeneratorMapping DEFAULT_CONSTANT_MAP = GM("doSetup", "doSetup", null, null);
-
-
+
+
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassGenerator.class);
public static enum BlockType {SETUP, EVAL, RESET, CLEANUP};
-
+
private final SignatureHolder sig;
private final EvaluationVisitor evaluationVisitor;
private final Map<ValueVectorSetup, JVar> vvDeclaration = Maps.newHashMap();
@@ -74,7 +75,7 @@ public class ClassGenerator<T>{
public final JDefinedClass clazz;
private final LinkedList<JBlock>[] blocks;
private final JCodeModel model;
-
+
private int index = 0;
private MappingSet mappings;
@@ -82,7 +83,7 @@ public class ClassGenerator<T>{
return new MappingSet("inIndex", "outIndex", DEFAULT_CONSTANT_MAP, DEFAULT_SCALAR_MAP);
}
-
+
@SuppressWarnings("unchecked")
ClassGenerator(CodeGenerator<T> codeGenerator, MappingSet mappingSet, SignatureHolder signature, EvaluationVisitor eval, JDefinedClass clazz, JCodeModel model) throws JClassAlreadyExistsException {
this.codeGenerator = codeGenerator;
@@ -96,7 +97,7 @@ public class ClassGenerator<T>{
blocks[i] = Lists.newLinkedList();
}
rotateBlock();
-
+
for(SignatureHolder child : signature.getChildHolders()){
String innerClassName = child.getSignatureClass().getSimpleName();
JDefinedClass innerClazz = clazz._class(Modifier.FINAL + Modifier.PRIVATE, innerClassName);
@@ -109,15 +110,15 @@ public class ClassGenerator<T>{
Preconditions.checkNotNull(inner);
return inner;
}
-
+
public MappingSet getMappingSet(){
return mappings;
}
-
+
public void setMappingSet(MappingSet mappings){
this.mappings = mappings;
}
-
+
public CodeGenerator<T> getCodeGenerator() {
return codeGenerator;
}
@@ -125,17 +126,17 @@ public class ClassGenerator<T>{
private GeneratorMapping getCurrentMapping(){
return mappings.getCurrentMapping();
}
-
+
public JBlock getBlock(String methodName){
JBlock blk = this.blocks[sig.get(methodName)].getLast();
Preconditions.checkNotNull(blk, "Requested method name of %s was not available for signature %s.", methodName, this.sig);
return blk;
}
-
+
public JBlock getBlock(BlockType type){
- return getBlock(getCurrentMapping().getMethodName(type));
+ return getBlock(getCurrentMapping().getMethodName(type));
}
-
+
public JBlock getSetupBlock(){
return getBlock(getCurrentMapping().getMethodName(BlockType.SETUP));
}
@@ -148,17 +149,17 @@ public class ClassGenerator<T>{
public JBlock getCleanupBlock(){
return getBlock(getCurrentMapping().getMethodName(BlockType.CLEANUP));
}
-
+
public JVar declareVectorValueSetupAndMember(String batchName, TypedFieldId fieldId){
return declareVectorValueSetupAndMember( DirectExpression.direct(batchName), fieldId);
}
public JVar declareVectorValueSetupAndMember(DirectExpression batchName, TypedFieldId fieldId){
final ValueVectorSetup setup = new ValueVectorSetup(batchName, fieldId);
- JVar var = this.vvDeclaration.get(setup);
- if(var != null) return var;
-
- Class<?> valueVectorClass = TypeHelper.getValueVectorClass(fieldId.getType().getMinorType(), fieldId.getType().getMode());
+// JVar var = this.vvDeclaration.get(setup);
+// if(var != null) return var;
+
+ Class<?> valueVectorClass = fieldId.getIntermediateClass();
JClass vvClass = model.ref(valueVectorClass);
JClass retClass = vvClass;
String vectorAccess = "getValueVector";
@@ -166,48 +167,56 @@ public class ClassGenerator<T>{
retClass = retClass.array();
vectorAccess = "getValueVectors";
}
-
+
JVar vv = declareClassField("vv", retClass);
JClass t = model.ref(SchemaChangeException.class);
JType objClass = model.ref(Object.class);
JBlock b = getSetupBlock();
+ //JExpr.newArray(model.INT).
+
+ JVar fieldArr = b.decl(model.INT.array(), "fieldIds" + index++, JExpr.newArray(model.INT, fieldId.getFieldIds().length));
+ int[] fieldIndices = fieldId.getFieldIds();
+ for(int i = 0; i < fieldIndices.length; i++){
+ b.assign(fieldArr.component(JExpr.lit(i)), JExpr.lit(fieldIndices[i]));
+ }
+
+ JInvocation invoke = batchName
+ .invoke("getValueAccessorById") //
+ .arg( vvClass.dotclass())
+ .arg(fieldArr);
+
JVar obj = b.decl( //
objClass, //
- getNextVar("tmp"), //
- batchName
- .invoke("getValueAccessorById") //
- .arg(JExpr.lit(fieldId.getFieldId())) //
- .arg( vvClass.dotclass())
- .invoke(vectorAccess)//
- );
-
-
+ getNextVar("tmp"), //
+ invoke.invoke(vectorAccess));
+
+
b._if(obj.eq(JExpr._null()))._then()._throw(JExpr._new(t).arg(JExpr.lit(String.format("Failure while loading vector %s with id: %s.", vv.name(), fieldId.toString()))));
//b.assign(vv, JExpr.cast(retClass, ((JExpression) JExpr.cast(wrapperClass, obj) ).invoke(vectorAccess)));
b.assign(vv, JExpr.cast(retClass, obj ));
vvDeclaration.put(setup, vv);
-
+
return vv;
}
public HoldingContainer addExpr(LogicalExpression ex){
return addExpr(ex, true);
}
-
+
public HoldingContainer addExpr(LogicalExpression ex, boolean rotate){
// logger.debug("Adding next write {}", ex);
if(rotate) rotateBlock();
return evaluationVisitor.addExpr(ex, this);
}
-
+
public void rotateBlock(){
for(LinkedList<JBlock> b : blocks){
b.add(new JBlock(true, true));
}
}
-
-
+
+
void flushCode(){
int i =0;
for(CodeGeneratorMethod method : sig){
@@ -219,19 +228,19 @@ public class ClassGenerator<T>{
m._throws(model.ref(c));
}
m._throws(SchemaChangeException.class);
-
+
for(JBlock b : blocks[i++]){
if(!b.isEmpty()) m.body().add(b);
}
-
+
}
-
+
for(ClassGenerator<T> child : innerClasses.values()){
child.flushCode();
}
}
-
-
+
+
public JCodeModel getModel() {
return model;
}
@@ -239,11 +248,11 @@ public class ClassGenerator<T>{
public String getNextVar() {
return "v" + index++;
}
-
+
public String getNextVar(String prefix){
return prefix + index++;
}
-
+
public JVar declareClassField(String prefix, JType t){
return clazz.field(JMod.NONE, t, prefix + index++);
}
@@ -251,11 +260,11 @@ public class ClassGenerator<T>{
public JVar declareClassField(String prefix, JType t, JExpression init){
return clazz.field(JMod.NONE, t, prefix + index++, init);
}
-
+
public HoldingContainer declare(MajorType t){
return declare(t, true);
}
-
+
public HoldingContainer declare(MajorType t, boolean includeNewInstance){
JType holderType = getHolderType(t);
JVar var;
@@ -266,12 +275,12 @@ public class ClassGenerator<T>{
}
JFieldRef outputSet = null;
if(t.getMode() == DataMode.OPTIONAL){
- outputSet = var.ref("isSet");
+ outputSet = var.ref("isSet");
}
index++;
return new HoldingContainer(t, var, var.ref("value"), outputSet);
}
-
+
public List<TypedFieldId> getWorkspaceTypes() {
return this.workspaceTypes;
}
@@ -283,7 +292,7 @@ public class ClassGenerator<T>{
private static class ValueVectorSetup{
final DirectExpression batch;
final TypedFieldId fieldId;
-
+
public ValueVectorSetup(DirectExpression batch, TypedFieldId fieldId) {
super();
this.batch = batch;
@@ -321,35 +330,45 @@ public class ClassGenerator<T>{
return true;
}
-
+
}
-
-
+
+
public static class HoldingContainer{
private final JVar holder;
private final JFieldRef value;
private final JFieldRef isSet;
private final MajorType type;
private boolean isConstant;
-
+ private final boolean singularRepeated;
+
public HoldingContainer(MajorType t, JVar holder, JFieldRef value, JFieldRef isSet) {
+ this(t, holder, value, isSet, false);
+ }
+
+ public HoldingContainer(MajorType t, JVar holder, JFieldRef value, JFieldRef isSet, boolean singularRepeated) {
super();
this.holder = holder;
this.value = value;
this.isSet = isSet;
this.type = t;
this.isConstant = false;
+ this.singularRepeated = singularRepeated;
}
-
+
+ public boolean isSingularRepeated(){
+ return singularRepeated;
+ }
+
public HoldingContainer setConstant(boolean isConstant) {
this.isConstant = isConstant;
return this;
}
-
+
public boolean isConstant() {
return this.isConstant;
}
-
+
public JVar getHolder() {
return holder;
}
@@ -357,7 +376,7 @@ public class ClassGenerator<T>{
public JFieldRef getValue() {
return value;
}
-
+
public MajorType getMajorType(){
return type;
}
@@ -366,11 +385,11 @@ public class ClassGenerator<T>{
Preconditions.checkNotNull(isSet, "You cannot access the isSet variable when operating on a non-nullable output value.");
return isSet;
}
-
+
public boolean isOptional(){
return type.getMode() == DataMode.OPTIONAL;
}
-
+
public boolean isRepeated(){
return type.getMode() == DataMode.REPEATED;
}
@@ -379,7 +398,7 @@ public class ClassGenerator<T>{
return type.getMinorType();
}
}
-
+
public JType getHolderType(MajorType t){
return TypeHelper.getHolderType(model, t.getMinorType(), t.getMode());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index dce070f46..d700bf3f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -20,8 +20,6 @@ package org.apache.drill.exec.expr;
import java.util.List;
import java.util.Set;
-import io.netty.buffer.ByteBuf;
-import com.google.common.collect.Lists;
import org.apache.drill.common.expression.CastExpression;
import org.apache.drill.common.expression.ConvertExpression;
import org.apache.drill.common.expression.FunctionCall;
@@ -29,25 +27,24 @@ import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.IfExpression.IfCondition;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.TypedNullConstant;
-import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
-import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
-import org.apache.drill.common.expression.ValueExpressions.LongExpression;
-import org.apache.drill.common.expression.ValueExpressions.IntExpression;
import org.apache.drill.common.expression.ValueExpressions.DateExpression;
-import org.apache.drill.common.expression.ValueExpressions.IntervalYearExpression;
-import org.apache.drill.common.expression.ValueExpressions.IntervalDayExpression;
-import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
-import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
-import org.apache.drill.common.expression.ValueExpressions.Decimal9Expression;
import org.apache.drill.common.expression.ValueExpressions.Decimal18Expression;
import org.apache.drill.common.expression.ValueExpressions.Decimal28Expression;
import org.apache.drill.common.expression.ValueExpressions.Decimal38Expression;
+import org.apache.drill.common.expression.ValueExpressions.Decimal9Expression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntervalDayExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntervalYearExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
-import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
@@ -60,6 +57,8 @@ import org.apache.drill.exec.expr.fn.HiveFuncHolder;
import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.record.NullExpression;
import org.apache.drill.exec.vector.ValueHolderHelper;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.FieldWriter;
import com.google.common.collect.Lists;
import com.sun.codemodel.JBlock;
@@ -68,6 +67,7 @@ import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JInvocation;
+import com.sun.codemodel.JLabel;
import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
@@ -88,7 +88,7 @@ public class EvaluationVisitor {
private class EvalVisitor extends AbstractExprVisitor<HoldingContainer, ClassGenerator<?>, RuntimeException> {
-
+
@Override
public HoldingContainer visitFunctionCall(FunctionCall call, ClassGenerator<?> generator) throws RuntimeException {
throw new UnsupportedOperationException("FunctionCall is not expected here. "+
@@ -143,18 +143,18 @@ public class EvaluationVisitor {
JConditional jc = null;
JBlock conditionalBlock = new JBlock(false, false);
for (IfCondition c : ifExpr.conditions) {
- HoldingContainer HoldingContainer = c.condition.accept(this, generator);
+ HoldingContainer holdingContainer = c.condition.accept(this, generator);
if (jc == null) {
- if (HoldingContainer.isOptional()) {
- jc = conditionalBlock._if(HoldingContainer.getIsSet().cand(HoldingContainer.getValue()));
+ if (holdingContainer.isOptional()) {
+ jc = conditionalBlock._if(holdingContainer.getIsSet().cand(holdingContainer.getValue()));
} else {
- jc = conditionalBlock._if(HoldingContainer.getValue().eq(JExpr.lit(1)));
+ jc = conditionalBlock._if(holdingContainer.getValue().eq(JExpr.lit(1)));
}
} else {
- if (HoldingContainer.isOptional()) {
- jc = jc._else()._if(HoldingContainer.getIsSet().cand(HoldingContainer.getValue()));
+ if (holdingContainer.isOptional()) {
+ jc = jc._else()._if(holdingContainer.getIsSet().cand(holdingContainer.getValue()));
} else {
- jc = jc._else()._if(HoldingContainer.getValue());
+ jc = jc._else()._if(holdingContainer.getValue());
}
}
@@ -184,14 +184,14 @@ public class EvaluationVisitor {
return output;
}
-
+
@Override
public HoldingContainer visitSchemaPath(SchemaPath path, ClassGenerator<?> generator) throws RuntimeException {
throw new UnsupportedOperationException("All schema paths should have been replaced with ValueVectorExpressions.");
}
@Override
- public HoldingContainer visitLongConstant(LongExpression e, ClassGenerator<?> generator) throws RuntimeException {
+ public HoldingContainer visitLongConstant(LongExpression e, ClassGenerator<?> generator) throws RuntimeException {
HoldingContainer out = generator.declare(e.getMajorType());
generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getLong()));
return out;
@@ -269,81 +269,180 @@ public class EvaluationVisitor {
private HoldingContainer visitValueVectorWriteExpression(ValueVectorWriteExpression e, ClassGenerator<?> generator) {
- LogicalExpression child = e.getChild();
- HoldingContainer inputContainer = child.accept(this, generator);
+ final LogicalExpression child = e.getChild();
+ final HoldingContainer inputContainer = child.accept(this, generator);
+ final boolean complex = Types.isComplex(inputContainer.getMajorType());
+
JBlock block = generator.getEvalBlock();
JExpression outIndex = generator.getMappingSet().getValueWriteIndex();
JVar vv = generator.declareVectorValueSetupAndMember(generator.getMappingSet().getOutgoing(), e.getFieldId());
- String setMethod = e.isSafe() ? "setSafe" : "set";
-
- JInvocation setMeth;
- if (Types.usesHolderForGet(inputContainer.getMajorType())) {
- setMeth = vv.invoke("getMutator").invoke(setMethod).arg(outIndex).arg(inputContainer.getHolder());
- }else{
- setMeth = vv.invoke("getMutator").invoke(setMethod).arg(outIndex).arg(inputContainer.getValue());
- }
-
- if(e.isSafe()){
- HoldingContainer outputContainer = generator.declare(Types.REQUIRED_BIT);
- block.assign(outputContainer.getValue(), JExpr.lit(1));
- if(inputContainer.isOptional()){
-// block._if(vv.invoke("getMutator").invoke(setMethod).arg(outIndex).not())._then().assign(outputContainer.getValue(), JExpr.lit(0));
- JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not());
- block = jc._then();
+
+ if(complex){
+ JType writerImpl = generator.getModel()._ref(TypeHelper.getWriterImpl(inputContainer.getMinorType(), inputContainer.getMajorType().getMode()));
+ JType writerIFace = generator.getModel()._ref(TypeHelper.getWriterInterface(inputContainer.getMinorType(), inputContainer.getMajorType().getMode()));
+ JVar writer = generator.declareClassField("writer", writerIFace);
+ generator.getSetupBlock().assign(writer, JExpr._new(writerImpl).arg(vv).arg(JExpr._null()));
+ generator.getEvalBlock().add(writer.invoke("setPosition").arg(outIndex));
+ String copyMethod = inputContainer.isSingularRepeated() ? "copyAsValueSingle" : "copyAsValue";
+ generator.getEvalBlock().add(inputContainer.getHolder().invoke(copyMethod).arg(writer));
+ if(e.isSafe()){
+ HoldingContainer outputContainer = generator.declare(Types.REQUIRED_BIT);
+ JConditional ifOut = generator.getEvalBlock()._if(writer.invoke("ok"));
+ ifOut._then().assign(outputContainer.getValue(), JExpr.lit(1));
+ ifOut._else().assign(outputContainer.getValue(), JExpr.lit(0));
+ return outputContainer;
}
- block._if(setMeth.not())._then().assign(outputContainer.getValue(), JExpr.lit(0));
- return outputContainer;
}else{
- if (inputContainer.isOptional()) {
-// block.add(vv.invoke("getMutator").invoke(setMethod).arg(outIndex));
- JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not());
- block = jc._then();
+ String setMethod = e.isSafe() ? "setSafe" : "set";
+
+ JInvocation setMeth;
+ if (Types.usesHolderForGet(inputContainer.getMajorType())) {
+ setMeth = vv.invoke("getMutator").invoke(setMethod).arg(outIndex).arg(inputContainer.getHolder());
+ }else{
+ setMeth = vv.invoke("getMutator").invoke(setMethod).arg(outIndex).arg(inputContainer.getValue());
+ }
+
+ if(e.isSafe()){
+ HoldingContainer outputContainer = generator.declare(Types.REQUIRED_BIT);
+ block.assign(outputContainer.getValue(), JExpr.lit(1));
+ if(inputContainer.isOptional()){
+// block._if(vv.invoke("getMutator").invoke(setMethod).arg(outIndex).not())._then().assign(outputContainer.getValue(), JExpr.lit(0));
+ JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not());
+ block = jc._then();
+ }
+ block._if(setMeth.not())._then().assign(outputContainer.getValue(), JExpr.lit(0));
+ return outputContainer;
+ }else{
+ if (inputContainer.isOptional()) {
+// block.add(vv.invoke("getMutator").invoke(setMethod).arg(outIndex));
+ JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not());
+ block = jc._then();
+ }
+ block.add(setMeth);
}
- block.add(setMeth);
+
}
-
+
+
return null;
}
private HoldingContainer visitValueVectorReadExpression(ValueVectorReadExpression e, ClassGenerator<?> generator)
throws RuntimeException {
// declare value vector
-
- JVar vv1 = generator.declareVectorValueSetupAndMember(generator.getMappingSet().getIncoming(), e.getFieldId());
+
+ JExpression vv1 = generator.declareVectorValueSetupAndMember(generator.getMappingSet().getIncoming(), e.getFieldId());
JExpression indexVariable = generator.getMappingSet().getValueReadIndex();
- JInvocation getValueAccessor = vv1.invoke("getAccessor").invoke("get");
- JInvocation getValueAccessor2 = vv1.invoke("getAccessor");
+ JExpression componentVariable = indexVariable.shrz(JExpr.lit(16));
if (e.isSuperReader()) {
-
- getValueAccessor = ((JExpression) vv1.component(indexVariable.shrz(JExpr.lit(16)))).invoke("getAccessor").invoke("get");
- getValueAccessor2 = ((JExpression) vv1.component(indexVariable.shrz(JExpr.lit(16)))).invoke("getAccessor");
+ vv1 = ((JExpression) vv1.component(componentVariable));
indexVariable = indexVariable.band(JExpr.lit((int) Character.MAX_VALUE));
}
// evaluation work.
HoldingContainer out = generator.declare(e.getMajorType());
+ final boolean primitive = !Types.usesHolderForGet(e.getMajorType());
+ final boolean hasReadPath = e.hasReadPath();
+ final boolean complex = Types.isComplex(e.getMajorType());
- if (out.isOptional()) {
- JBlock blk = generator.getEvalBlock();
- blk.assign(out.getIsSet(), getValueAccessor2.invoke("isSet").arg(indexVariable));
- JConditional jc = blk._if(out.getIsSet().eq(JExpr.lit(1)));
- if (Types.usesHolderForGet(e.getMajorType())) {
- jc._then().add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
- } else {
- jc._then().assign(out.getValue(), getValueAccessor.arg(indexVariable));
+ int[] fieldIds = e.getFieldId().getFieldIds();
+ for(int i = 1; i < fieldIds.length; i++){
+
+ }
+
+ if(!hasReadPath && !complex){
+
+ JInvocation getValueAccessor = vv1.invoke("getAccessor").invoke("get");
+ JInvocation getValueAccessor2 = vv1.invoke("getAccessor");
+ JBlock eval = new JBlock();
+
+ if(primitive){
+ eval.assign(out.getValue(), getValueAccessor.arg(indexVariable));
+ }else{
+ eval.add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
}
- } else {
- if (Types.usesHolderForGet(e.getMajorType())) {
- if (e.isArrayElement()) {
- generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(JExpr.lit(e.getIndex())).arg(out.getHolder()));
- } else {
- generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
+
+ if (out.isOptional()) {
+ JBlock blk = generator.getEvalBlock();
+ blk.assign(out.getIsSet(), getValueAccessor2.invoke("isSet").arg(indexVariable));
+ JConditional jc = blk._if(out.getIsSet().eq(JExpr.lit(1)));
+ jc._then().add(eval);
+ }else{
+ generator.getEvalBlock().add(eval);
+ }
+
+ }else{
+ JExpression vector = e.isSuperReader() ? vv1.component(componentVariable) : vv1;
+ JExpression expr = vector.invoke("getAccessor").invoke("getReader");
+
+ JLabel label = generator.getEvalBlock().label("complex");
+ JBlock eval = generator.getEvalBlock().block();
+
+ // position to the correct value.
+ eval.add(expr.invoke("setPosition").arg(indexVariable));
+ PathSegment seg = e.getReadPath();
+ int listNum = 0;
+ boolean lastWasArray = false;
+ while(true){
+ if(seg.isArray()){
+ lastWasArray = true;
+
+ if(seg.isLastPath() && !complex) break;
+
+ JVar list = generator.declareClassField("list", generator.getModel()._ref(FieldReader.class));
+ generator.getSetupBlock().assign(list, expr);
+ expr = list;
+
+ // if this is an array, set a single position for the expression to allow us to read the right data lower down.
+ JVar desiredIndex = eval.decl(generator.getModel().INT, "desiredIndex" + listNum, JExpr.lit(seg.getArraySegment().getIndex()));
+ // start with negative one so that we are at zero after first call to next.
+ JVar currentIndex = eval.decl(generator.getModel().INT, "currentIndex" + listNum, JExpr.lit(-1));
+
+ eval._while( //
+ currentIndex.lt(desiredIndex) //
+ .cand(expr.invoke("next")) ).body().assign(currentIndex, currentIndex.plus(JExpr.lit(1)));
+
+ JBlock ifNoVal = eval._if(desiredIndex.ne(currentIndex))._then().block();
+ if(!complex) ifNoVal.assign(out.getIsSet(), JExpr.lit(0));
+ ifNoVal._break(label);
+
+ listNum++;
+
+ }else{
+ lastWasArray = false;
+ JExpression fieldName = JExpr.lit(seg.getNameSegment().getPath());
+ expr = expr.invoke("reader").arg(fieldName);
+ }
+ seg = seg.getChild();
+
+ // stop once we get to last column or when the segment is an array at the end of the reference.
+ if(seg == null || seg.isLastPath() && seg.isArray()) break;
+ }
+ MajorType secondaryType = e.getFieldId().getSecondaryFinal();
+ JType readerImpl = generator.getModel()._ref(TypeHelper.getReaderClassName(secondaryType.getMinorType(), secondaryType.getMode()));
+ JVar complexReader = generator.declareClassField("reader", readerImpl);
+ generator.getSetupBlock().assign(complexReader, JExpr.cast(readerImpl, expr));
+ expr = complexReader;
+
+ if(complex){
+ HoldingContainer hc = new HoldingContainer(e.getMajorType(), (JVar) expr, null, null, lastWasArray);
+ return hc;
+ //eval.assign(out.getHolder().ref("reader"), expr);
+ }else{
+ if(seg != null){
+ eval.add(expr.invoke("read").arg(JExpr.lit(seg.getArraySegment().getIndex())).arg(out.getHolder()));
+ }else{
+
+ eval.add(expr.invoke("read").arg(out.getHolder()));
}
- } else {
- generator.getEvalBlock().assign(out.getValue(), getValueAccessor.arg(indexVariable));
}
+
}
+
+
+
+
return out;
}
@@ -352,11 +451,11 @@ public class EvaluationVisitor {
// Preconditions.checkArgument(child.getMajorType().equals(Types.REQUIRED_BOOLEAN));
HoldingContainer hc = child.accept(this, generator);
if(e.isReturnTrueOnOne()){
- generator.getEvalBlock()._return(hc.getValue().eq(JExpr.lit(1)));
+ generator.getEvalBlock()._return(hc.getValue().eq(JExpr.lit(1)));
}else{
generator.getEvalBlock()._return(hc.getValue());
}
-
+
return null;
}
@@ -453,6 +552,7 @@ public class EvaluationVisitor {
}
}
+
private class ConstantFilter extends EvalVisitor {
private Set<LogicalExpression> constantBoundaries;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index 95d341b9c..fc7fb6a41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -69,7 +69,7 @@ import com.google.common.collect.Lists;
public class ExpressionTreeMaterializer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class);
-
+
private ExpressionTreeMaterializer() {
};
@@ -239,36 +239,12 @@ public class ExpressionTreeMaterializer {
@Override
public LogicalExpression visitSchemaPath(SchemaPath path, FunctionImplementationRegistry value) {
// logger.debug("Visiting schema path {}", path);
- PathSegment seg = path.getRootSegment();
- List<String> segments = Lists.newArrayList();
- segments.add(seg.getNameSegment().getPath().toString());
- boolean isArrayElement = false;
- int index = -1;
- while((seg = seg.getChild()) != null) {
- if (seg.isNamed()) {
- segments.add(seg.getNameSegment().getPath().toString());
- if (seg.isLastPath()) {
- break;
- }
- } else {
- if (!seg.isLastPath()) {
- throw new UnsupportedOperationException("Repeated map type not supported");
- }
- index = seg.getArraySegment().getIndex();
- isArrayElement = true;
- break;
- }
- }
- SchemaPath newPath = SchemaPath.getCompoundPath((String[]) segments.toArray(new String[0]));
- TypedFieldId tfId = batch.getValueVectorId(newPath);
+ TypedFieldId tfId = batch.getValueVectorId(path);
if (tfId == null) {
logger.warn("Unable to find value vector of path {}, returning null instance.", path);
return NullExpression.INSTANCE;
} else {
- ValueVectorReadExpression e = new ValueVectorReadExpression(tfId, index, isArrayElement);
- if (isArrayElement) {
- e.required();
- }
+ ValueVectorReadExpression e = new ValueVectorReadExpression(tfId);
return e;
}
}
@@ -361,15 +337,15 @@ public class ExpressionTreeMaterializer {
@Override
public LogicalExpression visitCastExpression(CastExpression e, FunctionImplementationRegistry value){
-
+
// if the cast is pointless, remove it.
LogicalExpression input = e.getInput().accept(this, value);
MajorType newMajor = e.getMajorType();
MinorType newMinor = input.getMajorType().getMinorType();
-
+
if(castEqual(e.getPosition(), newMajor, input.getMajorType())) return input; // don't do pointless cast.
-
+
if(newMinor == MinorType.LATE || newMinor == MinorType.NULL){
// if the type still isn't fully bound, leave as cast expression.
return new CastExpression(input, e.getMajorType(), e.getPosition());
@@ -391,10 +367,10 @@ public class ExpressionTreeMaterializer {
newArgs.add(new ValueExpressions.LongExpression(type.getScale(), null));
}
FunctionCall fc = new FunctionCall(castFuncWithType, newArgs, e.getPosition());
- return fc.accept(this, value);
+ return fc.accept(this, value);
}
}
-
+
private boolean castEqual(ExpressionPosition pos, MajorType from, MajorType to){
if(!from.getMinorType().equals(to.getMinorType())) return false;
switch(from.getMinorType()){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
index 4ba503d70..6e2809aa8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
@@ -21,56 +21,43 @@ import java.util.Iterator;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.visitors.ExprVisitor;
-import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.record.TypedFieldId;
import com.google.common.collect.Iterators;
-import javax.sound.sampled.FloatControl;
-
public class ValueVectorReadExpression implements LogicalExpression{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class);
- private MajorType type;
private final TypedFieldId fieldId;
- private final boolean superReader;
- private final int index;
- private final boolean isArrayElement;
-
-
- public ValueVectorReadExpression(TypedFieldId tfId, int index, boolean isArrayElement){
- this.type = tfId.getType();
+
+
+ public ValueVectorReadExpression(TypedFieldId tfId){
this.fieldId = tfId;
- this.superReader = tfId.isHyperReader();
- this.index = index;
- this.isArrayElement = isArrayElement;
}
- public void required() {
- type = Types.required(type.getMinorType());
+ public boolean hasReadPath(){
+ return fieldId.hasRemainder();
}
- public boolean isArrayElement() {
- return isArrayElement;
+ public PathSegment getReadPath(){
+ return fieldId.getRemainder();
}
- public ValueVectorReadExpression(TypedFieldId tfId) {
- this(tfId, -1, false);
- }
-
public TypedFieldId getTypedFieldId(){
return fieldId;
}
-
+
public boolean isSuperReader(){
- return superReader;
+ return fieldId.isHyperReader();
}
@Override
public MajorType getMajorType() {
- return type;
+ return fieldId.getFinalType();
}
@Override
@@ -82,10 +69,6 @@ public class ValueVectorReadExpression implements LogicalExpression{
return fieldId;
}
- public int getIndex() {
- return index;
- }
-
@Override
public ExpressionPosition getPosition() {
return ExpressionPosition.UNKNOWN;
@@ -95,6 +78,6 @@ public class ValueVectorReadExpression implements LogicalExpression{
public Iterator<LogicalExpression> iterator() {
return Iterators.emptyIterator();
}
-
-
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/HashFunctions.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/HashFunctions.java.orig
new file mode 100644
index 000000000..7eeb7308e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/HashFunctions.java.orig
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.exec.record.RecordBatch;
+
+public class HashFunctions {
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL )
+ public static class NullableFloatHash implements DrillSimpleFunc {
+
+ @Param NullableFloat4Holder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ if (in.isSet == 0)
+ out.value = 0;
+ else
+ out.value = com.google.common.hash.Hashing.murmur3_128().hashInt(Float.floatToIntBits(in.value)).asInt();
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL )
+ public static class FloatHash implements DrillSimpleFunc {
+
+ @Param Float4Holder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ out.value = com.google.common.hash.Hashing.murmur3_128().hashInt(Float.floatToIntBits(in.value)).asInt();
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL )
+ public static class NullableDoubleHash implements DrillSimpleFunc {
+
+ @Param NullableFloat8Holder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ if (in.isSet == 0)
+ out.value = 0;
+ else
+ out.value = com.google.common.hash.Hashing.murmur3_128().hashLong(Double.doubleToLongBits(in.value)).asInt();
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL )
+ public static class DoubleHash implements DrillSimpleFunc {
+
+ @Param Float8Holder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ out.value = com.google.common.hash.Hashing.murmur3_128().hashLong(Double.doubleToLongBits(in.value)).asInt();
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL )
+ public static class NullableVarBinaryHash implements DrillSimpleFunc {
+
+ @Param NullableVarBinaryHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ if (in.isSet == 0)
+ out.value = 0;
+ else
+ out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0);
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL )
+ public static class NullableVarCharHash implements DrillSimpleFunc {
+
+ @Param NullableVarCharHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ if (in.isSet == 0)
+ out.value = 0;
+ else
+ out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0);
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL )
+ public static class NullableVar16CharHash implements DrillSimpleFunc {
+
+ @Param NullableVar16CharHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ if (in.isSet == 0)
+ out.value = 0;
+ else
+ out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0);
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+ public static class NullableBigIntHash implements DrillSimpleFunc {
+
+ @Param NullableBigIntHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ // TODO: implement hash function for other types
+ if (in.isSet == 0)
+ out.value = 0;
+ else
+ out.value = com.google.common.hash.Hashing.murmur3_128().hashLong(in.value).asInt();
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+ public static class NullableIntHash implements DrillSimpleFunc {
+ @Param NullableIntHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ // TODO: implement hash function for other types
+ if (in.isSet == 0)
+ out.value = 0;
+ else
+ out.value = com.google.common.hash.Hashing.murmur3_128().hashInt(in.value).asInt();
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+ public static class VarBinaryHash implements DrillSimpleFunc {
+
+ @Param VarBinaryHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0);
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+ public static class VarCharHash implements DrillSimpleFunc {
+
+ @Param VarCharHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0);
+ }
+ }
+
+<<<<<<< HEAD
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+ public static class Var16CharHash implements DrillSimpleFunc {
+
+ @Param Var16CharHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0);
+ }
+ }
+
+=======
+>>>>>>> 450e9e0... Support Complex Types
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+ public static class HashBigInt implements DrillSimpleFunc {
+
+ @Param BigIntHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ // TODO: implement hash function for other types
+ out.value = com.google.common.hash.Hashing.murmur3_128().hashLong(in.value).asInt();
+ }
+ }
+
+ @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL)
+ public static class IntHash implements DrillSimpleFunc {
+ @Param IntHolder in;
+ @Output IntHolder out;
+
+ public void setup(RecordBatch incoming) {
+ }
+
+ public void eval() {
+ // TODO: implement hash function for other types
+ out.value = com.google.common.hash.Hashing.murmur3_128().hashInt(in.value).asInt();
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ComplexHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ComplexHolder.java
new file mode 100644
index 000000000..e1025dfb8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ComplexHolder.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.holders;
+
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+public class ComplexHolder implements ValueHolder {
+ public FieldReader reader;
+ public int isSet;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java
new file mode 100644
index 000000000..09746da94
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.holders;
+
+public final class RepeatedListHolder implements ValueHolder{
+ public int start;
+ public int end;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java
new file mode 100644
index 000000000..247f75e02
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.holders;
+
+public final class RepeatedMapHolder implements ValueHolder{
+ public int start;
+ public int end;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryRuntimeException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryRuntimeException.java
new file mode 100644
index 000000000..305eabd92
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryRuntimeException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+
+public class OutOfMemoryRuntimeException extends DrillRuntimeException{
+
+ public OutOfMemoryRuntimeException() {
+ super();
+
+ }
+
+ public OutOfMemoryRuntimeException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+
+ }
+
+ public OutOfMemoryRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+
+ }
+
+ public OutOfMemoryRuntimeException(String message) {
+ super(message);
+
+ }
+
+ public OutOfMemoryRuntimeException(Throwable cause) {
+ super(cause);
+
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 73ed72322..a49d1a8f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -204,8 +204,8 @@ public class ScanBatch implements RecordBatch {
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return container.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return container.getValueAccessorById(clazz, ids);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index e0e7e518f..fc8c430e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -60,8 +60,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
BatchSchema schema = container.getSchema();
VectorContainer newContainer = new VectorContainer();
for (MaterializedField field : schema) {
- int id = container.getValueVectorId(field.getAsSchemaPath()).getFieldId();
- newContainer.add(container.getValueAccessorById(id, field.getValueClass()).getValueVectors());
+ int[] ids = container.getValueVectorId(field.getPath()).getFieldIds();
+ newContainer.add(container.getValueAccessorById(field.getValueClass(), ids).getValueVectors());
}
newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
this.hyperBatch = new ExpandableHyperContainer(newContainer);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 2a57aaa34..1c1a6d260 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -104,8 +104,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return sv4;
}
-
-
+
+
@Override
public void cleanup() {
if (sv4 != null) {
@@ -127,8 +127,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return IterOutcome.NONE;
}
}
-
-
+
+
try{
outer: while (true) {
Stopwatch watch = new Stopwatch();
@@ -166,7 +166,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
throw new UnsupportedOperationException();
}
}
-
+
if (schema == null){
// builder may be null at this point if the first incoming batch is empty
return IterOutcome.NONE;
@@ -181,7 +181,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
return IterOutcome.OK_NEW_SCHEMA;
-
+
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
kill();
logger.error("Failure during query", ex);
@@ -239,10 +239,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, context.getFunctionRegistry());
ClassGenerator<PriorityQueue> g = cg.getRoot();
g.setMappingSet(mainMapping);
-
+
for(Ordering od : orderings){
// first, we rewrite the evaluation stack for each side of the comparison.
- ErrorCollector collector = new ErrorCollectorImpl();
+ ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
g.setMappingSet(leftMapping);
@@ -250,26 +250,26 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
g.setMappingSet(rightMapping);
HoldingContainer right = g.addExpr(expr, false);
g.setMappingSet(mainMapping);
-
+
// next we wrap the two comparison sides and add the expression block for the comparison.
LogicalExpression fh = FunctionGenerationHelper.getComparator(left, right, context.getFunctionRegistry());
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
-
+
if(od.getDirection() == Direction.ASCENDING){
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
}
}
-
+
g.getEvalBlock()._return(JExpr.lit(0));
PriorityQueue q = context.getImplementationClass(cg);
q.init(config.getLimit(), context, oContext.getAllocator(), schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE);
return q;
}
-
+
@Override
public WritableBatch getWritableBatch() {
throw new UnsupportedOperationException("A sort batch is not writable.");
@@ -332,8 +332,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return container.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return container.getValueAccessorById(clazz, ids);
}
@Override
@@ -355,6 +355,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return container.iterator();
}
}
-
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 65669b12f..cf3d75e2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -37,7 +37,7 @@ public class WireRecordBatch implements RecordBatch {
private FragmentContext context;
private BatchSchema schema;
-
+
public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException {
this.fragProvider = fragProvider;
this.context = context;
@@ -83,17 +83,17 @@ public class WireRecordBatch implements RecordBatch {
public TypedFieldId getValueVectorId(SchemaPath path) {
return batchLoader.getValueVectorId(path);
}
-
+
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return batchLoader.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return batchLoader.getValueAccessorById(clazz, ids);
}
@Override
public IterOutcome next() {
try{
RawFragmentBatch batch = fragProvider.getNext();
-
+
// skip over empty batches. we do this since these are basically control messages.
while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
batch = fragProvider.getNext();
@@ -107,7 +107,7 @@ public class WireRecordBatch implements RecordBatch {
if (batch.getHeader().getIsOutOfMemory()) {
return IterOutcome.OUT_OF_MEMORY;
}
-
+
// logger.debug("Next received batch {}", batch);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 5f2605484..039445b96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -62,9 +62,9 @@ public abstract class HashAggTemplate implements HashAggregator {
private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
+
private static final boolean EXTRA_DEBUG_1 = false;
- private static final boolean EXTRA_DEBUG_2 = false;
+ private static final boolean EXTRA_DEBUG_2 = false;
private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field.";
private boolean first = true;
private boolean newSchema = false;
@@ -88,7 +88,7 @@ public abstract class HashAggTemplate implements HashAggregator {
List<VectorAllocator> wsAllocators = Lists.newArrayList(); // allocators for the workspace vectors
ErrorCollector collector = new ErrorCollectorImpl();
-
+
private MaterializedField[] materializedValueFields;
private boolean allFlushed = false;
@@ -102,13 +102,13 @@ public abstract class HashAggTemplate implements HashAggregator {
aggrValuesContainer = new VectorContainer();
ValueVector vector ;
-
- for(int i = 0; i < materializedValueFields.length; i++) {
+
+ for(int i = 0; i < materializedValueFields.length; i++) {
MaterializedField outputField = materializedValueFields[i];
// Create a type-specific ValueVector for this value
vector = TypeHelper.getNewVector(outputField, allocator) ;
VectorAllocator.getAllocator(vector, 50 /* avg. width */).alloc(HashTable.BATCH_SIZE) ;
-
+
aggrValuesContainer.add(vector) ;
}
@@ -124,8 +124,8 @@ public abstract class HashAggTemplate implements HashAggregator {
setupInterior(incoming, outgoing, aggrValuesContainer);
}
- private boolean outputValues() {
- for (int i = 0; i <= maxOccupiedIdx; i++) {
+ private boolean outputValues() {
+ for (int i = 0; i <= maxOccupiedIdx; i++) {
if (outputRecordValues(i, outputCount) ) {
if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", outputCount) ;
outputCount++;
@@ -139,7 +139,7 @@ public abstract class HashAggTemplate implements HashAggregator {
private void clear() {
aggrValuesContainer.clear();
}
-
+
// Code-generated methods (implemented in HashAggBatch)
@RuntimeOverridden
@@ -155,19 +155,19 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing,
- LogicalExpression[] valueExprs,
+ LogicalExpression[] valueExprs,
List<TypedFieldId> valueFieldIds,
TypedFieldId[] groupByOutFieldIds,
- VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators)
+ VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators)
throws SchemaChangeException, ClassTransformationException, IOException {
-
+
if (valueExprs == null || valueFieldIds == null) {
throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
}
if (valueFieldIds.size() < valueExprs.length) {
throw new IllegalArgumentException("Wrong number of workspace variables.");
}
-
+
this.context = context;
this.allocator = allocator;
this.incoming = incoming;
@@ -175,11 +175,11 @@ public abstract class HashAggTemplate implements HashAggregator {
this.keyAllocators = keyAllocators;
this.valueAllocators = valueAllocators;
this.outgoing = outgoing;
-
+
this.hashAggrConfig = hashAggrConfig;
- // currently, hash aggregation is only applicable if there are group-by expressions.
- // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
+ // currently, hash aggregation is only applicable if there are group-by expressions.
+ // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
// need to create hash table. However, for plain aggregations with DISTINCT ..
// e.g SELECT COUNT(DISTINCT a1) FROM t1 ;
// we need to build a hash table on the aggregation column a1.
@@ -188,14 +188,14 @@ public abstract class HashAggTemplate implements HashAggregator {
throw new IllegalArgumentException("Currently, hash aggregation is only applicable if there are group-by expressions.");
}
- this.htIdxHolder = new IntHolder();
+ this.htIdxHolder = new IntHolder();
materializedValueFields = new MaterializedField[valueFieldIds.size()];
if (valueFieldIds.size() > 0) {
int i = 0;
- FieldReference ref = new FieldReference("dummy", ExpressionPosition.UNKNOWN, valueFieldIds.get(0).getType());
+ FieldReference ref = new FieldReference("dummy", ExpressionPosition.UNKNOWN, valueFieldIds.get(0).getIntermediateType());
for (TypedFieldId id : valueFieldIds) {
- materializedValueFields[i++] = MaterializedField.create(ref, id.getType());
+ materializedValueFields[i++] = MaterializedField.create(ref, id.getIntermediateType());
}
}
@@ -203,7 +203,7 @@ public abstract class HashAggTemplate implements HashAggregator {
this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ;
batchHolders = new ArrayList<BatchHolder>();
- addBatchHolder();
+ addBatchHolder();
doSetup(incoming);
}
@@ -211,21 +211,21 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
public AggOutcome doWork() {
try{
- // Note: Keeping the outer and inner try blocks here to maintain some similarity with
- // StreamingAggregate which does somethings conditionally in the outer try block.
+ // Note: Keeping the outer and inner try blocks here to maintain some similarity with
+ // StreamingAggregate which does somethings conditionally in the outer try block.
// In the future HashAggregate may also need to perform some actions conditionally
- // in the outer try block.
+ // in the outer try block.
outside: while(true) {
// loop through existing records, aggregating the values as necessary.
if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
- checkGroupAndAggrValues(currentIndex);
+ checkGroupAndAggrValues(currentIndex);
}
if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex);
-
+
try{
while(true){
@@ -239,10 +239,10 @@ public abstract class HashAggTemplate implements HashAggregator {
case NOT_YET:
this.outcome = out;
return AggOutcome.RETURN_OUTCOME;
-
+
case OK_NEW_SCHEMA:
if(EXTRA_DEBUG_1) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
- newSchema = true;
+ newSchema = true;
this.cleanup();
// TODO: new schema case needs to be handled appropriately
return AggOutcome.UPDATE_AGGREGATOR;
@@ -254,20 +254,20 @@ public abstract class HashAggTemplate implements HashAggregator {
} else {
checkGroupAndAggrValues(currentIndex);
incIndex();
-
+
if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop");
continue outside;
}
case NONE:
outcome = out;
- outputKeysAndValues() ;
-
+ outputKeysAndValues() ;
+
// cleanup my internal state since there is nothing more to return
this.cleanup();
// cleanup incoming batch since output of aggregation does not need
// any references to the incoming
-
+
incoming.cleanup();
return setOkAndReturn();
@@ -294,7 +294,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// now otherwise downstream operators will break.
// TODO: allow outputting arbitrarily large number of records in batches
assert (numGroupedRecords < Character.MAX_VALUE);
-
+
for (VectorAllocator a : keyAllocators) {
if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords);
a.alloc(numGroupedRecords);
@@ -320,14 +320,14 @@ public abstract class HashAggTemplate implements HashAggregator {
public void cleanup(){
htable.clear();
htable = null;
- htIdxHolder = null;
+ htIdxHolder = null;
materializedValueFields = null;
for (BatchHolder bh : batchHolders) {
bh.clear();
}
batchHolders.clear();
- batchHolders = null;
+ batchHolders = null;
}
private AggOutcome tooBigFailure(){
@@ -335,7 +335,7 @@ public abstract class HashAggTemplate implements HashAggregator {
this.outcome = IterOutcome.STOP;
return AggOutcome.CLEANUP_AND_RETURN;
}
-
+
private final AggOutcome setOkAndReturn(){
if(first){
this.outcome = IterOutcome.OK_NEW_SCHEMA;
@@ -356,20 +356,20 @@ public abstract class HashAggTemplate implements HashAggregator {
}
currentIndex = getVectorIndex(underlyingIndex);
}
-
+
private final void resetIndex(){
underlyingIndex = -1;
incIndex();
}
private void addBatchHolder() {
- BatchHolder bh = new BatchHolder();
+ BatchHolder bh = new BatchHolder();
batchHolders.add(bh);
if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
int batchIdx = batchHolders.size() - 1;
- bh.setup(batchIdx);
+ bh.setup(batchIdx);
}
private boolean outputKeysAndValues() {
@@ -392,20 +392,20 @@ public abstract class HashAggTemplate implements HashAggregator {
return allFlushed;
}
- // Check if a group is present in the hash table; if not, insert it in the hash table.
- // The htIdxHolder contains the index of the group in the hash table container; this same
- // index is also used for the aggregation values maintained by the hash aggregate.
+ // Check if a group is present in the hash table; if not, insert it in the hash table.
+ // The htIdxHolder contains the index of the group in the hash table container; this same
+ // index is also used for the aggregation values maintained by the hash aggregate.
private boolean checkGroupAndAggrValues(int incomingRowIdx) {
if (incomingRowIdx < 0) {
throw new IllegalArgumentException("Invalid incoming row index.");
}
- /** for debugging
+ /** for debugging
Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
BigIntVector vv0 = null;
BigIntHolder holder = null;
- if (tmp != null) {
+ if (tmp != null) {
vv0 = ((BigIntVector) tmp);
holder = new BigIntHolder();
holder.value = vv0.getAccessor().get(incomingRowIdx) ;
@@ -432,7 +432,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// logger.debug("group-by key = {} already present at hash table index = {}", holder.value, currentIdx) ;
//}
- }
+ }
else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
@@ -441,17 +441,17 @@ public abstract class HashAggTemplate implements HashAggregator {
// logger.debug("group-by key = {} added at hash table index = {}", holder.value, currentIdx) ;
//}
}
-
+
if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
numGroupedRecords++;
return true;
}
-
- }
+
+ }
return false;
}
-
+
// Code-generated methods (implemented in HashAggBatch)
public abstract void doSetup(@Named("incoming") RecordBatch incoming);
public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 34845b38e..3e6def128 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -28,12 +28,12 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
public class InternalBatch implements Iterable<VectorWrapper<?>>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InternalBatch.class);
-
+
private final VectorContainer container;
private final BatchSchema schema;
private final SelectionVector2 sv2;
private final SelectionVector4 sv4;
-
+
public InternalBatch(RecordBatch incoming){
switch(incoming.getSchema().getSelectionVectorMode()){
case FOUR_BYTE:
@@ -42,7 +42,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
break;
case TWO_BYTE:
this.sv4 = null;
- this.sv2 = incoming.getSelectionVector2().clone();
+ this.sv2 = incoming.getSelectionVector2().clone();
break;
default:
this.sv4 = null;
@@ -74,9 +74,9 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
if(sv4 != null) sv4.clear();
container.clear();
}
-
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
- return container.getValueAccessorById(fieldId, clazz);
+
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){
+ return container.getValueAccessorById(clazz, fieldIds);
}
-
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 883052ae9..72d046261 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -348,8 +348,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
container.add(v);
allocators.add(RemovingRecordBatch.getAllocator4(v));
- JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+ JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId));
g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
.arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
.arg(outIndex)
@@ -376,8 +376,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
container.add(v);
allocators.add(RemovingRecordBatch.getAllocator4(v));
- JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false));
+ JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), false, fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, outputFieldId));
g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index baa232e7f..c07878a69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -43,7 +43,7 @@ public final class JoinStatus {
private int rightPosition;
private int svRightPosition;
private IterOutcome lastRight;
-
+
private int outputPosition;
public RightSourceMode rightSourceMode = RightSourceMode.INCOMING;
public MergeJoinBatch outputBatch;
@@ -54,7 +54,7 @@ public final class JoinStatus {
public boolean ok = true;
private boolean initialSet = false;
private boolean leftRepeating = false;
-
+
public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) {
super();
this.left = left;
@@ -70,7 +70,7 @@ public final class JoinStatus {
initialSet = true;
}
}
-
+
public final void advanceLeft(){
leftPosition++;
}
@@ -90,6 +90,10 @@ public final class JoinStatus {
return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
}
+ public final int getRightCount(){
+ return right.getRecordCount();
+ }
+
public final void setRightPosition(int pos) {
rightPosition = pos;
}
@@ -176,7 +180,7 @@ public final class JoinStatus {
}
if(b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) b.getSelectionVector2().clear();
}
-
+
/**
* Check if the left record position can advance by one in the current batch.
*/
@@ -230,5 +234,5 @@ public final class JoinStatus {
private boolean eitherMatches(IterOutcome outcome){
return lastLeft == outcome || lastRight == outcome;
}
-
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index f43934e96..af0d378f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -97,8 +97,8 @@ public abstract class JoinTemplate implements JoinWorker {
while (status.isLeftPositionAllowed()) {
if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
return false;
-
- status.incOutputPos();
+
+ status.incOutputPos();
status.advanceLeft();
}
}
@@ -113,7 +113,7 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
return false;
status.incOutputPos();
}
@@ -135,7 +135,7 @@ public abstract class JoinTemplate implements JoinWorker {
doCompareNextLeftKey(status.getLeftPosition()) != 0)
// this record marks the end of repeated keys
status.notifyLeftStoppedRepeating();
-
+
boolean crossedBatchBoundaries = false;
int initialRightPosition = status.getRightPosition();
do {
@@ -143,11 +143,11 @@ public abstract class JoinTemplate implements JoinWorker {
if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
return false;
- if (!doCopyRight(status.getRightPosition(), status.getOutPosition()))
+ if (!doCopyRight(status.getRightPosition(), status.getOutPosition()))
return false;
-
+
status.incOutputPos();
-
+
// If the left key has duplicates and we're about to cross a boundary in the right batch, add the
// right table's record batch to the sv4 builder before calling next. These records will need to be
// copied again for each duplicate left key.
@@ -170,7 +170,7 @@ public abstract class JoinTemplate implements JoinWorker {
status.notifyLeftStoppedRepeating();
} else if (status.isLeftRepeating() && crossedBatchBoundaries) {
try {
- // build the right batches and
+ // build the right batches and
status.outputBatch.batchBuilder.build();
status.setSV4AdvanceMode();
} catch (SchemaChangeException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index a2c424fe2..3d496d315 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -350,10 +350,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public TypedFieldId getValueVectorId(SchemaPath path) {
return outgoingContainer.getValueVectorId(path);
}
-
+
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return outgoingContainer.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return outgoingContainer.getValueAccessorById(clazz, ids);
}
@Override
@@ -373,7 +373,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
/**
* Creates a generate class which implements the copy and compare methods.
- *
+ *
* @return instance of a new merger based on generated code
* @throws SchemaChangeException
*/
@@ -443,8 +443,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// declare incoming value vector and assign it to the array
JVar inVV = cg.declareVectorValueSetupAndMember("incomingBatches[" + batchIdx + "]",
new TypedFieldId(vv.getField().getType(),
- fieldIdx,
- false));
+ false,
+ fieldIdx));
// add vv to initialization list (e.g. { vv1, vv2, vv3 } )
incomingVectorInitBatch.add(inVV);
@@ -501,11 +501,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType();
Class cmpVectorClass = TypeHelper.getValueVectorClass(minor, mode);
+ JExpression arr = JExpr.newArray(cg.getModel().INT).add(JExpr.lit(vvRead.getFieldId().getFieldIds()[0]));
comparisonVectorInitBatch.add(
((JExpression) incomingBatchesVar.component(JExpr.lit(b)))
.invoke("getValueAccessorById")
- .arg(JExpr.lit(vvRead.getFieldId().getFieldId()))
.arg(cg.getModel()._ref(cmpVectorClass).boxify().dotclass())
+ .arg(arr)
.invoke("getValueVector"));
}
@@ -583,8 +584,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// declare outgoing value vectors
JVar outgoingVV = cg.declareVectorValueSetupAndMember("outgoingBatch",
new TypedFieldId(vvOut.getField().getType(),
- fieldIdx,
- vvOut.isHyper()));
+ vvOut.isHyper(), fieldIdx));
// assign to the appropriate slot in the outgoingVector array (in order of iteration)
cg.getSetupBlock().assign(outgoingVectors.component(JExpr.lit(fieldIdx)), outgoingVV);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
index dd7011afb..339844360 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
@@ -82,7 +82,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
this.svMode = incoming.getSchema().getSelectionVectorMode();
this.outBatch = outgoing;
this.outputField = outputField;
- partitionValues = (IntVector) outBatch.getValueAccessorById(outBatch.getValueVectorId(outputField).getFieldId(), IntVector.class).getValueVector();
+ partitionValues = (IntVector) outBatch.getValueAccessorById(IntVector.class, outBatch.getValueVectorId(outputField).getFieldIds()).getValueVector();
switch(svMode){
case FOUR_BYTE:
case TWO_BYTE:
@@ -98,7 +98,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
public abstract int doEval(@Named("inIndex") int inIndex, @Named("partitionIndex") int partitionIndex);
-
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 6e115a7fc..deef25fdc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -219,8 +219,8 @@ public class OutgoingRecordBatch implements VectorAccessible {
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return vectorContainer.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
+ return vectorContainer.getValueAccessorById(clazz, fieldIds);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 604808547..bcd484ca5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -94,7 +94,7 @@ public class PartitionSenderRootExec implements RootExec {
if (!ok) {
stop();
-
+
return false;
}
@@ -153,7 +153,7 @@ public class PartitionSenderRootExec implements RootExec {
}
-
+
private void generatePartitionFunction() throws SchemaChangeException {
LogicalExpression filterExpression = operator.getExpr();
@@ -166,7 +166,7 @@ public class PartitionSenderRootExec implements RootExec {
}
cg.addExpr(new ReturnValueExpression(expr));
-
+
try {
Partitioner p = context.getImplementationClass(cg);
p.setup(context, incoming, outgoing);
@@ -214,7 +214,7 @@ public class PartitionSenderRootExec implements RootExec {
"outgoingVectors");
// create 2d array and build initialization list. For example:
- // outgoingVectors = new ValueVector[][] {
+ // outgoingVectors = new ValueVector[][] {
// new ValueVector[] {vv1, vv2},
// new ValueVector[] {vv3, vv4}
// });
@@ -229,8 +229,8 @@ public class PartitionSenderRootExec implements RootExec {
// declare outgoing value vector and assign it to the array
JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId + "]",
new TypedFieldId(vv.getField().getType(),
- fieldId,
- false));
+ false,
+ fieldId));
// add vv to initialization list (e.g. { vv1, vv2, vv3 } )
outgoingVectorInitBatch.add(outVV);
++fieldId;
@@ -248,8 +248,8 @@ public class PartitionSenderRootExec implements RootExec {
for (VectorWrapper<?> vvIn : incoming) {
// declare incoming value vectors
JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(),
- fieldId,
- vvIn.isHyper()));
+ vvIn.isHyper(),
+ fieldId));
// generate the copyFrom() invocation with explicit cast to the appropriate type
Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(),
@@ -307,7 +307,7 @@ public class PartitionSenderRootExec implements RootExec {
}
}
}
-
+
public void stop() {
logger.debug("Partition sender stopping.");
ok = false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 347092a36..b94f40373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -20,9 +20,7 @@ package org.apache.drill.exec.physical.impl.project;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import com.sun.codemodel.JExpr;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference;
@@ -31,7 +29,6 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
@@ -44,7 +41,6 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
-import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
@@ -52,12 +48,13 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
+import com.carrotsearch.hppc.IntOpenHashSet;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
@@ -92,6 +89,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
int incomingRecordCount = incoming.getRecordCount();
for(ValueVector v : this.allocationVectors){
AllocationHelper.allocate(v, incomingRecordCount, 250);
+// v.allocateNew();
}
int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
if (outputRecords < incomingRecordCount) {
@@ -177,14 +175,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- Set<Integer> transferFieldIds = new HashSet();
+ IntOpenHashSet transferFieldIds = new IntOpenHashSet();
boolean isAnyWildcard = isAnyWildcard(exprs);
if(isAnyWildcard){
for(VectorWrapper<?> wrapper : incoming){
ValueVector vvIn = wrapper.getValueVector();
- String name = vvIn.getField().getDef().getName(vvIn.getField().getDef().getNameCount() - 1).getName();
+
+ String name = vvIn.getField().getPath().getLastSegment().getNameSegment().getPath();
FieldReference ref = new FieldReference(name);
TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
transfers.add(tp);
@@ -202,17 +201,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
// add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+ && !((ValueVectorReadExpression) expr).hasReadPath()
&& !isAnyWildcard
- &&!transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldId())
- && !((ValueVectorReadExpression) expr).isArrayElement()) {
+ && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])
+ && !((ValueVectorReadExpression) expr).hasReadPath()) {
ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
- ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
+ TypedFieldId id = vectorRead.getFieldId();
+ ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
Preconditions.checkNotNull(incoming);
TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
transfers.add(tp);
container.add(tp.getTo());
- transferFieldIds.add(vectorRead.getFieldId().getFieldId());
+ transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
// logger.debug("Added transfer.");
}else{
// need to do evaluation.
@@ -221,6 +222,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
TypedFieldId fid = container.add(vector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
HoldingContainer hc = cg.addExpr(write);
+
+
cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
logger.debug("Added eval.");
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 60e599384..aa0ecf66b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -33,12 +33,12 @@ import com.google.common.collect.ImmutableList;
public abstract class ProjectorTemplate implements Projector {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectorTemplate.class);
-
+
private ImmutableList<TransferPair> transfers;
private SelectionVector2 vector2;
private SelectionVector4 vector4;
private SelectionVectorMode svMode;
-
+
public ProjectorTemplate() throws SchemaChangeException{
}
@@ -47,18 +47,18 @@ public abstract class ProjectorTemplate implements Projector {
switch(svMode){
case FOUR_BYTE:
throw new UnsupportedOperationException();
-
-
+
+
case TWO_BYTE:
final int count = recordCount;
for(int i = 0; i < count; i++, firstOutputIndex++){
doEval(vector2.getIndex(i), firstOutputIndex);
}
return recordCount;
-
-
+
+
case NONE:
-
+
final int countN = recordCount;
int i;
for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) {
@@ -76,8 +76,9 @@ public abstract class ProjectorTemplate implements Projector {
t.transfer();
}
return recordCount;
-
-
+
+
+
default:
throw new UnsupportedOperationException();
}
@@ -86,7 +87,7 @@ public abstract class ProjectorTemplate implements Projector {
@Override
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{
- this.svMode = incoming.getSchema().getSelectionVectorMode();
+ this.svMode = incoming.getSchema().getSelectionVectorMode();
switch(svMode){
case FOUR_BYTE:
this.vector4 = incoming.getSelectionVector4();
@@ -103,7 +104,7 @@ public abstract class ProjectorTemplate implements Projector {
public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 4018991b3..62af0b2a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -86,7 +86,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
default:
throw new UnsupportedOperationException();
}
-
+
container.buildSchema(SelectionVectorMode.NONE);
}
@@ -156,12 +156,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
public void cleanup(){
super.cleanup();
}
-
+
private class StraightCopier implements Copier{
private List<TransferPair> pairs = Lists.newArrayList();
private List<ValueVector> out = Lists.newArrayList();
-
+
@Override
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){
for(VectorWrapper<?> vv : incoming){
@@ -183,7 +183,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
public List<ValueVector> getOut() {
return out;
}
-
+
}
private Copier getStraightCopier(){
@@ -192,10 +192,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
container.addCollection(copier.getOut());
return copier;
}
-
+
private Copier getGenerated2Copier() throws SchemaChangeException{
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
-
+
List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : incoming){
ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
@@ -218,12 +218,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this);
}
-
+
public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : batch){
-
+
ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
container.add(v);
allocators.add(getAllocator4(v));
@@ -239,20 +239,20 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
}
-
+
public static void generateCopies(ClassGenerator g, VectorAccessible batch, boolean hyper){
// we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all.
int fieldId = 0;
-
+
JExpression inIndex = JExpr.direct("inIndex");
JExpression outIndex = JExpr.direct("outIndex");
g.rotateBlock();
for(VectorWrapper<?> vv : batch){
- JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), fieldId, vv.isHyper()));
- JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
+ JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId));
+ JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId));
if(hyper){
-
+
g.getEvalBlock()._if(
outVV
.invoke("copyFromSafe")
@@ -268,20 +268,20 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}else{
g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
}
-
-
+
+
fieldId++;
}
g.rotateBlock();
g.getEvalBlock()._return(JExpr.TRUE);
}
-
+
@Override
public WritableBatch getWritableBatch() {
return WritableBatch.get(this);
}
-
+
public static VectorAllocator getAllocator4(ValueVector outgoing){
if(outgoing instanceof FixedWidthVector){
return new FixedVectorAllocator((FixedWidthVector) outgoing);
@@ -291,6 +291,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
throw new UnsupportedOperationException();
}
}
-
-
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 8d3a3e5b3..f96a1bda3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -100,9 +100,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
validateReadState();
- return incoming.getValueAccessorById(fieldId, clazz);
+ return incoming.getValueAccessorById(clazz, ids);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index d87a9f58c..a54685290 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -242,8 +242,8 @@ public class BatchGroup implements VectorAccessible {
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return currentContainer.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return currentContainer.getValueAccessorById(clazz, ids);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 930f851bc..4b6c37dd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -373,12 +373,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2];
int i = 0;
for (BatchGroup group : batchGroupList) {
- vectors[i++] = group.getValueAccessorById(group.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
- field.getValueClass()).getValueVector();
+ vectors[i++] = group.getValueAccessorById(
+ field.getValueClass(),
+ group.getValueVectorId(field.getPath()).getFieldIds()
+ ).getValueVector();
if (group.hasSecond()) {
VectorContainer c = group.getSecondContainer();
- vectors[i++] = c.getValueAccessorById(c.getValueVectorId(field.getAsSchemaPath()).getFieldId(),
- field.getValueClass()).getValueVector();
+ vectors[i++] = c.getValueAccessorById(
+ field.getValueClass(),
+ c.getValueVectorId(field.getPath()).getFieldIds()
+ ).getValueVector();
} else {
vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector should never be used. Just want to avoid having null elements in the hyper vector
i++;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 214f81c95..844d6dbbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
-
+
protected final VectorContainer container = new VectorContainer();
protected final T popConfig;
protected final FragmentContext context;
@@ -43,7 +43,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
this.popConfig = popConfig;
this.oContext = new OperatorContext(popConfig, context);
}
-
+
@Override
public Iterator<VectorWrapper<?>> iterator() {
return container.iterator();
@@ -67,14 +67,14 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
public void kill() {
killIncoming();
}
-
+
protected abstract void killIncoming();
-
+
public void cleanup(){
container.clear();
oContext.close();
}
-
+
@Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
@@ -91,16 +91,16 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- return container.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return container.getValueAccessorById(clazz, ids);
}
-
+
@Override
public WritableBatch getWritableBatch() {
// logger.debug("Getting writable batch.");
WritableBatch batch = WritableBatch.get(this);
return batch;
-
+
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index a6a4621c0..b44a23325 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -20,21 +20,25 @@ package org.apache.drill.exec.record;
import java.lang.reflect.Array;
import com.google.common.base.Preconditions;
+
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+import org.apache.drill.exec.vector.complex.MapVector;
public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class);
-
+
private T[] vectors;
private MaterializedField f;
private final boolean releasable;
-
+
public HyperVectorWrapper(MaterializedField f, T[] v){
this(f, v, true);
}
-
+
public HyperVectorWrapper(MaterializedField f, T[] v, boolean releasable){
assert(v.length > 0);
this.f = f;
@@ -72,9 +76,51 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
public void clear() {
if(!releasable) return;
for(T x : vectors){
- x.clear();
+ x.clear();
+ }
+ }
+
+ @Override
+ public VectorWrapper<?> getChildWrapper(int[] ids) {
+ if(ids.length == 1) return this;
+
+ ValueVector[] vectors = new ValueVector[this.vectors.length];
+ int index = 0;
+
+ for(ValueVector v : this.vectors){
+ ValueVector vector = v;
+ for(int i = 1; i < ids.length; i++){
+ MapVector map = (MapVector) vector;
+ vector = map.getVectorById(ids[i]);
+ }
+ vectors[index] = vector;
+ index++;
+ }
+ return new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors);
+ }
+
+ @Override
+ public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
+ ValueVector v = vectors[0];
+ if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null;
+
+ if(v instanceof AbstractContainerVector){
+ // we're looking for a multi path.
+ AbstractContainerVector c = (AbstractContainerVector) v;
+ TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+ builder.intermediateType(v.getField().getType());
+ builder.hyper();
+ builder.addId(id);
+ return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild());
+
+ }else{
+ return TypedFieldId.newBuilder() //
+ .intermediateType(v.getField().getType()) //
+ .finalType(v.getField().getType()) //
+ .addId(id) //
+ .hyper() //
+ .build();
}
-
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index d93e258f4..439552f33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -17,39 +17,58 @@
*/
package org.apache.drill.exec.record;
-import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
-import org.apache.drill.common.expression.PathSegment.NameSegment;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart;
-import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
-import com.beust.jcommander.internal.Lists;
+import com.google.hive12.common.collect.Lists;
public class MaterializedField{
- private final FieldDef def;
+ private SchemaPath path;
+ private MajorType type;
+ private List<MaterializedField> children = Lists.newArrayList();
- public MaterializedField(FieldDef def) {
- this.def = def;
+ private MaterializedField(SchemaPath path, MajorType type) {
+ super();
+ this.path = path;
+ this.type = type;
}
- public static MaterializedField create(FieldDef def){
- return new MaterializedField(def);
+ public static MaterializedField create(SerializedField serField){
+ return new MaterializedField(SchemaPath.create(serField.getNamePart()), serField.getMajorType());
+ }
+
+ public SerializedField.Builder getAsBuilder(){
+ return SerializedField.newBuilder() //
+ .setMajorType(type) //
+ .setNamePart(path.getAsNamePart());
+ }
+
+ public void addChild(MaterializedField field){
+ children.add(field);
}
public MaterializedField clone(FieldReference ref){
- return create(ref, def.getMajorType());
+ return create(ref, type);
+ }
+
+ public String getLastName(){
+ PathSegment seg = path.getRootSegment();
+ while(seg.getChild() != null) seg = seg.getChild();
+ return seg.getNameSegment().getPath();
+ }
+
+
+ // TODO: rewrite without as direct match rather than conversion then match.
+ public boolean matches(SerializedField field){
+ MaterializedField f = create(field);
+ return f.equals(this);
}
public static MaterializedField create(String path, MajorType type){
@@ -58,43 +77,20 @@ public class MaterializedField{
}
public static MaterializedField create(SchemaPath path, MajorType type) {
- FieldDef.Builder b = FieldDef.newBuilder();
- b.setMajorType(type);
- addSchemaPathToFieldDef(path, b);
- return create(b.build());
- }
-
- private static void addSchemaPathToFieldDef(SchemaPath path, FieldDef.Builder builder) {
- for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
- NamePart.Builder b = NamePart.newBuilder();
- if (p.isArray()) {
- b.setType(Type.ARRAY);
- } else {
- b.setName(p.getNameSegment().getPath().toString());
- b.setType(Type.NAME);
- }
- builder.addName(b.build());
- if(p.isLastPath()) break;
- }
+ return new MaterializedField(path, type);
}
- public FieldDef getDef() {
- return def;
+ public SchemaPath getPath(){
+ return path;
}
+ /**
+ * Get the schema path. Deprecated, use getPath() instead.
+ * @return the SchemaPath of this field.
+ */
+ @Deprecated
public SchemaPath getAsSchemaPath(){
- List<NamePart> nameList = Lists.newArrayList(def.getNameList());
- Collections.reverse(nameList);
- PathSegment seg = null;
- for(NamePart p : nameList){
- if(p.getType() == NamePart.Type.ARRAY){
- throw new UnsupportedOperationException();
- }else{
- seg = new NameSegment(p.getName(), seg);
- }
- }
- if( !(seg instanceof NameSegment) ) throw new UnsupportedOperationException();
- return new SchemaPath( (NameSegment) seg);
+ return path;
}
// public String getName(){
@@ -119,29 +115,29 @@ public class MaterializedField{
// }
public int getWidth() {
- return def.getMajorType().getWidth();
+ return type.getWidth();
}
public MajorType getType() {
- return def.getMajorType();
+ return type;
}
public int getScale() {
- return def.getMajorType().getScale();
+ return type.getScale();
}
public int getPrecision() {
- return def.getMajorType().getPrecision();
+ return type.getPrecision();
}
public boolean isNullable() {
- return def.getMajorType().getMode() == DataMode.OPTIONAL;
+ return type.getMode() == DataMode.OPTIONAL;
}
public DataMode getDataMode() {
- return def.getMajorType().getMode();
+ return type.getMode();
}
public MaterializedField getOtherNullableVersion(){
- MajorType mt = def.getMajorType();
+ MajorType mt = type;
DataMode newDataMode = null;
switch(mt.getMode()){
case OPTIONAL:
@@ -153,7 +149,7 @@ public class MaterializedField{
default:
throw new UnsupportedOperationException();
}
- return new MaterializedField(def.toBuilder().setMajorType(mt.toBuilder().setMode(newDataMode).build()).build());
+ return new MaterializedField(path, mt.toBuilder().setMode(newDataMode).build());
}
public Class<?> getValueClass() {
@@ -161,33 +157,19 @@ public class MaterializedField{
}
public boolean matches(SchemaPath path) {
- Iterator<NamePart> iter = def.getNameList().iterator();
+ if(!path.isSimplePath()) return false;
- for (PathSegment p = path.getRootSegment();; p = p.getChild()) {
- if(p == null) break;
- if (!iter.hasNext()) return false;
- NamePart n = iter.next();
-
- if (p.isArray()) {
- if (n.getType() == Type.ARRAY) continue;
- return false;
- } else {
- if (p.getNameSegment().getPath().equalsIgnoreCase(n.getName())) continue;
- return false;
- }
-
- }
- // we've reviewed all path segments. confirm that we don't have any extra name parts.
- return !iter.hasNext();
+ return this.path.equals(path);
}
-
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((def == null) ? 0 : def.hashCode());
+ result = prime * result + ((children == null) ? 0 : children.hashCode());
+ result = prime * result + ((path == null) ? 0 : path.hashCode());
+ result = prime * result + ((type == null) ? 0 : type.hashCode());
return result;
}
@@ -200,20 +182,30 @@ public class MaterializedField{
if (getClass() != obj.getClass())
return false;
MaterializedField other = (MaterializedField) obj;
- if (def == null) {
- if (other.def != null)
+ if (children == null) {
+ if (other.children != null)
+ return false;
+ } else if (!children.equals(other.children))
+ return false;
+ if (path == null) {
+ if (other.path != null)
+ return false;
+ } else if (!path.equals(other.path))
+ return false;
+ if (type == null) {
+ if (other.type != null)
return false;
- } else if (!def.equals(other.def))
+ } else if (!type.equals(other.type))
return false;
return true;
}
@Override
public String toString() {
- return "MaterializedField [" + def.toString() + "]";
+ return "MaterializedField [path=" + path + ", type=" + type + "]";
}
public String toExpr(){
- return this.getAsSchemaPath().toExpr();
+ return path.toExpr();
}
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 31283c61b..60fdd4d5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.vector.ValueVector;
* A record batch contains a set of field values for a particular range of records. In the case of a record batch
* composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not
* change unless the next() IterOutcome is a *_NEW_SCHEMA type.
- *
+ *
* A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids
* provided utilizing getValueVectorId();
*/
@@ -56,21 +56,21 @@ public interface RecordBatch extends VectorAccessible {
/**
* Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query
* level information.
- *
+ *
* @return
*/
public FragmentContext getContext();
/**
* Provide the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA IterOutcome is provided.
- *
+ *
* @return
*/
public BatchSchema getSchema();
/**
* Provide the number of records that are within this record count
- *
+ *
* @return
*/
public int getRecordCount();
@@ -89,7 +89,7 @@ public interface RecordBatch extends VectorAccessible {
* Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the
* same as the ordinal position of the field within the Iterator provided this classes implementation of
* Iterable<ValueVector>.
- *
+ *
* @param path
* The path where the vector should be located.
* @return The local field id associated with this vector. If no field matches this path, this will return a null
@@ -97,24 +97,24 @@ public interface RecordBatch extends VectorAccessible {
*/
public abstract TypedFieldId getValueVectorId(SchemaPath path);
@Override
- public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
+ public abstract VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids);
/**
* Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an
* IterOutcome.NONE, the consumer should no longer next(). Behavior at this point is undetermined and likely to throw
* an exception.
- *
+ *
* @return An IterOutcome describing the result of the iteration.
*/
public IterOutcome next();
/**
* Get a writable version of this batch. Takes over owernship of existing buffers.
- *
+ *
* @return
*/
public WritableBatch getWritableBatch();
-
+
public void cleanup();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ed450af06..10d959fae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -23,13 +23,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import javax.jdo.metadata.FieldMetadata;
+
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Maps;
@@ -63,25 +64,24 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
boolean schemaChanged = schema == null;
// logger.info("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only"));
// System.out.println("Load, ThreadId: " + Thread.currentThread().getId());
- Map<FieldDef, ValueVector> oldFields = Maps.newHashMap();
+ Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap();
for(VectorWrapper<?> w : container){
ValueVector v = w.getValueVector();
- oldFields.put(v.getField().getDef(), v);
+ oldFields.put(v.getField(), v);
}
VectorContainer newVectors = new VectorContainer();
- List<FieldMetadata> fields = def.getFieldList();
+ List<SerializedField> fields = def.getFieldList();
int bufOffset = 0;
- for (FieldMetadata fmd : fields) {
- FieldDef fieldDef = fmd.getDef();
+ for (SerializedField fmd : fields) {
+ MaterializedField fieldDef = MaterializedField.create(fmd);
ValueVector v = oldFields.remove(fieldDef);
if(v == null) {
// if we arrive here, we didn't have a matching vector.
schemaChanged = true;
- MaterializedField m = new MaterializedField(fieldDef);
- v = TypeHelper.getNewVector(m, allocator);
+ v = TypeHelper.getNewVector(fieldDef, allocator);
}
if (fmd.getValueCount() == 0){
v.clear();
@@ -136,8 +136,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
return valueCount;
}
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){
- return container.getValueAccessorById(fieldId, clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids){
+ return container.getValueAccessorById(clazz, ids);
}
public WritableBatch getWritableBatch(){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index b7a82481d..692fe62f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -17,11 +17,15 @@
*/
package org.apache.drill.exec.record;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+import org.apache.drill.exec.vector.complex.MapVector;
public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class);
-
+
private T v;
public SimpleVectorWrapper(T v){
@@ -53,8 +57,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
public boolean isHyper() {
return false;
}
-
-
+
@SuppressWarnings("unchecked")
@Override
public VectorWrapper<T> cloneAndTransfer() {
@@ -71,4 +74,56 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){
return new SimpleVectorWrapper<T>(v);
}
+
+
+ @Override
+ public VectorWrapper<?> getChildWrapper(int[] ids) {
+ if(ids.length == 1) return this;
+
+ ValueVector vector = v;
+
+ for(int i = 1; i < ids.length; i++){
+ MapVector map = (MapVector) vector;
+ vector = map.getVectorById(ids[i]);
+ }
+
+ return new SimpleVectorWrapper<ValueVector>(vector);
+ }
+
+ @Override
+ public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) {
+ if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null;
+ PathSegment seg = expectedPath.getRootSegment();
+
+ if(v instanceof AbstractContainerVector){
+ // we're looking for a multi path.
+ AbstractContainerVector c = (AbstractContainerVector) v;
+ TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+ builder.intermediateType(v.getField().getType());
+ builder.addId(id);
+ return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild());
+
+ }else{
+ TypedFieldId.Builder builder = TypedFieldId.newBuilder();
+ builder.intermediateType(v.getField().getType());
+ builder.addId(id);
+ builder.finalType(v.getField().getType());
+ if(seg.isLastPath()){
+ return builder.build();
+ }else{
+ PathSegment child = seg.getChild();
+ if(child.isArray() && child.isLastPath()){
+ builder.remainder(child);
+ builder.withIndex();
+ return builder.build();
+ }else{
+ return null;
+ }
+
+ }
+
+ }
+ }
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
index ba2c7b2b2..9645be9fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
@@ -23,5 +23,5 @@ public interface TransferPair {
public void transfer();
public void splitAndTransfer(int startIndex, int length);
public ValueVector getTo();
- public void copyValue(int from, int to);
+ public boolean copyValueSafe(int from, int to);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
index 0fbd0ae56..24a8251ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
@@ -17,34 +17,174 @@
*/
package org.apache.drill.exec.record;
+import java.util.Arrays;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.carrotsearch.hppc.IntArrayList;
+import com.google.common.base.Preconditions;
public class TypedFieldId {
- final MajorType type;
- final int fieldId;
+ final MajorType finalType;
+ final MajorType secondaryFinal;
+ final MajorType intermediateType;
+ final int[] fieldIds;
final boolean isHyperReader;
+ final PathSegment remainder;
+
+ public TypedFieldId(MajorType type, int... fieldIds){
+ this(type, type, type, false, null, fieldIds);
+ }
- public TypedFieldId(MajorType type, int fieldId){
- this(type, fieldId, false);
+ public TypedFieldId(MajorType type, IntArrayList breadCrumb, PathSegment remainder){
+ this(type, type, type, false, remainder, breadCrumb.toArray());
}
-
- public TypedFieldId(MajorType type, int fieldId, boolean isHyper) {
+
+ public TypedFieldId(MajorType type, boolean isHyper, int... fieldIds){
+ this(type, type, type, isHyper, null, fieldIds);
+ }
+
+ public TypedFieldId(MajorType intermediateType, MajorType secondaryFinal, MajorType finalType, boolean isHyper, PathSegment remainder, int... fieldIds) {
super();
- this.type = type;
- this.fieldId = fieldId;
+ this.intermediateType = intermediateType;
+ this.finalType = finalType;
+ this.secondaryFinal = secondaryFinal;
+ this.fieldIds = fieldIds;
this.isHyperReader = isHyper;
+ this.remainder = remainder;
+ }
+
+
+
+ public TypedFieldId cloneWithChild(int id){
+ int[] fieldIds = ArrayUtils.add(this.fieldIds, id);
+ return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds);
+ }
+
+ public PathSegment getLastSegment(){
+ if(remainder == null) return null;
+ PathSegment seg = remainder;
+ while(seg.getChild() != null){
+ seg = seg.getChild();
+ }
+ return seg;
+ }
+
+ public TypedFieldId cloneWithRemainder(PathSegment remainder){
+ return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds);
+ }
+
+ public boolean hasRemainder(){
+ return remainder != null;
+ }
+
+ public PathSegment getRemainder(){
+ return remainder;
}
public boolean isHyperReader(){
return isHyperReader;
}
-
- public MajorType getType() {
- return type;
+
+ public MajorType getIntermediateType() {
+ return intermediateType;
+ }
+
+ public Class<? extends ValueVector> getIntermediateClass(){
+ return (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(intermediateType.getMinorType(), intermediateType.getMode());
+ }
+
+ public MajorType getFinalType(){
+ return finalType;
}
- public int getFieldId() {
- return fieldId;
+ public int[] getFieldIds() {
+ return fieldIds;
+ }
+
+
+
+ public MajorType getSecondaryFinal() {
+ return secondaryFinal;
+ }
+
+ public static Builder newBuilder(){
+ return new Builder();
+ }
+
+ public static class Builder{
+ final IntArrayList ids = new IntArrayList();
+ MajorType finalType;
+ MajorType intermediateType;
+ PathSegment remainder;
+ boolean hyperReader = false;
+ boolean withIndex = false;
+
+ public Builder addId(int id){
+ ids.add(id);
+ return this;
+ }
+
+ public Builder withIndex(){
+ withIndex = true;
+ return this;
+ }
+
+ public Builder remainder(PathSegment remainder){
+ this.remainder = remainder;
+ return this;
+ }
+
+ public Builder hyper(){
+ this.hyperReader = true;
+ return this;
+ }
+
+ public Builder finalType(MajorType finalType){
+ this.finalType = finalType;
+ return this;
+ }
+
+ public Builder intermediateType(MajorType intermediateType){
+ this.intermediateType = intermediateType;
+ return this;
+ }
+
+ public TypedFieldId build(){
+ Preconditions.checkNotNull(intermediateType);
+ Preconditions.checkNotNull(finalType);
+
+ if(intermediateType == null) intermediateType = finalType;
+ MajorType actualFinalType = finalType;
+ MajorType secondaryFinal = finalType;
+
+ // if this has an index, switch to required type for output
+ if(withIndex && intermediateType == finalType) actualFinalType = finalType.toBuilder().setMode(DataMode.REQUIRED).build();
+
+ // if this isn't a direct access, switch the final type to nullable as offsets may be null.
+ // TODO: there is a bug here with some things.
+ if(intermediateType != finalType) actualFinalType = finalType.toBuilder().setMode(DataMode.OPTIONAL).build();
+
+ return new TypedFieldId(intermediateType, secondaryFinal, actualFinalType, hyperReader, remainder, ids.toArray());
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + Arrays.hashCode(fieldIds);
+ result = prime * result + ((finalType == null) ? 0 : finalType.hashCode());
+ result = prime * result + ((intermediateType == null) ? 0 : intermediateType.hashCode());
+ result = prime * result + (isHyperReader ? 1231 : 1237);
+ result = prime * result + ((remainder == null) ? 0 : remainder.hashCode());
+ result = prime * result + ((secondaryFinal == null) ? 0 : secondaryFinal.hashCode());
+ return result;
}
@Override
@@ -56,20 +196,32 @@ public class TypedFieldId {
if (getClass() != obj.getClass())
return false;
TypedFieldId other = (TypedFieldId) obj;
- if (fieldId != other.fieldId)
+ if (!Arrays.equals(fieldIds, other.fieldIds))
return false;
- if (type == null) {
- if (other.type != null)
+ if (finalType == null) {
+ if (other.finalType != null)
return false;
- } else if (!type.equals(other.type))
+ } else if (!finalType.equals(other.finalType))
+ return false;
+ if (intermediateType == null) {
+ if (other.intermediateType != null)
+ return false;
+ } else if (!intermediateType.equals(other.intermediateType))
+ return false;
+ if (isHyperReader != other.isHyperReader)
+ return false;
+ if (remainder == null) {
+ if (other.remainder != null)
+ return false;
+ } else if (!remainder.equals(other.remainder))
+ return false;
+ if (secondaryFinal == null) {
+ if (other.secondaryFinal != null)
+ return false;
+ } else if (!secondaryFinal.equals(other.secondaryFinal))
return false;
return true;
}
- @Override
- public String toString() {
- return "TypedFieldId [type=" + type + ", fieldId=" + fieldId + ", isSuperReader=" + isHyperReader + "]";
- }
-
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
index a8100b275..474a0a6ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.vector.ValueVector;
* To change this template use File | Settings | File Templates.
*/
public interface VectorAccessible extends Iterable<VectorWrapper<?>> {
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz);
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds);
public TypedFieldId getValueVectorId(SchemaPath path);
public BatchSchema getSchema();
public int getRecordCount();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 25289a88c..1c7714e2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -22,13 +22,15 @@ import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
import com.beust.jcommander.internal.Lists;
import com.google.common.base.Preconditions;
-public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccessible {
+public class VectorContainer extends AbstractMapVector implements Iterable<VectorWrapper<?>>, VectorAccessible {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class);
protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList();
@@ -61,6 +63,10 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
add(vv, releasable);
}
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){
+ return null;
+ }
+
/**
* Get a set of transferred clones of this container. Note that this guarantees that the vectors in the cloned
* container have the same TypedFieldIds as the existing container, allowing interchangeability in generated code. In
@@ -94,7 +100,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
schema = null;
int i = wrappers.size();
wrappers.add(SimpleVectorWrapper.create(vv));
- return new TypedFieldId(vv.getField().getType(), i, false);
+ return new TypedFieldId(vv.getField().getType(), i);
}
public void add(ValueVector[] hyperVector) {
@@ -129,29 +135,33 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
public TypedFieldId getValueVectorId(SchemaPath path) {
for (int i = 0; i < wrappers.size(); i++) {
VectorWrapper<?> va = wrappers.get(i);
- SchemaPath w = va.getField().getAsSchemaPath();
- if (w.equals(path)){
- return new TypedFieldId(va.getField().getType(), i, va.isHyper());
+ TypedFieldId id = va.getFieldIdIfMatches(i, path);
+ if(id != null){
+ return id;
}
}
- if(path.getRootSegment().isNamed() && path.getRootSegment().getNameSegment().getPath().equals("_MAP") && path.getRootSegment().isLastPath()) throw new UnsupportedOperationException("Drill does not yet support map references.");
return null;
}
+
+
@Override
- public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) {
- VectorWrapper<?> va = wrappers.get(fieldId);
- if(va!= null && clazz == null){
- return (VectorWrapper<?>) va;
- }
- if (va != null && va.getVectorClass() != clazz) {
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) {
+ Preconditions.checkArgument(fieldIds.length >= 1);
+ VectorWrapper<?> va = wrappers.get(fieldIds[0]);
+
+ if(va == null) return null;
+
+ if (fieldIds.length == 1 && clazz != null && !clazz.isAssignableFrom(va.getVectorClass())) {
throw new IllegalStateException(String.format(
"Failure while reading vector. Expected vector class of %s but was holding vector class %s.",
clazz.getCanonicalName(), va.getVectorClass().getCanonicalName()));
}
- return (VectorWrapper<?>) va;
+
+ return (VectorWrapper<?>) va.getChildWrapper(fieldIds);
+
}
public BatchSchema getSchema() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index 401b50e36..dc8ffe582 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -17,11 +17,14 @@
*/
package org.apache.drill.exec.record;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.vector.ValueVector;
public interface VectorWrapper<T extends ValueVector> {
+
+
public Class<T> getVectorClass();
public MaterializedField getField();
public T getValueVector();
@@ -29,4 +32,11 @@ public interface VectorWrapper<T extends ValueVector> {
public boolean isHyper();
public void clear();
public VectorWrapper<T> cloneAndTransfer();
+ public VectorWrapper<?> getChildWrapper(int[] ids);
+
+ /**
+ * Traverse the object graph and determine whether the provided SchemaPath matches data within the Wrapper. If so, return a TypedFieldId associated with this path.
+ * @return TypedFieldId
+ */
+ public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 396834c49..4ff370887 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -18,12 +18,14 @@
package org.apache.drill.exec.record;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
import java.util.List;
-import io.netty.buffer.CompositeByteBuf;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import javax.jdo.metadata.FieldMetadata;
+
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.ValueVector;
@@ -63,7 +65,7 @@ public class WritableBatch {
Preconditions.checkState(!cleared,
"Attempted to reconstruct a container from a WritableBatch after it had been cleared");
if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */
-
+
CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
/* Copy data from each buffer into the compound buffer */
@@ -71,8 +73,7 @@ public class WritableBatch {
cbb.addComponent(buf);
}
-
- List<FieldMetadata> fields = def.getFieldList();
+ List<SerializedField> fields = def.getFieldList();
int bufferOffset = 0;
@@ -82,7 +83,7 @@ public class WritableBatch {
int vectorIndex = 0;
for (VectorWrapper<?> vv : container) {
- FieldMetadata fmd = fields.get(vectorIndex);
+ SerializedField fmd = fields.get(vectorIndex);
ValueVector v = vv.getValueVector();
ByteBuf bb = cbb.slice(bufferOffset, fmd.getBufferLength());
// v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
@@ -127,7 +128,7 @@ public class WritableBatch {
public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
List<ByteBuf> buffers = Lists.newArrayList();
- List<FieldMetadata> metadata = Lists.newArrayList();
+ List<SerializedField> metadata = Lists.newArrayList();
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
index 872052cd0..04a976819 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java
@@ -44,15 +44,15 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) {
this(name, context, fs, storageConfig, new JSONFormatConfig());
}
-
+
public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) {
super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("json"), "json");
}
-
+
@Override
- public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
+ public RecordReader getRecordReader(FragmentContext context, FileWork fileWork,
List<SchemaPath> columns) throws ExecutionSetupException {
- return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
+ return new JSONRecordReader2(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns);
}
@Override
@@ -78,6 +78,6 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> {
return true;
return false;
}
-
+
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
deleted file mode 100644
index 1c8539cc0..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.easy.json;
-
-import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
-import static com.fasterxml.jackson.core.JsonToken.END_OBJECT;
-import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.holders.NullableBitHolder;
-import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
-import org.apache.drill.exec.expr.holders.NullableIntHolder;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.schema.DiffSchema;
-import org.apache.drill.exec.schema.Field;
-import org.apache.drill.exec.schema.NamedField;
-import org.apache.drill.exec.schema.ObjectSchema;
-import org.apache.drill.exec.schema.RecordSchema;
-import org.apache.drill.exec.schema.SchemaIdGenerator;
-import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.VectorHolder;
-import org.apache.drill.exec.vector.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonToken;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class JSONRecordReader implements RecordReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
- private static final int DEFAULT_LENGTH = 4000;
- public static final Charset UTF_8 = Charset.forName("UTF-8");
-
- private final Map<String, VectorHolder> valueVectorMap;
- private final FileSystem fileSystem;
- private final Path hadoopPath;
-
- private JsonParser parser;
- private SchemaIdGenerator generator;
- private DiffSchema diffSchema;
- private RecordSchema currentSchema;
- private List<Field> removedFields;
- private OutputMutator outputMutator;
- private int batchSize;
- private final List<SchemaPath> columns;
-
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize,
- List<SchemaPath> columns) throws OutOfMemoryException {
- this.hadoopPath = new Path(inputPath);
- this.fileSystem = fileSystem;
- this.batchSize = batchSize;
- valueVectorMap = Maps.newHashMap();
- this.columns = columns;
- }
-
- public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
- List<SchemaPath> columns) throws OutOfMemoryException {
- this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, columns);
- }
-
- private JsonParser getParser() {
- return parser;
- }
-
- @Override
- public void setup(OutputMutator output) throws ExecutionSetupException {
- outputMutator = output;
- output.removeAllFields();
- currentSchema = new ObjectSchema();
- diffSchema = new DiffSchema();
- removedFields = Lists.newArrayList();
-
- try {
- JsonFactory factory = new JsonFactory();
- parser = factory.createJsonParser(fileSystem.open(hadoopPath));
- parser.nextToken(); // Read to the first START_OBJECT token
- generator = new SchemaIdGenerator();
- } catch (IOException e) {
- throw new ExecutionSetupException(e);
- }
- }
-
- @Override
- public int next() {
- if (parser.isClosed() || !parser.hasCurrentToken()) {
- return 0;
- }
-
- resetBatch();
-
- int nextRowIndex = 0;
-
- try {
- while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) {
- parser.nextToken(); // Read to START_OBJECT token
-
- if (!parser.hasCurrentToken()) {
- parser.close();
- break;
- }
- }
-
- parser.nextToken();
-
- if (!parser.hasCurrentToken()) {
- parser.close();
- }
-
- // Garbage collect fields never referenced in this batch
- for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
- diffSchema.addRemovedField(field);
- outputMutator.removeField(field.getAsMaterializedField());
- }
-
- if (diffSchema.isChanged()) {
- outputMutator.setNewSchema();
- }
-
-
- } catch (IOException | SchemaChangeException e) {
- logger.error("Error reading next in Json reader", e);
- throw new DrillRuntimeException(e);
- }
-
- for (VectorHolder holder : valueVectorMap.values()) {
- if (holder.isRepeated()) {
- holder.setGroupCount(nextRowIndex);
- }
- holder.getValueVector().getMutator().setValueCount(nextRowIndex);
- }
-
- return nextRowIndex;
- }
-
- private void resetBatch() {
- for (VectorHolder value : valueVectorMap.values()) {
- value.reset();
- }
-
- currentSchema.resetMarkedFields();
- diffSchema.reset();
- removedFields.clear();
- }
-
- @Override
- public void cleanup() {
- try {
- parser.close();
- } catch (IOException e) {
- logger.warn("Error closing Json parser", e);
- }
- }
-
-
- private RecordSchema getCurrentSchema() {
- return currentSchema;
- }
-
- private void setCurrentSchema(RecordSchema schema) {
- currentSchema = schema;
- }
-
- private List<Field> getRemovedFields() {
- return removedFields;
- }
-
- private boolean fieldSelected(String field){
-
- SchemaPath sp = SchemaPath.getCompoundPath(field.split("\\."));
- if (this.columns != null && this.columns.size() > 0){
- for (SchemaPath expr : this.columns){
- if ( sp.equals(expr)){
- return true;
- }
- }
- return false;
- }
- return true;
- }
-
- public static enum ReadType {
- ARRAY(END_ARRAY) {
- @Override
- public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) {
- return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
- }
-
- @Override
- public RecordSchema createSchema() throws IOException {
- return new ObjectSchema();
- }
- },
- OBJECT(END_OBJECT) {
- @Override
- public Field createField(RecordSchema parentSchema,
- String prefixFieldName,
- String fieldName,
- MajorType fieldType,
- int index) {
- return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType);
- }
-
- @Override
- public RecordSchema createSchema() throws IOException {
- return new ObjectSchema();
- }
- };
-
- private final JsonToken endObject;
-
- ReadType(JsonToken endObject) {
- this.endObject = endObject;
- }
-
- public JsonToken getEndObject() {
- return endObject;
- }
-
- @SuppressWarnings("ConstantConditions")
- public boolean readRecord(JSONRecordReader reader,
- String prefixFieldName,
- int rowIndex,
- int groupCount) throws IOException, SchemaChangeException {
- JsonParser parser = reader.getParser();
- JsonToken token = parser.nextToken();
- JsonToken endObject = getEndObject();
- int colIndex = 0;
- boolean isFull = false;
- while (token != endObject) {
- if (token == FIELD_NAME) {
- token = parser.nextToken();
- continue;
- }
-
- String fieldName = parser.getCurrentName();
- if ( fieldName != null && ! reader.fieldSelected(fieldName)){
- // this field was not requested in the query
- token = parser.nextToken();
- colIndex += 1;
- continue;
- }
- MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY);
- ReadType readType = null;
- switch (token) {
- case START_ARRAY:
- readType = ReadType.ARRAY;
- groupCount++;
- break;
- case START_OBJECT:
- readType = ReadType.OBJECT;
- groupCount = 0;
- break;
- }
-
- if (fieldType != null) { // Including nulls
- boolean currentFieldFull = !recordData(
- readType,
- reader,
- fieldType,
- prefixFieldName,
- fieldName,
- rowIndex,
- colIndex,
- groupCount);
- if(readType == ReadType.ARRAY) {
- groupCount--;
- }
- isFull = isFull || currentFieldFull;
- }
- token = parser.nextToken();
- colIndex += 1;
- }
- return !isFull;
- }
-
- private void removeChildFields(List<Field> removedFields, Field field) {
- RecordSchema schema = field.getAssignedSchema();
- if (schema == null) {
- return;
- }
- for (Field childField : schema.getFields()) {
- removedFields.add(childField);
- if (childField.hasSchema()) {
- removeChildFields(removedFields, childField);
- }
- }
- }
-
- private boolean recordData(JSONRecordReader.ReadType readType,
- JSONRecordReader reader,
- MajorType fieldType,
- String prefixFieldName,
- String fieldName,
- int rowIndex,
- int colIndex,
- int groupCount) throws IOException, SchemaChangeException {
- RecordSchema currentSchema = reader.getCurrentSchema();
- Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex);
- boolean isFieldFound = field != null;
- List<Field> removedFields = reader.getRemovedFields();
- boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE);
-
- if (isFieldFound && !field.getFieldType().equals(fieldType)) {
- boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE);
-
- if (newFieldLateBound && !existingFieldLateBound) {
- fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType());
- } else if (!newFieldLateBound && existingFieldLateBound) {
- field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType()));
- } else if (!newFieldLateBound && !existingFieldLateBound) {
- if (field.hasSchema()) {
- removeChildFields(removedFields, field);
- }
- removedFields.add(field);
- currentSchema.removeField(field, colIndex);
-
- isFieldFound = false;
- }
- }
-
- if (!isFieldFound) {
- field = createField(
- currentSchema,
- prefixFieldName,
- fieldName,
- fieldType,
- colIndex
- );
-
- reader.recordNewField(field);
- currentSchema.addField(field);
- }
-
- field.setRead(true);
-
- VectorHolder holder = getOrCreateVectorHolder(reader, field);
- if (readType != null) {
- RecordSchema fieldSchema = field.getAssignedSchema();
- RecordSchema newSchema = readType.createSchema();
-
- if (readType != ReadType.ARRAY) {
- reader.setCurrentSchema(fieldSchema);
- if (fieldSchema == null) reader.setCurrentSchema(newSchema);
- readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
- } else {
- readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount);
- }
-
- reader.setCurrentSchema(currentSchema);
-
- } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) {
- return addValueToVector(
- rowIndex,
- holder,
- JacksonHelper.getValueFromFieldType(
- reader.getParser(),
- fieldType.getMinorType()
- ),
- fieldType.getMinorType(),
- groupCount
- );
- }
-
- return true;
- }
-
- private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) {
- switch (minorType) {
- case BIGINT: {
- holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableBigIntVector int4 = (NullableBigIntVector) holder.getValueVector();
- NullableBigIntVector.Mutator m = int4.getMutator();
- m.set(index, (Long) val);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated int is not supported.");
- }
-
- RepeatedBigIntVector repeatedInt4 = (RepeatedBigIntVector) holder.getValueVector();
- RepeatedBigIntVector.Mutator m = repeatedInt4.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Long) val);
- }
-
- return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1);
- }
- case FLOAT4: {
- holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector();
- NullableFloat4Vector.Mutator m = float4.getMutator();
- m.set(index, (Float) val);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated float is not supported.");
- }
-
- RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector();
- RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Float) val);
- }
- return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1);
- }
- case VARCHAR: {
- if (val == null) {
- return (index + 1) * 4 <= holder.getLength();
- } else {
- byte[] bytes = ((String) val).getBytes(UTF_8);
- int length = bytes.length;
- holder.incAndCheckLength(length);
- if (groupCount == 0) {
- NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector();
- NullableVarCharVector.Mutator m = varLen4.getMutator();
- m.set(index, bytes);
- } else {
- RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector();
- RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator();
- holder.setGroupCount(index);
- m.add(index, bytes);
- }
- return holder.hasEnoughSpace(length + 4 + 1);
- }
- }
- case BIT: {
- holder.incAndCheckLength(NullableBitHolder.WIDTH + 1);
- if (groupCount == 0) {
- if (val != null) {
- NullableBitVector bit = (NullableBitVector) holder.getValueVector();
- NullableBitVector.Mutator m = bit.getMutator();
- m.set(index, (Boolean) val ? 1 : 0);
- }
- } else {
- if (val == null) {
- throw new UnsupportedOperationException("Nullable repeated boolean is not supported.");
- }
-
- RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector();
- RepeatedBitVector.Mutator m = repeatedBit.getMutator();
- holder.setGroupCount(index);
- m.add(index, (Boolean) val ? 1 : 0);
- }
- return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1);
- }
- default:
- throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType);
- }
- }
-
- private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException {
- return reader.getOrCreateVectorHolder(field);
- }
-
- public abstract RecordSchema createSchema() throws IOException;
-
- public abstract Field createField(RecordSchema parentSchema,
- String prefixFieldName,
- String fieldName,
- MajorType fieldType,
- int index);
- }
-
- private void recordNewField(Field field) {
- diffSchema.recordNewField(field);
- }
-
- private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException {
- String fullFieldName = field.getFullFieldName();
- VectorHolder holder = valueVectorMap.get(fullFieldName);
-
- if (holder == null) {
- MajorType type = field.getFieldType();
- MinorType minorType = type.getMinorType();
-
- if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) {
- return null;
- }
-
- MaterializedField f = MaterializedField.create(SchemaPath.getCompoundPath(fullFieldName.split("\\.")), type);
-
- ValueVector v = outputMutator.addField(f, TypeHelper.getValueVectorClass(minorType, type.getMode()));
- AllocationHelper.allocate(v, batchSize, 50);
- holder = new VectorHolder(v);
- valueVectorMap.put(fullFieldName, holder);
- return holder;
- }
- return holder;
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
new file mode 100644
index 000000000..bb52a20e8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.fn.JsonRecordSplitter;
+import org.apache.drill.exec.vector.complex.fn.UTF8JsonRecordSplitter;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.hive12.common.collect.Lists;
+
+public class JSONRecordReader2 implements RecordReader{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader2.class);
+
+ private OutputMutator mutator;
+ private VectorContainerWriter writer;
+ private Path hadoopPath;
+ private FileSystem fileSystem;
+ private InputStream stream;
+ private JsonReader jsonReader;
+
+ public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
+ List<SchemaPath> columns) throws OutOfMemoryException {
+ this.hadoopPath = new Path(inputPath);
+ this.fileSystem = fileSystem;
+ }
+
+ @Override
+ public void setup(OutputMutator output) throws ExecutionSetupException {
+ try{
+ stream = fileSystem.open(hadoopPath);
+ JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream);
+ this.writer = new VectorContainerWriter(output);
+ this.mutator = output;
+ jsonReader = new JsonReader(splitter);
+ }catch(IOException e){
+ throw new ExecutionSetupException("Failure reading JSON file.", e);
+ }
+ }
+
+ @Override
+ public int next() {
+ writer.allocate();
+ writer.reset();
+
+ int i =0;
+
+ try{
+ outside: while(true){
+ writer.setPosition(i);
+
+ switch(jsonReader.write(writer)){
+ case WRITE_SUCCEED:
+ i++;
+ break;
+
+ case NO_MORE:
+// System.out.println("no more records - main loop");
+ break outside;
+
+ case WRITE_FAILED:
+// System.out.println("==== hit bounds at " + i);
+ break outside;
+ };
+ }
+
+
+ writer.setValueCount(i);
+ mutator.setNewSchema();
+ return i;
+
+ }catch(IOException | SchemaChangeException e){
+ throw new DrillRuntimeException("Failure while reading JSON file.", e);
+ }
+
+ }
+
+ @Override
+ public void cleanup() {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ logger.warn("Failure while closing stream.", e);
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 25931db65..9e0126870 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -224,7 +224,7 @@ public class HiveRecordReader implements RecordReader {
PrimitiveCategory pCat = primitiveCategories.get(i);
MajorType type = getMajorType(pCat);
MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), type);
- ValueVector vv = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ ValueVector vv = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
vectors.add(vv);
}
for (int i = 0; i < selectedPartitionNames.size(); i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index eb9e7a693..5c07dc5a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -76,7 +76,7 @@ public class MockRecordReader implements RecordReader {
for (int i = 0; i < config.getTypes().length; i++) {
MajorType type = config.getTypes()[i].getMajorType();
- valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
}
output.setNewSchema();
} catch (SchemaChangeException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 75cd799fc..5d2845627 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.List;
import com.google.common.base.Preconditions;
+
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -41,10 +42,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.*;
-import org.apache.drill.exec.vector.NullableVarBinaryVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.apache.drill.exec.vector.VarCharVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -263,7 +260,7 @@ public class ParquetRecordReader implements RecordReader {
//convertedTypes.put()
fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
- v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
+ v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v,
convertedType);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index 86aec44c8..f7a74c258 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -19,11 +19,11 @@ package org.apache.drill.exec.vector;
public class AllocationHelper {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class);
-
+
public static void allocate(ValueVector v, int valueCount, int bytesPerValue){
allocate(v, valueCount, bytesPerValue, 5);
}
-
+
public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){
if(v instanceof FixedWidthVector){
((FixedWidthVector) v).allocateNew(valueCount);
@@ -34,7 +34,7 @@ public class AllocationHelper {
}else if(v instanceof RepeatedVariableWidthVector){
((RepeatedVariableWidthVector) v).allocateNew(valueCount * bytesPerValue * repeatedPerTop, valueCount, valueCount * repeatedPerTop);
}else{
- throw new UnsupportedOperationException();
+ v.allocateNew();
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index ddddab1a9..9641e6ae8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -17,21 +17,25 @@
*/
package org.apache.drill.exec.vector;
+import java.util.Iterator;
+
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.DeadBuf;
import org.apache.drill.exec.record.MaterializedField;
+import com.google.hive12.common.collect.Iterators;
+
public abstract class BaseDataValueVector extends BaseValueVector{
protected ByteBuf data = DeadBuf.DEAD_BUFFER;
protected int valueCount;
-
+
public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
-
+
}
/**
@@ -46,7 +50,7 @@ public abstract class BaseDataValueVector extends BaseValueVector{
}
}
-
+
@Override
public ByteBuf[] getBuffers(){
ByteBuf[] out;
@@ -60,18 +64,24 @@ public abstract class BaseDataValueVector extends BaseValueVector{
clear();
return out;
}
-
+
public int getBufferSize() {
if(valueCount == 0) return 0;
return data.writerIndex();
}
@Override
- public abstract FieldMetadata getMetadata();
+ public abstract SerializedField getMetadata();
public ByteBuf getData(){
return data;
}
-
-
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return Iterators.emptyIterator();
+ }
+
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 7cc1adff2..7a61475f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -17,14 +17,21 @@
*/
package org.apache.drill.exec.vector;
+import java.util.Iterator;
+
import org.apache.drill.common.expression.FieldReference;
+
import io.netty.buffer.ByteBuf;
+
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
+import com.google.hive12.common.collect.Iterators;
+
public abstract class BaseValueVector implements ValueVector{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class);
-
+
protected final BufferAllocator allocator;
protected final MaterializedField field;
@@ -32,21 +39,24 @@ public abstract class BaseValueVector implements ValueVector{
this.allocator = allocator;
this.field = field;
}
-
+
@Override
public void close() {
clear();
}
-
+
@Override
public MaterializedField getField() {
return field;
}
-
+
public MaterializedField getField(FieldReference ref){
return getField().clone(ref);
}
-
+
+ protected SerializedField.Builder getMetadataBuilder(){
+ return getField().getAsBuilder();
+ }
abstract public ByteBuf getData();
@@ -54,12 +64,15 @@ public abstract class BaseValueVector implements ValueVector{
public abstract int getValueCount();
public void reset(){}
}
-
+
abstract class BaseMutator implements Mutator{
public void reset(){}
}
-
-
-
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return Iterators.emptyIterator();
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 63384a328..597b0f1e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -24,9 +24,12 @@ import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.NullableBitHolder;
import org.apache.drill.exec.memory.AccountingByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.impl.BitReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
/**
* Bit implements a vector of bit-width values. Elements in the vector are accessed by position from the logical start
@@ -49,11 +52,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public FieldMetadata getMetadata() {
- return FieldMetadata.newBuilder()
- .setDef(getField().getDef())
- .setValueCount(valueCount)
- .setBufferLength( (int) Math.ceil(valueCount / 8.0))
+ public SerializedField getMetadata() {
+ return field.getAsBuilder() //
+ .setValueCount(valueCount) //
+ .setBufferLength( (int) Math.ceil(valueCount / 8.0)) //
.build();
}
@@ -66,13 +68,26 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
public void allocateNew() {
+ if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+ }
+
+ public boolean allocateNewSafe() {
clear();
if (allocationMonitor > 5) {
allocationValueCount = Math.min(1, (int)(allocationValueCount * 0.9));
} else if (allocationMonitor < -5) {
allocationValueCount = (int) (allocationValueCount * 1.1);
}
- allocateNew(allocationValueCount);
+
+ clear();
+ valueCapacity = allocationValueCount;
+ int valueSize = getSizeFromCount(allocationValueCount);
+ data = allocator.buffer(valueSize);
+ if(data == null) return false;
+ for (int i = 0; i < valueSize; i++) {
+ data.setByte(i, 0);
+ }
+ return true;
}
/**
@@ -112,8 +127,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public void load(FieldMetadata metadata, ByteBuf buffer) {
- assert this.field.getDef().equals(metadata.getDef());
+ public void load(SerializedField metadata, ByteBuf buffer) {
+ assert this.field.matches(metadata);
int loaded = load(metadata.getValueCount(), buffer);
assert metadata.getBufferLength() == loaded;
}
@@ -177,9 +192,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
}
- private void copyTo(int startIndex, int length, BitVector target) {
-
- }
private class TransferImpl implements TransferPair {
BitVector to;
@@ -205,8 +217,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public void copyValue(int fromIndex, int toIndex) {
- to.copyFrom(fromIndex, toIndex, BitVector.this);
+ public boolean copyValueSafe(int fromIndex, int toIndex) {
+ return to.copyFromSafe(fromIndex, toIndex, BitVector.this);
}
}
@@ -233,7 +245,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public final Object getObject(int index) {
+ public final Boolean getObject(int index) {
return new Boolean(get(index) != 0);
}
@@ -245,9 +257,15 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
holder.value = get(index);
}
- final void get(int index, NullableBitHolder holder) {
+ public final void get(int index, NullableBitHolder holder) {
+ holder.isSet = 1;
holder.value = get(index);
}
+
+ @Override
+ public FieldReader getReader() {
+ return new BitReaderImpl(BitVector.this);
+ }
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
index 8e097e439..ad2ba1b04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java
@@ -18,5 +18,5 @@
package org.apache.drill.exec.vector;
public interface RepeatedMutator extends ValueVector.Mutator {
- public void startNewGroup(int index);
+ public boolean startNewGroup(int index);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 258b354b3..8b871fc5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -22,24 +22,34 @@ import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import org.apache.drill.common.expression.FieldReference;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
/**
* ValueVectorTypes defines a set of template-generated classes which implement type-specific value vectors. The
* template approach was chosen due to the lack of multiple inheritence. It is also important that all related logic be
* as efficient as possible.
*/
-public interface ValueVector extends Closeable {
+public interface ValueVector extends Closeable, Iterable<ValueVector> {
+
/**
* Allocate new buffers. ValueVector implements logic to determine how much to allocate.
+ * @throws OutOfMemoryRuntimeException Thrown if no memory can be allocated.
*/
- public void allocateNew();
+ public void allocateNew() throws OutOfMemoryRuntimeException;
+
+ /**
+ * Allocates new buffers. ValueVector implements logic to determine how much to allocate.
+ * @return Returns true if allocation was succesful.
+ */
+ public boolean allocateNewSafe();
public int getBufferSize();
-
+
/**
* Alternative to clear(). Allows use as closeable in try-with-resources.
*/
@@ -52,27 +62,26 @@ public interface ValueVector extends Closeable {
/**
* Get information about how this field is materialized.
- *
+ *
* @return
*/
public MaterializedField getField();
/**
- * Get a transfer pair to allow transferring this vectors data between this vector and a destination vector of the same
- * type. Will also generate a second instance of this vector class that is connected through the TransferPair.
- *
- * @return
+ * Get a transfer pair to allow transferring this vectors data between this vector and a destination vector of the
+ * same type. Will also generate a second instance of this vector class that is connected through the TransferPair.
+ *
+ * @return
*/
public TransferPair getTransferPair();
public TransferPair makeTransferPair(ValueVector to);
-
-
+
public TransferPair getTransferPair(FieldReference ref);
/**
* Given the current buffer allocation, return the maximum number of values that this buffer can contain.
- *
+ *
* @return Maximum values buffer can contain. In the case of a Repeated field, this is the number of atoms, not
* repeated groups.
*/
@@ -80,37 +89,40 @@ public interface ValueVector extends Closeable {
/**
* Get Accessor to read value vector data.
- *
+ *
* @return
*/
public abstract Accessor getAccessor();
/**
- * Return the underlying buffers associated with this vector. Note that this doesn't impact the
- * reference counts for this buffer so it only should be used for in-context access. Also note
- * that this buffer changes regularly thus external classes shouldn't hold a reference to
- * it (unless they change it).
+ * Return the underlying buffers associated with this vector. Note that this doesn't impact the reference counts for
+ * this buffer so it only should be used for in-context access. Also note that this buffer changes regularly thus
+ * external classes shouldn't hold a reference to it (unless they change it).
*
* @return The underlying ByteBuf.
*/
public abstract ByteBuf[] getBuffers();
-
+
/**
- * Load the data provided in the buffer. Typically used when deserializing from the wire.
- * @param metadata Metadata used to decode the incoming buffer.
- * @param buffer The buffer that contains the ValueVector.
+ * Load the data provided in the buffer. Typically used when deserializing from the wire.
+ *
+ * @param metadata
+ * Metadata used to decode the incoming buffer.
+ * @param buffer
+ * The buffer that contains the ValueVector.
*/
- public void load(FieldMetadata metadata, ByteBuf buffer);
-
+ public void load(SerializedField metadata, ByteBuf buffer);
+
/**
- * Get the metadata for this field. Used in serialization
+ * Get the metadata for this field. Used in serialization
+ *
* @return FieldMetadata for this field.
*/
- public FieldMetadata getMetadata();
-
+ public SerializedField getMetadata();
+
/**
* Get a Mutator to update this vectors data.
- *
+ *
* @return
*/
public abstract Mutator getMutator();
@@ -125,23 +137,25 @@ public interface ValueVector extends Closeable {
/**
* Get the Java Object representation of the element at the specified position. Useful for testing.
- *
+ *
* @param index
* Index of the value to get
*/
public abstract Object getObject(int index);
public int getValueCount();
-
+
public boolean isNull(int index);
public void reset();
+
+ public FieldReader getReader();
}
public interface Mutator {
/**
* Set the top number values (optional/required) or number of value groupings (repeated) in this vector.
- *
+ *
* @param valueCount
*/
public void setValueCount(int valueCount);
@@ -150,4 +164,5 @@ public interface ValueVector extends Closeable {
public void generateTestData(int values);
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
index 32f08b0c8..adee17199 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java
@@ -40,6 +40,6 @@ public class GenericAccessor extends AbstractSqlAccessor {
@Override
TypeProtos.MajorType getType() {
- return v.getMetadata().getDef().getMajorType();
+ return v.getMetadata().getMajorType();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
new file mode 100644
index 000000000..ab1d2707d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.vector.ValueVector;
+
+public abstract class AbstractContainerVector implements ValueVector{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
+
+ public abstract <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz);
+ public abstract <T extends ValueVector> T get(String name, Class<T> clazz);
+ public abstract int size();
+
+ protected <T extends ValueVector> T typeify(ValueVector v, Class<T> clazz){
+ if(clazz.isAssignableFrom(v.getClass())){
+ return (T) v;
+ }else{
+ throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s]. Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ }
+ }
+
+ public abstract VectorWithOrdinal getVectorWithOrdinal(String name);
+
+
+ public TypedFieldId getFieldIdIfMatches(TypedFieldId.Builder builder, boolean addToBreadCrumb, PathSegment seg){
+ if(seg == null){
+ if(addToBreadCrumb) builder.intermediateType(this.getField().getType());
+ return builder.finalType(this.getField().getType()).build();
+ }
+
+ if(seg.isArray()){
+
+ if(seg.isLastPath()){
+ if(addToBreadCrumb) builder.intermediateType(this.getField().getType());
+ return builder //
+ .remainder(seg) //
+ .finalType(this.getField().getType()) //
+ .withIndex() //
+ .build();
+ }else{
+ if(addToBreadCrumb){
+ addToBreadCrumb = false;
+ builder.remainder(seg);
+ }
+ // this is a complex array reference, which means it doesn't correspond directly to a vector by itself.
+ seg = seg.getChild();
+
+ }
+
+ }else{
+ // name segment.
+ }
+
+ VectorWithOrdinal vord = getVectorWithOrdinal(seg.isArray() ? null : seg.getNameSegment().getPath());
+ if(vord == null) return null;
+
+
+ if(addToBreadCrumb){
+ builder.intermediateType(this.getField().getType());
+ builder.addId(vord.ordinal);
+ }
+
+ ValueVector v = vord.vector;
+
+ if(v instanceof AbstractContainerVector){
+ // we're looking for a multi path.
+ AbstractContainerVector c = (AbstractContainerVector) v;
+ return c.getFieldIdIfMatches(builder, addToBreadCrumb, seg.getChild());
+ }else{
+ if(seg.isLastPath()){
+ if(addToBreadCrumb) builder.intermediateType(v.getField().getType());
+ return builder.finalType(v.getField().getType()).build();
+ }else{
+ logger.warn("You tried to request a complex type inside a scalar object.");
+ return null;
+ }
+ }
+
+ }
+
+ protected boolean supportsDirectRead(){
+ return false;
+ }
+
+ protected class VectorWithOrdinal{
+ final ValueVector vector;
+ final int ordinal;
+
+ public VectorWithOrdinal(ValueVector v, int ordinal){
+ this.vector = v;
+ this.ordinal = ordinal;
+ }
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
new file mode 100644
index 000000000..f126e5ce7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+public class AbstractMapVector {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractMapVector.class);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
new file mode 100644
index 000000000..91c0be574
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector.MapSingleCopier;
+import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.hive12.common.collect.Lists;
+
+public class MapVector extends AbstractContainerVector {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);
+
+ public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REQUIRED).build();
+
+ final HashMap<String, ValueVector> vectors = Maps.newHashMap();
+ private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
+ private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>();
+ private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
+ private final Accessor accessor = new Accessor();
+ private final Mutator mutator = new Mutator();
+ private final BufferAllocator allocator;
+ private MaterializedField field;
+ private int valueCount;
+
+ public MapVector(String path, BufferAllocator allocator){
+ this.field = MaterializedField.create(SchemaPath.getSimplePath(path), TYPE);
+ this.allocator = allocator;
+ }
+ public MapVector(MaterializedField field, BufferAllocator allocator){
+ this.field = field;
+ this.allocator = allocator;
+ }
+
+ public int size(){
+ return vectors.size();
+ }
+
+ transient private MapTransferPair ephPair;
+ transient private MapSingleCopier ephPair2;
+
+ public boolean copyFromSafe(int fromIndex, int thisIndex, MapVector from){
+ if(ephPair == null || ephPair.from != from){
+ ephPair = (MapTransferPair) from.makeTransferPair(this);
+ }
+ return ephPair.copyValueSafe(fromIndex, thisIndex);
+ }
+
+ public boolean copyFromSafe(int fromSubIndex, int thisIndex, RepeatedMapVector from){
+ if(ephPair2 == null || ephPair2.from != from){
+ ephPair2 = from.makeSingularCopier(this);
+ }
+ return ephPair2.copySafe(fromSubIndex, thisIndex);
+ }
+
+ @Override
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+ ValueVector v = vectors.get(name);
+
+ if(v == null){
+ v = TypeHelper.getNewVector(field.getPath(), name, allocator, type);
+ Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.", type));
+ put(name, v);
+ }
+ return typeify(v, clazz);
+
+ }
+
+ protected void put(String name, ValueVector vv){
+ int ordinal = vectors.size();
+ if(vectors.put(name, vv) != null){
+ throw new IllegalStateException();
+ }
+ vectorIds.put(name, new VectorWithOrdinal(vv, ordinal));
+ vectorsById.put(ordinal, vv);
+ field.addChild(vv.getField());
+ }
+
+
+ @Override
+ protected boolean supportsDirectRead() {
+ return true;
+ }
+
+ public Iterator<String> fieldNameIterator(){
+ return vectors.keySet().iterator();
+ }
+
+ @Override
+ public void allocateNew() throws OutOfMemoryRuntimeException {
+ if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+ }
+
+ @Override
+ public boolean allocateNewSafe() {
+ for(ValueVector v : vectors.values()){
+ if(!v.allocateNewSafe()) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public <T extends ValueVector> T get(String name, Class<T> clazz) {
+ ValueVector v = vectors.get(name);
+ if(v == null) throw new IllegalStateException(String.format("Attempting to access invalid map field of name %s.", name));
+ return typeify(v, clazz);
+ }
+
+ @Override
+ public int getBufferSize() {
+ if(valueCount == 0 || vectors.isEmpty()) return 0;
+ long buffer = 0;
+ for(ValueVector v : this){
+ buffer += v.getBufferSize();
+ }
+
+ return (int) buffer;
+ }
+
+ @Override
+ public void close() {
+ for(ValueVector v : this){
+ v.close();
+ }
+ }
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return vectors.values().iterator();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return field;
+ }
+
+ @Override
+ public TransferPair getTransferPair() {
+ return new MapTransferPair(field.getPath());
+ }
+
+ @Override
+ public TransferPair makeTransferPair(ValueVector to) {
+ return new MapTransferPair( (MapVector) to);
+ }
+
+ @Override
+ public TransferPair getTransferPair(FieldReference ref) {
+ return new MapTransferPair(ref);
+ }
+
+ private class MapTransferPair implements TransferPair{
+ private MapVector from = MapVector.this;
+ private TransferPair[] pairs;
+ private MapVector to;
+
+ public MapTransferPair(SchemaPath path){
+ MapVector v = new MapVector(MaterializedField.create(path, TYPE), allocator);
+ pairs = new TransferPair[vectors.size()];
+ int i =0;
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ TransferPair otherSide = e.getValue().getTransferPair();
+ v.put(e.getKey(), otherSide.getTo());
+ pairs[i++] = otherSide;
+ }
+ this.to = v;
+ }
+
+ public MapTransferPair(MapVector to){
+ this.to = to;
+ pairs = new TransferPair[vectors.size()];
+ int i =0;
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ int preSize = to.vectors.size();
+ ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
+ if(to.vectors.size() != preSize) v.allocateNew();
+ pairs[i++] = e.getValue().makeTransferPair(v);
+ }
+ }
+
+
+ @Override
+ public void transfer() {
+ for(TransferPair p : pairs){
+ p.transfer();
+ }
+ to.valueCount = valueCount;
+ clear();
+ }
+
+ @Override
+ public ValueVector getTo() {
+ return to;
+ }
+
+ @Override
+ public boolean copyValueSafe(int from, int to) {
+ for(TransferPair p : pairs){
+ if(!p.copyValueSafe(from, to)) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void splitAndTransfer(int startIndex, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ @Override
+ public int getValueCapacity() {
+ if(this.vectors.isEmpty()) return 0;
+ return vectors.values().iterator().next().getValueCapacity();
+ }
+
+ @Override
+ public Accessor getAccessor() {
+ return accessor;
+ }
+
+ @Override
+ public ByteBuf[] getBuffers() {
+ List<ByteBuf> bufs = Lists.newArrayList();
+ for(ValueVector v : vectors.values()){
+ for(ByteBuf b : v.getBuffers()){
+ bufs.add(b);
+ }
+ }
+ return bufs.toArray(new ByteBuf[bufs.size()]);
+ }
+
+ @Override
+ public void load(SerializedField metadata, ByteBuf buf) {
+ List<SerializedField> fields = metadata.getChildList();
+
+ int bufOffset = 0;
+ for (SerializedField fmd : fields) {
+ MaterializedField fieldDef = MaterializedField.create(fmd);
+
+ ValueVector v = vectors.get(fieldDef.getLastName());
+ if(v == null) {
+ // if we arrive here, we didn't have a matching vector.
+
+ v = TypeHelper.getNewVector(fieldDef, allocator);
+ }
+ if (fmd.getValueCount() == 0){
+ v.clear();
+ } else {
+ v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+ }
+ bufOffset += fmd.getBufferLength();
+ put(fieldDef.getLastName(), v);
+ }
+ }
+
+ @Override
+ public SerializedField getMetadata() {
+ SerializedField.Builder b = getField() //
+ .getAsBuilder() //
+ .setBufferLength(getBufferSize()) //
+ .setValueCount(valueCount);
+
+
+ for(ValueVector v : vectors.values()){
+ b.addChild(v.getMetadata());
+ }
+ return b.build();
+ }
+
+ @Override
+ public Mutator getMutator() {
+ return mutator;
+ }
+
+ public class Accessor implements ValueVector.Accessor{
+
+ @Override
+ public Object getObject(int index) {
+ Map<String, Object> vv = Maps.newHashMap();
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ ValueVector v = e.getValue();
+ String k = e.getKey();
+ Object value = v.getAccessor().getObject(index);
+ if(value != null){
+ vv.put(k, value);
+ }
+ }
+ return vv;
+ }
+
+ public void get(int index, ComplexHolder holder){
+ reader.setPosition(index);
+ holder.reader = reader;
+ }
+
+ @Override
+ public int getValueCount() {
+ return valueCount;
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public FieldReader getReader() {
+ return new SingleMapReaderImpl(MapVector.this);
+ }
+
+ }
+
+ public ValueVector getVectorById(int id){
+ return vectorsById.get(id);
+ }
+
+ public class Mutator implements ValueVector.Mutator{
+
+ @Override
+ public void setValueCount(int valueCount) {
+ for(ValueVector v : vectors.values()){
+ v.getMutator().setValueCount(valueCount);
+ }
+ MapVector.this.valueCount = valueCount;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public void generateTestData(int values) {
+ }
+
+ }
+
+ @Override
+ public void clear() {
+ for(ValueVector v : vectors.values()){
+ v.clear();;
+ }
+ }
+
+ public VectorWithOrdinal getVectorWithOrdinal(String name){
+ return vectorIds.get(name);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java
new file mode 100644
index 000000000..6d86a6438
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+public interface Positionable {
+ public void setPosition(int index);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
new file mode 100644
index 000000000..93930b50c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
+import org.apache.drill.exec.expr.holders.RepeatedListHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.impl.NullReader;
+import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import com.google.common.collect.Lists;
+import com.google.hive12.common.base.Preconditions;
+
+
+public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{
+
+ private final UInt4Vector offsets; // offsets to start of each record
+ private final BufferAllocator allocator;
+ private final Mutator mutator = new Mutator();
+ private final Accessor accessor = new Accessor();
+ private ValueVector vector;
+ private final MaterializedField field;
+ private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this);
+ private int allocationValueCount = 4000;
+ private int allocationMonitor = 0;
+
+ private int lastSet = 0;
+
+ private int valueCount;
+
+ public static MajorType TYPE = Types.repeated(MinorType.LIST);
+
+ public RepeatedListVector(MaterializedField field, BufferAllocator allocator){
+ this.allocator = allocator;
+ this.offsets = new UInt4Vector(null, allocator);
+ this.field = field;
+ }
+
+ public int size(){
+ return vector != null ? 1 : 0;
+ }
+
+ public RepeatedListVector(SchemaPath path, BufferAllocator allocator){
+ this(MaterializedField.create(path, TYPE), allocator);
+ }
+
+ transient private RepeatedListTransferPair ephPair;
+
+ public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from){
+ if(ephPair == null || ephPair.from != from){
+ ephPair = (RepeatedListTransferPair) from.makeTransferPair(this);
+ }
+ return ephPair.copyValueSafe(fromIndex, thisIndex);
+ }
+
+ public Mutator getMutator(){
+ return mutator;
+ }
+
+ @Override
+ public void allocateNew() throws OutOfMemoryRuntimeException {
+ if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+ }
+
+ @Override
+ public boolean allocateNewSafe() {
+ if(!offsets.allocateNewSafe()) return false;
+
+ if(vector != null){
+ return vector.allocateNewSafe();
+ }else{
+ return true;
+ }
+
+ }
+
+ public class Mutator implements ValueVector.Mutator{
+
+ public void startNewGroup(int index) {
+ offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
+ }
+
+ public int add(int index){
+ int endOffset = index+1;
+ int currentChildOffset = offsets.getAccessor().get(endOffset);
+ int newChildOffset = currentChildOffset + 1;
+ boolean success = offsets.getMutator().setSafe(endOffset, newChildOffset);
+ lastSet = index;
+ if(!success) return -1;
+
+ // this is done at beginning so return the currentChildOffset, not the new offset.
+ return currentChildOffset;
+
+ }
+
+ @Override
+ public void setValueCount(int groupCount) {
+ populateEmpties(groupCount);
+ offsets.getMutator().setValueCount(groupCount+1);
+
+ if(vector != null){
+ int valueCount = offsets.getAccessor().get(groupCount);
+ vector.getMutator().setValueCount(valueCount);
+ }
+ }
+
+ @Override
+ public void reset() {
+ lastSet = 0;
+ }
+
+ @Override
+ public void generateTestData(int values) {
+ }
+
+ }
+
+ public class Accessor implements ValueVector.Accessor {
+
+ @Override
+ public Object getObject(int index) {
+ List<Object> l = Lists.newArrayList();
+ int end = offsets.getAccessor().get(index+1);
+ for(int i = offsets.getAccessor().get(index); i < end; i++){
+ l.add(vector.getAccessor().getObject(i));
+ }
+ return l;
+ }
+
+ @Override
+ public int getValueCount() {
+ return offsets.getAccessor().getValueCount() - 1;
+ }
+
+ public void get(int index, RepeatedListHolder holder){
+ assert index <= getValueCapacity();
+ holder.start = offsets.getAccessor().get(index);
+ holder.end = offsets.getAccessor().get(index+1);
+ }
+
+ public void get(int index, ComplexHolder holder){
+ FieldReader reader = getReader();
+ reader.setPosition(index);
+ holder.reader = reader;
+ }
+
+ public void get(int index, int arrayIndex, ComplexHolder holder){
+ RepeatedListHolder h = new RepeatedListHolder();
+ get(index, h);
+ int offset = h.start + arrayIndex;
+
+ if(offset >= h.end){
+ holder.reader = NullReader.INSTANCE;
+ }else{
+ FieldReader r = vector.getAccessor().getReader();
+ r.setPosition(offset);
+ holder.reader = r;
+ }
+
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public FieldReader getReader() {
+ return reader;
+ }
+
+ }
+
+ @Override
+ public int getBufferSize() {
+ return offsets.getBufferSize() + vector.getBufferSize();
+ }
+
+ @Override
+ public void close() {
+ offsets.close();
+ if(vector != null) vector.close();
+ }
+
+ @Override
+ public void clear() {
+ lastSet = 0;
+ offsets.clear();
+ if(vector != null) vector.clear();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return field;
+ }
+
+ @Override
+ public TransferPair getTransferPair() {
+ return new RepeatedListTransferPair(field.getPath());
+ }
+
+
+ public class RepeatedListTransferPair implements TransferPair{
+ private final RepeatedListVector from = RepeatedListVector.this;
+ private final RepeatedListVector to;
+ private final TransferPair vectorTransfer;
+
+ private RepeatedListTransferPair(RepeatedListVector to){
+ this.to = to;
+ if(to.vector == null){
+ to.vector = to.addOrGet(null, vector.getField().getType(), vector.getClass());
+ to.vector.allocateNew();
+ }
+ this.vectorTransfer = vector.makeTransferPair(to.vector);
+ }
+
+ private RepeatedListTransferPair(SchemaPath path){
+ this.to = new RepeatedListVector(path, allocator);
+ vectorTransfer = vector.getTransferPair();
+ this.to.vector = vectorTransfer.getTo();
+ }
+
+ @Override
+ public void transfer() {
+ offsets.transferTo(to.offsets);
+ vectorTransfer.transfer();
+ to.valueCount = valueCount;
+ clear();
+ }
+
+ @Override
+ public ValueVector getTo() {
+ return to;
+ }
+
+ @Override
+ public void splitAndTransfer(int startIndex, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public boolean copyValueSafe(int from, int to) {
+ RepeatedListHolder holder = new RepeatedListHolder();
+ accessor.get(from, holder);
+ int newIndex = this.to.offsets.getAccessor().get(to);
+ //todo: make this a bulk copy.
+ for(int i = holder.start; i < holder.end; i++, newIndex++){
+ if(!vectorTransfer.copyValueSafe(i, newIndex)) return false;
+ }
+ if(!this.to.offsets.getMutator().setSafe(to, newIndex)) return false;
+
+ return true;
+ }
+
+ }
+
+ @Override
+ public TransferPair makeTransferPair(ValueVector to) {
+ if(!(to instanceof RepeatedListVector ) ) throw new IllegalArgumentException("You can't make a transfer pair from an incompatible .");
+ return new RepeatedListTransferPair( (RepeatedListVector) to);
+ }
+
+ @Override
+ public TransferPair getTransferPair(FieldReference ref) {
+ return new RepeatedListTransferPair(ref);
+ }
+
+ @Override
+ public int getValueCapacity() {
+ if(vector == null) return offsets.getValueCapacity() - 1;
+ return Math.min(offsets.getValueCapacity() - 1, vector.getValueCapacity());
+ }
+
+ @Override
+ public Accessor getAccessor() {
+ return accessor;
+ }
+
+ @Override
+ public ByteBuf[] getBuffers() {
+ return ArrayUtils.addAll(offsets.getBuffers(), vector.getBuffers());
+ }
+
+ private void setVector(ValueVector v){
+ field.addChild(v.getField());
+ this.vector = v;
+ }
+
+ @Override
+ public void load(SerializedField metadata, ByteBuf buf) {
+ SerializedField childField = metadata.getChildList().get(0);
+
+ int bufOffset = offsets.load(metadata.getValueCount()+1, buf);
+
+ MaterializedField fieldDef = MaterializedField.create(childField);
+ if(vector == null) {
+ setVector(TypeHelper.getNewVector(fieldDef, allocator));
+ }
+
+ if (childField.getValueCount() == 0){
+ vector.clear();
+ } else {
+ vector.load(childField, buf.slice(bufOffset, childField.getBufferLength()));
+ }
+ }
+
+ @Override
+ public SerializedField getMetadata() {
+ return getField() //
+ .getAsBuilder() //
+ .setBufferLength(getBufferSize()) //
+ .setValueCount(accessor.getValueCount()) //
+ .addChild(vector.getMetadata()) //
+ .build();
+ }
+
+ private void populateEmpties(int groupCount){
+ int previousEnd = offsets.getAccessor().get(lastSet + 1);
+ for(int i = lastSet + 2; i <= groupCount; i++){
+ offsets.getMutator().setSafe(i, previousEnd);
+ }
+ lastSet = groupCount - 1;
+ }
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return Collections.singleton(vector).iterator();
+ }
+
+ @Override
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+ Preconditions.checkArgument(name == null);
+
+ if(vector == null){
+ vector = TypeHelper.getNewVector(MaterializedField.create(field.getPath().getUnindexedArrayChild(), type), allocator);
+ }
+ return typeify(vector, clazz);
+ }
+
+ @Override
+ public <T extends ValueVector> T get(String name, Class<T> clazz) {
+ if(name != null) return null;
+ return typeify(vector, clazz);
+ }
+
+ @Override
+ public void allocateNew(int parentValueCount, int childValueCount) {
+ clear();
+ offsets.allocateNew(parentValueCount+1);
+ mutator.reset();
+ accessor.reset();
+ }
+
+ @Override
+ public int load(int parentValueCount, int childValueCount, ByteBuf buf) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VectorWithOrdinal getVectorWithOrdinal(String name) {
+ if(name != null) return null;
+ return new VectorWithOrdinal(vector, 0);
+ }
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
new file mode 100644
index 000000000..2492cc89f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -0,0 +1,478 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.holders.ComplexHolder;
+import org.apache.drill.exec.expr.holders.RepeatedMapHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.impl.NullReader;
+import org.apache.drill.exec.vector.complex.impl.RepeatedMapReaderImpl;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class RepeatedMapVector extends AbstractContainerVector implements RepeatedFixedWidthVector {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class);
+
+ public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build();
+
+ private final UInt4Vector offsets; // offsets to start of each record
+ private final Map<String, ValueVector> vectors = Maps.newHashMap();
+ private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap();
+ private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this);
+ private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>();
+ private final Accessor accessor = new Accessor();
+ private final Mutator mutator = new Mutator();
+ private final BufferAllocator allocator;
+ private final MaterializedField field;
+ private int lastSet = 0;
+
+ public RepeatedMapVector(MaterializedField field, BufferAllocator allocator){
+ this.field = field;
+ this.allocator = allocator;
+ this.offsets = new UInt4Vector(null, allocator);
+
+ }
+
+ @Override
+ public void allocateNew(int parentValueCount, int childValueCount) {
+ clear();
+ offsets.allocateNew(parentValueCount+1);
+ mutator.reset();
+ accessor.reset();
+ }
+
+ public Iterator<String> fieldNameIterator(){
+ return vectors.keySet().iterator();
+ }
+
+ public int size(){
+ return vectors.size();
+ }
+
+ @Override
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+ ValueVector v = vectors.get(name);
+
+ if(v == null){
+ v = TypeHelper.getNewVector(field.getPath(), name, allocator, type);
+ Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.", type));
+ put(name, v);
+ }
+ return typeify(v, clazz);
+
+ }
+
+ @Override
+ public <T extends ValueVector> T get(String name, Class<T> clazz) {
+ ValueVector v = vectors.get(name);
+ if(v == null) throw new IllegalStateException(String.format("Attempting to access invalid map field of name %s.", name));
+ return typeify(v, clazz);
+ }
+
+ @Override
+ public int getBufferSize() {
+ if(accessor.getValueCount() == 0 || vectors.isEmpty()) return 0;
+ long buffer = offsets.getBufferSize();
+ for(ValueVector v : this){
+ buffer += v.getBufferSize();
+ }
+
+ return (int) buffer;
+ }
+
+ @Override
+ public void close() {
+ for(ValueVector v : this){
+ v.close();
+ }
+ }
+
+ @Override
+ public Iterator<ValueVector> iterator() {
+ return vectors.values().iterator();
+ }
+
+ @Override
+ public MaterializedField getField() {
+ return field;
+ }
+
+ @Override
+ public TransferPair getTransferPair() {
+ return new MapTransferPair(field.getPath());
+ }
+
+ @Override
+ public TransferPair makeTransferPair(ValueVector to) {
+ return new MapTransferPair( (RepeatedMapVector) to);
+ }
+
+ MapSingleCopier makeSingularCopier(MapVector to){
+ return new MapSingleCopier(to);
+ }
+
+
+ class MapSingleCopier{
+ private final TransferPair[] pairs;
+ final RepeatedMapVector from = RepeatedMapVector.this;
+
+ public MapSingleCopier(MapVector to){
+ pairs = new TransferPair[vectors.size()];
+ int i =0;
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ int preSize = to.vectors.size();
+ ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
+ if(to.vectors.size() != preSize) v.allocateNew();
+ pairs[i++] = e.getValue().makeTransferPair(v);
+ }
+ }
+
+ public boolean copySafe(int fromSubIndex, int toIndex){
+ for(TransferPair p : pairs){
+ if(!p.copyValueSafe(fromSubIndex, toIndex)) return false;
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public TransferPair getTransferPair(FieldReference ref) {
+ return new MapTransferPair(ref);
+ }
+
+ @Override
+ public void allocateNew() throws OutOfMemoryRuntimeException {
+ if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException();
+ }
+
+ @Override
+ public boolean allocateNewSafe() {
+ if(!offsets.allocateNewSafe()) return false;
+ for(ValueVector v : vectors.values()){
+ if(!v.allocateNewSafe()) return false;
+ }
+ return true;
+ }
+
+ private class MapTransferPair implements TransferPair{
+
+ private final TransferPair[] pairs;
+ private final RepeatedMapVector to;
+ private final RepeatedMapVector from = RepeatedMapVector.this;
+
+ public MapTransferPair(SchemaPath path){
+ RepeatedMapVector v = new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator);
+ pairs = new TransferPair[vectors.size()];
+ int i =0;
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ TransferPair otherSide = e.getValue().getTransferPair();
+ v.put(e.getKey(), otherSide.getTo());
+ pairs[i++] = otherSide;
+ }
+ this.to = v;
+ }
+
+ public MapTransferPair(RepeatedMapVector to){
+ this.to = to;
+ pairs = new TransferPair[vectors.size()];
+ int i =0;
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass());
+ pairs[i++] = e.getValue().makeTransferPair(v);
+ }
+ }
+
+
+ @Override
+ public void transfer() {
+ offsets.transferTo(to.offsets);
+ for(TransferPair p : pairs){
+ p.transfer();
+ }
+ clear();
+ }
+
+ @Override
+ public ValueVector getTo() {
+ return to;
+ }
+
+ @Override
+ public boolean copyValueSafe(int from, int to) {
+ RepeatedMapHolder holder = new RepeatedMapHolder();
+ accessor.get(from, holder);
+ int newIndex = this.to.offsets.getAccessor().get(to);
+ //todo: make these bulk copies
+ for(int i = holder.start; i < holder.end; i++, newIndex++){
+ for(TransferPair p : pairs){
+ if(!p.copyValueSafe(from, to)) return false;
+ }
+ }
+ if(!this.to.offsets.getMutator().setSafe(to, newIndex)) return false;
+ return true;
+ }
+
+ @Override
+ public void splitAndTransfer(int startIndex, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+
+ transient private MapTransferPair ephPair;
+
+ public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedMapVector from){
+ if(ephPair == null || ephPair.from != from){
+ ephPair = (MapTransferPair) from.makeTransferPair(this);
+ }
+ return ephPair.copyValueSafe(fromIndex, thisIndex);
+ }
+
+ @Override
+ public int getValueCapacity() {
+ return offsets.getValueCapacity();
+ }
+
+ @Override
+ public Accessor getAccessor() {
+ return accessor;
+ }
+
+ @Override
+ public ByteBuf[] getBuffers() {
+ List<ByteBuf> bufs = Lists.newArrayList(offsets.getBuffers());
+
+ for(ValueVector v : vectors.values()){
+ for(ByteBuf b : v.getBuffers()){
+ bufs.add(b);
+ }
+ }
+ return bufs.toArray(new ByteBuf[bufs.size()]);
+ }
+
+
+ @Override
+ public void load(SerializedField metadata, ByteBuf buf) {
+ List<SerializedField> fields = metadata.getChildList();
+
+ int bufOffset = offsets.load(metadata.getValueCount()+1, buf);
+
+ for (SerializedField fmd : fields) {
+ MaterializedField fieldDef = MaterializedField.create(fmd);
+
+ ValueVector v = vectors.get(fieldDef.getLastName());
+ if(v == null) {
+ // if we arrive here, we didn't have a matching vector.
+
+ v = TypeHelper.getNewVector(fieldDef, allocator);
+ }
+ if (fmd.getValueCount() == 0){
+ v.clear();
+ } else {
+ v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
+ }
+ bufOffset += fmd.getBufferLength();
+ put(fieldDef.getLastName(), v);
+ }
+ }
+
+ @Override
+ public SerializedField getMetadata() {
+ SerializedField.Builder b = getField() //
+ .getAsBuilder() //
+ .setBufferLength(getBufferSize()) //
+ .setValueCount(accessor.getValueCount());
+
+ for(ValueVector v : vectors.values()){
+ b.addChild(v.getMetadata());
+ }
+ return b.build();
+ }
+
+ protected void put(String name, ValueVector vv){
+ int ordinal = vectors.size();
+ if(vectors.put(name, vv) != null){
+ throw new IllegalStateException();
+ }
+ vectorIds.put(name, new VectorWithOrdinal(vv, ordinal));
+ vectorsById.put(ordinal, vv);
+ field.addChild(vv.getField());
+ }
+
+
+ @Override
+ public Mutator getMutator() {
+ return mutator;
+ }
+
+ public class Accessor implements ValueVector.Accessor{
+
+ @Override
+ public Object getObject(int index) {
+ List<Object> l = Lists.newArrayList();
+ int end = offsets.getAccessor().get(index+1);
+ for(int i = offsets.getAccessor().get(index); i < end; i++){
+ Map<String, Object> vv = Maps.newHashMap();
+ for(Map.Entry<String, ValueVector> e : vectors.entrySet()){
+ ValueVector v = e.getValue();
+ String k = e.getKey();
+ Object value = v.getAccessor().getObject(i);
+ if(value != null){
+ vv.put(k,value);
+ }
+ }
+ l.add(vv);
+ }
+ return l;
+ }
+
+ @Override
+ public int getValueCount() {
+ return offsets.getAccessor().getValueCount() - 1;
+ }
+
+ public void get(int index, RepeatedMapHolder holder){
+ assert index <= getValueCapacity();
+ holder.start = offsets.getAccessor().get(index);
+ holder.end = offsets.getAccessor().get(index+1);
+ }
+
+ public void get(int index, ComplexHolder holder){
+ FieldReader reader = getReader();
+ reader.setPosition(index);
+ holder.reader = reader;
+ }
+
+ public void get(int index, int arrayIndex, ComplexHolder holder){
+ RepeatedMapHolder h = new RepeatedMapHolder();
+ get(index, h);
+ int offset = h.start + arrayIndex;
+
+ if(offset >= h.end){
+ holder.reader = NullReader.INSTANCE;
+ }else{
+ reader.setSinglePosition(index, arrayIndex);
+ holder.reader = reader;
+ }
+ }
+
+ @Override
+ public boolean isNull(int index) {
+ return false;
+ }
+
+ @Override
+ public void reset() {
+ }
+
+ @Override
+ public FieldReader getReader() {
+ return reader;
+ }
+
+ }
+
+ private void populateEmpties(int groupCount){
+ int previousEnd = offsets.getAccessor().get(lastSet + 1);
+ for(int i = lastSet + 2; i <= groupCount; i++){
+ offsets.getMutator().setSafe(i, previousEnd);
+ }
+ lastSet = groupCount - 1;
+ }
+
+ public class Mutator implements ValueVector.Mutator{
+
+ public void startNewGroup(int index) {
+ populateEmpties(index);
+ lastSet = index;
+ offsets.getMutator().set(index+1, offsets.getAccessor().get(index));
+ }
+
+ public int add(int index){
+ int nextOffset = offsets.getAccessor().get(index+1);
+ boolean success = offsets.getMutator().setSafe(index+1, nextOffset+1);
+ if(!success) return -1;
+ return nextOffset;
+ }
+
+ @Override
+ public void setValueCount(int groupCount) {
+ populateEmpties(groupCount);
+ offsets.getMutator().setValueCount(groupCount+1);
+ int valueCount = offsets.getAccessor().get(groupCount);
+ for(ValueVector v : vectors.values()){
+ v.getMutator().setValueCount(valueCount);
+ }
+ }
+
+ @Override
+ public void reset() {
+ lastSet = 0;
+ }
+
+ @Override
+ public void generateTestData(int values) {
+ }
+
+ }
+
+ @Override
+ public void clear() {
+ lastSet = 0;
+ offsets.clear();
+ for(ValueVector v : vectors.values()){
+ v.clear();;
+ }
+ }
+
+ @Override
+ public int load(int parentValueCount, int childValueCount, ByteBuf buf) {
+ throw new UnsupportedOperationException();
+ }
+
+
+ @Override
+ public VectorWithOrdinal getVectorWithOrdinal(String name) {
+ return vectorIds.get(name);
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/StateTool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/StateTool.java
new file mode 100644
index 000000000..99f601056
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/StateTool.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import java.util.Arrays;
+
+public class StateTool {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StateTool.class);
+
+ public static <T extends Enum<?>> void check(T currentState, T... expectedStates){
+ for(T s : expectedStates){
+ if(s == currentState) return;
+ }
+ throw new IllegalArgumentException(String.format("Expected to be in one of these states %s but was actuall in state %s", Arrays.toString(expectedStates), currentState));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java
new file mode 100644
index 000000000..43dba6573
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex;
+
+import org.apache.drill.exec.vector.complex.writer.FieldWriter;
+
+
+public class WriteState {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriteState.class);
+
+ private FieldWriter failPoint;
+
+ public boolean isFailed(){
+ return failPoint != null;
+ }
+
+ public boolean isOk(){
+ return failPoint == null;
+ }
+
+ public void fail(FieldWriter w){
+ assert failPoint == null;
+ failPoint = w;
+
+// System.out.println("Fail Point " + failPoint);
+ }
+
+ public void reset(){
+ failPoint = null;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
new file mode 100644
index 000000000..761bc797a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+import java.io.IOException;
+import java.io.Reader;
+
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.JsonParser.Feature;
+import com.google.common.base.Charsets;
+
+public class JsonReader {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class);
+
+ public static enum WriteState {
+ WRITE_SUCCEED, WRITE_FAILED, NO_MORE
+ }
+
+ private final JsonFactory factory = new JsonFactory();
+ private ByteBufInputStream stream;
+ private long byteOffset;
+ private JsonRecordSplitter splitter;
+ private Reader reader;
+ private JsonParser parser;
+
+ public JsonReader(JsonRecordSplitter splitter) throws JsonParseException, IOException {
+ this.splitter = splitter;
+ factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+ factory.configure(Feature.ALLOW_COMMENTS, true);
+ reader = splitter.getNextReader();
+ }
+
+ public WriteState write(ComplexWriter writer) throws JsonParseException, IOException {
+ if(reader == null){
+ reader = splitter.getNextReader();
+ if(reader == null) return WriteState.NO_MORE;
+
+ }
+
+ parser = factory.createJsonParser(reader);
+ reader.mark(1024*128);
+ JsonToken t = parser.nextToken();
+ while(!parser.hasCurrentToken()) t = parser.nextToken();
+
+
+ switch (t) {
+ case START_OBJECT:
+ writeData(writer.rootAsMap());
+ break;
+ case START_ARRAY:
+ writeData(writer.rootAsList());
+ break;
+ case NOT_AVAILABLE:
+ return WriteState.NO_MORE;
+ default:
+ throw new JsonParseException(
+ String.format("Failure while parsing JSON. Found token of [%s] Drill currently only supports parsing "
+ + "json strings that contain either lists or maps. The root object cannot be a scalar.",
+ t),
+ parser.getCurrentLocation());
+ }
+
+ if (!writer.ok()) {
+ reader.reset();
+ return WriteState.WRITE_FAILED;
+ } else {
+ reader = null;
+ return WriteState.WRITE_SUCCEED;
+ }
+ }
+
+
+ private void writeData(MapWriter map) throws JsonParseException, IOException {
+ //
+ map.start();
+ outside: while(true){
+ JsonToken t = parser.nextToken();
+ if(t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) return;
+
+ assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name());
+ final String fieldName = parser.getText();
+
+
+ switch(parser.nextToken()){
+ case START_ARRAY:
+ writeData(map.list(fieldName));
+ break;
+ case START_OBJECT:
+ writeData(map.map(fieldName));
+ break;
+ case END_OBJECT:
+ break outside;
+
+ case VALUE_EMBEDDED_OBJECT:
+ case VALUE_FALSE: {
+ BitHolder h = new BitHolder();
+ h.value = 0;
+ map.bit(fieldName).write(h);
+ break;
+ }
+ case VALUE_TRUE: {
+ BitHolder h = new BitHolder();
+ h.value = 1;
+ map.bit(fieldName).write(h);
+ break;
+ }
+ case VALUE_NULL:
+ // do nothing as we don't have a type.
+ break;
+ case VALUE_NUMBER_FLOAT:
+ Float8Holder fh = new Float8Holder();
+ fh.value = parser.getDoubleValue();
+ map.float8(fieldName).write(fh);
+ break;
+ case VALUE_NUMBER_INT:
+ BigIntHolder bh = new BigIntHolder();
+ bh.value = parser.getLongValue();
+ map.bigInt(fieldName).write(bh);
+ break;
+ case VALUE_STRING:
+ VarCharHolder vh = new VarCharHolder();
+ String value = parser.getText();
+ byte[] b = value.getBytes(Charsets.UTF_8);
+ ByteBuf d = UnpooledByteBufAllocator.DEFAULT.buffer(b.length);
+ d.setBytes(0, b);
+ vh.buffer = d;
+ vh.start = 0;
+ vh.end = b.length;
+ map.varChar(fieldName).write(vh);
+ break;
+
+ default:
+ throw new IllegalStateException("Unexpected token " + parser.getCurrentToken());
+
+ }
+
+ }
+ map.end();
+
+ }
+
+ private void writeData(ListWriter list) throws JsonParseException, IOException {
+ list.start();
+ outside: while(true){
+
+ switch(parser.nextToken()){
+ case START_ARRAY:
+ writeData(list.list());
+ break;
+ case START_OBJECT:
+ writeData(list.map());
+ break;
+ case END_ARRAY:
+ case END_OBJECT:
+ break outside;
+
+ case VALUE_EMBEDDED_OBJECT:
+ case VALUE_FALSE:{
+ BitHolder h = new BitHolder();
+ h.value = 0;
+ list.bit().write(h);
+ break;
+ }
+ case VALUE_TRUE: {
+ BitHolder h = new BitHolder();
+ h.value = 1;
+ list.bit().write(h);
+ break;
+ }
+ case VALUE_NULL:
+ // do nothing as we don't have a type.
+ break;
+ case VALUE_NUMBER_FLOAT:
+ Float8Holder fh = new Float8Holder();
+ fh.value = parser.getDoubleValue();
+ list.float8().write(fh);
+ break;
+ case VALUE_NUMBER_INT:
+ BigIntHolder bh = new BigIntHolder();
+ bh.value = parser.getLongValue();
+ list.bigInt().write(bh);
+ break;
+ case VALUE_STRING:
+ VarCharHolder vh = new VarCharHolder();
+ String value = parser.getText();
+ byte[] b = value.getBytes(Charsets.UTF_8);
+ ByteBuf d = UnpooledByteBufAllocator.DEFAULT.buffer(b.length);
+ d.setBytes(0, b);
+ vh.buffer = d;
+ vh.start = 0;
+ vh.end = b.length;
+ list.varChar().write(vh);
+
+ default:
+ throw new IllegalStateException("Unexpected token " + parser.getCurrentToken());
+ }
+ }
+ list.end();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java
new file mode 100644
index 000000000..6f6e7afbb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+import java.io.Reader;
+
+public interface JsonRecordSplitter {
+
+ public abstract Reader getNextReader() throws IOException;
+
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
new file mode 100644
index 000000000..0624eceb1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+
+public class JsonWriter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonWriter.class);
+
+ private final JsonFactory factory = new JsonFactory();
+ private final JsonGenerator gen;
+
+ public JsonWriter(OutputStream out, boolean pretty) throws IOException{
+ JsonGenerator writer = factory.createJsonGenerator(out);
+ gen = pretty ? writer.useDefaultPrettyPrinter() : writer;
+ }
+
+ public void write(FieldReader reader) throws JsonGenerationException, IOException{
+ writeValue(reader);
+ gen.flush();
+ }
+
+ private void writeValue(FieldReader reader) throws JsonGenerationException, IOException{
+ final DataMode m = reader.getType().getMode();
+ final MinorType mt = reader.getType().getMinorType();
+
+ switch(m){
+ case OPTIONAL:
+ if(!reader.isSet()){
+ gen.writeNull();
+ break;
+ }
+
+ case REQUIRED:
+
+
+ switch (mt) {
+ case FLOAT4:
+ gen.writeNumber(reader.readFloat());
+ break;
+ case FLOAT8:
+ gen.writeNumber(reader.readDouble());
+ break;
+ case INT:
+ Integer i = reader.readInteger();
+ if(i == null){
+ gen.writeNull();
+ }else{
+ gen.writeNumber(reader.readInteger());
+ }
+ break;
+ case SMALLINT:
+ gen.writeNumber(reader.readShort());
+ break;
+ case TINYINT:
+ gen.writeNumber(reader.readByte());
+ break;
+ case BIGINT:
+ Long l = reader.readLong();
+ if(l == null){
+ gen.writeNull();
+ }else{
+ gen.writeNumber(reader.readLong());
+ }
+
+ break;
+ case BIT:
+ gen.writeBoolean(reader.readBoolean());
+ break;
+
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case TIMESTAMPTZ:
+ gen.writeString(reader.readDateTime().toString());
+
+ case INTERVALYEAR:
+ case INTERVALDAY:
+ case INTERVAL:
+ gen.writeString(reader.readPeriod().toString());
+ break;
+ case DECIMAL28DENSE:
+ case DECIMAL28SPARSE:
+ case DECIMAL38DENSE:
+ case DECIMAL38SPARSE:
+ case DECIMAL9:
+ case DECIMAL18:
+ gen.writeNumber(reader.readBigDecimal());
+ break;
+
+ case LIST:
+ // this is a pseudo class, doesn't actually contain the real reader so we have to drop down.
+ writeValue(reader.reader());
+ break;
+ case MAP:
+ gen.writeStartObject();
+ for(String name : reader){
+ if(reader.isSet()){
+ gen.writeFieldName(name);
+ writeValue(reader.reader(name));
+ }
+ }
+ gen.writeEndObject();
+ break;
+ case NULL:
+ gen.writeNull();
+ break;
+
+ case VAR16CHAR:
+ gen.writeString(reader.readString());
+ break;
+ case VARBINARY:
+ gen.writeBinary(reader.readByteArray());
+ break;
+ case VARCHAR:
+ gen.writeString(reader.readText().toString());
+ break;
+
+ }
+ break;
+
+ case REPEATED:
+ gen.writeStartArray();
+ switch (mt) {
+ case FLOAT4:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeNumber(reader.readFloat(i));
+ }
+
+ break;
+ case FLOAT8:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeNumber(reader.readDouble(i));
+ }
+ break;
+ case INT:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeNumber(reader.readInteger(i));
+ }
+ break;
+ case SMALLINT:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeNumber(reader.readShort(i));
+ }
+ break;
+ case TINYINT:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeNumber(reader.readByte(i));
+ }
+ break;
+ case BIGINT:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeNumber(reader.readLong(i));
+ }
+ break;
+ case BIT:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeBoolean(reader.readBoolean(i));
+ }
+ break;
+
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case TIMESTAMPTZ:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeString(reader.readDateTime(i).toString());
+ }
+
+ case INTERVALYEAR:
+ case INTERVALDAY:
+ case INTERVAL:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeString(reader.readPeriod(i).toString());
+ }
+ break;
+ case DECIMAL28DENSE:
+ case DECIMAL28SPARSE:
+ case DECIMAL38DENSE:
+ case DECIMAL38SPARSE:
+ case DECIMAL9:
+ case DECIMAL18:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeNumber(reader.readBigDecimal(i));
+ }
+ break;
+
+ case LIST:
+ for(int i = 0; i < reader.size(); i++){
+ while(reader.next()){
+ writeValue(reader.reader());
+ }
+ }
+ break;
+ case MAP:
+ while(reader.next()){
+ gen.writeStartObject();
+ for(String name : reader){
+ FieldReader mapField = reader.reader(name);
+ if(mapField.isSet()){
+ gen.writeFieldName(name);
+ writeValue(mapField);
+ }
+ }
+ gen.writeEndObject();
+ }
+ break;
+ case NULL:
+ break;
+
+ case VAR16CHAR:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeString(reader.readString(i));
+ }
+ break;
+ case VARBINARY:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeBinary(reader.readByteArray(i));
+ }
+ break;
+ case VARCHAR:
+ for(int i = 0; i < reader.size(); i++){
+ gen.writeString(reader.readText(i).toString());
+ }
+ break;
+
+ }
+ gen.writeEndArray();
+ break;
+ }
+
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
new file mode 100644
index 000000000..0cdbf852b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+
+import com.google.common.io.CharStreams;
+
+public class ReaderJSONRecordSplitter implements JsonRecordSplitter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class);
+
+ private static final int OPEN_CBRACKET = '{';
+ private static final int OPEN_BRACKET = '[';
+ private static final int CLOSE_CBRACKET = '}';
+ private static final int CLOSE_BRACKET = ']';
+
+ private static final int SPACE = ' ';
+ private static final int TAB = '\t';
+ private static final int NEW_LINE = '\n';
+ private static final int FORM_FEED = '\f';
+ private static final int CR = '\r';
+
+ private long start = 0;
+ private Reader reader;
+
+ public ReaderJSONRecordSplitter(Reader reader){
+ this.reader = reader;
+ }
+
+ public ReaderJSONRecordSplitter(String str){
+ this.reader = new StringReader(str);
+ }
+
+ @Override
+ public Reader getNextReader() throws IOException{
+
+ boolean inCandidate = false;
+ boolean found = false;
+
+ reader.mark(128*1024);
+ long endOffset = start;
+ outside: while(true){
+ int c = reader.read();
+// System.out.println(b);
+ endOffset++;
+
+ if(c == -1){
+ if(inCandidate){
+ found = true;
+ }
+ break;
+ }
+
+ switch(c){
+ case CLOSE_BRACKET:
+ case CLOSE_CBRACKET:
+// System.out.print("c");
+ inCandidate = true;
+ break;
+ case OPEN_BRACKET:
+ case OPEN_CBRACKET:
+// System.out.print("o");
+ if(inCandidate){
+ found = true;
+ break outside;
+ }
+ break;
+
+ case SPACE:
+ case TAB:
+ case NEW_LINE:
+ case CR:
+ case FORM_FEED:
+// System.out.print(' ');
+ break;
+
+ default:
+// System.out.print('-');
+ inCandidate = false;
+ }
+ }
+
+ if(found){
+ long maxBytes = endOffset - 1 - start;
+ start = endOffset;
+ reader.reset();
+ return new LimitedReader(reader, (int) maxBytes);
+ }else{
+ return null;
+ }
+
+ }
+
+ private class LimitedReader extends Reader {
+
+ private final Reader incoming;
+ private final int maxBytes;
+ private int markedBytes = 0;
+ private int bytes = 0;
+
+ public LimitedReader(Reader in, int maxBytes) {
+ this.maxBytes = maxBytes;
+ this.incoming = in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (bytes >= maxBytes){
+ return -1;
+ }else{
+ bytes++;
+ return incoming.read();
+ }
+
+
+ }
+
+
+ @Override
+ public void mark(int readAheadLimit) throws IOException {
+ incoming.mark(readAheadLimit);
+ markedBytes = bytes;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ incoming.reset();
+ bytes = markedBytes;
+ }
+
+ @Override
+ public int read(char[] cbuf, int off, int len) throws IOException {
+ int outputLength = Math.min(len, maxBytes - bytes);
+ if(outputLength > 0){
+ incoming.read(cbuf, off, outputLength);
+ bytes += outputLength;
+ return outputLength;
+ }else{
+ return -1;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception{
+ String str = " { \"b\": \"hello\", \"c\": \"goodbye\", r: []}\n { \"b\": \"yellow\", \"c\": \"red\"}\n ";
+ JsonRecordSplitter splitter = new ReaderJSONRecordSplitter(new StringReader(str));
+ Reader obj = null;
+ System.out.println();
+
+ while( (obj = splitter.getNextReader()) != null){
+ System.out.println();
+ System.out.println(CharStreams.toString(obj));
+ System.out.println("===end obj===");
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
new file mode 100644
index 000000000..e46e1bdc2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.fn;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.CharStreams;
+
+public class UTF8JsonRecordSplitter implements JsonRecordSplitter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UTF8JsonRecordSplitter.class);
+
+ private static final int OPEN_CBRACKET = '{';
+ private static final int OPEN_BRACKET = '[';
+ private static final int CLOSE_CBRACKET = '}';
+ private static final int CLOSE_BRACKET = ']';
+
+ private static final int SPACE = ' ';
+ private static final int TAB = '\t';
+ private static final int NEW_LINE = '\n';
+ private static final int FORM_FEED = '\f';
+ private static final int CR = '\r';
+
+ private long start = 0;
+ private InputStream incoming;
+
+ public UTF8JsonRecordSplitter(InputStream incoming){
+ this.incoming = new BufferedInputStream(incoming);
+ }
+
+ @Override
+ public Reader getNextReader() throws IOException{
+
+ boolean inCandidate = false;
+ boolean found = false;
+
+ incoming.mark(128*1024);
+ long endOffset = start;
+ outside: while(true){
+ int b = incoming.read();
+// System.out.println(b);
+ endOffset++;
+
+ if(b == -1){
+ if(inCandidate){
+ found = true;
+ }
+ break;
+ }
+
+ switch(b){
+ case CLOSE_BRACKET:
+ case CLOSE_CBRACKET:
+// System.out.print("c");
+ inCandidate = true;
+ break;
+ case OPEN_BRACKET:
+ case OPEN_CBRACKET:
+// System.out.print("o");
+ if(inCandidate){
+ found = true;
+ break outside;
+ }
+ break;
+
+ case SPACE:
+ case TAB:
+ case NEW_LINE:
+ case CR:
+ case FORM_FEED:
+// System.out.print(' ');
+ break;
+
+ default:
+// System.out.print('-');
+ inCandidate = false;
+ }
+ }
+
+ if(found){
+ long maxBytes = endOffset - 1 - start;
+ start = endOffset;
+ incoming.reset();
+ return new BufferedReader(new InputStreamReader(new DelInputStream(incoming, maxBytes), Charsets.UTF_8));
+ }else{
+ return null;
+ }
+
+ }
+
+ private class DelInputStream extends InputStream {
+
+ private final InputStream incoming;
+ private final long maxBytes;
+ private long bytes = 0;
+
+ public DelInputStream(InputStream in, long maxBytes) {
+ this.maxBytes = maxBytes;
+ this.incoming = in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (bytes >= maxBytes){
+ return -1;
+ }else{
+ bytes++;
+ return incoming.read();
+ }
+
+
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception{
+ byte[] str = " { \"b\": \"hello\", \"c\": \"goodbye\", r: []}\n { \"b\": \"yellow\", \"c\": \"red\"}\n ".getBytes(Charsets.UTF_8);
+ InputStream s = new ByteArrayInputStream(str);
+ JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(s);
+ Reader obj = null;
+ System.out.println();
+
+ while( (obj = splitter.getNextReader()) != null){
+ System.out.println();
+ System.out.println(CharStreams.toString(obj));
+ System.out.println("===end obj===");
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
new file mode 100644
index 000000000..8f892e73a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.impl;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+
+
+abstract class AbstractBaseReader implements FieldReader{
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseReader.class);
+
+ private int index;
+
+ public AbstractBaseReader() {
+ super();
+ }
+
+ public void setPosition(int index){
+ this.index = index;
+ }
+
+ int idx(){
+ return index;
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ throw new IllegalStateException("The current reader doesn't support reading as a map.");
+ }
+
+ public MajorType getType(){
+ throw new IllegalStateException("The current reader doesn't support getting type information.");
+ }
+
+ @Override
+ public boolean next() {
+ throw new IllegalStateException("The current reader doesn't support getting next information.");
+ }
+
+ @Override
+ public int size() {
+ throw new IllegalStateException("The current reader doesn't support getting size information.");
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
new file mode 100644
index 000000000..7aa984689
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.impl;
+
+import org.apache.drill.exec.vector.complex.WriteState;
+import org.apache.drill.exec.vector.complex.writer.FieldWriter;
+
+
+abstract class AbstractBaseWriter implements FieldWriter{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseWriter.class);
+
+ final WriteState state;
+ final FieldWriter parent;
+ private int index;
+
+ public AbstractBaseWriter(FieldWriter parent) {
+ super();
+ this.state = parent == null ? new WriteState() : parent.getState();
+ this.parent = parent;
+ }
+
+ public FieldWriter getParent() {
+ return parent;
+ }
+
+ public boolean ok(){
+ return state.isOk();
+ }
+
+ public boolean isRoot(){
+ return parent == null;
+ }
+
+ int idx(){
+ return index;
+ }
+ protected void resetState(){
+ state.reset();
+ }
+
+ public void setPosition(int index){
+ this.index = index;
+ }
+
+ void inform(boolean outcome){
+ if(!outcome){
+ state.fail(this);
+ }
+ }
+
+ public WriteState getState(){
+ return state;
+ }
+
+ public void end(){
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
new file mode 100644
index 000000000..c6ea75bb3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.impl;
+
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.StateTool;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+import com.google.hive12.common.base.Preconditions;
+
+public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWriter{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class);
+
+ SingleMapWriter mapRoot;
+ SingleListWriter listRoot;
+ MapVector container;
+
+ Mode mode = Mode.INIT;
+ private final String name;
+
+ private enum Mode { INIT, MAP, LIST };
+
+ public ComplexWriterImpl(String name, MapVector container){
+ super(null);
+ this.name = name;
+ this.container = container;
+ }
+
+ private void check(Mode... modes){
+ StateTool.check(mode, modes);
+ }
+
+ public void reset(){
+ setPosition(0);
+ resetState();
+ }
+
+ public void clear(){
+ switch(mode){
+ case MAP:
+ mapRoot.clear();
+ break;
+ case LIST:
+ listRoot.clear();
+ break;
+ }
+ }
+
+ public void setValueCount(int count){
+ switch(mode){
+ case MAP:
+ mapRoot.setValueCount(count);
+ break;
+ case LIST:
+ listRoot.setValueCount(count);
+ break;
+ }
+ }
+
+ public void setPosition(int index){
+ super.setPosition(index);
+ switch(mode){
+ case MAP:
+ mapRoot.setPosition(index);
+ break;
+ case LIST:
+ listRoot.setPosition(index);
+ break;
+ }
+ }
+
+
+ public MapWriter directMap(){
+ Preconditions.checkArgument(name == null);
+
+ switch(mode){
+
+ case INIT:
+ MapVector map = (MapVector) container;
+ mapRoot = new SingleMapWriter(map, this);
+ mapRoot.setPosition(idx());
+ mode = Mode.MAP;
+ break;
+
+ case MAP:
+ break;
+
+ default:
+ check(Mode.INIT, Mode.MAP);
+ }
+
+ return mapRoot;
+ }
+
+ @Override
+ public MapWriter rootAsMap() {
+ switch(mode){
+
+ case INIT:
+ MapVector map = container.addOrGet(name, Types.required(MinorType.MAP), MapVector.class);
+ mapRoot = new SingleMapWriter(map, this);
+ mapRoot.setPosition(idx());
+ mode = Mode.MAP;
+ break;
+
+ case MAP:
+ break;
+
+ default:
+ check(Mode.INIT, Mode.MAP);
+ }
+
+ return mapRoot;
+ }
+
+
+ @Override
+ public void allocate() {
+ if(mapRoot != null){
+ mapRoot.allocate();
+ }else if(listRoot != null){
+ listRoot.allocate();
+ }
+ }
+
+ @Override
+ public ListWriter rootAsList() {
+ switch(mode){
+
+ case INIT:
+ listRoot = new SingleListWriter(name, container, this);
+ listRoot.setPosition(idx());
+ mode = Mode.LIST;
+ break;
+
+ case LIST:
+ break;
+
+ default:
+ check(Mode.INIT, Mode.MAP);
+ }
+
+ return listRoot;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
new file mode 100644
index 000000000..c555f35ff
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java
@@ -0,0 +1,113 @@
+
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.vector.complex.impl;
+
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.holders.RepeatedListHolder;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedListVector;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+
+public class RepeatedListReaderImpl extends AbstractFieldReader{
+ private static final int NO_VALUES = Integer.MAX_VALUE - 1;
+ private static final MajorType TYPE = Types.repeated(MinorType.LIST);
+ private final String name;
+ private final RepeatedListVector container;
+ private FieldReader reader;
+
+ public RepeatedListReaderImpl(String name, RepeatedListVector container){
+ super();
+ this.name = name;
+ this.container = container;
+ }
+
+ public MajorType getType(){
+ return TYPE;
+ }
+
+ public void copyAsValue(ListWriter writer){
+ if(currentOffset == NO_VALUES) return;
+ RepeatedListWriter impl = (RepeatedListWriter) writer;
+ impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), container));
+ }
+
+ public void copyAsField(String name, MapWriter writer){
+ if(currentOffset == NO_VALUES) return;
+ RepeatedListWriter impl = (RepeatedListWriter) writer.list(name);
+ impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), container));
+ }
+
+ private int currentOffset;
+ private int maxOffset;
+
+ public int size(){
+ return maxOffset - currentOffset;
+ }
+
+ public void setPosition(int index){
+ super.setPosition(index);
+ RepeatedListHolder h = new RepeatedListHolder();
+ container.getAccessor().get(index, h);
+ if(h.start == h.end){
+ currentOffset = NO_VALUES;
+ }else{
+ currentOffset = h.start-1;
+ maxOffset = h.end;
+ if(reader != null) reader.setPosition(currentOffset);
+ }
+ }
+
+ public boolean next(){
+ if(currentOffset +1 < maxOffset){
+ currentOffset++;
+ if(reader != null) reader.setPosition(currentOffset);
+ return true;
+ }else{
+ currentOffset = NO_VALUES;
+ return false;
+ }
+ }
+
+ @Override
+ public Object readObject() {
+ return container.getAccessor().getObject(idx());
+ }
+
+ public FieldReader reader(){
+ if(reader == null){
+ reader = container.get(name, ValueVector.class).getAccessor().getReader();
+ reader.setPosition(currentOffset);
+ }
+ return reader;
+ }
+
+ public boolean isSet(){
+ return true;
+ }
+
+
+}
+
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
new file mode 100644
index 000000000..ab778ff3f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java
@@ -0,0 +1,205 @@
+
+
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.vector.complex.impl;
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ObjectArrays;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.SchemaDefProtos;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos.*;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.complex.*;
+import org.apache.drill.exec.vector.complex.reader.*;
+import org.apache.drill.exec.vector.complex.writer.*;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+
+import com.sun.codemodel.JType;
+import com.sun.codemodel.JCodeModel;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.List;
+import java.io.Closeable;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.apache.hadoop.io.Text;
+
+
+
+
+
+
+
+
+
+
+
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.vector.complex.MapVector;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("unused")
+public class RepeatedMapReaderImpl extends AbstractFieldReader{
+ private static final int NO_VALUES = Integer.MAX_VALUE - 1;
+
+ private final RepeatedMapVector vector;
+ private final Map<String, FieldReader> fields = Maps.newHashMap();
+
+ public RepeatedMapReaderImpl(RepeatedMapVector vector) {
+ this.vector = vector;
+ }
+
+ private void setChildrenPosition(int index){
+ for(FieldReader r : fields.values()){
+ r.setPosition(index);
+ }
+ }
+
+ public FieldReader reader(String name){
+ FieldReader reader = fields.get(name);
+ if(reader == null){
+ ValueVector child = vector.get(name, ValueVector.class);
+ if(child == null){
+ reader = NullReader.INSTANCE;
+ }else{
+ reader = child.getAccessor().getReader();
+ }
+ fields.put(name, reader);
+ reader.setPosition(currentOffset);
+ }
+ return reader;
+ }
+
+
+ private int currentOffset;
+ private int maxOffset;
+
+ public int size(){
+ return maxOffset - currentOffset;
+ }
+
+ public void setPosition(int index){
+ super.setPosition(index);
+ RepeatedMapHolder h = new RepeatedMapHolder();
+ vector.getAccessor().get(index, h);
+ if(h.start == h.end){
+ currentOffset = NO_VALUES;
+ }else{
+ currentOffset = h.start-1;
+ maxOffset = h.end;
+ setChildrenPosition(currentOffset);
+ }
+ }
+
+ public void setSinglePosition(int index, int childIndex){
+ super.setPosition(index);
+ RepeatedMapHolder h = new RepeatedMapHolder();
+ vector.getAccessor().get(index, h);
+ if(h.start == h.end){
+ currentOffset = NO_VALUES;
+ }else{
+ int singleOffset = h.start + childIndex;
+ assert singleOffset < h.end;
+ currentOffset = singleOffset;
+ maxOffset = singleOffset + 1;
+ setChildrenPosition(singleOffset);
+ }
+ }
+
+ public boolean next(){
+ if(currentOffset +1 < maxOffset){
+ setChildrenPosition(++currentOffset);
+ return true;
+ }else{
+ currentOffset = NO_VALUES;
+ return false;
+ }
+ }
+
+ @Override
+ public Object readObject() {
+ return vector.getAccessor().getObject(idx());
+ }
+
+ public MajorType getType(){
+ return vector.getField().getType();
+ }
+
+ public java.util.Iterator<String> iterator(){
+ return vector.fieldNameIterator();
+ }
+
+ @Override
+ public boolean isSet() {
+ return false;
+ }
+
+ public void copyAsValue(MapWriter writer){
+ if(currentOffset == NO_VALUES) return;
+ RepeatedMapWriter impl = (RepeatedMapWriter) writer;
+ impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector));
+ }
+
+ public void copyAsValueSingle(MapWriter writer){
+ if(currentOffset == NO_VALUES) return;
+ SingleMapWriter impl = (SingleMapWriter) writer;
+ impl.inform(impl.container.copyFromSafe(currentOffset, impl.idx(), vector));
+ }
+
+ public void copyAsField(String name, MapWriter writer){
+ if(currentOffset == NO_VALUES) return;
+ RepeatedMapWriter impl = (RepeatedMapWriter) writer.map(name);
+ impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector));
+ }
+
+
+}
+
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java
new file mode 100644
index 000000000..36e04a7bb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java
@@ -0,0 +1,92 @@
+
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.vector.complex.impl;
+
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractContainerVector;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+
+
+
+
+
+
+
+
+@SuppressWarnings("unused")
+public class SingleListReaderImpl extends AbstractFieldReader{
+
+ private static final MajorType TYPE = Types.optional(MinorType.LIST);
+ private final String name;
+ private final AbstractContainerVector container;
+ private FieldReader reader;
+
+ public SingleListReaderImpl(String name, AbstractContainerVector container){
+ super();
+ this.name = name;
+ this.container = container;
+ }
+
+ public MajorType getType(){
+ return TYPE;
+ }
+
+
+ public void setPosition(int index){
+ super.setPosition(index);
+ if(reader != null) reader.setPosition(index);
+ }
+
+ @Override
+ public Object readObject() {
+ return reader.readObject();
+ }
+
+ public FieldReader reader(){
+ if(reader == null){
+ reader = container.get(name, ValueVector.class).getAccessor().getReader();
+ setPosition(idx());
+ }
+ return reader;
+ }
+
+ @Override
+ public boolean isSet() {
+ return false;
+ }
+
+ public void copyAsValue(ListWriter writer){
+ throw new UnsupportedOperationException("Generic list copying not yet supported. Please resolve to typed list.");
+ }
+
+ public void copyAsField(String name, MapWriter writer){
+ throw new UnsupportedOperationException("Generic list copying not yet supported. Please resolve to typed list.");
+ }
+
+
+
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
new file mode 100644
index 000000000..2158fcc8c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java
@@ -0,0 +1,154 @@
+
+
+/*******************************************************************************
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.vector.complex.impl;
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ObjectArrays;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.SchemaDefProtos;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.vector.*;
+import org.apache.drill.exec.expr.holders.*;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos.*;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.vector.complex.*;
+import org.apache.drill.exec.vector.complex.reader.*;
+import org.apache.drill.exec.vector.complex.writer.*;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+
+import com.sun.codemodel.JType;
+import com.sun.codemodel.JCodeModel;
+
+import java.util.Arrays;
+import java.util.Random;
+import java.util.List;
+import java.io.Closeable;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.apache.hadoop.io.Text;
+
+
+
+
+
+
+
+
+
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.vector.complex.MapVector;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("unused")
+public class SingleMapReaderImpl extends AbstractFieldReader{
+
+ private final MapVector vector;
+ private final Map<String, FieldReader> fields = Maps.newHashMap();
+
+ public SingleMapReaderImpl(MapVector vector) {
+ this.vector = vector;
+ }
+
+ private void setChildrenPosition(int index){
+ for(FieldReader r : fields.values()){
+ r.setPosition(index);
+ }
+ }
+
+ public FieldReader reader(String name){
+ FieldReader reader = fields.get(name);
+ if(reader == null){
+ ValueVector child = vector.get(name, ValueVector.class);
+ if(child == null){
+ reader = NullReader.INSTANCE;
+ }else{
+ reader = child.getAccessor().getReader();
+ }
+ fields.put(name, reader);
+ reader.setPosition(idx());
+ }
+ return reader;
+ }
+
+ public void setPosition(int index){
+ super.setPosition(index);
+ for(FieldReader r : fields.values()){
+ r.setPosition(index);
+ }
+ }
+
+ @Override
+ public Object readObject() {
+ return vector.getAccessor().getObject(idx());
+ }
+
+ @Override
+ public boolean isSet() {
+ return true;
+ }
+
+ public MajorType getType(){
+ return vector.getField().getType();
+ }
+
+ public java.util.Iterator<String> iterator(){
+ return vector.fieldNameIterator();
+ }
+
+ public void copyAsValue(MapWriter writer){
+ SingleMapWriter impl = (SingleMapWriter) writer;
+ impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector));
+ }
+
+ public void copyAsField(String name, MapWriter writer){
+ SingleMapWriter impl = (SingleMapWriter) writer.map(name);
+ impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector));
+ }
+
+
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
new file mode 100644
index 000000000..bc1d3675b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.impl;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
+public class VectorContainerWriter extends AbstractFieldWriter implements ComplexWriter {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerWriter.class);
+
+ SingleMapWriter mapRoot;
+ SpecialMapVector mapVector;
+ OutputMutator mutator;
+
+ public VectorContainerWriter(OutputMutator mutator) {
+ super(null);
+ this.mutator = mutator;
+ this.mapVector = new SpecialMapVector();
+ this.mapRoot = new SingleMapWriter(mapVector, this);
+ }
+
+ public void reset() {
+ setPosition(0);
+ resetState();
+ }
+
+ public void clear() {
+ mapRoot.clear();
+ }
+
+ public SingleMapWriter getWriter() {
+ return mapRoot;
+ }
+
+ public void setValueCount(int count) {
+ mapRoot.setValueCount(count);
+ }
+
+ public void setPosition(int index) {
+ super.setPosition(index);
+ mapRoot.setPosition(index);
+ }
+
+ public MapWriter directMap() {
+ return mapRoot;
+ }
+
+ @Override
+ public void allocate() {
+ mapRoot.allocate();
+ }
+
+ private class SpecialMapVector extends MapVector {
+
+ public SpecialMapVector() {
+ super("", null);
+ }
+
+ @Override
+ public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) {
+ try {
+ ValueVector v = mutator.addField(MaterializedField.create(name, type), clazz);
+ this.put(name, v);
+ return this.typeify(v, clazz);
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException(e);
+ }
+
+ }
+
+ }
+
+ @Override
+ public MapWriter rootAsMap() {
+ return mapRoot;
+ }
+
+ @Override
+ public ListWriter rootAsList() {
+ throw new UnsupportedOperationException(
+ "Drill doesn't support objects whose first level is a scalar or array. Objects must start as maps.");
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java
new file mode 100644
index 000000000..caa3aa64b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.reader;
+
+import org.apache.drill.exec.vector.complex.reader.BaseReader.ListReader;
+import org.apache.drill.exec.vector.complex.reader.BaseReader.MapReader;
+import org.apache.drill.exec.vector.complex.reader.BaseReader.RepeatedListReader;
+import org.apache.drill.exec.vector.complex.reader.BaseReader.RepeatedMapReader;
+import org.apache.drill.exec.vector.complex.reader.BaseReader.ScalarReader;
+
+
+
+public interface FieldReader extends MapReader, ListReader, ScalarReader, RepeatedMapReader, RepeatedListReader {
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java
new file mode 100644
index 000000000..3faa4f700
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.vector.complex.writer;
+
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ScalarWriter;
+
+
+
+public interface FieldWriter extends MapWriter, ListWriter, ScalarWriter {
+ public void allocate();
+ public void clear();
+} \ No newline at end of file