diff options
author | Jacques Nadeau <jacques@apache.org> | 2014-05-11 15:12:19 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-05-11 19:46:51 -0700 |
commit | cdc5daed5218ed70445d178f5fc25df09c573001 (patch) | |
tree | 9b6c5bf9d8570bf5bc4a1bd5776b6f698e667f22 /exec/java-exec/src/main/java/org/apache | |
parent | 4ffef0183cbe336e30503ceee647b21128ba7738 (diff) |
Add support for RepeatedMapVector, MapVector and RepeatedListVector.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache')
77 files changed, 4902 insertions, 1150 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java index f4a6d7da6..073a8d55f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java @@ -17,26 +17,34 @@ */ package org.apache.drill.exec.cache; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import com.hazelcast.nio.ObjectDataInput; -import com.hazelcast.nio.ObjectDataOutput; import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + import org.apache.drill.common.util.DataInputInputStream; import org.apache.drill.common.util.DataOutputOutputStream; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.proto.UserBitShared; -import org.apache.drill.exec.record.*; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; -import java.io.*; -import java.util.List; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.hazelcast.nio.ObjectDataInput; +import com.hazelcast.nio.ObjectDataOutput; /** * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read @@ -109,10 +117,10 @@ public class VectorAccessibleSerializable implements DrillSerializable { svMode = BatchSchema.SelectionVectorMode.TWO_BYTE; } List<ValueVector> vectorList = Lists.newArrayList(); - List<FieldMetadata> fieldList = batchDef.getFieldList(); - for (FieldMetadata metaData : fieldList) { + List<SerializedField> fieldList = batchDef.getFieldList(); + for (SerializedField metaData : fieldList) { int dataLength = metaData.getBufferLength(); - MaterializedField field = MaterializedField.create(metaData.getDef()); + MaterializedField field = MaterializedField.create(metaData); ByteBuf buf = allocator.buffer(dataLength); buf.writeBytes(input, dataLength); ValueVector vector = TypeHelper.getNewVector(field, allocator); @@ -135,7 +143,7 @@ public class VectorAccessibleSerializable implements DrillSerializable { retain = true; writeToStream(output); } - + /** * Serializes the VectorAccessible va and writes it to an output stream @@ -203,7 +211,7 @@ public class VectorAccessibleSerializable implements DrillSerializable { public VectorAccessible get() { return va; } - + public SelectionVector2 getSv2() { return sv2; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java index 2280aecaf..fc4855236 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.expr; import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; import java.lang.reflect.Modifier; -import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -41,6 +40,7 @@ import org.apache.drill.exec.record.TypedFieldId; import com.beust.jcommander.internal.Lists; import com.beust.jcommander.internal.Maps; import com.google.common.base.Preconditions; +import com.sun.codemodel.JArray; import com.sun.codemodel.JBlock; import com.sun.codemodel.JClass; import com.sun.codemodel.JClassAlreadyExistsException; @@ -49,20 +49,21 @@ import com.sun.codemodel.JDefinedClass; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; import com.sun.codemodel.JFieldRef; +import com.sun.codemodel.JInvocation; import com.sun.codemodel.JMethod; import com.sun.codemodel.JMod; import com.sun.codemodel.JType; import com.sun.codemodel.JVar; public class ClassGenerator<T>{ - + public static final GeneratorMapping DEFAULT_SCALAR_MAP = GM("doSetup", "doEval", null, null); public static final GeneratorMapping DEFAULT_CONSTANT_MAP = GM("doSetup", "doSetup", null, null); - - + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassGenerator.class); public static enum BlockType {SETUP, EVAL, RESET, CLEANUP}; - + private final SignatureHolder sig; private final EvaluationVisitor evaluationVisitor; private final Map<ValueVectorSetup, JVar> vvDeclaration = Maps.newHashMap(); @@ -74,7 +75,7 @@ public class ClassGenerator<T>{ public final JDefinedClass clazz; private final LinkedList<JBlock>[] blocks; private final JCodeModel model; - + private int index = 0; private MappingSet mappings; @@ -82,7 +83,7 @@ public class ClassGenerator<T>{ return new MappingSet("inIndex", "outIndex", DEFAULT_CONSTANT_MAP, DEFAULT_SCALAR_MAP); } - + @SuppressWarnings("unchecked") ClassGenerator(CodeGenerator<T> codeGenerator, MappingSet mappingSet, SignatureHolder signature, EvaluationVisitor eval, JDefinedClass clazz, JCodeModel model) throws JClassAlreadyExistsException { this.codeGenerator = codeGenerator; @@ -96,7 +97,7 @@ public class ClassGenerator<T>{ blocks[i] = Lists.newLinkedList(); } rotateBlock(); - + for(SignatureHolder child : signature.getChildHolders()){ String innerClassName = child.getSignatureClass().getSimpleName(); JDefinedClass innerClazz = clazz._class(Modifier.FINAL + Modifier.PRIVATE, innerClassName); @@ -109,15 +110,15 @@ public class ClassGenerator<T>{ Preconditions.checkNotNull(inner); return inner; } - + public MappingSet getMappingSet(){ return mappings; } - + public void setMappingSet(MappingSet mappings){ this.mappings = mappings; } - + public CodeGenerator<T> getCodeGenerator() { return codeGenerator; } @@ -125,17 +126,17 @@ public class ClassGenerator<T>{ private GeneratorMapping getCurrentMapping(){ return mappings.getCurrentMapping(); } - + public JBlock getBlock(String methodName){ JBlock blk = this.blocks[sig.get(methodName)].getLast(); Preconditions.checkNotNull(blk, "Requested method name of %s was not available for signature %s.", methodName, this.sig); return blk; } - + public JBlock getBlock(BlockType type){ - return getBlock(getCurrentMapping().getMethodName(type)); + return getBlock(getCurrentMapping().getMethodName(type)); } - + public JBlock getSetupBlock(){ return getBlock(getCurrentMapping().getMethodName(BlockType.SETUP)); } @@ -148,17 +149,17 @@ public class ClassGenerator<T>{ public JBlock getCleanupBlock(){ return getBlock(getCurrentMapping().getMethodName(BlockType.CLEANUP)); } - + public JVar declareVectorValueSetupAndMember(String batchName, TypedFieldId fieldId){ return declareVectorValueSetupAndMember( DirectExpression.direct(batchName), fieldId); } public JVar declareVectorValueSetupAndMember(DirectExpression batchName, TypedFieldId fieldId){ final ValueVectorSetup setup = new ValueVectorSetup(batchName, fieldId); - JVar var = this.vvDeclaration.get(setup); - if(var != null) return var; - - Class<?> valueVectorClass = TypeHelper.getValueVectorClass(fieldId.getType().getMinorType(), fieldId.getType().getMode()); +// JVar var = this.vvDeclaration.get(setup); +// if(var != null) return var; + + Class<?> valueVectorClass = fieldId.getIntermediateClass(); JClass vvClass = model.ref(valueVectorClass); JClass retClass = vvClass; String vectorAccess = "getValueVector"; @@ -166,48 +167,56 @@ public class ClassGenerator<T>{ retClass = retClass.array(); vectorAccess = "getValueVectors"; } - + JVar vv = declareClassField("vv", retClass); JClass t = model.ref(SchemaChangeException.class); JType objClass = model.ref(Object.class); JBlock b = getSetupBlock(); + //JExpr.newArray(model.INT). + + JVar fieldArr = b.decl(model.INT.array(), "fieldIds" + index++, JExpr.newArray(model.INT, fieldId.getFieldIds().length)); + int[] fieldIndices = fieldId.getFieldIds(); + for(int i = 0; i < fieldIndices.length; i++){ + b.assign(fieldArr.component(JExpr.lit(i)), JExpr.lit(fieldIndices[i])); + } + + JInvocation invoke = batchName + .invoke("getValueAccessorById") // + .arg( vvClass.dotclass()) + .arg(fieldArr); + JVar obj = b.decl( // objClass, // - getNextVar("tmp"), // - batchName - .invoke("getValueAccessorById") // - .arg(JExpr.lit(fieldId.getFieldId())) // - .arg( vvClass.dotclass()) - .invoke(vectorAccess)// - ); - - + getNextVar("tmp"), // + invoke.invoke(vectorAccess)); + + b._if(obj.eq(JExpr._null()))._then()._throw(JExpr._new(t).arg(JExpr.lit(String.format("Failure while loading vector %s with id: %s.", vv.name(), fieldId.toString())))); //b.assign(vv, JExpr.cast(retClass, ((JExpression) JExpr.cast(wrapperClass, obj) ).invoke(vectorAccess))); b.assign(vv, JExpr.cast(retClass, obj )); vvDeclaration.put(setup, vv); - + return vv; } public HoldingContainer addExpr(LogicalExpression ex){ return addExpr(ex, true); } - + public HoldingContainer addExpr(LogicalExpression ex, boolean rotate){ // logger.debug("Adding next write {}", ex); if(rotate) rotateBlock(); return evaluationVisitor.addExpr(ex, this); } - + public void rotateBlock(){ for(LinkedList<JBlock> b : blocks){ b.add(new JBlock(true, true)); } } - - + + void flushCode(){ int i =0; for(CodeGeneratorMethod method : sig){ @@ -219,19 +228,19 @@ public class ClassGenerator<T>{ m._throws(model.ref(c)); } m._throws(SchemaChangeException.class); - + for(JBlock b : blocks[i++]){ if(!b.isEmpty()) m.body().add(b); } - + } - + for(ClassGenerator<T> child : innerClasses.values()){ child.flushCode(); } } - - + + public JCodeModel getModel() { return model; } @@ -239,11 +248,11 @@ public class ClassGenerator<T>{ public String getNextVar() { return "v" + index++; } - + public String getNextVar(String prefix){ return prefix + index++; } - + public JVar declareClassField(String prefix, JType t){ return clazz.field(JMod.NONE, t, prefix + index++); } @@ -251,11 +260,11 @@ public class ClassGenerator<T>{ public JVar declareClassField(String prefix, JType t, JExpression init){ return clazz.field(JMod.NONE, t, prefix + index++, init); } - + public HoldingContainer declare(MajorType t){ return declare(t, true); } - + public HoldingContainer declare(MajorType t, boolean includeNewInstance){ JType holderType = getHolderType(t); JVar var; @@ -266,12 +275,12 @@ public class ClassGenerator<T>{ } JFieldRef outputSet = null; if(t.getMode() == DataMode.OPTIONAL){ - outputSet = var.ref("isSet"); + outputSet = var.ref("isSet"); } index++; return new HoldingContainer(t, var, var.ref("value"), outputSet); } - + public List<TypedFieldId> getWorkspaceTypes() { return this.workspaceTypes; } @@ -283,7 +292,7 @@ public class ClassGenerator<T>{ private static class ValueVectorSetup{ final DirectExpression batch; final TypedFieldId fieldId; - + public ValueVectorSetup(DirectExpression batch, TypedFieldId fieldId) { super(); this.batch = batch; @@ -321,35 +330,45 @@ public class ClassGenerator<T>{ return true; } - + } - - + + public static class HoldingContainer{ private final JVar holder; private final JFieldRef value; private final JFieldRef isSet; private final MajorType type; private boolean isConstant; - + private final boolean singularRepeated; + public HoldingContainer(MajorType t, JVar holder, JFieldRef value, JFieldRef isSet) { + this(t, holder, value, isSet, false); + } + + public HoldingContainer(MajorType t, JVar holder, JFieldRef value, JFieldRef isSet, boolean singularRepeated) { super(); this.holder = holder; this.value = value; this.isSet = isSet; this.type = t; this.isConstant = false; + this.singularRepeated = singularRepeated; } - + + public boolean isSingularRepeated(){ + return singularRepeated; + } + public HoldingContainer setConstant(boolean isConstant) { this.isConstant = isConstant; return this; } - + public boolean isConstant() { return this.isConstant; } - + public JVar getHolder() { return holder; } @@ -357,7 +376,7 @@ public class ClassGenerator<T>{ public JFieldRef getValue() { return value; } - + public MajorType getMajorType(){ return type; } @@ -366,11 +385,11 @@ public class ClassGenerator<T>{ Preconditions.checkNotNull(isSet, "You cannot access the isSet variable when operating on a non-nullable output value."); return isSet; } - + public boolean isOptional(){ return type.getMode() == DataMode.OPTIONAL; } - + public boolean isRepeated(){ return type.getMode() == DataMode.REPEATED; } @@ -379,7 +398,7 @@ public class ClassGenerator<T>{ return type.getMinorType(); } } - + public JType getHolderType(MajorType t){ return TypeHelper.getHolderType(model, t.getMinorType(), t.getMode()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java index dce070f46..d700bf3f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java @@ -20,8 +20,6 @@ package org.apache.drill.exec.expr; import java.util.List; import java.util.Set; -import io.netty.buffer.ByteBuf; -import com.google.common.collect.Lists; import org.apache.drill.common.expression.CastExpression; import org.apache.drill.common.expression.ConvertExpression; import org.apache.drill.common.expression.FunctionCall; @@ -29,25 +27,24 @@ import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.IfExpression; import org.apache.drill.common.expression.IfExpression.IfCondition; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.TypedNullConstant; -import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.expression.ValueExpressions.BooleanExpression; -import org.apache.drill.common.expression.ValueExpressions.DoubleExpression; -import org.apache.drill.common.expression.ValueExpressions.LongExpression; -import org.apache.drill.common.expression.ValueExpressions.IntExpression; import org.apache.drill.common.expression.ValueExpressions.DateExpression; -import org.apache.drill.common.expression.ValueExpressions.IntervalYearExpression; -import org.apache.drill.common.expression.ValueExpressions.IntervalDayExpression; -import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; -import org.apache.drill.common.expression.ValueExpressions.TimeExpression; -import org.apache.drill.common.expression.ValueExpressions.Decimal9Expression; import org.apache.drill.common.expression.ValueExpressions.Decimal18Expression; import org.apache.drill.common.expression.ValueExpressions.Decimal28Expression; import org.apache.drill.common.expression.ValueExpressions.Decimal38Expression; +import org.apache.drill.common.expression.ValueExpressions.Decimal9Expression; +import org.apache.drill.common.expression.ValueExpressions.DoubleExpression; +import org.apache.drill.common.expression.ValueExpressions.IntExpression; +import org.apache.drill.common.expression.ValueExpressions.IntervalDayExpression; +import org.apache.drill.common.expression.ValueExpressions.IntervalYearExpression; +import org.apache.drill.common.expression.ValueExpressions.LongExpression; import org.apache.drill.common.expression.ValueExpressions.QuotedString; +import org.apache.drill.common.expression.ValueExpressions.TimeExpression; +import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; @@ -60,6 +57,8 @@ import org.apache.drill.exec.expr.fn.HiveFuncHolder; import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.record.NullExpression; import org.apache.drill.exec.vector.ValueHolderHelper; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.writer.FieldWriter; import com.google.common.collect.Lists; import com.sun.codemodel.JBlock; @@ -68,6 +67,7 @@ import com.sun.codemodel.JConditional; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; import com.sun.codemodel.JInvocation; +import com.sun.codemodel.JLabel; import com.sun.codemodel.JType; import com.sun.codemodel.JVar; @@ -88,7 +88,7 @@ public class EvaluationVisitor { private class EvalVisitor extends AbstractExprVisitor<HoldingContainer, ClassGenerator<?>, RuntimeException> { - + @Override public HoldingContainer visitFunctionCall(FunctionCall call, ClassGenerator<?> generator) throws RuntimeException { throw new UnsupportedOperationException("FunctionCall is not expected here. "+ @@ -143,18 +143,18 @@ public class EvaluationVisitor { JConditional jc = null; JBlock conditionalBlock = new JBlock(false, false); for (IfCondition c : ifExpr.conditions) { - HoldingContainer HoldingContainer = c.condition.accept(this, generator); + HoldingContainer holdingContainer = c.condition.accept(this, generator); if (jc == null) { - if (HoldingContainer.isOptional()) { - jc = conditionalBlock._if(HoldingContainer.getIsSet().cand(HoldingContainer.getValue())); + if (holdingContainer.isOptional()) { + jc = conditionalBlock._if(holdingContainer.getIsSet().cand(holdingContainer.getValue())); } else { - jc = conditionalBlock._if(HoldingContainer.getValue().eq(JExpr.lit(1))); + jc = conditionalBlock._if(holdingContainer.getValue().eq(JExpr.lit(1))); } } else { - if (HoldingContainer.isOptional()) { - jc = jc._else()._if(HoldingContainer.getIsSet().cand(HoldingContainer.getValue())); + if (holdingContainer.isOptional()) { + jc = jc._else()._if(holdingContainer.getIsSet().cand(holdingContainer.getValue())); } else { - jc = jc._else()._if(HoldingContainer.getValue()); + jc = jc._else()._if(holdingContainer.getValue()); } } @@ -184,14 +184,14 @@ public class EvaluationVisitor { return output; } - + @Override public HoldingContainer visitSchemaPath(SchemaPath path, ClassGenerator<?> generator) throws RuntimeException { throw new UnsupportedOperationException("All schema paths should have been replaced with ValueVectorExpressions."); } @Override - public HoldingContainer visitLongConstant(LongExpression e, ClassGenerator<?> generator) throws RuntimeException { + public HoldingContainer visitLongConstant(LongExpression e, ClassGenerator<?> generator) throws RuntimeException { HoldingContainer out = generator.declare(e.getMajorType()); generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getLong())); return out; @@ -269,81 +269,180 @@ public class EvaluationVisitor { private HoldingContainer visitValueVectorWriteExpression(ValueVectorWriteExpression e, ClassGenerator<?> generator) { - LogicalExpression child = e.getChild(); - HoldingContainer inputContainer = child.accept(this, generator); + final LogicalExpression child = e.getChild(); + final HoldingContainer inputContainer = child.accept(this, generator); + final boolean complex = Types.isComplex(inputContainer.getMajorType()); + JBlock block = generator.getEvalBlock(); JExpression outIndex = generator.getMappingSet().getValueWriteIndex(); JVar vv = generator.declareVectorValueSetupAndMember(generator.getMappingSet().getOutgoing(), e.getFieldId()); - String setMethod = e.isSafe() ? "setSafe" : "set"; - - JInvocation setMeth; - if (Types.usesHolderForGet(inputContainer.getMajorType())) { - setMeth = vv.invoke("getMutator").invoke(setMethod).arg(outIndex).arg(inputContainer.getHolder()); - }else{ - setMeth = vv.invoke("getMutator").invoke(setMethod).arg(outIndex).arg(inputContainer.getValue()); - } - - if(e.isSafe()){ - HoldingContainer outputContainer = generator.declare(Types.REQUIRED_BIT); - block.assign(outputContainer.getValue(), JExpr.lit(1)); - if(inputContainer.isOptional()){ -// block._if(vv.invoke("getMutator").invoke(setMethod).arg(outIndex).not())._then().assign(outputContainer.getValue(), JExpr.lit(0)); - JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not()); - block = jc._then(); + + if(complex){ + JType writerImpl = generator.getModel()._ref(TypeHelper.getWriterImpl(inputContainer.getMinorType(), inputContainer.getMajorType().getMode())); + JType writerIFace = generator.getModel()._ref(TypeHelper.getWriterInterface(inputContainer.getMinorType(), inputContainer.getMajorType().getMode())); + JVar writer = generator.declareClassField("writer", writerIFace); + generator.getSetupBlock().assign(writer, JExpr._new(writerImpl).arg(vv).arg(JExpr._null())); + generator.getEvalBlock().add(writer.invoke("setPosition").arg(outIndex)); + String copyMethod = inputContainer.isSingularRepeated() ? "copyAsValueSingle" : "copyAsValue"; + generator.getEvalBlock().add(inputContainer.getHolder().invoke(copyMethod).arg(writer)); + if(e.isSafe()){ + HoldingContainer outputContainer = generator.declare(Types.REQUIRED_BIT); + JConditional ifOut = generator.getEvalBlock()._if(writer.invoke("ok")); + ifOut._then().assign(outputContainer.getValue(), JExpr.lit(1)); + ifOut._else().assign(outputContainer.getValue(), JExpr.lit(0)); + return outputContainer; } - block._if(setMeth.not())._then().assign(outputContainer.getValue(), JExpr.lit(0)); - return outputContainer; }else{ - if (inputContainer.isOptional()) { -// block.add(vv.invoke("getMutator").invoke(setMethod).arg(outIndex)); - JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not()); - block = jc._then(); + String setMethod = e.isSafe() ? "setSafe" : "set"; + + JInvocation setMeth; + if (Types.usesHolderForGet(inputContainer.getMajorType())) { + setMeth = vv.invoke("getMutator").invoke(setMethod).arg(outIndex).arg(inputContainer.getHolder()); + }else{ + setMeth = vv.invoke("getMutator").invoke(setMethod).arg(outIndex).arg(inputContainer.getValue()); + } + + if(e.isSafe()){ + HoldingContainer outputContainer = generator.declare(Types.REQUIRED_BIT); + block.assign(outputContainer.getValue(), JExpr.lit(1)); + if(inputContainer.isOptional()){ +// block._if(vv.invoke("getMutator").invoke(setMethod).arg(outIndex).not())._then().assign(outputContainer.getValue(), JExpr.lit(0)); + JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not()); + block = jc._then(); + } + block._if(setMeth.not())._then().assign(outputContainer.getValue(), JExpr.lit(0)); + return outputContainer; + }else{ + if (inputContainer.isOptional()) { +// block.add(vv.invoke("getMutator").invoke(setMethod).arg(outIndex)); + JConditional jc = block._if(inputContainer.getIsSet().eq(JExpr.lit(0)).not()); + block = jc._then(); + } + block.add(setMeth); } - block.add(setMeth); + } - + + return null; } private HoldingContainer visitValueVectorReadExpression(ValueVectorReadExpression e, ClassGenerator<?> generator) throws RuntimeException { // declare value vector - - JVar vv1 = generator.declareVectorValueSetupAndMember(generator.getMappingSet().getIncoming(), e.getFieldId()); + + JExpression vv1 = generator.declareVectorValueSetupAndMember(generator.getMappingSet().getIncoming(), e.getFieldId()); JExpression indexVariable = generator.getMappingSet().getValueReadIndex(); - JInvocation getValueAccessor = vv1.invoke("getAccessor").invoke("get"); - JInvocation getValueAccessor2 = vv1.invoke("getAccessor"); + JExpression componentVariable = indexVariable.shrz(JExpr.lit(16)); if (e.isSuperReader()) { - - getValueAccessor = ((JExpression) vv1.component(indexVariable.shrz(JExpr.lit(16)))).invoke("getAccessor").invoke("get"); - getValueAccessor2 = ((JExpression) vv1.component(indexVariable.shrz(JExpr.lit(16)))).invoke("getAccessor"); + vv1 = ((JExpression) vv1.component(componentVariable)); indexVariable = indexVariable.band(JExpr.lit((int) Character.MAX_VALUE)); } // evaluation work. HoldingContainer out = generator.declare(e.getMajorType()); + final boolean primitive = !Types.usesHolderForGet(e.getMajorType()); + final boolean hasReadPath = e.hasReadPath(); + final boolean complex = Types.isComplex(e.getMajorType()); - if (out.isOptional()) { - JBlock blk = generator.getEvalBlock(); - blk.assign(out.getIsSet(), getValueAccessor2.invoke("isSet").arg(indexVariable)); - JConditional jc = blk._if(out.getIsSet().eq(JExpr.lit(1))); - if (Types.usesHolderForGet(e.getMajorType())) { - jc._then().add(getValueAccessor.arg(indexVariable).arg(out.getHolder())); - } else { - jc._then().assign(out.getValue(), getValueAccessor.arg(indexVariable)); + int[] fieldIds = e.getFieldId().getFieldIds(); + for(int i = 1; i < fieldIds.length; i++){ + + } + + if(!hasReadPath && !complex){ + + JInvocation getValueAccessor = vv1.invoke("getAccessor").invoke("get"); + JInvocation getValueAccessor2 = vv1.invoke("getAccessor"); + JBlock eval = new JBlock(); + + if(primitive){ + eval.assign(out.getValue(), getValueAccessor.arg(indexVariable)); + }else{ + eval.add(getValueAccessor.arg(indexVariable).arg(out.getHolder())); } - } else { - if (Types.usesHolderForGet(e.getMajorType())) { - if (e.isArrayElement()) { - generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(JExpr.lit(e.getIndex())).arg(out.getHolder())); - } else { - generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(out.getHolder())); + + if (out.isOptional()) { + JBlock blk = generator.getEvalBlock(); + blk.assign(out.getIsSet(), getValueAccessor2.invoke("isSet").arg(indexVariable)); + JConditional jc = blk._if(out.getIsSet().eq(JExpr.lit(1))); + jc._then().add(eval); + }else{ + generator.getEvalBlock().add(eval); + } + + }else{ + JExpression vector = e.isSuperReader() ? vv1.component(componentVariable) : vv1; + JExpression expr = vector.invoke("getAccessor").invoke("getReader"); + + JLabel label = generator.getEvalBlock().label("complex"); + JBlock eval = generator.getEvalBlock().block(); + + // position to the correct value. + eval.add(expr.invoke("setPosition").arg(indexVariable)); + PathSegment seg = e.getReadPath(); + int listNum = 0; + boolean lastWasArray = false; + while(true){ + if(seg.isArray()){ + lastWasArray = true; + + if(seg.isLastPath() && !complex) break; + + JVar list = generator.declareClassField("list", generator.getModel()._ref(FieldReader.class)); + generator.getSetupBlock().assign(list, expr); + expr = list; + + // if this is an array, set a single position for the expression to allow us to read the right data lower down. + JVar desiredIndex = eval.decl(generator.getModel().INT, "desiredIndex" + listNum, JExpr.lit(seg.getArraySegment().getIndex())); + // start with negative one so that we are at zero after first call to next. + JVar currentIndex = eval.decl(generator.getModel().INT, "currentIndex" + listNum, JExpr.lit(-1)); + + eval._while( // + currentIndex.lt(desiredIndex) // + .cand(expr.invoke("next")) ).body().assign(currentIndex, currentIndex.plus(JExpr.lit(1))); + + JBlock ifNoVal = eval._if(desiredIndex.ne(currentIndex))._then().block(); + if(!complex) ifNoVal.assign(out.getIsSet(), JExpr.lit(0)); + ifNoVal._break(label); + + listNum++; + + }else{ + lastWasArray = false; + JExpression fieldName = JExpr.lit(seg.getNameSegment().getPath()); + expr = expr.invoke("reader").arg(fieldName); + } + seg = seg.getChild(); + + // stop once we get to last column or when the segment is an array at the end of the reference. + if(seg == null || seg.isLastPath() && seg.isArray()) break; + } + MajorType secondaryType = e.getFieldId().getSecondaryFinal(); + JType readerImpl = generator.getModel()._ref(TypeHelper.getReaderClassName(secondaryType.getMinorType(), secondaryType.getMode())); + JVar complexReader = generator.declareClassField("reader", readerImpl); + generator.getSetupBlock().assign(complexReader, JExpr.cast(readerImpl, expr)); + expr = complexReader; + + if(complex){ + HoldingContainer hc = new HoldingContainer(e.getMajorType(), (JVar) expr, null, null, lastWasArray); + return hc; + //eval.assign(out.getHolder().ref("reader"), expr); + }else{ + if(seg != null){ + eval.add(expr.invoke("read").arg(JExpr.lit(seg.getArraySegment().getIndex())).arg(out.getHolder())); + }else{ + + eval.add(expr.invoke("read").arg(out.getHolder())); } - } else { - generator.getEvalBlock().assign(out.getValue(), getValueAccessor.arg(indexVariable)); } + } + + + + return out; } @@ -352,11 +451,11 @@ public class EvaluationVisitor { // Preconditions.checkArgument(child.getMajorType().equals(Types.REQUIRED_BOOLEAN)); HoldingContainer hc = child.accept(this, generator); if(e.isReturnTrueOnOne()){ - generator.getEvalBlock()._return(hc.getValue().eq(JExpr.lit(1))); + generator.getEvalBlock()._return(hc.getValue().eq(JExpr.lit(1))); }else{ generator.getEvalBlock()._return(hc.getValue()); } - + return null; } @@ -453,6 +552,7 @@ public class EvaluationVisitor { } } + private class ConstantFilter extends EvalVisitor { private Set<LogicalExpression> constantBoundaries; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java index 95d341b9c..fc7fb6a41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java @@ -69,7 +69,7 @@ import com.google.common.collect.Lists; public class ExpressionTreeMaterializer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionTreeMaterializer.class); - + private ExpressionTreeMaterializer() { }; @@ -239,36 +239,12 @@ public class ExpressionTreeMaterializer { @Override public LogicalExpression visitSchemaPath(SchemaPath path, FunctionImplementationRegistry value) { // logger.debug("Visiting schema path {}", path); - PathSegment seg = path.getRootSegment(); - List<String> segments = Lists.newArrayList(); - segments.add(seg.getNameSegment().getPath().toString()); - boolean isArrayElement = false; - int index = -1; - while((seg = seg.getChild()) != null) { - if (seg.isNamed()) { - segments.add(seg.getNameSegment().getPath().toString()); - if (seg.isLastPath()) { - break; - } - } else { - if (!seg.isLastPath()) { - throw new UnsupportedOperationException("Repeated map type not supported"); - } - index = seg.getArraySegment().getIndex(); - isArrayElement = true; - break; - } - } - SchemaPath newPath = SchemaPath.getCompoundPath((String[]) segments.toArray(new String[0])); - TypedFieldId tfId = batch.getValueVectorId(newPath); + TypedFieldId tfId = batch.getValueVectorId(path); if (tfId == null) { logger.warn("Unable to find value vector of path {}, returning null instance.", path); return NullExpression.INSTANCE; } else { - ValueVectorReadExpression e = new ValueVectorReadExpression(tfId, index, isArrayElement); - if (isArrayElement) { - e.required(); - } + ValueVectorReadExpression e = new ValueVectorReadExpression(tfId); return e; } } @@ -361,15 +337,15 @@ public class ExpressionTreeMaterializer { @Override public LogicalExpression visitCastExpression(CastExpression e, FunctionImplementationRegistry value){ - + // if the cast is pointless, remove it. LogicalExpression input = e.getInput().accept(this, value); MajorType newMajor = e.getMajorType(); MinorType newMinor = input.getMajorType().getMinorType(); - + if(castEqual(e.getPosition(), newMajor, input.getMajorType())) return input; // don't do pointless cast. - + if(newMinor == MinorType.LATE || newMinor == MinorType.NULL){ // if the type still isn't fully bound, leave as cast expression. return new CastExpression(input, e.getMajorType(), e.getPosition()); @@ -391,10 +367,10 @@ public class ExpressionTreeMaterializer { newArgs.add(new ValueExpressions.LongExpression(type.getScale(), null)); } FunctionCall fc = new FunctionCall(castFuncWithType, newArgs, e.getPosition()); - return fc.accept(this, value); + return fc.accept(this, value); } } - + private boolean castEqual(ExpressionPosition pos, MajorType from, MajorType to){ if(!from.getMinorType().equals(to.getMinorType())) return false; switch(from.getMinorType()){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java index 4ba503d70..6e2809aa8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java @@ -21,56 +21,43 @@ import java.util.Iterator; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.visitors.ExprVisitor; -import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.record.TypedFieldId; import com.google.common.collect.Iterators; -import javax.sound.sampled.FloatControl; - public class ValueVectorReadExpression implements LogicalExpression{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class); - private MajorType type; private final TypedFieldId fieldId; - private final boolean superReader; - private final int index; - private final boolean isArrayElement; - - - public ValueVectorReadExpression(TypedFieldId tfId, int index, boolean isArrayElement){ - this.type = tfId.getType(); + + + public ValueVectorReadExpression(TypedFieldId tfId){ this.fieldId = tfId; - this.superReader = tfId.isHyperReader(); - this.index = index; - this.isArrayElement = isArrayElement; } - public void required() { - type = Types.required(type.getMinorType()); + public boolean hasReadPath(){ + return fieldId.hasRemainder(); } - public boolean isArrayElement() { - return isArrayElement; + public PathSegment getReadPath(){ + return fieldId.getRemainder(); } - public ValueVectorReadExpression(TypedFieldId tfId) { - this(tfId, -1, false); - } - public TypedFieldId getTypedFieldId(){ return fieldId; } - + public boolean isSuperReader(){ - return superReader; + return fieldId.isHyperReader(); } @Override public MajorType getMajorType() { - return type; + return fieldId.getFinalType(); } @Override @@ -82,10 +69,6 @@ public class ValueVectorReadExpression implements LogicalExpression{ return fieldId; } - public int getIndex() { - return index; - } - @Override public ExpressionPosition getPosition() { return ExpressionPosition.UNKNOWN; @@ -95,6 +78,6 @@ public class ValueVectorReadExpression implements LogicalExpression{ public Iterator<LogicalExpression> iterator() { return Iterators.emptyIterator(); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/HashFunctions.java.orig b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/HashFunctions.java.orig new file mode 100644 index 000000000..7eeb7308e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/HashFunctions.java.orig @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.fn.impl; + +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.exec.record.RecordBatch; + +public class HashFunctions { + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL ) + public static class NullableFloatHash implements DrillSimpleFunc { + + @Param NullableFloat4Holder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + if (in.isSet == 0) + out.value = 0; + else + out.value = com.google.common.hash.Hashing.murmur3_128().hashInt(Float.floatToIntBits(in.value)).asInt(); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL ) + public static class FloatHash implements DrillSimpleFunc { + + @Param Float4Holder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = com.google.common.hash.Hashing.murmur3_128().hashInt(Float.floatToIntBits(in.value)).asInt(); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL ) + public static class NullableDoubleHash implements DrillSimpleFunc { + + @Param NullableFloat8Holder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + if (in.isSet == 0) + out.value = 0; + else + out.value = com.google.common.hash.Hashing.murmur3_128().hashLong(Double.doubleToLongBits(in.value)).asInt(); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL ) + public static class DoubleHash implements DrillSimpleFunc { + + @Param Float8Holder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = com.google.common.hash.Hashing.murmur3_128().hashLong(Double.doubleToLongBits(in.value)).asInt(); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL ) + public static class NullableVarBinaryHash implements DrillSimpleFunc { + + @Param NullableVarBinaryHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + if (in.isSet == 0) + out.value = 0; + else + out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL ) + public static class NullableVarCharHash implements DrillSimpleFunc { + + @Param NullableVarCharHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + if (in.isSet == 0) + out.value = 0; + else + out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL ) + public static class NullableVar16CharHash implements DrillSimpleFunc { + + @Param NullableVar16CharHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + if (in.isSet == 0) + out.value = 0; + else + out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL) + public static class NullableBigIntHash implements DrillSimpleFunc { + + @Param NullableBigIntHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + // TODO: implement hash function for other types + if (in.isSet == 0) + out.value = 0; + else + out.value = com.google.common.hash.Hashing.murmur3_128().hashLong(in.value).asInt(); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL) + public static class NullableIntHash implements DrillSimpleFunc { + @Param NullableIntHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + // TODO: implement hash function for other types + if (in.isSet == 0) + out.value = 0; + else + out.value = com.google.common.hash.Hashing.murmur3_128().hashInt(in.value).asInt(); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL) + public static class VarBinaryHash implements DrillSimpleFunc { + + @Param VarBinaryHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL) + public static class VarCharHash implements DrillSimpleFunc { + + @Param VarCharHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0); + } + } + +<<<<<<< HEAD + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL) + public static class Var16CharHash implements DrillSimpleFunc { + + @Param Var16CharHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + out.value = org.apache.drill.exec.expr.fn.impl.HashHelper.hash(in.buffer.nioBuffer(in.start, in.end - in.start), 0); + } + } + +======= +>>>>>>> 450e9e0... Support Complex Types + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL) + public static class HashBigInt implements DrillSimpleFunc { + + @Param BigIntHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + // TODO: implement hash function for other types + out.value = com.google.common.hash.Hashing.murmur3_128().hashLong(in.value).asInt(); + } + } + + @FunctionTemplate(name = "hash", scope = FunctionScope.SIMPLE, nulls = FunctionTemplate.NullHandling.INTERNAL) + public static class IntHash implements DrillSimpleFunc { + @Param IntHolder in; + @Output IntHolder out; + + public void setup(RecordBatch incoming) { + } + + public void eval() { + // TODO: implement hash function for other types + out.value = com.google.common.hash.Hashing.murmur3_128().hashInt(in.value).asInt(); + } + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ComplexHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ComplexHolder.java new file mode 100644 index 000000000..e1025dfb8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/ComplexHolder.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.holders; + +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +public class ComplexHolder implements ValueHolder { + public FieldReader reader; + public int isSet; +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java new file mode 100644 index 000000000..09746da94 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedListHolder.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.holders; + +public final class RepeatedListHolder implements ValueHolder{ + public int start; + public int end; +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java new file mode 100644 index 000000000..247f75e02 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/RepeatedMapHolder.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.holders; + +public final class RepeatedMapHolder implements ValueHolder{ + public int start; + public int end; +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryRuntimeException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryRuntimeException.java new file mode 100644 index 000000000..305eabd92 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/OutOfMemoryRuntimeException.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.memory; + +import org.apache.drill.common.exceptions.DrillRuntimeException; + +public class OutOfMemoryRuntimeException extends DrillRuntimeException{ + + public OutOfMemoryRuntimeException() { + super(); + + } + + public OutOfMemoryRuntimeException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + + } + + public OutOfMemoryRuntimeException(String message, Throwable cause) { + super(message, cause); + + } + + public OutOfMemoryRuntimeException(String message) { + super(message); + + } + + public OutOfMemoryRuntimeException(Throwable cause) { + super(cause); + + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 73ed72322..a49d1a8f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -204,8 +204,8 @@ public class ScanBatch implements RecordBatch { } @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { - return container.getValueAccessorById(fieldId, clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return container.getValueAccessorById(clazz, ids); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java index e0e7e518f..fc8c430e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java @@ -60,8 +60,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { BatchSchema schema = container.getSchema(); VectorContainer newContainer = new VectorContainer(); for (MaterializedField field : schema) { - int id = container.getValueVectorId(field.getAsSchemaPath()).getFieldId(); - newContainer.add(container.getValueAccessorById(id, field.getValueClass()).getValueVectors()); + int[] ids = container.getValueVectorId(field.getPath()).getFieldIds(); + newContainer.add(container.getValueAccessorById(field.getValueClass(), ids).getValueVectors()); } newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); this.hyperBatch = new ExpandableHyperContainer(newContainer); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 2a57aaa34..1c1a6d260 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -104,8 +104,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return sv4; } - - + + @Override public void cleanup() { if (sv4 != null) { @@ -127,8 +127,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return IterOutcome.NONE; } } - - + + try{ outer: while (true) { Stopwatch watch = new Stopwatch(); @@ -166,7 +166,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { throw new UnsupportedOperationException(); } } - + if (schema == null){ // builder may be null at this point if the first incoming batch is empty return IterOutcome.NONE; @@ -181,7 +181,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); return IterOutcome.OK_NEW_SCHEMA; - + }catch(SchemaChangeException | ClassTransformationException | IOException ex){ kill(); logger.error("Failure during query", ex); @@ -239,10 +239,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, context.getFunctionRegistry()); ClassGenerator<PriorityQueue> g = cg.getRoot(); g.setMappingSet(mainMapping); - + for(Ordering od : orderings){ // first, we rewrite the evaluation stack for each side of the comparison. - ErrorCollector collector = new ErrorCollectorImpl(); + ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); g.setMappingSet(leftMapping); @@ -250,26 +250,26 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { g.setMappingSet(rightMapping); HoldingContainer right = g.addExpr(expr, false); g.setMappingSet(mainMapping); - + // next we wrap the two comparison sides and add the expression block for the comparison. LogicalExpression fh = FunctionGenerationHelper.getComparator(left, right, context.getFunctionRegistry()); HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - + if(od.getDirection() == Direction.ASCENDING){ jc._then()._return(out.getValue()); }else{ jc._then()._return(out.getValue().minus()); } } - + g.getEvalBlock()._return(JExpr.lit(0)); PriorityQueue q = context.getImplementationClass(cg); q.init(config.getLimit(), context, oContext.getAllocator(), schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE); return q; } - + @Override public WritableBatch getWritableBatch() { throw new UnsupportedOperationException("A sort batch is not writable."); @@ -332,8 +332,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { - return container.getValueAccessorById(fieldId, clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return container.getValueAccessorById(clazz, ids); } @Override @@ -355,6 +355,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return container.iterator(); } } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java index 65669b12f..cf3d75e2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java @@ -37,7 +37,7 @@ public class WireRecordBatch implements RecordBatch { private FragmentContext context; private BatchSchema schema; - + public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException { this.fragProvider = fragProvider; this.context = context; @@ -83,17 +83,17 @@ public class WireRecordBatch implements RecordBatch { public TypedFieldId getValueVectorId(SchemaPath path) { return batchLoader.getValueVectorId(path); } - + @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { - return batchLoader.getValueAccessorById(fieldId, clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return batchLoader.getValueAccessorById(clazz, ids); } @Override public IterOutcome next() { try{ RawFragmentBatch batch = fragProvider.getNext(); - + // skip over empty batches. we do this since these are basically control messages. while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){ batch = fragProvider.getNext(); @@ -107,7 +107,7 @@ public class WireRecordBatch implements RecordBatch { if (batch.getHeader().getIsOutOfMemory()) { return IterOutcome.OUT_OF_MEMORY; } - + // logger.debug("Next received batch {}", batch); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 5f2605484..039445b96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -62,9 +62,9 @@ public abstract class HashAggTemplate implements HashAggregator { private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; - + private static final boolean EXTRA_DEBUG_1 = false; - private static final boolean EXTRA_DEBUG_2 = false; + private static final boolean EXTRA_DEBUG_2 = false; private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field."; private boolean first = true; private boolean newSchema = false; @@ -88,7 +88,7 @@ public abstract class HashAggTemplate implements HashAggregator { List<VectorAllocator> wsAllocators = Lists.newArrayList(); // allocators for the workspace vectors ErrorCollector collector = new ErrorCollectorImpl(); - + private MaterializedField[] materializedValueFields; private boolean allFlushed = false; @@ -102,13 +102,13 @@ public abstract class HashAggTemplate implements HashAggregator { aggrValuesContainer = new VectorContainer(); ValueVector vector ; - - for(int i = 0; i < materializedValueFields.length; i++) { + + for(int i = 0; i < materializedValueFields.length; i++) { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator) ; VectorAllocator.getAllocator(vector, 50 /* avg. width */).alloc(HashTable.BATCH_SIZE) ; - + aggrValuesContainer.add(vector) ; } @@ -124,8 +124,8 @@ public abstract class HashAggTemplate implements HashAggregator { setupInterior(incoming, outgoing, aggrValuesContainer); } - private boolean outputValues() { - for (int i = 0; i <= maxOccupiedIdx; i++) { + private boolean outputValues() { + for (int i = 0; i <= maxOccupiedIdx; i++) { if (outputRecordValues(i, outputCount) ) { if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", outputCount) ; outputCount++; @@ -139,7 +139,7 @@ public abstract class HashAggTemplate implements HashAggregator { private void clear() { aggrValuesContainer.clear(); } - + // Code-generated methods (implemented in HashAggBatch) @RuntimeOverridden @@ -155,19 +155,19 @@ public abstract class HashAggTemplate implements HashAggregator { @Override public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing, - LogicalExpression[] valueExprs, + LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, - VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators) + VectorAllocator[] keyAllocators, VectorAllocator[] valueAllocators) throws SchemaChangeException, ClassTransformationException, IOException { - + if (valueExprs == null || valueFieldIds == null) { throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables."); } if (valueFieldIds.size() < valueExprs.length) { throw new IllegalArgumentException("Wrong number of workspace variables."); } - + this.context = context; this.allocator = allocator; this.incoming = incoming; @@ -175,11 +175,11 @@ public abstract class HashAggTemplate implements HashAggregator { this.keyAllocators = keyAllocators; this.valueAllocators = valueAllocators; this.outgoing = outgoing; - + this.hashAggrConfig = hashAggrConfig; - // currently, hash aggregation is only applicable if there are group-by expressions. - // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no + // currently, hash aggregation is only applicable if there are group-by expressions. + // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no // need to create hash table. However, for plain aggregations with DISTINCT .. // e.g SELECT COUNT(DISTINCT a1) FROM t1 ; // we need to build a hash table on the aggregation column a1. @@ -188,14 +188,14 @@ public abstract class HashAggTemplate implements HashAggregator { throw new IllegalArgumentException("Currently, hash aggregation is only applicable if there are group-by expressions."); } - this.htIdxHolder = new IntHolder(); + this.htIdxHolder = new IntHolder(); materializedValueFields = new MaterializedField[valueFieldIds.size()]; if (valueFieldIds.size() > 0) { int i = 0; - FieldReference ref = new FieldReference("dummy", ExpressionPosition.UNKNOWN, valueFieldIds.get(0).getType()); + FieldReference ref = new FieldReference("dummy", ExpressionPosition.UNKNOWN, valueFieldIds.get(0).getIntermediateType()); for (TypedFieldId id : valueFieldIds) { - materializedValueFields[i++] = MaterializedField.create(ref, id.getType()); + materializedValueFields[i++] = MaterializedField.create(ref, id.getIntermediateType()); } } @@ -203,7 +203,7 @@ public abstract class HashAggTemplate implements HashAggregator { this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ; batchHolders = new ArrayList<BatchHolder>(); - addBatchHolder(); + addBatchHolder(); doSetup(incoming); } @@ -211,21 +211,21 @@ public abstract class HashAggTemplate implements HashAggregator { @Override public AggOutcome doWork() { try{ - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. + // Note: Keeping the outer and inner try blocks here to maintain some similarity with + // StreamingAggregate which does somethings conditionally in the outer try block. // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. + // in the outer try block. outside: while(true) { // loop through existing records, aggregating the values as necessary. if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()..."); for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - checkGroupAndAggrValues(currentIndex); + checkGroupAndAggrValues(currentIndex); } if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex); - + try{ while(true){ @@ -239,10 +239,10 @@ public abstract class HashAggTemplate implements HashAggregator { case NOT_YET: this.outcome = out; return AggOutcome.RETURN_OUTCOME; - + case OK_NEW_SCHEMA: if(EXTRA_DEBUG_1) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); - newSchema = true; + newSchema = true; this.cleanup(); // TODO: new schema case needs to be handled appropriately return AggOutcome.UPDATE_AGGREGATOR; @@ -254,20 +254,20 @@ public abstract class HashAggTemplate implements HashAggregator { } else { checkGroupAndAggrValues(currentIndex); incIndex(); - + if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop"); continue outside; } case NONE: outcome = out; - outputKeysAndValues() ; - + outputKeysAndValues() ; + // cleanup my internal state since there is nothing more to return this.cleanup(); // cleanup incoming batch since output of aggregation does not need // any references to the incoming - + incoming.cleanup(); return setOkAndReturn(); @@ -294,7 +294,7 @@ public abstract class HashAggTemplate implements HashAggregator { // now otherwise downstream operators will break. // TODO: allow outputting arbitrarily large number of records in batches assert (numGroupedRecords < Character.MAX_VALUE); - + for (VectorAllocator a : keyAllocators) { if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} records.", a, numGroupedRecords); a.alloc(numGroupedRecords); @@ -320,14 +320,14 @@ public abstract class HashAggTemplate implements HashAggregator { public void cleanup(){ htable.clear(); htable = null; - htIdxHolder = null; + htIdxHolder = null; materializedValueFields = null; for (BatchHolder bh : batchHolders) { bh.clear(); } batchHolders.clear(); - batchHolders = null; + batchHolders = null; } private AggOutcome tooBigFailure(){ @@ -335,7 +335,7 @@ public abstract class HashAggTemplate implements HashAggregator { this.outcome = IterOutcome.STOP; return AggOutcome.CLEANUP_AND_RETURN; } - + private final AggOutcome setOkAndReturn(){ if(first){ this.outcome = IterOutcome.OK_NEW_SCHEMA; @@ -356,20 +356,20 @@ public abstract class HashAggTemplate implements HashAggregator { } currentIndex = getVectorIndex(underlyingIndex); } - + private final void resetIndex(){ underlyingIndex = -1; incIndex(); } private void addBatchHolder() { - BatchHolder bh = new BatchHolder(); + BatchHolder bh = new BatchHolder(); batchHolders.add(bh); if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); int batchIdx = batchHolders.size() - 1; - bh.setup(batchIdx); + bh.setup(batchIdx); } private boolean outputKeysAndValues() { @@ -392,20 +392,20 @@ public abstract class HashAggTemplate implements HashAggregator { return allFlushed; } - // Check if a group is present in the hash table; if not, insert it in the hash table. - // The htIdxHolder contains the index of the group in the hash table container; this same - // index is also used for the aggregation values maintained by the hash aggregate. + // Check if a group is present in the hash table; if not, insert it in the hash table. + // The htIdxHolder contains the index of the group in the hash table container; this same + // index is also used for the aggregation values maintained by the hash aggregate. private boolean checkGroupAndAggrValues(int incomingRowIdx) { if (incomingRowIdx < 0) { throw new IllegalArgumentException("Invalid incoming row index."); } - /** for debugging + /** for debugging Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector(); BigIntVector vv0 = null; BigIntHolder holder = null; - if (tmp != null) { + if (tmp != null) { vv0 = ((BigIntVector) tmp); holder = new BigIntHolder(); holder.value = vv0.getAccessor().get(incomingRowIdx) ; @@ -432,7 +432,7 @@ public abstract class HashAggTemplate implements HashAggregator { // logger.debug("group-by key = {} already present at hash table index = {}", holder.value, currentIdx) ; //} - } + } else if (putStatus == HashTable.PutStatus.KEY_ADDED) { if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ; @@ -441,17 +441,17 @@ public abstract class HashAggTemplate implements HashAggregator { // logger.debug("group-by key = {} added at hash table index = {}", holder.value, currentIdx) ; //} } - + if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) { numGroupedRecords++; return true; } - - } + + } return false; } - + // Code-generated methods (implemented in HashAggBatch) public abstract void doSetup(@Named("incoming") RecordBatch incoming); public abstract int getVectorIndex(@Named("recordIndex") int recordIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index 34845b38e..3e6def128 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -28,12 +28,12 @@ import org.apache.drill.exec.record.selection.SelectionVector4; public class InternalBatch implements Iterable<VectorWrapper<?>>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InternalBatch.class); - + private final VectorContainer container; private final BatchSchema schema; private final SelectionVector2 sv2; private final SelectionVector4 sv4; - + public InternalBatch(RecordBatch incoming){ switch(incoming.getSchema().getSelectionVectorMode()){ case FOUR_BYTE: @@ -42,7 +42,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ break; case TWO_BYTE: this.sv4 = null; - this.sv2 = incoming.getSelectionVector2().clone(); + this.sv2 = incoming.getSelectionVector2().clone(); break; default: this.sv4 = null; @@ -74,9 +74,9 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ if(sv4 != null) sv4.clear(); container.clear(); } - - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){ - return container.getValueAccessorById(fieldId, clazz); + + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){ + return container.getValueAccessorById(clazz, fieldIds); } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 883052ae9..72d046261 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -348,8 +348,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { container.add(v); allocators.add(RemovingRecordBatch.getAllocator4(v)); - JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false)); + JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId)); g.getEvalBlock()._if(outVV.invoke("copyFromSafe") .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) .arg(outIndex) @@ -376,8 +376,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { container.add(v); allocators.add(RemovingRecordBatch.getAllocator4(v)); - JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false)); + JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), false, fieldId)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, outputFieldId)); g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index baa232e7f..c07878a69 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -43,7 +43,7 @@ public final class JoinStatus { private int rightPosition; private int svRightPosition; private IterOutcome lastRight; - + private int outputPosition; public RightSourceMode rightSourceMode = RightSourceMode.INCOMING; public MergeJoinBatch outputBatch; @@ -54,7 +54,7 @@ public final class JoinStatus { public boolean ok = true; private boolean initialSet = false; private boolean leftRepeating = false; - + public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) { super(); this.left = left; @@ -70,7 +70,7 @@ public final class JoinStatus { initialSet = true; } } - + public final void advanceLeft(){ leftPosition++; } @@ -90,6 +90,10 @@ public final class JoinStatus { return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition; } + public final int getRightCount(){ + return right.getRecordCount(); + } + public final void setRightPosition(int pos) { rightPosition = pos; } @@ -176,7 +180,7 @@ public final class JoinStatus { } if(b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) b.getSelectionVector2().clear(); } - + /** * Check if the left record position can advance by one in the current batch. */ @@ -230,5 +234,5 @@ public final class JoinStatus { private boolean eitherMatches(IterOutcome outcome){ return lastLeft == outcome || lastRight == outcome; } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index f43934e96..af0d378f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -97,8 +97,8 @@ public abstract class JoinTemplate implements JoinWorker { while (status.isLeftPositionAllowed()) { if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) return false; - - status.incOutputPos(); + + status.incOutputPos(); status.advanceLeft(); } } @@ -113,7 +113,7 @@ public abstract class JoinTemplate implements JoinWorker { case -1: // left key < right key if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { - if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) return false; status.incOutputPos(); } @@ -135,7 +135,7 @@ public abstract class JoinTemplate implements JoinWorker { doCompareNextLeftKey(status.getLeftPosition()) != 0) // this record marks the end of repeated keys status.notifyLeftStoppedRepeating(); - + boolean crossedBatchBoundaries = false; int initialRightPosition = status.getRightPosition(); do { @@ -143,11 +143,11 @@ public abstract class JoinTemplate implements JoinWorker { if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) return false; - if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) + if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) return false; - + status.incOutputPos(); - + // If the left key has duplicates and we're about to cross a boundary in the right batch, add the // right table's record batch to the sv4 builder before calling next. These records will need to be // copied again for each duplicate left key. @@ -170,7 +170,7 @@ public abstract class JoinTemplate implements JoinWorker { status.notifyLeftStoppedRepeating(); } else if (status.isLeftRepeating() && crossedBatchBoundaries) { try { - // build the right batches and + // build the right batches and status.outputBatch.batchBuilder.build(); status.setSV4AdvanceMode(); } catch (SchemaChangeException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index a2c424fe2..3d496d315 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -350,10 +350,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> public TypedFieldId getValueVectorId(SchemaPath path) { return outgoingContainer.getValueVectorId(path); } - + @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { - return outgoingContainer.getValueAccessorById(fieldId, clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return outgoingContainer.getValueAccessorById(clazz, ids); } @Override @@ -373,7 +373,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> /** * Creates a generate class which implements the copy and compare methods. - * + * * @return instance of a new merger based on generated code * @throws SchemaChangeException */ @@ -443,8 +443,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // declare incoming value vector and assign it to the array JVar inVV = cg.declareVectorValueSetupAndMember("incomingBatches[" + batchIdx + "]", new TypedFieldId(vv.getField().getType(), - fieldIdx, - false)); + false, + fieldIdx)); // add vv to initialization list (e.g. { vv1, vv2, vv3 } ) incomingVectorInitBatch.add(inVV); @@ -501,11 +501,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> TypeProtos.MinorType minor = vvRead.getMajorType().getMinorType(); Class cmpVectorClass = TypeHelper.getValueVectorClass(minor, mode); + JExpression arr = JExpr.newArray(cg.getModel().INT).add(JExpr.lit(vvRead.getFieldId().getFieldIds()[0])); comparisonVectorInitBatch.add( ((JExpression) incomingBatchesVar.component(JExpr.lit(b))) .invoke("getValueAccessorById") - .arg(JExpr.lit(vvRead.getFieldId().getFieldId())) .arg(cg.getModel()._ref(cmpVectorClass).boxify().dotclass()) + .arg(arr) .invoke("getValueVector")); } @@ -583,8 +584,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // declare outgoing value vectors JVar outgoingVV = cg.declareVectorValueSetupAndMember("outgoingBatch", new TypedFieldId(vvOut.getField().getType(), - fieldIdx, - vvOut.isHyper())); + vvOut.isHyper(), fieldIdx)); // assign to the appropriate slot in the outgoingVector array (in order of iteration) cg.getSetupBlock().assign(outgoingVectors.component(JExpr.lit(fieldIdx)), outgoingVV); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java index dd7011afb..339844360 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java @@ -82,7 +82,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit this.svMode = incoming.getSchema().getSelectionVectorMode(); this.outBatch = outgoing; this.outputField = outputField; - partitionValues = (IntVector) outBatch.getValueAccessorById(outBatch.getValueVectorId(outputField).getFieldId(), IntVector.class).getValueVector(); + partitionValues = (IntVector) outBatch.getValueAccessorById(IntVector.class, outBatch.getValueVectorId(outputField).getFieldIds()).getValueVector(); switch(svMode){ case FOUR_BYTE: case TWO_BYTE: @@ -98,7 +98,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit public abstract int doEval(@Named("inIndex") int inIndex, @Named("partitionIndex") int partitionIndex); - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 6e115a7fc..deef25fdc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -219,8 +219,8 @@ public class OutgoingRecordBatch implements VectorAccessible { } @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { - return vectorContainer.getValueAccessorById(fieldId, clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) { + return vectorContainer.getValueAccessorById(clazz, fieldIds); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 604808547..bcd484ca5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -94,7 +94,7 @@ public class PartitionSenderRootExec implements RootExec { if (!ok) { stop(); - + return false; } @@ -153,7 +153,7 @@ public class PartitionSenderRootExec implements RootExec { } - + private void generatePartitionFunction() throws SchemaChangeException { LogicalExpression filterExpression = operator.getExpr(); @@ -166,7 +166,7 @@ public class PartitionSenderRootExec implements RootExec { } cg.addExpr(new ReturnValueExpression(expr)); - + try { Partitioner p = context.getImplementationClass(cg); p.setup(context, incoming, outgoing); @@ -214,7 +214,7 @@ public class PartitionSenderRootExec implements RootExec { "outgoingVectors"); // create 2d array and build initialization list. For example: - // outgoingVectors = new ValueVector[][] { + // outgoingVectors = new ValueVector[][] { // new ValueVector[] {vv1, vv2}, // new ValueVector[] {vv3, vv4} // }); @@ -229,8 +229,8 @@ public class PartitionSenderRootExec implements RootExec { // declare outgoing value vector and assign it to the array JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId + "]", new TypedFieldId(vv.getField().getType(), - fieldId, - false)); + false, + fieldId)); // add vv to initialization list (e.g. { vv1, vv2, vv3 } ) outgoingVectorInitBatch.add(outVV); ++fieldId; @@ -248,8 +248,8 @@ public class PartitionSenderRootExec implements RootExec { for (VectorWrapper<?> vvIn : incoming) { // declare incoming value vectors JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(), - fieldId, - vvIn.isHyper())); + vvIn.isHyper(), + fieldId)); // generate the copyFrom() invocation with explicit cast to the appropriate type Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(), @@ -307,7 +307,7 @@ public class PartitionSenderRootExec implements RootExec { } } } - + public void stop() { logger.debug("Partition sender stopping."); ok = false; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 347092a36..b94f40373 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -20,9 +20,7 @@ package org.apache.drill.exec.physical.impl.project; import java.io.IOException; import java.util.HashSet; import java.util.List; -import java.util.Set; -import com.sun.codemodel.JExpr; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.FieldReference; @@ -31,7 +29,6 @@ import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.PathSegment.NameSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -44,7 +41,6 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; @@ -52,12 +48,13 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; +import com.carrotsearch.hppc.IntOpenHashSet; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.sun.codemodel.JExpr; public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); @@ -92,6 +89,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ int incomingRecordCount = incoming.getRecordCount(); for(ValueVector v : this.allocationVectors){ AllocationHelper.allocate(v, incomingRecordCount, 250); +// v.allocateNew(); } int outputRecords = projector.projectRecords(0, incomingRecordCount, 0); if (outputRecords < incomingRecordCount) { @@ -177,14 +175,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - Set<Integer> transferFieldIds = new HashSet(); + IntOpenHashSet transferFieldIds = new IntOpenHashSet(); boolean isAnyWildcard = isAnyWildcard(exprs); if(isAnyWildcard){ for(VectorWrapper<?> wrapper : incoming){ ValueVector vvIn = wrapper.getValueVector(); - String name = vvIn.getField().getDef().getName(vvIn.getField().getDef().getNameCount() - 1).getName(); + + String name = vvIn.getField().getPath().getLastSegment().getNameSegment().getPath(); FieldReference ref = new FieldReference(name); TransferPair tp = wrapper.getValueVector().getTransferPair(ref); transfers.add(tp); @@ -202,17 +201,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE + && !((ValueVectorReadExpression) expr).hasReadPath() && !isAnyWildcard - &&!transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldId()) - && !((ValueVectorReadExpression) expr).isArrayElement()) { + && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0]) + && !((ValueVectorReadExpression) expr).hasReadPath()) { ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; - ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector(); + TypedFieldId id = vectorRead.getFieldId(); + ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); Preconditions.checkNotNull(incoming); TransferPair tp = vvIn.getTransferPair(getRef(namedExpression)); transfers.add(tp); container.add(tp.getTo()); - transferFieldIds.add(vectorRead.getFieldId().getFieldId()); + transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); // logger.debug("Added transfer."); }else{ // need to do evaluation. @@ -221,6 +222,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ TypedFieldId fid = container.add(vector); ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); HoldingContainer hc = cg.addExpr(write); + + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); logger.debug("Added eval."); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java index 60e599384..aa0ecf66b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java @@ -33,12 +33,12 @@ import com.google.common.collect.ImmutableList; public abstract class ProjectorTemplate implements Projector { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectorTemplate.class); - + private ImmutableList<TransferPair> transfers; private SelectionVector2 vector2; private SelectionVector4 vector4; private SelectionVectorMode svMode; - + public ProjectorTemplate() throws SchemaChangeException{ } @@ -47,18 +47,18 @@ public abstract class ProjectorTemplate implements Projector { switch(svMode){ case FOUR_BYTE: throw new UnsupportedOperationException(); - - + + case TWO_BYTE: final int count = recordCount; for(int i = 0; i < count; i++, firstOutputIndex++){ doEval(vector2.getIndex(i), firstOutputIndex); } return recordCount; - - + + case NONE: - + final int countN = recordCount; int i; for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) { @@ -76,8 +76,9 @@ public abstract class ProjectorTemplate implements Projector { t.transfer(); } return recordCount; - - + + + default: throw new UnsupportedOperationException(); } @@ -86,7 +87,7 @@ public abstract class ProjectorTemplate implements Projector { @Override public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{ - this.svMode = incoming.getSchema().getSelectionVectorMode(); + this.svMode = incoming.getSchema().getSelectionVectorMode(); switch(svMode){ case FOUR_BYTE: this.vector4 = incoming.getSelectionVector4(); @@ -103,7 +104,7 @@ public abstract class ProjectorTemplate implements Projector { public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 4018991b3..62af0b2a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -86,7 +86,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect default: throw new UnsupportedOperationException(); } - + container.buildSchema(SelectionVectorMode.NONE); } @@ -156,12 +156,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect public void cleanup(){ super.cleanup(); } - + private class StraightCopier implements Copier{ private List<TransferPair> pairs = Lists.newArrayList(); private List<ValueVector> out = Lists.newArrayList(); - + @Override public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators){ for(VectorWrapper<?> vv : incoming){ @@ -183,7 +183,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect public List<ValueVector> getOut() { return out; } - + } private Copier getStraightCopier(){ @@ -192,10 +192,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect container.addCollection(copier.getOut()); return copier; } - + private Copier getGenerated2Copier() throws SchemaChangeException{ Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); - + List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : incoming){ ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator()); @@ -218,12 +218,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this); } - + public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{ List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : batch){ - + ValueVector v = TypeHelper.getNewVector(i.getField(), allocator); container.add(v); allocators.add(getAllocator4(v)); @@ -239,20 +239,20 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect throw new SchemaChangeException("Failure while attempting to load generated class", e); } } - + public static void generateCopies(ClassGenerator g, VectorAccessible batch, boolean hyper){ // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all. int fieldId = 0; - + JExpression inIndex = JExpr.direct("inIndex"); JExpression outIndex = JExpr.direct("outIndex"); g.rotateBlock(); for(VectorWrapper<?> vv : batch){ - JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), fieldId, vv.isHyper())); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false)); + JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId)); if(hyper){ - + g.getEvalBlock()._if( outVV .invoke("copyFromSafe") @@ -268,20 +268,20 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect }else{ g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE); } - - + + fieldId++; } g.rotateBlock(); g.getEvalBlock()._return(JExpr.TRUE); } - + @Override public WritableBatch getWritableBatch() { return WritableBatch.get(this); } - + public static VectorAllocator getAllocator4(ValueVector outgoing){ if(outgoing instanceof FixedWidthVector){ return new FixedVectorAllocator((FixedWidthVector) outgoing); @@ -291,6 +291,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect throw new UnsupportedOperationException(); } } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 8d3a3e5b3..f96a1bda3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -100,9 +100,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch { } @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { validateReadState(); - return incoming.getValueAccessorById(fieldId, clazz); + return incoming.getValueAccessorById(clazz, ids); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java index d87a9f58c..a54685290 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java @@ -242,8 +242,8 @@ public class BatchGroup implements VectorAccessible { } @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { - return currentContainer.getValueAccessorById(fieldId, clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return currentContainer.getValueAccessorById(clazz, ids); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 930f851bc..4b6c37dd9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -373,12 +373,16 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2]; int i = 0; for (BatchGroup group : batchGroupList) { - vectors[i++] = group.getValueAccessorById(group.getValueVectorId(field.getAsSchemaPath()).getFieldId(), - field.getValueClass()).getValueVector(); + vectors[i++] = group.getValueAccessorById( + field.getValueClass(), + group.getValueVectorId(field.getPath()).getFieldIds() + ).getValueVector(); if (group.hasSecond()) { VectorContainer c = group.getSecondContainer(); - vectors[i++] = c.getValueAccessorById(c.getValueVectorId(field.getAsSchemaPath()).getFieldId(), - field.getValueClass()).getValueVector(); + vectors[i++] = c.getValueAccessorById( + field.getValueClass(), + c.getValueVectorId(field.getPath()).getFieldIds() + ).getValueVector(); } else { vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector should never be used. Just want to avoid having null elements in the hyper vector i++; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 214f81c95..844d6dbbd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -31,7 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{ final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - + protected final VectorContainer container = new VectorContainer(); protected final T popConfig; protected final FragmentContext context; @@ -43,7 +43,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements this.popConfig = popConfig; this.oContext = new OperatorContext(popConfig, context); } - + @Override public Iterator<VectorWrapper<?>> iterator() { return container.iterator(); @@ -67,14 +67,14 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements public void kill() { killIncoming(); } - + protected abstract void killIncoming(); - + public void cleanup(){ container.clear(); oContext.close(); } - + @Override public SelectionVector2 getSelectionVector2() { throw new UnsupportedOperationException(); @@ -91,16 +91,16 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { - return container.getValueAccessorById(fieldId, clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return container.getValueAccessorById(clazz, ids); } - + @Override public WritableBatch getWritableBatch() { // logger.debug("Getting writable batch."); WritableBatch batch = WritableBatch.get(this); return batch; - + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index a6a4621c0..b44a23325 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -20,21 +20,25 @@ package org.apache.drill.exec.record; import java.lang.reflect.Array; import com.google.common.base.Preconditions; + import org.apache.commons.lang3.ArrayUtils; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; +import org.apache.drill.exec.vector.complex.MapVector; public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HyperVectorWrapper.class); - + private T[] vectors; private MaterializedField f; private final boolean releasable; - + public HyperVectorWrapper(MaterializedField f, T[] v){ this(f, v, true); } - + public HyperVectorWrapper(MaterializedField f, T[] v, boolean releasable){ assert(v.length > 0); this.f = f; @@ -72,9 +76,51 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< public void clear() { if(!releasable) return; for(T x : vectors){ - x.clear(); + x.clear(); + } + } + + @Override + public VectorWrapper<?> getChildWrapper(int[] ids) { + if(ids.length == 1) return this; + + ValueVector[] vectors = new ValueVector[this.vectors.length]; + int index = 0; + + for(ValueVector v : this.vectors){ + ValueVector vector = v; + for(int i = 1; i < ids.length; i++){ + MapVector map = (MapVector) vector; + vector = map.getVectorById(ids[i]); + } + vectors[index] = vector; + index++; + } + return new HyperVectorWrapper<ValueVector>(vectors[0].getField(), vectors); + } + + @Override + public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { + ValueVector v = vectors[0]; + if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null; + + if(v instanceof AbstractContainerVector){ + // we're looking for a multi path. + AbstractContainerVector c = (AbstractContainerVector) v; + TypedFieldId.Builder builder = TypedFieldId.newBuilder(); + builder.intermediateType(v.getField().getType()); + builder.hyper(); + builder.addId(id); + return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); + + }else{ + return TypedFieldId.newBuilder() // + .intermediateType(v.getField().getType()) // + .finalType(v.getField().getType()) // + .addId(id) // + .hyper() // + .build(); } - } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java index d93e258f4..439552f33 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java @@ -17,39 +17,58 @@ */ package org.apache.drill.exec.record; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.PathSegment; -import org.apache.drill.common.expression.PathSegment.NameSegment; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef; -import org.apache.drill.exec.proto.SchemaDefProtos.NamePart; -import org.apache.drill.exec.proto.SchemaDefProtos.NamePart.Type; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; -import com.beust.jcommander.internal.Lists; +import com.google.hive12.common.collect.Lists; public class MaterializedField{ - private final FieldDef def; + private SchemaPath path; + private MajorType type; + private List<MaterializedField> children = Lists.newArrayList(); - public MaterializedField(FieldDef def) { - this.def = def; + private MaterializedField(SchemaPath path, MajorType type) { + super(); + this.path = path; + this.type = type; } - public static MaterializedField create(FieldDef def){ - return new MaterializedField(def); + public static MaterializedField create(SerializedField serField){ + return new MaterializedField(SchemaPath.create(serField.getNamePart()), serField.getMajorType()); + } + + public SerializedField.Builder getAsBuilder(){ + return SerializedField.newBuilder() // + .setMajorType(type) // + .setNamePart(path.getAsNamePart()); + } + + public void addChild(MaterializedField field){ + children.add(field); } public MaterializedField clone(FieldReference ref){ - return create(ref, def.getMajorType()); + return create(ref, type); + } + + public String getLastName(){ + PathSegment seg = path.getRootSegment(); + while(seg.getChild() != null) seg = seg.getChild(); + return seg.getNameSegment().getPath(); + } + + + // TODO: rewrite without as direct match rather than conversion then match. + public boolean matches(SerializedField field){ + MaterializedField f = create(field); + return f.equals(this); } public static MaterializedField create(String path, MajorType type){ @@ -58,43 +77,20 @@ public class MaterializedField{ } public static MaterializedField create(SchemaPath path, MajorType type) { - FieldDef.Builder b = FieldDef.newBuilder(); - b.setMajorType(type); - addSchemaPathToFieldDef(path, b); - return create(b.build()); - } - - private static void addSchemaPathToFieldDef(SchemaPath path, FieldDef.Builder builder) { - for (PathSegment p = path.getRootSegment();; p = p.getChild()) { - NamePart.Builder b = NamePart.newBuilder(); - if (p.isArray()) { - b.setType(Type.ARRAY); - } else { - b.setName(p.getNameSegment().getPath().toString()); - b.setType(Type.NAME); - } - builder.addName(b.build()); - if(p.isLastPath()) break; - } + return new MaterializedField(path, type); } - public FieldDef getDef() { - return def; + public SchemaPath getPath(){ + return path; } + /** + * Get the schema path. Deprecated, use getPath() instead. + * @return the SchemaPath of this field. + */ + @Deprecated public SchemaPath getAsSchemaPath(){ - List<NamePart> nameList = Lists.newArrayList(def.getNameList()); - Collections.reverse(nameList); - PathSegment seg = null; - for(NamePart p : nameList){ - if(p.getType() == NamePart.Type.ARRAY){ - throw new UnsupportedOperationException(); - }else{ - seg = new NameSegment(p.getName(), seg); - } - } - if( !(seg instanceof NameSegment) ) throw new UnsupportedOperationException(); - return new SchemaPath( (NameSegment) seg); + return path; } // public String getName(){ @@ -119,29 +115,29 @@ public class MaterializedField{ // } public int getWidth() { - return def.getMajorType().getWidth(); + return type.getWidth(); } public MajorType getType() { - return def.getMajorType(); + return type; } public int getScale() { - return def.getMajorType().getScale(); + return type.getScale(); } public int getPrecision() { - return def.getMajorType().getPrecision(); + return type.getPrecision(); } public boolean isNullable() { - return def.getMajorType().getMode() == DataMode.OPTIONAL; + return type.getMode() == DataMode.OPTIONAL; } public DataMode getDataMode() { - return def.getMajorType().getMode(); + return type.getMode(); } public MaterializedField getOtherNullableVersion(){ - MajorType mt = def.getMajorType(); + MajorType mt = type; DataMode newDataMode = null; switch(mt.getMode()){ case OPTIONAL: @@ -153,7 +149,7 @@ public class MaterializedField{ default: throw new UnsupportedOperationException(); } - return new MaterializedField(def.toBuilder().setMajorType(mt.toBuilder().setMode(newDataMode).build()).build()); + return new MaterializedField(path, mt.toBuilder().setMode(newDataMode).build()); } public Class<?> getValueClass() { @@ -161,33 +157,19 @@ public class MaterializedField{ } public boolean matches(SchemaPath path) { - Iterator<NamePart> iter = def.getNameList().iterator(); + if(!path.isSimplePath()) return false; - for (PathSegment p = path.getRootSegment();; p = p.getChild()) { - if(p == null) break; - if (!iter.hasNext()) return false; - NamePart n = iter.next(); - - if (p.isArray()) { - if (n.getType() == Type.ARRAY) continue; - return false; - } else { - if (p.getNameSegment().getPath().equalsIgnoreCase(n.getName())) continue; - return false; - } - - } - // we've reviewed all path segments. confirm that we don't have any extra name parts. - return !iter.hasNext(); + return this.path.equals(path); } - @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((def == null) ? 0 : def.hashCode()); + result = prime * result + ((children == null) ? 0 : children.hashCode()); + result = prime * result + ((path == null) ? 0 : path.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); return result; } @@ -200,20 +182,30 @@ public class MaterializedField{ if (getClass() != obj.getClass()) return false; MaterializedField other = (MaterializedField) obj; - if (def == null) { - if (other.def != null) + if (children == null) { + if (other.children != null) + return false; + } else if (!children.equals(other.children)) + return false; + if (path == null) { + if (other.path != null) + return false; + } else if (!path.equals(other.path)) + return false; + if (type == null) { + if (other.type != null) return false; - } else if (!def.equals(other.def)) + } else if (!type.equals(other.type)) return false; return true; } @Override public String toString() { - return "MaterializedField [" + def.toString() + "]"; + return "MaterializedField [path=" + path + ", type=" + type + "]"; } public String toExpr(){ - return this.getAsSchemaPath().toExpr(); + return path.toExpr(); } }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 31283c61b..60fdd4d5f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -27,7 +27,7 @@ import org.apache.drill.exec.vector.ValueVector; * A record batch contains a set of field values for a particular range of records. In the case of a record batch * composed of ValueVectors, ideally a batch fits within L2 cache (~256k per core). The set of value vectors do not * change unless the next() IterOutcome is a *_NEW_SCHEMA type. - * + * * A key thing to know is that the Iterator provided by record batch must align with the rank positions of the field ids * provided utilizing getValueVectorId(); */ @@ -56,21 +56,21 @@ public interface RecordBatch extends VectorAccessible { /** * Access the FragmentContext of the current query fragment. Useful for reporting failure information or other query * level information. - * + * * @return */ public FragmentContext getContext(); /** * Provide the schema of the current RecordBatch. This changes if and only if a *_NEW_SCHEMA IterOutcome is provided. - * + * * @return */ public BatchSchema getSchema(); /** * Provide the number of records that are within this record count - * + * * @return */ public int getRecordCount(); @@ -89,7 +89,7 @@ public interface RecordBatch extends VectorAccessible { * Get the value vector type and id for the given schema path. The TypedFieldId should store a fieldId which is the * same as the ordinal position of the field within the Iterator provided this classes implementation of * Iterable<ValueVector>. - * + * * @param path * The path where the vector should be located. * @return The local field id associated with this vector. If no field matches this path, this will return a null @@ -97,24 +97,24 @@ public interface RecordBatch extends VectorAccessible { */ public abstract TypedFieldId getValueVectorId(SchemaPath path); @Override - public abstract VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz); + public abstract VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids); /** * Update the data in each Field reading interface for the next range of records. Once a RecordBatch returns an * IterOutcome.NONE, the consumer should no longer next(). Behavior at this point is undetermined and likely to throw * an exception. - * + * * @return An IterOutcome describing the result of the iteration. */ public IterOutcome next(); /** * Get a writable version of this batch. Takes over owernship of existing buffers. - * + * * @return */ public WritableBatch getWritableBatch(); - + public void cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index ed450af06..10d959fae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -23,13 +23,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import javax.jdo.metadata.FieldMetadata; + import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef; -import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Maps; @@ -63,25 +64,24 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp boolean schemaChanged = schema == null; // logger.info("Load, ThreadID: {}", Thread.currentThread().getId(), new RuntimeException("For Stack Trace Only")); // System.out.println("Load, ThreadId: " + Thread.currentThread().getId()); - Map<FieldDef, ValueVector> oldFields = Maps.newHashMap(); + Map<MaterializedField, ValueVector> oldFields = Maps.newHashMap(); for(VectorWrapper<?> w : container){ ValueVector v = w.getValueVector(); - oldFields.put(v.getField().getDef(), v); + oldFields.put(v.getField(), v); } VectorContainer newVectors = new VectorContainer(); - List<FieldMetadata> fields = def.getFieldList(); + List<SerializedField> fields = def.getFieldList(); int bufOffset = 0; - for (FieldMetadata fmd : fields) { - FieldDef fieldDef = fmd.getDef(); + for (SerializedField fmd : fields) { + MaterializedField fieldDef = MaterializedField.create(fmd); ValueVector v = oldFields.remove(fieldDef); if(v == null) { // if we arrive here, we didn't have a matching vector. schemaChanged = true; - MaterializedField m = new MaterializedField(fieldDef); - v = TypeHelper.getNewVector(m, allocator); + v = TypeHelper.getNewVector(fieldDef, allocator); } if (fmd.getValueCount() == 0){ v.clear(); @@ -136,8 +136,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp return valueCount; } - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz){ - return container.getValueAccessorById(fieldId, clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids){ + return container.getValueAccessorById(clazz, ids); } public WritableBatch getWritableBatch(){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index b7a82481d..692fe62f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -17,11 +17,15 @@ */ package org.apache.drill.exec.record; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; +import org.apache.drill.exec.vector.complex.MapVector; public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper<T>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class); - + private T v; public SimpleVectorWrapper(T v){ @@ -53,8 +57,7 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper public boolean isHyper() { return false; } - - + @SuppressWarnings("unchecked") @Override public VectorWrapper<T> cloneAndTransfer() { @@ -71,4 +74,56 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper public static <T extends ValueVector> SimpleVectorWrapper<T> create(T v){ return new SimpleVectorWrapper<T>(v); } + + + @Override + public VectorWrapper<?> getChildWrapper(int[] ids) { + if(ids.length == 1) return this; + + ValueVector vector = v; + + for(int i = 1; i < ids.length; i++){ + MapVector map = (MapVector) vector; + vector = map.getVectorById(ids[i]); + } + + return new SimpleVectorWrapper<ValueVector>(vector); + } + + @Override + public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { + if(!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) return null; + PathSegment seg = expectedPath.getRootSegment(); + + if(v instanceof AbstractContainerVector){ + // we're looking for a multi path. + AbstractContainerVector c = (AbstractContainerVector) v; + TypedFieldId.Builder builder = TypedFieldId.newBuilder(); + builder.intermediateType(v.getField().getType()); + builder.addId(id); + return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); + + }else{ + TypedFieldId.Builder builder = TypedFieldId.newBuilder(); + builder.intermediateType(v.getField().getType()); + builder.addId(id); + builder.finalType(v.getField().getType()); + if(seg.isLastPath()){ + return builder.build(); + }else{ + PathSegment child = seg.getChild(); + if(child.isArray() && child.isLastPath()){ + builder.remainder(child); + builder.withIndex(); + return builder.build(); + }else{ + return null; + } + + } + + } + } + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java index ba2c7b2b2..9645be9fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java @@ -23,5 +23,5 @@ public interface TransferPair { public void transfer(); public void splitAndTransfer(int startIndex, int length); public ValueVector getTo(); - public void copyValue(int from, int to); + public boolean copyValueSafe(int from, int to); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java index 0fbd0ae56..24a8251ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java @@ -17,34 +17,174 @@ */ package org.apache.drill.exec.record; +import java.util.Arrays; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.vector.ValueVector; + +import com.carrotsearch.hppc.IntArrayList; +import com.google.common.base.Preconditions; public class TypedFieldId { - final MajorType type; - final int fieldId; + final MajorType finalType; + final MajorType secondaryFinal; + final MajorType intermediateType; + final int[] fieldIds; final boolean isHyperReader; + final PathSegment remainder; + + public TypedFieldId(MajorType type, int... fieldIds){ + this(type, type, type, false, null, fieldIds); + } - public TypedFieldId(MajorType type, int fieldId){ - this(type, fieldId, false); + public TypedFieldId(MajorType type, IntArrayList breadCrumb, PathSegment remainder){ + this(type, type, type, false, remainder, breadCrumb.toArray()); } - - public TypedFieldId(MajorType type, int fieldId, boolean isHyper) { + + public TypedFieldId(MajorType type, boolean isHyper, int... fieldIds){ + this(type, type, type, isHyper, null, fieldIds); + } + + public TypedFieldId(MajorType intermediateType, MajorType secondaryFinal, MajorType finalType, boolean isHyper, PathSegment remainder, int... fieldIds) { super(); - this.type = type; - this.fieldId = fieldId; + this.intermediateType = intermediateType; + this.finalType = finalType; + this.secondaryFinal = secondaryFinal; + this.fieldIds = fieldIds; this.isHyperReader = isHyper; + this.remainder = remainder; + } + + + + public TypedFieldId cloneWithChild(int id){ + int[] fieldIds = ArrayUtils.add(this.fieldIds, id); + return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds); + } + + public PathSegment getLastSegment(){ + if(remainder == null) return null; + PathSegment seg = remainder; + while(seg.getChild() != null){ + seg = seg.getChild(); + } + return seg; + } + + public TypedFieldId cloneWithRemainder(PathSegment remainder){ + return new TypedFieldId(intermediateType, secondaryFinal, finalType, isHyperReader, remainder, fieldIds); + } + + public boolean hasRemainder(){ + return remainder != null; + } + + public PathSegment getRemainder(){ + return remainder; } public boolean isHyperReader(){ return isHyperReader; } - - public MajorType getType() { - return type; + + public MajorType getIntermediateType() { + return intermediateType; + } + + public Class<? extends ValueVector> getIntermediateClass(){ + return (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(intermediateType.getMinorType(), intermediateType.getMode()); + } + + public MajorType getFinalType(){ + return finalType; } - public int getFieldId() { - return fieldId; + public int[] getFieldIds() { + return fieldIds; + } + + + + public MajorType getSecondaryFinal() { + return secondaryFinal; + } + + public static Builder newBuilder(){ + return new Builder(); + } + + public static class Builder{ + final IntArrayList ids = new IntArrayList(); + MajorType finalType; + MajorType intermediateType; + PathSegment remainder; + boolean hyperReader = false; + boolean withIndex = false; + + public Builder addId(int id){ + ids.add(id); + return this; + } + + public Builder withIndex(){ + withIndex = true; + return this; + } + + public Builder remainder(PathSegment remainder){ + this.remainder = remainder; + return this; + } + + public Builder hyper(){ + this.hyperReader = true; + return this; + } + + public Builder finalType(MajorType finalType){ + this.finalType = finalType; + return this; + } + + public Builder intermediateType(MajorType intermediateType){ + this.intermediateType = intermediateType; + return this; + } + + public TypedFieldId build(){ + Preconditions.checkNotNull(intermediateType); + Preconditions.checkNotNull(finalType); + + if(intermediateType == null) intermediateType = finalType; + MajorType actualFinalType = finalType; + MajorType secondaryFinal = finalType; + + // if this has an index, switch to required type for output + if(withIndex && intermediateType == finalType) actualFinalType = finalType.toBuilder().setMode(DataMode.REQUIRED).build(); + + // if this isn't a direct access, switch the final type to nullable as offsets may be null. + // TODO: there is a bug here with some things. + if(intermediateType != finalType) actualFinalType = finalType.toBuilder().setMode(DataMode.OPTIONAL).build(); + + return new TypedFieldId(intermediateType, secondaryFinal, actualFinalType, hyperReader, remainder, ids.toArray()); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + Arrays.hashCode(fieldIds); + result = prime * result + ((finalType == null) ? 0 : finalType.hashCode()); + result = prime * result + ((intermediateType == null) ? 0 : intermediateType.hashCode()); + result = prime * result + (isHyperReader ? 1231 : 1237); + result = prime * result + ((remainder == null) ? 0 : remainder.hashCode()); + result = prime * result + ((secondaryFinal == null) ? 0 : secondaryFinal.hashCode()); + return result; } @Override @@ -56,20 +196,32 @@ public class TypedFieldId { if (getClass() != obj.getClass()) return false; TypedFieldId other = (TypedFieldId) obj; - if (fieldId != other.fieldId) + if (!Arrays.equals(fieldIds, other.fieldIds)) return false; - if (type == null) { - if (other.type != null) + if (finalType == null) { + if (other.finalType != null) return false; - } else if (!type.equals(other.type)) + } else if (!finalType.equals(other.finalType)) + return false; + if (intermediateType == null) { + if (other.intermediateType != null) + return false; + } else if (!intermediateType.equals(other.intermediateType)) + return false; + if (isHyperReader != other.isHyperReader) + return false; + if (remainder == null) { + if (other.remainder != null) + return false; + } else if (!remainder.equals(other.remainder)) + return false; + if (secondaryFinal == null) { + if (other.secondaryFinal != null) + return false; + } else if (!secondaryFinal.equals(other.secondaryFinal)) return false; return true; } - @Override - public String toString() { - return "TypedFieldId [type=" + type + ", fieldId=" + fieldId + ", isSuperReader=" + isHyperReader + "]"; - } - }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java index a8100b275..474a0a6ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorAccessible.java @@ -28,7 +28,7 @@ import org.apache.drill.exec.vector.ValueVector; * To change this template use File | Settings | File Templates. */ public interface VectorAccessible extends Iterable<VectorWrapper<?>> { - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz); + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds); public TypedFieldId getValueVectorId(SchemaPath path); public BatchSchema getSchema(); public int getRecordCount(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 25289a88c..1c7714e2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -22,13 +22,15 @@ import java.util.Iterator; import java.util.List; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractMapVector; import com.beust.jcommander.internal.Lists; import com.google.common.base.Preconditions; -public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccessible { +public class VectorContainer extends AbstractMapVector implements Iterable<VectorWrapper<?>>, VectorAccessible { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainer.class); protected final List<VectorWrapper<?>> wrappers = Lists.newArrayList(); @@ -61,6 +63,10 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess add(vv, releasable); } + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz){ + return null; + } + /** * Get a set of transferred clones of this container. Note that this guarantees that the vectors in the cloned * container have the same TypedFieldIds as the existing container, allowing interchangeability in generated code. In @@ -94,7 +100,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess schema = null; int i = wrappers.size(); wrappers.add(SimpleVectorWrapper.create(vv)); - return new TypedFieldId(vv.getField().getType(), i, false); + return new TypedFieldId(vv.getField().getType(), i); } public void add(ValueVector[] hyperVector) { @@ -129,29 +135,33 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess public TypedFieldId getValueVectorId(SchemaPath path) { for (int i = 0; i < wrappers.size(); i++) { VectorWrapper<?> va = wrappers.get(i); - SchemaPath w = va.getField().getAsSchemaPath(); - if (w.equals(path)){ - return new TypedFieldId(va.getField().getType(), i, va.isHyper()); + TypedFieldId id = va.getFieldIdIfMatches(i, path); + if(id != null){ + return id; } } - if(path.getRootSegment().isNamed() && path.getRootSegment().getNameSegment().getPath().equals("_MAP") && path.getRootSegment().isLastPath()) throw new UnsupportedOperationException("Drill does not yet support map references."); return null; } + + @Override - public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { - VectorWrapper<?> va = wrappers.get(fieldId); - if(va!= null && clazz == null){ - return (VectorWrapper<?>) va; - } - if (va != null && va.getVectorClass() != clazz) { + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) { + Preconditions.checkArgument(fieldIds.length >= 1); + VectorWrapper<?> va = wrappers.get(fieldIds[0]); + + if(va == null) return null; + + if (fieldIds.length == 1 && clazz != null && !clazz.isAssignableFrom(va.getVectorClass())) { throw new IllegalStateException(String.format( "Failure while reading vector. Expected vector class of %s but was holding vector class %s.", clazz.getCanonicalName(), va.getVectorClass().getCanonicalName())); } - return (VectorWrapper<?>) va; + + return (VectorWrapper<?>) va.getChildWrapper(fieldIds); + } public BatchSchema getSchema() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java index 401b50e36..dc8ffe582 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java @@ -17,11 +17,14 @@ */ package org.apache.drill.exec.record; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.vector.ValueVector; public interface VectorWrapper<T extends ValueVector> { + + public Class<T> getVectorClass(); public MaterializedField getField(); public T getValueVector(); @@ -29,4 +32,11 @@ public interface VectorWrapper<T extends ValueVector> { public boolean isHyper(); public void clear(); public VectorWrapper<T> cloneAndTransfer(); + public VectorWrapper<?> getChildWrapper(int[] ids); + + /** + * Traverse the object graph and determine whether the provided SchemaPath matches data within the Wrapper. If so, return a TypedFieldId associated with this path. + * @return TypedFieldId + */ + public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index 396834c49..4ff370887 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -18,12 +18,14 @@ package org.apache.drill.exec.record; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import java.util.List; -import io.netty.buffer.CompositeByteBuf; -import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; +import javax.jdo.metadata.FieldMetadata; + import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.vector.ValueVector; @@ -63,7 +65,7 @@ public class WritableBatch { Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared"); if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */ - + CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length); /* Copy data from each buffer into the compound buffer */ @@ -71,8 +73,7 @@ public class WritableBatch { cbb.addComponent(buf); } - - List<FieldMetadata> fields = def.getFieldList(); + List<SerializedField> fields = def.getFieldList(); int bufferOffset = 0; @@ -82,7 +83,7 @@ public class WritableBatch { int vectorIndex = 0; for (VectorWrapper<?> vv : container) { - FieldMetadata fmd = fields.get(vectorIndex); + SerializedField fmd = fields.get(vectorIndex); ValueVector v = vv.getValueVector(); ByteBuf bb = cbb.slice(bufferOffset, fmd.getBufferLength()); // v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength())); @@ -127,7 +128,7 @@ public class WritableBatch { public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) { List<ByteBuf> buffers = Lists.newArrayList(); - List<FieldMetadata> metadata = Lists.newArrayList(); + List<SerializedField> metadata = Lists.newArrayList(); for (ValueVector vv : vectors) { metadata.add(vv.getMetadata()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 872052cd0..04a976819 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -44,15 +44,15 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig) { this(name, context, fs, storageConfig, new JSONFormatConfig()); } - + public JSONFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig config, JSONFormatConfig formatPluginConfig) { super(name, context, fs, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("json"), "json"); } - + @Override - public RecordReader getRecordReader(FragmentContext context, FileWork fileWork, + public RecordReader getRecordReader(FragmentContext context, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException { - return new JSONRecordReader(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns); + return new JSONRecordReader2(context, fileWork.getPath(), this.getFileSystem().getUnderlying(), columns); } @Override @@ -78,6 +78,6 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { return true; return false; } - + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java deleted file mode 100644 index 1c8539cc0..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ /dev/null @@ -1,532 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.easy.json; - -import static com.fasterxml.jackson.core.JsonToken.END_ARRAY; -import static com.fasterxml.jackson.core.JsonToken.END_OBJECT; -import static com.fasterxml.jackson.core.JsonToken.FIELD_NAME; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Map; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.expr.holders.NullableBitHolder; -import org.apache.drill.exec.expr.holders.NullableFloat4Holder; -import org.apache.drill.exec.expr.holders.NullableIntHolder; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.OutOfMemoryException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.schema.DiffSchema; -import org.apache.drill.exec.schema.Field; -import org.apache.drill.exec.schema.NamedField; -import org.apache.drill.exec.schema.ObjectSchema; -import org.apache.drill.exec.schema.RecordSchema; -import org.apache.drill.exec.schema.SchemaIdGenerator; -import org.apache.drill.exec.schema.json.jackson.JacksonHelper; -import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.store.VectorHolder; -import org.apache.drill.exec.vector.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonToken; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -public class JSONRecordReader implements RecordReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class); - private static final int DEFAULT_LENGTH = 4000; - public static final Charset UTF_8 = Charset.forName("UTF-8"); - - private final Map<String, VectorHolder> valueVectorMap; - private final FileSystem fileSystem; - private final Path hadoopPath; - - private JsonParser parser; - private SchemaIdGenerator generator; - private DiffSchema diffSchema; - private RecordSchema currentSchema; - private List<Field> removedFields; - private OutputMutator outputMutator; - private int batchSize; - private final List<SchemaPath> columns; - - public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize, - List<SchemaPath> columns) throws OutOfMemoryException { - this.hadoopPath = new Path(inputPath); - this.fileSystem = fileSystem; - this.batchSize = batchSize; - valueVectorMap = Maps.newHashMap(); - this.columns = columns; - } - - public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, - List<SchemaPath> columns) throws OutOfMemoryException { - this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, columns); - } - - private JsonParser getParser() { - return parser; - } - - @Override - public void setup(OutputMutator output) throws ExecutionSetupException { - outputMutator = output; - output.removeAllFields(); - currentSchema = new ObjectSchema(); - diffSchema = new DiffSchema(); - removedFields = Lists.newArrayList(); - - try { - JsonFactory factory = new JsonFactory(); - parser = factory.createJsonParser(fileSystem.open(hadoopPath)); - parser.nextToken(); // Read to the first START_OBJECT token - generator = new SchemaIdGenerator(); - } catch (IOException e) { - throw new ExecutionSetupException(e); - } - } - - @Override - public int next() { - if (parser.isClosed() || !parser.hasCurrentToken()) { - return 0; - } - - resetBatch(); - - int nextRowIndex = 0; - - try { - while (ReadType.OBJECT.readRecord(this, null, nextRowIndex++, 0)) { - parser.nextToken(); // Read to START_OBJECT token - - if (!parser.hasCurrentToken()) { - parser.close(); - break; - } - } - - parser.nextToken(); - - if (!parser.hasCurrentToken()) { - parser.close(); - } - - // Garbage collect fields never referenced in this batch - for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) { - diffSchema.addRemovedField(field); - outputMutator.removeField(field.getAsMaterializedField()); - } - - if (diffSchema.isChanged()) { - outputMutator.setNewSchema(); - } - - - } catch (IOException | SchemaChangeException e) { - logger.error("Error reading next in Json reader", e); - throw new DrillRuntimeException(e); - } - - for (VectorHolder holder : valueVectorMap.values()) { - if (holder.isRepeated()) { - holder.setGroupCount(nextRowIndex); - } - holder.getValueVector().getMutator().setValueCount(nextRowIndex); - } - - return nextRowIndex; - } - - private void resetBatch() { - for (VectorHolder value : valueVectorMap.values()) { - value.reset(); - } - - currentSchema.resetMarkedFields(); - diffSchema.reset(); - removedFields.clear(); - } - - @Override - public void cleanup() { - try { - parser.close(); - } catch (IOException e) { - logger.warn("Error closing Json parser", e); - } - } - - - private RecordSchema getCurrentSchema() { - return currentSchema; - } - - private void setCurrentSchema(RecordSchema schema) { - currentSchema = schema; - } - - private List<Field> getRemovedFields() { - return removedFields; - } - - private boolean fieldSelected(String field){ - - SchemaPath sp = SchemaPath.getCompoundPath(field.split("\\.")); - if (this.columns != null && this.columns.size() > 0){ - for (SchemaPath expr : this.columns){ - if ( sp.equals(expr)){ - return true; - } - } - return false; - } - return true; - } - - public static enum ReadType { - ARRAY(END_ARRAY) { - @Override - public Field createField(RecordSchema parentSchema, String prefixFieldName, String fieldName, MajorType fieldType, int index) { - return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType); - } - - @Override - public RecordSchema createSchema() throws IOException { - return new ObjectSchema(); - } - }, - OBJECT(END_OBJECT) { - @Override - public Field createField(RecordSchema parentSchema, - String prefixFieldName, - String fieldName, - MajorType fieldType, - int index) { - return new NamedField(parentSchema, prefixFieldName, fieldName, fieldType); - } - - @Override - public RecordSchema createSchema() throws IOException { - return new ObjectSchema(); - } - }; - - private final JsonToken endObject; - - ReadType(JsonToken endObject) { - this.endObject = endObject; - } - - public JsonToken getEndObject() { - return endObject; - } - - @SuppressWarnings("ConstantConditions") - public boolean readRecord(JSONRecordReader reader, - String prefixFieldName, - int rowIndex, - int groupCount) throws IOException, SchemaChangeException { - JsonParser parser = reader.getParser(); - JsonToken token = parser.nextToken(); - JsonToken endObject = getEndObject(); - int colIndex = 0; - boolean isFull = false; - while (token != endObject) { - if (token == FIELD_NAME) { - token = parser.nextToken(); - continue; - } - - String fieldName = parser.getCurrentName(); - if ( fieldName != null && ! reader.fieldSelected(fieldName)){ - // this field was not requested in the query - token = parser.nextToken(); - colIndex += 1; - continue; - } - MajorType fieldType = JacksonHelper.getFieldType(token, this == ReadType.ARRAY); - ReadType readType = null; - switch (token) { - case START_ARRAY: - readType = ReadType.ARRAY; - groupCount++; - break; - case START_OBJECT: - readType = ReadType.OBJECT; - groupCount = 0; - break; - } - - if (fieldType != null) { // Including nulls - boolean currentFieldFull = !recordData( - readType, - reader, - fieldType, - prefixFieldName, - fieldName, - rowIndex, - colIndex, - groupCount); - if(readType == ReadType.ARRAY) { - groupCount--; - } - isFull = isFull || currentFieldFull; - } - token = parser.nextToken(); - colIndex += 1; - } - return !isFull; - } - - private void removeChildFields(List<Field> removedFields, Field field) { - RecordSchema schema = field.getAssignedSchema(); - if (schema == null) { - return; - } - for (Field childField : schema.getFields()) { - removedFields.add(childField); - if (childField.hasSchema()) { - removeChildFields(removedFields, childField); - } - } - } - - private boolean recordData(JSONRecordReader.ReadType readType, - JSONRecordReader reader, - MajorType fieldType, - String prefixFieldName, - String fieldName, - int rowIndex, - int colIndex, - int groupCount) throws IOException, SchemaChangeException { - RecordSchema currentSchema = reader.getCurrentSchema(); - Field field = currentSchema.getField(fieldName == null ? prefixFieldName : fieldName, colIndex); - boolean isFieldFound = field != null; - List<Field> removedFields = reader.getRemovedFields(); - boolean newFieldLateBound = fieldType.getMinorType().equals(MinorType.LATE); - - if (isFieldFound && !field.getFieldType().equals(fieldType)) { - boolean existingFieldLateBound = field.getFieldType().getMinorType().equals(MinorType.LATE); - - if (newFieldLateBound && !existingFieldLateBound) { - fieldType = Types.overrideMinorType(fieldType, field.getFieldType().getMinorType()); - } else if (!newFieldLateBound && existingFieldLateBound) { - field.setFieldType(Types.overrideMinorType(field.getFieldType(), fieldType.getMinorType())); - } else if (!newFieldLateBound && !existingFieldLateBound) { - if (field.hasSchema()) { - removeChildFields(removedFields, field); - } - removedFields.add(field); - currentSchema.removeField(field, colIndex); - - isFieldFound = false; - } - } - - if (!isFieldFound) { - field = createField( - currentSchema, - prefixFieldName, - fieldName, - fieldType, - colIndex - ); - - reader.recordNewField(field); - currentSchema.addField(field); - } - - field.setRead(true); - - VectorHolder holder = getOrCreateVectorHolder(reader, field); - if (readType != null) { - RecordSchema fieldSchema = field.getAssignedSchema(); - RecordSchema newSchema = readType.createSchema(); - - if (readType != ReadType.ARRAY) { - reader.setCurrentSchema(fieldSchema); - if (fieldSchema == null) reader.setCurrentSchema(newSchema); - readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount); - } else { - readType.readRecord(reader, field.getFullFieldName(), rowIndex, groupCount); - } - - reader.setCurrentSchema(currentSchema); - - } else if (holder != null && !newFieldLateBound && fieldType.getMinorType() != MinorType.LATE) { - return addValueToVector( - rowIndex, - holder, - JacksonHelper.getValueFromFieldType( - reader.getParser(), - fieldType.getMinorType() - ), - fieldType.getMinorType(), - groupCount - ); - } - - return true; - } - - private static <T> boolean addValueToVector(int index, VectorHolder holder, T val, MinorType minorType, int groupCount) { - switch (minorType) { - case BIGINT: { - holder.incAndCheckLength(NullableIntHolder.WIDTH * 8 + 1); - if (groupCount == 0) { - if (val != null) { - NullableBigIntVector int4 = (NullableBigIntVector) holder.getValueVector(); - NullableBigIntVector.Mutator m = int4.getMutator(); - m.set(index, (Long) val); - } - } else { - if (val == null) { - throw new UnsupportedOperationException("Nullable repeated int is not supported."); - } - - RepeatedBigIntVector repeatedInt4 = (RepeatedBigIntVector) holder.getValueVector(); - RepeatedBigIntVector.Mutator m = repeatedInt4.getMutator(); - holder.setGroupCount(index); - m.add(index, (Long) val); - } - - return holder.hasEnoughSpace(NullableIntHolder.WIDTH * 8 + 1); - } - case FLOAT4: { - holder.incAndCheckLength(NullableFloat4Holder.WIDTH * 8 + 1); - if (groupCount == 0) { - if (val != null) { - NullableFloat4Vector float4 = (NullableFloat4Vector) holder.getValueVector(); - NullableFloat4Vector.Mutator m = float4.getMutator(); - m.set(index, (Float) val); - } - } else { - if (val == null) { - throw new UnsupportedOperationException("Nullable repeated float is not supported."); - } - - RepeatedFloat4Vector repeatedFloat4 = (RepeatedFloat4Vector) holder.getValueVector(); - RepeatedFloat4Vector.Mutator m = repeatedFloat4.getMutator(); - holder.setGroupCount(index); - m.add(index, (Float) val); - } - return holder.hasEnoughSpace(NullableFloat4Holder.WIDTH * 8 + 1); - } - case VARCHAR: { - if (val == null) { - return (index + 1) * 4 <= holder.getLength(); - } else { - byte[] bytes = ((String) val).getBytes(UTF_8); - int length = bytes.length; - holder.incAndCheckLength(length); - if (groupCount == 0) { - NullableVarCharVector varLen4 = (NullableVarCharVector) holder.getValueVector(); - NullableVarCharVector.Mutator m = varLen4.getMutator(); - m.set(index, bytes); - } else { - RepeatedVarCharVector repeatedVarLen4 = (RepeatedVarCharVector) holder.getValueVector(); - RepeatedVarCharVector.Mutator m = repeatedVarLen4.getMutator(); - holder.setGroupCount(index); - m.add(index, bytes); - } - return holder.hasEnoughSpace(length + 4 + 1); - } - } - case BIT: { - holder.incAndCheckLength(NullableBitHolder.WIDTH + 1); - if (groupCount == 0) { - if (val != null) { - NullableBitVector bit = (NullableBitVector) holder.getValueVector(); - NullableBitVector.Mutator m = bit.getMutator(); - m.set(index, (Boolean) val ? 1 : 0); - } - } else { - if (val == null) { - throw new UnsupportedOperationException("Nullable repeated boolean is not supported."); - } - - RepeatedBitVector repeatedBit = (RepeatedBitVector) holder.getValueVector(); - RepeatedBitVector.Mutator m = repeatedBit.getMutator(); - holder.setGroupCount(index); - m.add(index, (Boolean) val ? 1 : 0); - } - return holder.hasEnoughSpace(NullableBitHolder.WIDTH + 1); - } - default: - throw new DrillRuntimeException("Type not supported to add value. Type: " + minorType); - } - } - - private VectorHolder getOrCreateVectorHolder(JSONRecordReader reader, Field field) throws SchemaChangeException { - return reader.getOrCreateVectorHolder(field); - } - - public abstract RecordSchema createSchema() throws IOException; - - public abstract Field createField(RecordSchema parentSchema, - String prefixFieldName, - String fieldName, - MajorType fieldType, - int index); - } - - private void recordNewField(Field field) { - diffSchema.recordNewField(field); - } - - private VectorHolder getOrCreateVectorHolder(Field field) throws SchemaChangeException { - String fullFieldName = field.getFullFieldName(); - VectorHolder holder = valueVectorMap.get(fullFieldName); - - if (holder == null) { - MajorType type = field.getFieldType(); - MinorType minorType = type.getMinorType(); - - if (minorType.equals(MinorType.MAP) || minorType.equals(MinorType.LATE)) { - return null; - } - - MaterializedField f = MaterializedField.create(SchemaPath.getCompoundPath(fullFieldName.split("\\.")), type); - - ValueVector v = outputMutator.addField(f, TypeHelper.getValueVectorClass(minorType, type.getMode())); - AllocationHelper.allocate(v, batchSize, 50); - holder = new VectorHolder(v); - valueVectorMap.put(fullFieldName, holder); - return holder; - } - return holder; - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java new file mode 100644 index 000000000..bb52a20e8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.json; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.vector.complex.fn.JsonReader; +import org.apache.drill.exec.vector.complex.fn.JsonRecordSplitter; +import org.apache.drill.exec.vector.complex.fn.UTF8JsonRecordSplitter; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.hive12.common.collect.Lists; + +public class JSONRecordReader2 implements RecordReader{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader2.class); + + private OutputMutator mutator; + private VectorContainerWriter writer; + private Path hadoopPath; + private FileSystem fileSystem; + private InputStream stream; + private JsonReader jsonReader; + + public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, + List<SchemaPath> columns) throws OutOfMemoryException { + this.hadoopPath = new Path(inputPath); + this.fileSystem = fileSystem; + } + + @Override + public void setup(OutputMutator output) throws ExecutionSetupException { + try{ + stream = fileSystem.open(hadoopPath); + JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream); + this.writer = new VectorContainerWriter(output); + this.mutator = output; + jsonReader = new JsonReader(splitter); + }catch(IOException e){ + throw new ExecutionSetupException("Failure reading JSON file.", e); + } + } + + @Override + public int next() { + writer.allocate(); + writer.reset(); + + int i =0; + + try{ + outside: while(true){ + writer.setPosition(i); + + switch(jsonReader.write(writer)){ + case WRITE_SUCCEED: + i++; + break; + + case NO_MORE: +// System.out.println("no more records - main loop"); + break outside; + + case WRITE_FAILED: +// System.out.println("==== hit bounds at " + i); + break outside; + }; + } + + + writer.setValueCount(i); + mutator.setNewSchema(); + return i; + + }catch(IOException | SchemaChangeException e){ + throw new DrillRuntimeException("Failure while reading JSON file.", e); + } + + } + + @Override + public void cleanup() { + try { + stream.close(); + } catch (IOException e) { + logger.warn("Failure while closing stream.", e); + } + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index 25931db65..9e0126870 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -224,7 +224,7 @@ public class HiveRecordReader implements RecordReader { PrimitiveCategory pCat = primitiveCategories.get(i); MajorType type = getMajorType(pCat); MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), type); - ValueVector vv = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); + ValueVector vv = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); vectors.add(vv); } for (int i = 0; i < selectedPartitionNames.size(); i++) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index eb9e7a693..5c07dc5a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -76,7 +76,7 @@ public class MockRecordReader implements RecordReader { for (int i = 0; i < config.getTypes().length; i++) { MajorType type = config.getTypes()[i].getMajorType(); - valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); + valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); } output.setNewSchema(); } catch (SchemaChangeException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 75cd799fc..5d2845627 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import com.google.common.base.Preconditions; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.ExpressionPosition; @@ -41,10 +42,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.*; -import org.apache.drill.exec.vector.NullableVarBinaryVector; -import org.apache.drill.exec.vector.NullableVarCharVector; -import org.apache.drill.exec.vector.VarBinaryVector; -import org.apache.drill.exec.vector.VarCharVector; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -263,7 +260,7 @@ public class ParquetRecordReader implements RecordReader { //convertedTypes.put() fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY; - v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); + v = output.addField(field, (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v, convertedType); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java index 86aec44c8..f7a74c258 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java @@ -19,11 +19,11 @@ package org.apache.drill.exec.vector; public class AllocationHelper { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AllocationHelper.class); - + public static void allocate(ValueVector v, int valueCount, int bytesPerValue){ allocate(v, valueCount, bytesPerValue, 5); } - + public static void allocate(ValueVector v, int valueCount, int bytesPerValue, int repeatedPerTop){ if(v instanceof FixedWidthVector){ ((FixedWidthVector) v).allocateNew(valueCount); @@ -34,7 +34,7 @@ public class AllocationHelper { }else if(v instanceof RepeatedVariableWidthVector){ ((RepeatedVariableWidthVector) v).allocateNew(valueCount * bytesPerValue * repeatedPerTop, valueCount, valueCount * repeatedPerTop); }else{ - throw new UnsupportedOperationException(); + v.allocateNew(); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java index ddddab1a9..9641e6ae8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java @@ -17,21 +17,25 @@ */ package org.apache.drill.exec.vector; +import java.util.Iterator; + import io.netty.buffer.ByteBuf; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.DeadBuf; import org.apache.drill.exec.record.MaterializedField; +import com.google.hive12.common.collect.Iterators; + public abstract class BaseDataValueVector extends BaseValueVector{ protected ByteBuf data = DeadBuf.DEAD_BUFFER; protected int valueCount; - + public BaseDataValueVector(MaterializedField field, BufferAllocator allocator) { super(field, allocator); - + } /** @@ -46,7 +50,7 @@ public abstract class BaseDataValueVector extends BaseValueVector{ } } - + @Override public ByteBuf[] getBuffers(){ ByteBuf[] out; @@ -60,18 +64,24 @@ public abstract class BaseDataValueVector extends BaseValueVector{ clear(); return out; } - + public int getBufferSize() { if(valueCount == 0) return 0; return data.writerIndex(); } @Override - public abstract FieldMetadata getMetadata(); + public abstract SerializedField getMetadata(); public ByteBuf getData(){ return data; } - - + + @Override + public Iterator<ValueVector> iterator() { + return Iterators.emptyIterator(); + } + + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java index 7cc1adff2..7a61475f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java @@ -17,14 +17,21 @@ */ package org.apache.drill.exec.vector; +import java.util.Iterator; + import org.apache.drill.common.expression.FieldReference; + import io.netty.buffer.ByteBuf; + import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; +import com.google.hive12.common.collect.Iterators; + public abstract class BaseValueVector implements ValueVector{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseValueVector.class); - + protected final BufferAllocator allocator; protected final MaterializedField field; @@ -32,21 +39,24 @@ public abstract class BaseValueVector implements ValueVector{ this.allocator = allocator; this.field = field; } - + @Override public void close() { clear(); } - + @Override public MaterializedField getField() { return field; } - + public MaterializedField getField(FieldReference ref){ return getField().clone(ref); } - + + protected SerializedField.Builder getMetadataBuilder(){ + return getField().getAsBuilder(); + } abstract public ByteBuf getData(); @@ -54,12 +64,15 @@ public abstract class BaseValueVector implements ValueVector{ public abstract int getValueCount(); public void reset(){} } - + abstract class BaseMutator implements Mutator{ public void reset(){} } - - - + + @Override + public Iterator<ValueVector> iterator() { + return Iterators.emptyIterator(); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java index 63384a328..597b0f1e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -24,9 +24,12 @@ import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.NullableBitHolder; import org.apache.drill.exec.memory.AccountingByteBuf; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.complex.impl.BitReaderImpl; +import org.apache.drill.exec.vector.complex.reader.FieldReader; /** * Bit implements a vector of bit-width values. Elements in the vector are accessed by position from the logical start @@ -49,11 +52,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } @Override - public FieldMetadata getMetadata() { - return FieldMetadata.newBuilder() - .setDef(getField().getDef()) - .setValueCount(valueCount) - .setBufferLength( (int) Math.ceil(valueCount / 8.0)) + public SerializedField getMetadata() { + return field.getAsBuilder() // + .setValueCount(valueCount) // + .setBufferLength( (int) Math.ceil(valueCount / 8.0)) // .build(); } @@ -66,13 +68,26 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } public void allocateNew() { + if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException(); + } + + public boolean allocateNewSafe() { clear(); if (allocationMonitor > 5) { allocationValueCount = Math.min(1, (int)(allocationValueCount * 0.9)); } else if (allocationMonitor < -5) { allocationValueCount = (int) (allocationValueCount * 1.1); } - allocateNew(allocationValueCount); + + clear(); + valueCapacity = allocationValueCount; + int valueSize = getSizeFromCount(allocationValueCount); + data = allocator.buffer(valueSize); + if(data == null) return false; + for (int i = 0; i < valueSize; i++) { + data.setByte(i, 0); + } + return true; } /** @@ -112,8 +127,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } @Override - public void load(FieldMetadata metadata, ByteBuf buffer) { - assert this.field.getDef().equals(metadata.getDef()); + public void load(SerializedField metadata, ByteBuf buffer) { + assert this.field.matches(metadata); int loaded = load(metadata.getValueCount(), buffer); assert metadata.getBufferLength() == loaded; } @@ -177,9 +192,6 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } } - private void copyTo(int startIndex, int length, BitVector target) { - - } private class TransferImpl implements TransferPair { BitVector to; @@ -205,8 +217,8 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } @Override - public void copyValue(int fromIndex, int toIndex) { - to.copyFrom(fromIndex, toIndex, BitVector.this); + public boolean copyValueSafe(int fromIndex, int toIndex) { + return to.copyFromSafe(fromIndex, toIndex, BitVector.this); } } @@ -233,7 +245,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } @Override - public final Object getObject(int index) { + public final Boolean getObject(int index) { return new Boolean(get(index) != 0); } @@ -245,9 +257,15 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe holder.value = get(index); } - final void get(int index, NullableBitHolder holder) { + public final void get(int index, NullableBitHolder holder) { + holder.isSet = 1; holder.value = get(index); } + + @Override + public FieldReader getReader() { + return new BitReaderImpl(BitVector.this); + } } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java index 8e097e439..ad2ba1b04 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/RepeatedMutator.java @@ -18,5 +18,5 @@ package org.apache.drill.exec.vector; public interface RepeatedMutator extends ValueVector.Mutator { - public void startNewGroup(int index); + public boolean startNewGroup(int index); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java index 258b354b3..8b871fc5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -22,24 +22,34 @@ import io.netty.buffer.ByteBuf; import java.io.Closeable; import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.complex.reader.FieldReader; /** * ValueVectorTypes defines a set of template-generated classes which implement type-specific value vectors. The * template approach was chosen due to the lack of multiple inheritence. It is also important that all related logic be * as efficient as possible. */ -public interface ValueVector extends Closeable { +public interface ValueVector extends Closeable, Iterable<ValueVector> { + /** * Allocate new buffers. ValueVector implements logic to determine how much to allocate. + * @throws OutOfMemoryRuntimeException Thrown if no memory can be allocated. */ - public void allocateNew(); + public void allocateNew() throws OutOfMemoryRuntimeException; + + /** + * Allocates new buffers. ValueVector implements logic to determine how much to allocate. + * @return Returns true if allocation was succesful. + */ + public boolean allocateNewSafe(); public int getBufferSize(); - + /** * Alternative to clear(). Allows use as closeable in try-with-resources. */ @@ -52,27 +62,26 @@ public interface ValueVector extends Closeable { /** * Get information about how this field is materialized. - * + * * @return */ public MaterializedField getField(); /** - * Get a transfer pair to allow transferring this vectors data between this vector and a destination vector of the same - * type. Will also generate a second instance of this vector class that is connected through the TransferPair. - * - * @return + * Get a transfer pair to allow transferring this vectors data between this vector and a destination vector of the + * same type. Will also generate a second instance of this vector class that is connected through the TransferPair. + * + * @return */ public TransferPair getTransferPair(); public TransferPair makeTransferPair(ValueVector to); - - + public TransferPair getTransferPair(FieldReference ref); /** * Given the current buffer allocation, return the maximum number of values that this buffer can contain. - * + * * @return Maximum values buffer can contain. In the case of a Repeated field, this is the number of atoms, not * repeated groups. */ @@ -80,37 +89,40 @@ public interface ValueVector extends Closeable { /** * Get Accessor to read value vector data. - * + * * @return */ public abstract Accessor getAccessor(); /** - * Return the underlying buffers associated with this vector. Note that this doesn't impact the - * reference counts for this buffer so it only should be used for in-context access. Also note - * that this buffer changes regularly thus external classes shouldn't hold a reference to - * it (unless they change it). + * Return the underlying buffers associated with this vector. Note that this doesn't impact the reference counts for + * this buffer so it only should be used for in-context access. Also note that this buffer changes regularly thus + * external classes shouldn't hold a reference to it (unless they change it). * * @return The underlying ByteBuf. */ public abstract ByteBuf[] getBuffers(); - + /** - * Load the data provided in the buffer. Typically used when deserializing from the wire. - * @param metadata Metadata used to decode the incoming buffer. - * @param buffer The buffer that contains the ValueVector. + * Load the data provided in the buffer. Typically used when deserializing from the wire. + * + * @param metadata + * Metadata used to decode the incoming buffer. + * @param buffer + * The buffer that contains the ValueVector. */ - public void load(FieldMetadata metadata, ByteBuf buffer); - + public void load(SerializedField metadata, ByteBuf buffer); + /** - * Get the metadata for this field. Used in serialization + * Get the metadata for this field. Used in serialization + * * @return FieldMetadata for this field. */ - public FieldMetadata getMetadata(); - + public SerializedField getMetadata(); + /** * Get a Mutator to update this vectors data. - * + * * @return */ public abstract Mutator getMutator(); @@ -125,23 +137,25 @@ public interface ValueVector extends Closeable { /** * Get the Java Object representation of the element at the specified position. Useful for testing. - * + * * @param index * Index of the value to get */ public abstract Object getObject(int index); public int getValueCount(); - + public boolean isNull(int index); public void reset(); + + public FieldReader getReader(); } public interface Mutator { /** * Set the top number values (optional/required) or number of value groupings (repeated) in this vector. - * + * * @param valueCount */ public void setValueCount(int valueCount); @@ -150,4 +164,5 @@ public interface ValueVector extends Closeable { public void generateTestData(int values); } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java index 32f08b0c8..adee17199 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/GenericAccessor.java @@ -40,6 +40,6 @@ public class GenericAccessor extends AbstractSqlAccessor { @Override TypeProtos.MajorType getType() { - return v.getMetadata().getDef().getMajorType(); + return v.getMetadata().getMajorType(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java new file mode 100644 index 000000000..ab1d2707d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex; + +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.vector.ValueVector; + +public abstract class AbstractContainerVector implements ValueVector{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class); + + public abstract <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz); + public abstract <T extends ValueVector> T get(String name, Class<T> clazz); + public abstract int size(); + + protected <T extends ValueVector> T typeify(ValueVector v, Class<T> clazz){ + if(clazz.isAssignableFrom(v.getClass())){ + return (T) v; + }else{ + throw new IllegalStateException(String.format("Vector requested [%s] was different than type stored [%s]. Drill doesn't yet support hetergenous types.", clazz.getSimpleName(), v.getClass().getSimpleName())); + } + } + + public abstract VectorWithOrdinal getVectorWithOrdinal(String name); + + + public TypedFieldId getFieldIdIfMatches(TypedFieldId.Builder builder, boolean addToBreadCrumb, PathSegment seg){ + if(seg == null){ + if(addToBreadCrumb) builder.intermediateType(this.getField().getType()); + return builder.finalType(this.getField().getType()).build(); + } + + if(seg.isArray()){ + + if(seg.isLastPath()){ + if(addToBreadCrumb) builder.intermediateType(this.getField().getType()); + return builder // + .remainder(seg) // + .finalType(this.getField().getType()) // + .withIndex() // + .build(); + }else{ + if(addToBreadCrumb){ + addToBreadCrumb = false; + builder.remainder(seg); + } + // this is a complex array reference, which means it doesn't correspond directly to a vector by itself. + seg = seg.getChild(); + + } + + }else{ + // name segment. + } + + VectorWithOrdinal vord = getVectorWithOrdinal(seg.isArray() ? null : seg.getNameSegment().getPath()); + if(vord == null) return null; + + + if(addToBreadCrumb){ + builder.intermediateType(this.getField().getType()); + builder.addId(vord.ordinal); + } + + ValueVector v = vord.vector; + + if(v instanceof AbstractContainerVector){ + // we're looking for a multi path. + AbstractContainerVector c = (AbstractContainerVector) v; + return c.getFieldIdIfMatches(builder, addToBreadCrumb, seg.getChild()); + }else{ + if(seg.isLastPath()){ + if(addToBreadCrumb) builder.intermediateType(v.getField().getType()); + return builder.finalType(v.getField().getType()).build(); + }else{ + logger.warn("You tried to request a complex type inside a scalar object."); + return null; + } + } + + } + + protected boolean supportsDirectRead(){ + return false; + } + + protected class VectorWithOrdinal{ + final ValueVector vector; + final int ordinal; + + public VectorWithOrdinal(ValueVector v, int ordinal){ + this.vector = v; + this.ordinal = ordinal; + } + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java new file mode 100644 index 000000000..f126e5ce7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex; + +public class AbstractMapVector { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractMapVector.class); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java new file mode 100644 index 000000000..91c0be574 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex; + +import io.netty.buffer.ByteBuf; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.ComplexHolder; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.RepeatedMapVector.MapSingleCopier; +import org.apache.drill.exec.vector.complex.impl.SingleMapReaderImpl; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; + +import com.carrotsearch.hppc.IntObjectOpenHashMap; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.hive12.common.collect.Lists; + +public class MapVector extends AbstractContainerVector { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class); + + public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REQUIRED).build(); + + final HashMap<String, ValueVector> vectors = Maps.newHashMap(); + private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap(); + private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>(); + private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this); + private final Accessor accessor = new Accessor(); + private final Mutator mutator = new Mutator(); + private final BufferAllocator allocator; + private MaterializedField field; + private int valueCount; + + public MapVector(String path, BufferAllocator allocator){ + this.field = MaterializedField.create(SchemaPath.getSimplePath(path), TYPE); + this.allocator = allocator; + } + public MapVector(MaterializedField field, BufferAllocator allocator){ + this.field = field; + this.allocator = allocator; + } + + public int size(){ + return vectors.size(); + } + + transient private MapTransferPair ephPair; + transient private MapSingleCopier ephPair2; + + public boolean copyFromSafe(int fromIndex, int thisIndex, MapVector from){ + if(ephPair == null || ephPair.from != from){ + ephPair = (MapTransferPair) from.makeTransferPair(this); + } + return ephPair.copyValueSafe(fromIndex, thisIndex); + } + + public boolean copyFromSafe(int fromSubIndex, int thisIndex, RepeatedMapVector from){ + if(ephPair2 == null || ephPair2.from != from){ + ephPair2 = from.makeSingularCopier(this); + } + return ephPair2.copySafe(fromSubIndex, thisIndex); + } + + @Override + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { + ValueVector v = vectors.get(name); + + if(v == null){ + v = TypeHelper.getNewVector(field.getPath(), name, allocator, type); + Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.", type)); + put(name, v); + } + return typeify(v, clazz); + + } + + protected void put(String name, ValueVector vv){ + int ordinal = vectors.size(); + if(vectors.put(name, vv) != null){ + throw new IllegalStateException(); + } + vectorIds.put(name, new VectorWithOrdinal(vv, ordinal)); + vectorsById.put(ordinal, vv); + field.addChild(vv.getField()); + } + + + @Override + protected boolean supportsDirectRead() { + return true; + } + + public Iterator<String> fieldNameIterator(){ + return vectors.keySet().iterator(); + } + + @Override + public void allocateNew() throws OutOfMemoryRuntimeException { + if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException(); + } + + @Override + public boolean allocateNewSafe() { + for(ValueVector v : vectors.values()){ + if(!v.allocateNewSafe()) return false; + } + return true; + } + + @Override + public <T extends ValueVector> T get(String name, Class<T> clazz) { + ValueVector v = vectors.get(name); + if(v == null) throw new IllegalStateException(String.format("Attempting to access invalid map field of name %s.", name)); + return typeify(v, clazz); + } + + @Override + public int getBufferSize() { + if(valueCount == 0 || vectors.isEmpty()) return 0; + long buffer = 0; + for(ValueVector v : this){ + buffer += v.getBufferSize(); + } + + return (int) buffer; + } + + @Override + public void close() { + for(ValueVector v : this){ + v.close(); + } + } + + @Override + public Iterator<ValueVector> iterator() { + return vectors.values().iterator(); + } + + @Override + public MaterializedField getField() { + return field; + } + + @Override + public TransferPair getTransferPair() { + return new MapTransferPair(field.getPath()); + } + + @Override + public TransferPair makeTransferPair(ValueVector to) { + return new MapTransferPair( (MapVector) to); + } + + @Override + public TransferPair getTransferPair(FieldReference ref) { + return new MapTransferPair(ref); + } + + private class MapTransferPair implements TransferPair{ + private MapVector from = MapVector.this; + private TransferPair[] pairs; + private MapVector to; + + public MapTransferPair(SchemaPath path){ + MapVector v = new MapVector(MaterializedField.create(path, TYPE), allocator); + pairs = new TransferPair[vectors.size()]; + int i =0; + for(Map.Entry<String, ValueVector> e : vectors.entrySet()){ + TransferPair otherSide = e.getValue().getTransferPair(); + v.put(e.getKey(), otherSide.getTo()); + pairs[i++] = otherSide; + } + this.to = v; + } + + public MapTransferPair(MapVector to){ + this.to = to; + pairs = new TransferPair[vectors.size()]; + int i =0; + for(Map.Entry<String, ValueVector> e : vectors.entrySet()){ + int preSize = to.vectors.size(); + ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass()); + if(to.vectors.size() != preSize) v.allocateNew(); + pairs[i++] = e.getValue().makeTransferPair(v); + } + } + + + @Override + public void transfer() { + for(TransferPair p : pairs){ + p.transfer(); + } + to.valueCount = valueCount; + clear(); + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public boolean copyValueSafe(int from, int to) { + for(TransferPair p : pairs){ + if(!p.copyValueSafe(from, to)) return false; + } + return true; + } + + @Override + public void splitAndTransfer(int startIndex, int length) { + throw new UnsupportedOperationException(); + } + + } + + @Override + public int getValueCapacity() { + if(this.vectors.isEmpty()) return 0; + return vectors.values().iterator().next().getValueCapacity(); + } + + @Override + public Accessor getAccessor() { + return accessor; + } + + @Override + public ByteBuf[] getBuffers() { + List<ByteBuf> bufs = Lists.newArrayList(); + for(ValueVector v : vectors.values()){ + for(ByteBuf b : v.getBuffers()){ + bufs.add(b); + } + } + return bufs.toArray(new ByteBuf[bufs.size()]); + } + + @Override + public void load(SerializedField metadata, ByteBuf buf) { + List<SerializedField> fields = metadata.getChildList(); + + int bufOffset = 0; + for (SerializedField fmd : fields) { + MaterializedField fieldDef = MaterializedField.create(fmd); + + ValueVector v = vectors.get(fieldDef.getLastName()); + if(v == null) { + // if we arrive here, we didn't have a matching vector. + + v = TypeHelper.getNewVector(fieldDef, allocator); + } + if (fmd.getValueCount() == 0){ + v.clear(); + } else { + v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength())); + } + bufOffset += fmd.getBufferLength(); + put(fieldDef.getLastName(), v); + } + } + + @Override + public SerializedField getMetadata() { + SerializedField.Builder b = getField() // + .getAsBuilder() // + .setBufferLength(getBufferSize()) // + .setValueCount(valueCount); + + + for(ValueVector v : vectors.values()){ + b.addChild(v.getMetadata()); + } + return b.build(); + } + + @Override + public Mutator getMutator() { + return mutator; + } + + public class Accessor implements ValueVector.Accessor{ + + @Override + public Object getObject(int index) { + Map<String, Object> vv = Maps.newHashMap(); + for(Map.Entry<String, ValueVector> e : vectors.entrySet()){ + ValueVector v = e.getValue(); + String k = e.getKey(); + Object value = v.getAccessor().getObject(index); + if(value != null){ + vv.put(k, value); + } + } + return vv; + } + + public void get(int index, ComplexHolder holder){ + reader.setPosition(index); + holder.reader = reader; + } + + @Override + public int getValueCount() { + return valueCount; + } + + @Override + public boolean isNull(int index) { + return false; + } + + @Override + public void reset() { + } + + @Override + public FieldReader getReader() { + return new SingleMapReaderImpl(MapVector.this); + } + + } + + public ValueVector getVectorById(int id){ + return vectorsById.get(id); + } + + public class Mutator implements ValueVector.Mutator{ + + @Override + public void setValueCount(int valueCount) { + for(ValueVector v : vectors.values()){ + v.getMutator().setValueCount(valueCount); + } + MapVector.this.valueCount = valueCount; + } + + @Override + public void reset() { + } + + @Override + public void generateTestData(int values) { + } + + } + + @Override + public void clear() { + for(ValueVector v : vectors.values()){ + v.clear();; + } + } + + public VectorWithOrdinal getVectorWithOrdinal(String name){ + return vectorIds.get(name); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java new file mode 100644 index 000000000..6d86a6438 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/Positionable.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex; + +public interface Positionable { + public void setPosition(int index); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java new file mode 100644 index 000000000..93930b50c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex; + +import io.netty.buffer.ByteBuf; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.ComplexHolder; +import org.apache.drill.exec.expr.holders.RepeatedListHolder; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.impl.NullReader; +import org.apache.drill.exec.vector.complex.impl.RepeatedListReaderImpl; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +import com.google.common.collect.Lists; +import com.google.hive12.common.base.Preconditions; + + +public class RepeatedListVector extends AbstractContainerVector implements RepeatedFixedWidthVector{ + + private final UInt4Vector offsets; // offsets to start of each record + private final BufferAllocator allocator; + private final Mutator mutator = new Mutator(); + private final Accessor accessor = new Accessor(); + private ValueVector vector; + private final MaterializedField field; + private final RepeatedListReaderImpl reader = new RepeatedListReaderImpl(null, this); + private int allocationValueCount = 4000; + private int allocationMonitor = 0; + + private int lastSet = 0; + + private int valueCount; + + public static MajorType TYPE = Types.repeated(MinorType.LIST); + + public RepeatedListVector(MaterializedField field, BufferAllocator allocator){ + this.allocator = allocator; + this.offsets = new UInt4Vector(null, allocator); + this.field = field; + } + + public int size(){ + return vector != null ? 1 : 0; + } + + public RepeatedListVector(SchemaPath path, BufferAllocator allocator){ + this(MaterializedField.create(path, TYPE), allocator); + } + + transient private RepeatedListTransferPair ephPair; + + public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedListVector from){ + if(ephPair == null || ephPair.from != from){ + ephPair = (RepeatedListTransferPair) from.makeTransferPair(this); + } + return ephPair.copyValueSafe(fromIndex, thisIndex); + } + + public Mutator getMutator(){ + return mutator; + } + + @Override + public void allocateNew() throws OutOfMemoryRuntimeException { + if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException(); + } + + @Override + public boolean allocateNewSafe() { + if(!offsets.allocateNewSafe()) return false; + + if(vector != null){ + return vector.allocateNewSafe(); + }else{ + return true; + } + + } + + public class Mutator implements ValueVector.Mutator{ + + public void startNewGroup(int index) { + offsets.getMutator().set(index+1, offsets.getAccessor().get(index)); + } + + public int add(int index){ + int endOffset = index+1; + int currentChildOffset = offsets.getAccessor().get(endOffset); + int newChildOffset = currentChildOffset + 1; + boolean success = offsets.getMutator().setSafe(endOffset, newChildOffset); + lastSet = index; + if(!success) return -1; + + // this is done at beginning so return the currentChildOffset, not the new offset. + return currentChildOffset; + + } + + @Override + public void setValueCount(int groupCount) { + populateEmpties(groupCount); + offsets.getMutator().setValueCount(groupCount+1); + + if(vector != null){ + int valueCount = offsets.getAccessor().get(groupCount); + vector.getMutator().setValueCount(valueCount); + } + } + + @Override + public void reset() { + lastSet = 0; + } + + @Override + public void generateTestData(int values) { + } + + } + + public class Accessor implements ValueVector.Accessor { + + @Override + public Object getObject(int index) { + List<Object> l = Lists.newArrayList(); + int end = offsets.getAccessor().get(index+1); + for(int i = offsets.getAccessor().get(index); i < end; i++){ + l.add(vector.getAccessor().getObject(i)); + } + return l; + } + + @Override + public int getValueCount() { + return offsets.getAccessor().getValueCount() - 1; + } + + public void get(int index, RepeatedListHolder holder){ + assert index <= getValueCapacity(); + holder.start = offsets.getAccessor().get(index); + holder.end = offsets.getAccessor().get(index+1); + } + + public void get(int index, ComplexHolder holder){ + FieldReader reader = getReader(); + reader.setPosition(index); + holder.reader = reader; + } + + public void get(int index, int arrayIndex, ComplexHolder holder){ + RepeatedListHolder h = new RepeatedListHolder(); + get(index, h); + int offset = h.start + arrayIndex; + + if(offset >= h.end){ + holder.reader = NullReader.INSTANCE; + }else{ + FieldReader r = vector.getAccessor().getReader(); + r.setPosition(offset); + holder.reader = r; + } + + } + + @Override + public boolean isNull(int index) { + return false; + } + + @Override + public void reset() { + } + + @Override + public FieldReader getReader() { + return reader; + } + + } + + @Override + public int getBufferSize() { + return offsets.getBufferSize() + vector.getBufferSize(); + } + + @Override + public void close() { + offsets.close(); + if(vector != null) vector.close(); + } + + @Override + public void clear() { + lastSet = 0; + offsets.clear(); + if(vector != null) vector.clear(); + } + + @Override + public MaterializedField getField() { + return field; + } + + @Override + public TransferPair getTransferPair() { + return new RepeatedListTransferPair(field.getPath()); + } + + + public class RepeatedListTransferPair implements TransferPair{ + private final RepeatedListVector from = RepeatedListVector.this; + private final RepeatedListVector to; + private final TransferPair vectorTransfer; + + private RepeatedListTransferPair(RepeatedListVector to){ + this.to = to; + if(to.vector == null){ + to.vector = to.addOrGet(null, vector.getField().getType(), vector.getClass()); + to.vector.allocateNew(); + } + this.vectorTransfer = vector.makeTransferPair(to.vector); + } + + private RepeatedListTransferPair(SchemaPath path){ + this.to = new RepeatedListVector(path, allocator); + vectorTransfer = vector.getTransferPair(); + this.to.vector = vectorTransfer.getTo(); + } + + @Override + public void transfer() { + offsets.transferTo(to.offsets); + vectorTransfer.transfer(); + to.valueCount = valueCount; + clear(); + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public void splitAndTransfer(int startIndex, int length) { + throw new UnsupportedOperationException(); + } + + + @Override + public boolean copyValueSafe(int from, int to) { + RepeatedListHolder holder = new RepeatedListHolder(); + accessor.get(from, holder); + int newIndex = this.to.offsets.getAccessor().get(to); + //todo: make this a bulk copy. + for(int i = holder.start; i < holder.end; i++, newIndex++){ + if(!vectorTransfer.copyValueSafe(i, newIndex)) return false; + } + if(!this.to.offsets.getMutator().setSafe(to, newIndex)) return false; + + return true; + } + + } + + @Override + public TransferPair makeTransferPair(ValueVector to) { + if(!(to instanceof RepeatedListVector ) ) throw new IllegalArgumentException("You can't make a transfer pair from an incompatible ."); + return new RepeatedListTransferPair( (RepeatedListVector) to); + } + + @Override + public TransferPair getTransferPair(FieldReference ref) { + return new RepeatedListTransferPair(ref); + } + + @Override + public int getValueCapacity() { + if(vector == null) return offsets.getValueCapacity() - 1; + return Math.min(offsets.getValueCapacity() - 1, vector.getValueCapacity()); + } + + @Override + public Accessor getAccessor() { + return accessor; + } + + @Override + public ByteBuf[] getBuffers() { + return ArrayUtils.addAll(offsets.getBuffers(), vector.getBuffers()); + } + + private void setVector(ValueVector v){ + field.addChild(v.getField()); + this.vector = v; + } + + @Override + public void load(SerializedField metadata, ByteBuf buf) { + SerializedField childField = metadata.getChildList().get(0); + + int bufOffset = offsets.load(metadata.getValueCount()+1, buf); + + MaterializedField fieldDef = MaterializedField.create(childField); + if(vector == null) { + setVector(TypeHelper.getNewVector(fieldDef, allocator)); + } + + if (childField.getValueCount() == 0){ + vector.clear(); + } else { + vector.load(childField, buf.slice(bufOffset, childField.getBufferLength())); + } + } + + @Override + public SerializedField getMetadata() { + return getField() // + .getAsBuilder() // + .setBufferLength(getBufferSize()) // + .setValueCount(accessor.getValueCount()) // + .addChild(vector.getMetadata()) // + .build(); + } + + private void populateEmpties(int groupCount){ + int previousEnd = offsets.getAccessor().get(lastSet + 1); + for(int i = lastSet + 2; i <= groupCount; i++){ + offsets.getMutator().setSafe(i, previousEnd); + } + lastSet = groupCount - 1; + } + + @Override + public Iterator<ValueVector> iterator() { + return Collections.singleton(vector).iterator(); + } + + @Override + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { + Preconditions.checkArgument(name == null); + + if(vector == null){ + vector = TypeHelper.getNewVector(MaterializedField.create(field.getPath().getUnindexedArrayChild(), type), allocator); + } + return typeify(vector, clazz); + } + + @Override + public <T extends ValueVector> T get(String name, Class<T> clazz) { + if(name != null) return null; + return typeify(vector, clazz); + } + + @Override + public void allocateNew(int parentValueCount, int childValueCount) { + clear(); + offsets.allocateNew(parentValueCount+1); + mutator.reset(); + accessor.reset(); + } + + @Override + public int load(int parentValueCount, int childValueCount, ByteBuf buf) { + throw new UnsupportedOperationException(); + } + + @Override + public VectorWithOrdinal getVectorWithOrdinal(String name) { + if(name != null) return null; + return new VectorWithOrdinal(vector, 0); + } + + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java new file mode 100644 index 000000000..2492cc89f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex; + +import io.netty.buffer.ByteBuf; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.ComplexHolder; +import org.apache.drill.exec.expr.holders.RepeatedMapHolder; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.vector.RepeatedFixedWidthVector; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.impl.NullReader; +import org.apache.drill.exec.vector.complex.impl.RepeatedMapReaderImpl; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +import com.carrotsearch.hppc.IntObjectOpenHashMap; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class RepeatedMapVector extends AbstractContainerVector implements RepeatedFixedWidthVector { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedMapVector.class); + + public final static MajorType TYPE = MajorType.newBuilder().setMinorType(MinorType.MAP).setMode(DataMode.REPEATED).build(); + + private final UInt4Vector offsets; // offsets to start of each record + private final Map<String, ValueVector> vectors = Maps.newHashMap(); + private final Map<String, VectorWithOrdinal> vectorIds = Maps.newHashMap(); + private final RepeatedMapReaderImpl reader = new RepeatedMapReaderImpl(RepeatedMapVector.this); + private final IntObjectOpenHashMap<ValueVector> vectorsById = new IntObjectOpenHashMap<>(); + private final Accessor accessor = new Accessor(); + private final Mutator mutator = new Mutator(); + private final BufferAllocator allocator; + private final MaterializedField field; + private int lastSet = 0; + + public RepeatedMapVector(MaterializedField field, BufferAllocator allocator){ + this.field = field; + this.allocator = allocator; + this.offsets = new UInt4Vector(null, allocator); + + } + + @Override + public void allocateNew(int parentValueCount, int childValueCount) { + clear(); + offsets.allocateNew(parentValueCount+1); + mutator.reset(); + accessor.reset(); + } + + public Iterator<String> fieldNameIterator(){ + return vectors.keySet().iterator(); + } + + public int size(){ + return vectors.size(); + } + + @Override + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { + ValueVector v = vectors.get(name); + + if(v == null){ + v = TypeHelper.getNewVector(field.getPath(), name, allocator, type); + Preconditions.checkNotNull(v, String.format("Failure to create vector of type %s.", type)); + put(name, v); + } + return typeify(v, clazz); + + } + + @Override + public <T extends ValueVector> T get(String name, Class<T> clazz) { + ValueVector v = vectors.get(name); + if(v == null) throw new IllegalStateException(String.format("Attempting to access invalid map field of name %s.", name)); + return typeify(v, clazz); + } + + @Override + public int getBufferSize() { + if(accessor.getValueCount() == 0 || vectors.isEmpty()) return 0; + long buffer = offsets.getBufferSize(); + for(ValueVector v : this){ + buffer += v.getBufferSize(); + } + + return (int) buffer; + } + + @Override + public void close() { + for(ValueVector v : this){ + v.close(); + } + } + + @Override + public Iterator<ValueVector> iterator() { + return vectors.values().iterator(); + } + + @Override + public MaterializedField getField() { + return field; + } + + @Override + public TransferPair getTransferPair() { + return new MapTransferPair(field.getPath()); + } + + @Override + public TransferPair makeTransferPair(ValueVector to) { + return new MapTransferPair( (RepeatedMapVector) to); + } + + MapSingleCopier makeSingularCopier(MapVector to){ + return new MapSingleCopier(to); + } + + + class MapSingleCopier{ + private final TransferPair[] pairs; + final RepeatedMapVector from = RepeatedMapVector.this; + + public MapSingleCopier(MapVector to){ + pairs = new TransferPair[vectors.size()]; + int i =0; + for(Map.Entry<String, ValueVector> e : vectors.entrySet()){ + int preSize = to.vectors.size(); + ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass()); + if(to.vectors.size() != preSize) v.allocateNew(); + pairs[i++] = e.getValue().makeTransferPair(v); + } + } + + public boolean copySafe(int fromSubIndex, int toIndex){ + for(TransferPair p : pairs){ + if(!p.copyValueSafe(fromSubIndex, toIndex)) return false; + } + return true; + } + } + + @Override + public TransferPair getTransferPair(FieldReference ref) { + return new MapTransferPair(ref); + } + + @Override + public void allocateNew() throws OutOfMemoryRuntimeException { + if(!allocateNewSafe()) throw new OutOfMemoryRuntimeException(); + } + + @Override + public boolean allocateNewSafe() { + if(!offsets.allocateNewSafe()) return false; + for(ValueVector v : vectors.values()){ + if(!v.allocateNewSafe()) return false; + } + return true; + } + + private class MapTransferPair implements TransferPair{ + + private final TransferPair[] pairs; + private final RepeatedMapVector to; + private final RepeatedMapVector from = RepeatedMapVector.this; + + public MapTransferPair(SchemaPath path){ + RepeatedMapVector v = new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator); + pairs = new TransferPair[vectors.size()]; + int i =0; + for(Map.Entry<String, ValueVector> e : vectors.entrySet()){ + TransferPair otherSide = e.getValue().getTransferPair(); + v.put(e.getKey(), otherSide.getTo()); + pairs[i++] = otherSide; + } + this.to = v; + } + + public MapTransferPair(RepeatedMapVector to){ + this.to = to; + pairs = new TransferPair[vectors.size()]; + int i =0; + for(Map.Entry<String, ValueVector> e : vectors.entrySet()){ + ValueVector v = to.addOrGet(e.getKey(), e.getValue().getField().getType(), e.getValue().getClass()); + pairs[i++] = e.getValue().makeTransferPair(v); + } + } + + + @Override + public void transfer() { + offsets.transferTo(to.offsets); + for(TransferPair p : pairs){ + p.transfer(); + } + clear(); + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public boolean copyValueSafe(int from, int to) { + RepeatedMapHolder holder = new RepeatedMapHolder(); + accessor.get(from, holder); + int newIndex = this.to.offsets.getAccessor().get(to); + //todo: make these bulk copies + for(int i = holder.start; i < holder.end; i++, newIndex++){ + for(TransferPair p : pairs){ + if(!p.copyValueSafe(from, to)) return false; + } + } + if(!this.to.offsets.getMutator().setSafe(to, newIndex)) return false; + return true; + } + + @Override + public void splitAndTransfer(int startIndex, int length) { + throw new UnsupportedOperationException(); + } + + } + + + transient private MapTransferPair ephPair; + + public boolean copyFromSafe(int fromIndex, int thisIndex, RepeatedMapVector from){ + if(ephPair == null || ephPair.from != from){ + ephPair = (MapTransferPair) from.makeTransferPair(this); + } + return ephPair.copyValueSafe(fromIndex, thisIndex); + } + + @Override + public int getValueCapacity() { + return offsets.getValueCapacity(); + } + + @Override + public Accessor getAccessor() { + return accessor; + } + + @Override + public ByteBuf[] getBuffers() { + List<ByteBuf> bufs = Lists.newArrayList(offsets.getBuffers()); + + for(ValueVector v : vectors.values()){ + for(ByteBuf b : v.getBuffers()){ + bufs.add(b); + } + } + return bufs.toArray(new ByteBuf[bufs.size()]); + } + + + @Override + public void load(SerializedField metadata, ByteBuf buf) { + List<SerializedField> fields = metadata.getChildList(); + + int bufOffset = offsets.load(metadata.getValueCount()+1, buf); + + for (SerializedField fmd : fields) { + MaterializedField fieldDef = MaterializedField.create(fmd); + + ValueVector v = vectors.get(fieldDef.getLastName()); + if(v == null) { + // if we arrive here, we didn't have a matching vector. + + v = TypeHelper.getNewVector(fieldDef, allocator); + } + if (fmd.getValueCount() == 0){ + v.clear(); + } else { + v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength())); + } + bufOffset += fmd.getBufferLength(); + put(fieldDef.getLastName(), v); + } + } + + @Override + public SerializedField getMetadata() { + SerializedField.Builder b = getField() // + .getAsBuilder() // + .setBufferLength(getBufferSize()) // + .setValueCount(accessor.getValueCount()); + + for(ValueVector v : vectors.values()){ + b.addChild(v.getMetadata()); + } + return b.build(); + } + + protected void put(String name, ValueVector vv){ + int ordinal = vectors.size(); + if(vectors.put(name, vv) != null){ + throw new IllegalStateException(); + } + vectorIds.put(name, new VectorWithOrdinal(vv, ordinal)); + vectorsById.put(ordinal, vv); + field.addChild(vv.getField()); + } + + + @Override + public Mutator getMutator() { + return mutator; + } + + public class Accessor implements ValueVector.Accessor{ + + @Override + public Object getObject(int index) { + List<Object> l = Lists.newArrayList(); + int end = offsets.getAccessor().get(index+1); + for(int i = offsets.getAccessor().get(index); i < end; i++){ + Map<String, Object> vv = Maps.newHashMap(); + for(Map.Entry<String, ValueVector> e : vectors.entrySet()){ + ValueVector v = e.getValue(); + String k = e.getKey(); + Object value = v.getAccessor().getObject(i); + if(value != null){ + vv.put(k,value); + } + } + l.add(vv); + } + return l; + } + + @Override + public int getValueCount() { + return offsets.getAccessor().getValueCount() - 1; + } + + public void get(int index, RepeatedMapHolder holder){ + assert index <= getValueCapacity(); + holder.start = offsets.getAccessor().get(index); + holder.end = offsets.getAccessor().get(index+1); + } + + public void get(int index, ComplexHolder holder){ + FieldReader reader = getReader(); + reader.setPosition(index); + holder.reader = reader; + } + + public void get(int index, int arrayIndex, ComplexHolder holder){ + RepeatedMapHolder h = new RepeatedMapHolder(); + get(index, h); + int offset = h.start + arrayIndex; + + if(offset >= h.end){ + holder.reader = NullReader.INSTANCE; + }else{ + reader.setSinglePosition(index, arrayIndex); + holder.reader = reader; + } + } + + @Override + public boolean isNull(int index) { + return false; + } + + @Override + public void reset() { + } + + @Override + public FieldReader getReader() { + return reader; + } + + } + + private void populateEmpties(int groupCount){ + int previousEnd = offsets.getAccessor().get(lastSet + 1); + for(int i = lastSet + 2; i <= groupCount; i++){ + offsets.getMutator().setSafe(i, previousEnd); + } + lastSet = groupCount - 1; + } + + public class Mutator implements ValueVector.Mutator{ + + public void startNewGroup(int index) { + populateEmpties(index); + lastSet = index; + offsets.getMutator().set(index+1, offsets.getAccessor().get(index)); + } + + public int add(int index){ + int nextOffset = offsets.getAccessor().get(index+1); + boolean success = offsets.getMutator().setSafe(index+1, nextOffset+1); + if(!success) return -1; + return nextOffset; + } + + @Override + public void setValueCount(int groupCount) { + populateEmpties(groupCount); + offsets.getMutator().setValueCount(groupCount+1); + int valueCount = offsets.getAccessor().get(groupCount); + for(ValueVector v : vectors.values()){ + v.getMutator().setValueCount(valueCount); + } + } + + @Override + public void reset() { + lastSet = 0; + } + + @Override + public void generateTestData(int values) { + } + + } + + @Override + public void clear() { + lastSet = 0; + offsets.clear(); + for(ValueVector v : vectors.values()){ + v.clear();; + } + } + + @Override + public int load(int parentValueCount, int childValueCount, ByteBuf buf) { + throw new UnsupportedOperationException(); + } + + + @Override + public VectorWithOrdinal getVectorWithOrdinal(String name) { + return vectorIds.get(name); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/StateTool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/StateTool.java new file mode 100644 index 000000000..99f601056 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/StateTool.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex; + +import java.util.Arrays; + +public class StateTool { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StateTool.class); + + public static <T extends Enum<?>> void check(T currentState, T... expectedStates){ + for(T s : expectedStates){ + if(s == currentState) return; + } + throw new IllegalArgumentException(String.format("Expected to be in one of these states %s but was actuall in state %s", Arrays.toString(expectedStates), currentState)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java new file mode 100644 index 000000000..43dba6573 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/WriteState.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex; + +import org.apache.drill.exec.vector.complex.writer.FieldWriter; + + +public class WriteState { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriteState.class); + + private FieldWriter failPoint; + + public boolean isFailed(){ + return failPoint != null; + } + + public boolean isOk(){ + return failPoint == null; + } + + public void fail(FieldWriter w){ + assert failPoint == null; + failPoint = w; + +// System.out.println("Fail Point " + failPoint); + } + + public void reset(){ + failPoint = null; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java new file mode 100644 index 000000000..761bc797a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.fn; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.UnpooledByteBufAllocator; + +import java.io.IOException; +import java.io.Reader; + +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.expr.holders.BitHolder; +import org.apache.drill.exec.expr.holders.Float8Holder; +import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.core.JsonParser.Feature; +import com.google.common.base.Charsets; + +public class JsonReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonReader.class); + + public static enum WriteState { + WRITE_SUCCEED, WRITE_FAILED, NO_MORE + } + + private final JsonFactory factory = new JsonFactory(); + private ByteBufInputStream stream; + private long byteOffset; + private JsonRecordSplitter splitter; + private Reader reader; + private JsonParser parser; + + public JsonReader(JsonRecordSplitter splitter) throws JsonParseException, IOException { + this.splitter = splitter; + factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); + factory.configure(Feature.ALLOW_COMMENTS, true); + reader = splitter.getNextReader(); + } + + public WriteState write(ComplexWriter writer) throws JsonParseException, IOException { + if(reader == null){ + reader = splitter.getNextReader(); + if(reader == null) return WriteState.NO_MORE; + + } + + parser = factory.createJsonParser(reader); + reader.mark(1024*128); + JsonToken t = parser.nextToken(); + while(!parser.hasCurrentToken()) t = parser.nextToken(); + + + switch (t) { + case START_OBJECT: + writeData(writer.rootAsMap()); + break; + case START_ARRAY: + writeData(writer.rootAsList()); + break; + case NOT_AVAILABLE: + return WriteState.NO_MORE; + default: + throw new JsonParseException( + String.format("Failure while parsing JSON. Found token of [%s] Drill currently only supports parsing " + + "json strings that contain either lists or maps. The root object cannot be a scalar.", + t), + parser.getCurrentLocation()); + } + + if (!writer.ok()) { + reader.reset(); + return WriteState.WRITE_FAILED; + } else { + reader = null; + return WriteState.WRITE_SUCCEED; + } + } + + + private void writeData(MapWriter map) throws JsonParseException, IOException { + // + map.start(); + outside: while(true){ + JsonToken t = parser.nextToken(); + if(t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) return; + + assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name()); + final String fieldName = parser.getText(); + + + switch(parser.nextToken()){ + case START_ARRAY: + writeData(map.list(fieldName)); + break; + case START_OBJECT: + writeData(map.map(fieldName)); + break; + case END_OBJECT: + break outside; + + case VALUE_EMBEDDED_OBJECT: + case VALUE_FALSE: { + BitHolder h = new BitHolder(); + h.value = 0; + map.bit(fieldName).write(h); + break; + } + case VALUE_TRUE: { + BitHolder h = new BitHolder(); + h.value = 1; + map.bit(fieldName).write(h); + break; + } + case VALUE_NULL: + // do nothing as we don't have a type. + break; + case VALUE_NUMBER_FLOAT: + Float8Holder fh = new Float8Holder(); + fh.value = parser.getDoubleValue(); + map.float8(fieldName).write(fh); + break; + case VALUE_NUMBER_INT: + BigIntHolder bh = new BigIntHolder(); + bh.value = parser.getLongValue(); + map.bigInt(fieldName).write(bh); + break; + case VALUE_STRING: + VarCharHolder vh = new VarCharHolder(); + String value = parser.getText(); + byte[] b = value.getBytes(Charsets.UTF_8); + ByteBuf d = UnpooledByteBufAllocator.DEFAULT.buffer(b.length); + d.setBytes(0, b); + vh.buffer = d; + vh.start = 0; + vh.end = b.length; + map.varChar(fieldName).write(vh); + break; + + default: + throw new IllegalStateException("Unexpected token " + parser.getCurrentToken()); + + } + + } + map.end(); + + } + + private void writeData(ListWriter list) throws JsonParseException, IOException { + list.start(); + outside: while(true){ + + switch(parser.nextToken()){ + case START_ARRAY: + writeData(list.list()); + break; + case START_OBJECT: + writeData(list.map()); + break; + case END_ARRAY: + case END_OBJECT: + break outside; + + case VALUE_EMBEDDED_OBJECT: + case VALUE_FALSE:{ + BitHolder h = new BitHolder(); + h.value = 0; + list.bit().write(h); + break; + } + case VALUE_TRUE: { + BitHolder h = new BitHolder(); + h.value = 1; + list.bit().write(h); + break; + } + case VALUE_NULL: + // do nothing as we don't have a type. + break; + case VALUE_NUMBER_FLOAT: + Float8Holder fh = new Float8Holder(); + fh.value = parser.getDoubleValue(); + list.float8().write(fh); + break; + case VALUE_NUMBER_INT: + BigIntHolder bh = new BigIntHolder(); + bh.value = parser.getLongValue(); + list.bigInt().write(bh); + break; + case VALUE_STRING: + VarCharHolder vh = new VarCharHolder(); + String value = parser.getText(); + byte[] b = value.getBytes(Charsets.UTF_8); + ByteBuf d = UnpooledByteBufAllocator.DEFAULT.buffer(b.length); + d.setBytes(0, b); + vh.buffer = d; + vh.start = 0; + vh.end = b.length; + list.varChar().write(vh); + + default: + throw new IllegalStateException("Unexpected token " + parser.getCurrentToken()); + } + } + list.end(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java new file mode 100644 index 000000000..6f6e7afbb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonRecordSplitter.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.fn; + +import java.io.IOException; +import java.io.Reader; + +public interface JsonRecordSplitter { + + public abstract Reader getNextReader() throws IOException; + +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java new file mode 100644 index 000000000..0624eceb1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.fn; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerationException; +import com.fasterxml.jackson.core.JsonGenerator; + +public class JsonWriter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonWriter.class); + + private final JsonFactory factory = new JsonFactory(); + private final JsonGenerator gen; + + public JsonWriter(OutputStream out, boolean pretty) throws IOException{ + JsonGenerator writer = factory.createJsonGenerator(out); + gen = pretty ? writer.useDefaultPrettyPrinter() : writer; + } + + public void write(FieldReader reader) throws JsonGenerationException, IOException{ + writeValue(reader); + gen.flush(); + } + + private void writeValue(FieldReader reader) throws JsonGenerationException, IOException{ + final DataMode m = reader.getType().getMode(); + final MinorType mt = reader.getType().getMinorType(); + + switch(m){ + case OPTIONAL: + if(!reader.isSet()){ + gen.writeNull(); + break; + } + + case REQUIRED: + + + switch (mt) { + case FLOAT4: + gen.writeNumber(reader.readFloat()); + break; + case FLOAT8: + gen.writeNumber(reader.readDouble()); + break; + case INT: + Integer i = reader.readInteger(); + if(i == null){ + gen.writeNull(); + }else{ + gen.writeNumber(reader.readInteger()); + } + break; + case SMALLINT: + gen.writeNumber(reader.readShort()); + break; + case TINYINT: + gen.writeNumber(reader.readByte()); + break; + case BIGINT: + Long l = reader.readLong(); + if(l == null){ + gen.writeNull(); + }else{ + gen.writeNumber(reader.readLong()); + } + + break; + case BIT: + gen.writeBoolean(reader.readBoolean()); + break; + + case DATE: + case TIME: + case TIMESTAMP: + case TIMESTAMPTZ: + gen.writeString(reader.readDateTime().toString()); + + case INTERVALYEAR: + case INTERVALDAY: + case INTERVAL: + gen.writeString(reader.readPeriod().toString()); + break; + case DECIMAL28DENSE: + case DECIMAL28SPARSE: + case DECIMAL38DENSE: + case DECIMAL38SPARSE: + case DECIMAL9: + case DECIMAL18: + gen.writeNumber(reader.readBigDecimal()); + break; + + case LIST: + // this is a pseudo class, doesn't actually contain the real reader so we have to drop down. + writeValue(reader.reader()); + break; + case MAP: + gen.writeStartObject(); + for(String name : reader){ + if(reader.isSet()){ + gen.writeFieldName(name); + writeValue(reader.reader(name)); + } + } + gen.writeEndObject(); + break; + case NULL: + gen.writeNull(); + break; + + case VAR16CHAR: + gen.writeString(reader.readString()); + break; + case VARBINARY: + gen.writeBinary(reader.readByteArray()); + break; + case VARCHAR: + gen.writeString(reader.readText().toString()); + break; + + } + break; + + case REPEATED: + gen.writeStartArray(); + switch (mt) { + case FLOAT4: + for(int i = 0; i < reader.size(); i++){ + gen.writeNumber(reader.readFloat(i)); + } + + break; + case FLOAT8: + for(int i = 0; i < reader.size(); i++){ + gen.writeNumber(reader.readDouble(i)); + } + break; + case INT: + for(int i = 0; i < reader.size(); i++){ + gen.writeNumber(reader.readInteger(i)); + } + break; + case SMALLINT: + for(int i = 0; i < reader.size(); i++){ + gen.writeNumber(reader.readShort(i)); + } + break; + case TINYINT: + for(int i = 0; i < reader.size(); i++){ + gen.writeNumber(reader.readByte(i)); + } + break; + case BIGINT: + for(int i = 0; i < reader.size(); i++){ + gen.writeNumber(reader.readLong(i)); + } + break; + case BIT: + for(int i = 0; i < reader.size(); i++){ + gen.writeBoolean(reader.readBoolean(i)); + } + break; + + case DATE: + case TIME: + case TIMESTAMP: + case TIMESTAMPTZ: + for(int i = 0; i < reader.size(); i++){ + gen.writeString(reader.readDateTime(i).toString()); + } + + case INTERVALYEAR: + case INTERVALDAY: + case INTERVAL: + for(int i = 0; i < reader.size(); i++){ + gen.writeString(reader.readPeriod(i).toString()); + } + break; + case DECIMAL28DENSE: + case DECIMAL28SPARSE: + case DECIMAL38DENSE: + case DECIMAL38SPARSE: + case DECIMAL9: + case DECIMAL18: + for(int i = 0; i < reader.size(); i++){ + gen.writeNumber(reader.readBigDecimal(i)); + } + break; + + case LIST: + for(int i = 0; i < reader.size(); i++){ + while(reader.next()){ + writeValue(reader.reader()); + } + } + break; + case MAP: + while(reader.next()){ + gen.writeStartObject(); + for(String name : reader){ + FieldReader mapField = reader.reader(name); + if(mapField.isSet()){ + gen.writeFieldName(name); + writeValue(mapField); + } + } + gen.writeEndObject(); + } + break; + case NULL: + break; + + case VAR16CHAR: + for(int i = 0; i < reader.size(); i++){ + gen.writeString(reader.readString(i)); + } + break; + case VARBINARY: + for(int i = 0; i < reader.size(); i++){ + gen.writeBinary(reader.readByteArray(i)); + } + break; + case VARCHAR: + for(int i = 0; i < reader.size(); i++){ + gen.writeString(reader.readText(i).toString()); + } + break; + + } + gen.writeEndArray(); + break; + } + + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java new file mode 100644 index 000000000..0cdbf852b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/ReaderJSONRecordSplitter.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.fn; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; + +import com.google.common.io.CharStreams; + +public class ReaderJSONRecordSplitter implements JsonRecordSplitter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderJSONRecordSplitter.class); + + private static final int OPEN_CBRACKET = '{'; + private static final int OPEN_BRACKET = '['; + private static final int CLOSE_CBRACKET = '}'; + private static final int CLOSE_BRACKET = ']'; + + private static final int SPACE = ' '; + private static final int TAB = '\t'; + private static final int NEW_LINE = '\n'; + private static final int FORM_FEED = '\f'; + private static final int CR = '\r'; + + private long start = 0; + private Reader reader; + + public ReaderJSONRecordSplitter(Reader reader){ + this.reader = reader; + } + + public ReaderJSONRecordSplitter(String str){ + this.reader = new StringReader(str); + } + + @Override + public Reader getNextReader() throws IOException{ + + boolean inCandidate = false; + boolean found = false; + + reader.mark(128*1024); + long endOffset = start; + outside: while(true){ + int c = reader.read(); +// System.out.println(b); + endOffset++; + + if(c == -1){ + if(inCandidate){ + found = true; + } + break; + } + + switch(c){ + case CLOSE_BRACKET: + case CLOSE_CBRACKET: +// System.out.print("c"); + inCandidate = true; + break; + case OPEN_BRACKET: + case OPEN_CBRACKET: +// System.out.print("o"); + if(inCandidate){ + found = true; + break outside; + } + break; + + case SPACE: + case TAB: + case NEW_LINE: + case CR: + case FORM_FEED: +// System.out.print(' '); + break; + + default: +// System.out.print('-'); + inCandidate = false; + } + } + + if(found){ + long maxBytes = endOffset - 1 - start; + start = endOffset; + reader.reset(); + return new LimitedReader(reader, (int) maxBytes); + }else{ + return null; + } + + } + + private class LimitedReader extends Reader { + + private final Reader incoming; + private final int maxBytes; + private int markedBytes = 0; + private int bytes = 0; + + public LimitedReader(Reader in, int maxBytes) { + this.maxBytes = maxBytes; + this.incoming = in; + } + + @Override + public int read() throws IOException { + if (bytes >= maxBytes){ + return -1; + }else{ + bytes++; + return incoming.read(); + } + + + } + + + @Override + public void mark(int readAheadLimit) throws IOException { + incoming.mark(readAheadLimit); + markedBytes = bytes; + } + + @Override + public void reset() throws IOException { + incoming.reset(); + bytes = markedBytes; + } + + @Override + public int read(char[] cbuf, int off, int len) throws IOException { + int outputLength = Math.min(len, maxBytes - bytes); + if(outputLength > 0){ + incoming.read(cbuf, off, outputLength); + bytes += outputLength; + return outputLength; + }else{ + return -1; + } + } + + @Override + public void close() throws IOException { + } + + } + + public static void main(String[] args) throws Exception{ + String str = " { \"b\": \"hello\", \"c\": \"goodbye\", r: []}\n { \"b\": \"yellow\", \"c\": \"red\"}\n "; + JsonRecordSplitter splitter = new ReaderJSONRecordSplitter(new StringReader(str)); + Reader obj = null; + System.out.println(); + + while( (obj = splitter.getNextReader()) != null){ + System.out.println(); + System.out.println(CharStreams.toString(obj)); + System.out.println("===end obj==="); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java new file mode 100644 index 000000000..e46e1bdc2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/UTF8JsonRecordSplitter.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.fn; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; + +import com.google.common.base.Charsets; +import com.google.common.io.CharStreams; + +public class UTF8JsonRecordSplitter implements JsonRecordSplitter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UTF8JsonRecordSplitter.class); + + private static final int OPEN_CBRACKET = '{'; + private static final int OPEN_BRACKET = '['; + private static final int CLOSE_CBRACKET = '}'; + private static final int CLOSE_BRACKET = ']'; + + private static final int SPACE = ' '; + private static final int TAB = '\t'; + private static final int NEW_LINE = '\n'; + private static final int FORM_FEED = '\f'; + private static final int CR = '\r'; + + private long start = 0; + private InputStream incoming; + + public UTF8JsonRecordSplitter(InputStream incoming){ + this.incoming = new BufferedInputStream(incoming); + } + + @Override + public Reader getNextReader() throws IOException{ + + boolean inCandidate = false; + boolean found = false; + + incoming.mark(128*1024); + long endOffset = start; + outside: while(true){ + int b = incoming.read(); +// System.out.println(b); + endOffset++; + + if(b == -1){ + if(inCandidate){ + found = true; + } + break; + } + + switch(b){ + case CLOSE_BRACKET: + case CLOSE_CBRACKET: +// System.out.print("c"); + inCandidate = true; + break; + case OPEN_BRACKET: + case OPEN_CBRACKET: +// System.out.print("o"); + if(inCandidate){ + found = true; + break outside; + } + break; + + case SPACE: + case TAB: + case NEW_LINE: + case CR: + case FORM_FEED: +// System.out.print(' '); + break; + + default: +// System.out.print('-'); + inCandidate = false; + } + } + + if(found){ + long maxBytes = endOffset - 1 - start; + start = endOffset; + incoming.reset(); + return new BufferedReader(new InputStreamReader(new DelInputStream(incoming, maxBytes), Charsets.UTF_8)); + }else{ + return null; + } + + } + + private class DelInputStream extends InputStream { + + private final InputStream incoming; + private final long maxBytes; + private long bytes = 0; + + public DelInputStream(InputStream in, long maxBytes) { + this.maxBytes = maxBytes; + this.incoming = in; + } + + @Override + public int read() throws IOException { + if (bytes >= maxBytes){ + return -1; + }else{ + bytes++; + return incoming.read(); + } + + + } + + } + + public static void main(String[] args) throws Exception{ + byte[] str = " { \"b\": \"hello\", \"c\": \"goodbye\", r: []}\n { \"b\": \"yellow\", \"c\": \"red\"}\n ".getBytes(Charsets.UTF_8); + InputStream s = new ByteArrayInputStream(str); + JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(s); + Reader obj = null; + System.out.println(); + + while( (obj = splitter.getNextReader()) != null){ + System.out.println(); + System.out.println(CharStreams.toString(obj)); + System.out.println("===end obj==="); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java new file mode 100644 index 000000000..8f892e73a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.impl; + +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + + +abstract class AbstractBaseReader implements FieldReader{ + + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseReader.class); + + private int index; + + public AbstractBaseReader() { + super(); + } + + public void setPosition(int index){ + this.index = index; + } + + int idx(){ + return index; + } + + @Override + public Iterator<String> iterator() { + throw new IllegalStateException("The current reader doesn't support reading as a map."); + } + + public MajorType getType(){ + throw new IllegalStateException("The current reader doesn't support getting type information."); + } + + @Override + public boolean next() { + throw new IllegalStateException("The current reader doesn't support getting next information."); + } + + @Override + public int size() { + throw new IllegalStateException("The current reader doesn't support getting size information."); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java new file mode 100644 index 000000000..7aa984689 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseWriter.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.impl; + +import org.apache.drill.exec.vector.complex.WriteState; +import org.apache.drill.exec.vector.complex.writer.FieldWriter; + + +abstract class AbstractBaseWriter implements FieldWriter{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBaseWriter.class); + + final WriteState state; + final FieldWriter parent; + private int index; + + public AbstractBaseWriter(FieldWriter parent) { + super(); + this.state = parent == null ? new WriteState() : parent.getState(); + this.parent = parent; + } + + public FieldWriter getParent() { + return parent; + } + + public boolean ok(){ + return state.isOk(); + } + + public boolean isRoot(){ + return parent == null; + } + + int idx(){ + return index; + } + protected void resetState(){ + state.reset(); + } + + public void setPosition(int index){ + this.index = index; + } + + void inform(boolean outcome){ + if(!outcome){ + state.fail(this); + } + } + + public WriteState getState(){ + return state; + } + + public void end(){ + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java new file mode 100644 index 000000000..c6ea75bb3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.impl; + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.StateTool; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + +import com.google.hive12.common.base.Preconditions; + +public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWriter{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class); + + SingleMapWriter mapRoot; + SingleListWriter listRoot; + MapVector container; + + Mode mode = Mode.INIT; + private final String name; + + private enum Mode { INIT, MAP, LIST }; + + public ComplexWriterImpl(String name, MapVector container){ + super(null); + this.name = name; + this.container = container; + } + + private void check(Mode... modes){ + StateTool.check(mode, modes); + } + + public void reset(){ + setPosition(0); + resetState(); + } + + public void clear(){ + switch(mode){ + case MAP: + mapRoot.clear(); + break; + case LIST: + listRoot.clear(); + break; + } + } + + public void setValueCount(int count){ + switch(mode){ + case MAP: + mapRoot.setValueCount(count); + break; + case LIST: + listRoot.setValueCount(count); + break; + } + } + + public void setPosition(int index){ + super.setPosition(index); + switch(mode){ + case MAP: + mapRoot.setPosition(index); + break; + case LIST: + listRoot.setPosition(index); + break; + } + } + + + public MapWriter directMap(){ + Preconditions.checkArgument(name == null); + + switch(mode){ + + case INIT: + MapVector map = (MapVector) container; + mapRoot = new SingleMapWriter(map, this); + mapRoot.setPosition(idx()); + mode = Mode.MAP; + break; + + case MAP: + break; + + default: + check(Mode.INIT, Mode.MAP); + } + + return mapRoot; + } + + @Override + public MapWriter rootAsMap() { + switch(mode){ + + case INIT: + MapVector map = container.addOrGet(name, Types.required(MinorType.MAP), MapVector.class); + mapRoot = new SingleMapWriter(map, this); + mapRoot.setPosition(idx()); + mode = Mode.MAP; + break; + + case MAP: + break; + + default: + check(Mode.INIT, Mode.MAP); + } + + return mapRoot; + } + + + @Override + public void allocate() { + if(mapRoot != null){ + mapRoot.allocate(); + }else if(listRoot != null){ + listRoot.allocate(); + } + } + + @Override + public ListWriter rootAsList() { + switch(mode){ + + case INIT: + listRoot = new SingleListWriter(name, container, this); + listRoot.setPosition(idx()); + mode = Mode.LIST; + break; + + case LIST: + break; + + default: + check(Mode.INIT, Mode.MAP); + } + + return listRoot; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java new file mode 100644 index 000000000..c555f35ff --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedListReaderImpl.java @@ -0,0 +1,113 @@ + +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.vector.complex.impl; + + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.holders.RepeatedListHolder; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.RepeatedListVector; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; + +public class RepeatedListReaderImpl extends AbstractFieldReader{ + private static final int NO_VALUES = Integer.MAX_VALUE - 1; + private static final MajorType TYPE = Types.repeated(MinorType.LIST); + private final String name; + private final RepeatedListVector container; + private FieldReader reader; + + public RepeatedListReaderImpl(String name, RepeatedListVector container){ + super(); + this.name = name; + this.container = container; + } + + public MajorType getType(){ + return TYPE; + } + + public void copyAsValue(ListWriter writer){ + if(currentOffset == NO_VALUES) return; + RepeatedListWriter impl = (RepeatedListWriter) writer; + impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), container)); + } + + public void copyAsField(String name, MapWriter writer){ + if(currentOffset == NO_VALUES) return; + RepeatedListWriter impl = (RepeatedListWriter) writer.list(name); + impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), container)); + } + + private int currentOffset; + private int maxOffset; + + public int size(){ + return maxOffset - currentOffset; + } + + public void setPosition(int index){ + super.setPosition(index); + RepeatedListHolder h = new RepeatedListHolder(); + container.getAccessor().get(index, h); + if(h.start == h.end){ + currentOffset = NO_VALUES; + }else{ + currentOffset = h.start-1; + maxOffset = h.end; + if(reader != null) reader.setPosition(currentOffset); + } + } + + public boolean next(){ + if(currentOffset +1 < maxOffset){ + currentOffset++; + if(reader != null) reader.setPosition(currentOffset); + return true; + }else{ + currentOffset = NO_VALUES; + return false; + } + } + + @Override + public Object readObject() { + return container.getAccessor().getObject(idx()); + } + + public FieldReader reader(){ + if(reader == null){ + reader = container.get(name, ValueVector.class).getAccessor().getReader(); + reader.setPosition(currentOffset); + } + return reader; + } + + public boolean isSet(){ + return true; + } + + +} + + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java new file mode 100644 index 000000000..ab778ff3f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/RepeatedMapReaderImpl.java @@ -0,0 +1,205 @@ + + +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.vector.complex.impl; + + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.collect.ObjectArrays; +import com.google.common.base.Charsets; +import com.google.common.collect.ObjectArrays; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.SchemaDefProtos; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; +import org.apache.drill.exec.record.*; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.types.TypeProtos.*; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.complex.*; +import org.apache.drill.exec.vector.complex.reader.*; +import org.apache.drill.exec.vector.complex.writer.*; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; + +import com.sun.codemodel.JType; +import com.sun.codemodel.JCodeModel; + +import java.util.Arrays; +import java.util.Random; +import java.util.List; +import java.io.Closeable; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.math.BigDecimal; +import java.math.BigInteger; + +import org.joda.time.DateTime; +import org.joda.time.Period; +import org.apache.hadoop.io.Text; + + + + + + + + + + + + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.vector.complex.MapVector; + +import com.google.common.collect.Maps; + +@SuppressWarnings("unused") +public class RepeatedMapReaderImpl extends AbstractFieldReader{ + private static final int NO_VALUES = Integer.MAX_VALUE - 1; + + private final RepeatedMapVector vector; + private final Map<String, FieldReader> fields = Maps.newHashMap(); + + public RepeatedMapReaderImpl(RepeatedMapVector vector) { + this.vector = vector; + } + + private void setChildrenPosition(int index){ + for(FieldReader r : fields.values()){ + r.setPosition(index); + } + } + + public FieldReader reader(String name){ + FieldReader reader = fields.get(name); + if(reader == null){ + ValueVector child = vector.get(name, ValueVector.class); + if(child == null){ + reader = NullReader.INSTANCE; + }else{ + reader = child.getAccessor().getReader(); + } + fields.put(name, reader); + reader.setPosition(currentOffset); + } + return reader; + } + + + private int currentOffset; + private int maxOffset; + + public int size(){ + return maxOffset - currentOffset; + } + + public void setPosition(int index){ + super.setPosition(index); + RepeatedMapHolder h = new RepeatedMapHolder(); + vector.getAccessor().get(index, h); + if(h.start == h.end){ + currentOffset = NO_VALUES; + }else{ + currentOffset = h.start-1; + maxOffset = h.end; + setChildrenPosition(currentOffset); + } + } + + public void setSinglePosition(int index, int childIndex){ + super.setPosition(index); + RepeatedMapHolder h = new RepeatedMapHolder(); + vector.getAccessor().get(index, h); + if(h.start == h.end){ + currentOffset = NO_VALUES; + }else{ + int singleOffset = h.start + childIndex; + assert singleOffset < h.end; + currentOffset = singleOffset; + maxOffset = singleOffset + 1; + setChildrenPosition(singleOffset); + } + } + + public boolean next(){ + if(currentOffset +1 < maxOffset){ + setChildrenPosition(++currentOffset); + return true; + }else{ + currentOffset = NO_VALUES; + return false; + } + } + + @Override + public Object readObject() { + return vector.getAccessor().getObject(idx()); + } + + public MajorType getType(){ + return vector.getField().getType(); + } + + public java.util.Iterator<String> iterator(){ + return vector.fieldNameIterator(); + } + + @Override + public boolean isSet() { + return false; + } + + public void copyAsValue(MapWriter writer){ + if(currentOffset == NO_VALUES) return; + RepeatedMapWriter impl = (RepeatedMapWriter) writer; + impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector)); + } + + public void copyAsValueSingle(MapWriter writer){ + if(currentOffset == NO_VALUES) return; + SingleMapWriter impl = (SingleMapWriter) writer; + impl.inform(impl.container.copyFromSafe(currentOffset, impl.idx(), vector)); + } + + public void copyAsField(String name, MapWriter writer){ + if(currentOffset == NO_VALUES) return; + RepeatedMapWriter impl = (RepeatedMapWriter) writer.map(name); + impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector)); + } + + +} + + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java new file mode 100644 index 000000000..36e04a7bb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleListReaderImpl.java @@ -0,0 +1,92 @@ + +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.vector.complex.impl; + + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.AbstractContainerVector; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; + + + + + + + + +@SuppressWarnings("unused") +public class SingleListReaderImpl extends AbstractFieldReader{ + + private static final MajorType TYPE = Types.optional(MinorType.LIST); + private final String name; + private final AbstractContainerVector container; + private FieldReader reader; + + public SingleListReaderImpl(String name, AbstractContainerVector container){ + super(); + this.name = name; + this.container = container; + } + + public MajorType getType(){ + return TYPE; + } + + + public void setPosition(int index){ + super.setPosition(index); + if(reader != null) reader.setPosition(index); + } + + @Override + public Object readObject() { + return reader.readObject(); + } + + public FieldReader reader(){ + if(reader == null){ + reader = container.get(name, ValueVector.class).getAccessor().getReader(); + setPosition(idx()); + } + return reader; + } + + @Override + public boolean isSet() { + return false; + } + + public void copyAsValue(ListWriter writer){ + throw new UnsupportedOperationException("Generic list copying not yet supported. Please resolve to typed list."); + } + + public void copyAsField(String name, MapWriter writer){ + throw new UnsupportedOperationException("Generic list copying not yet supported. Please resolve to typed list."); + } + + + +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java new file mode 100644 index 000000000..2158fcc8c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/SingleMapReaderImpl.java @@ -0,0 +1,154 @@ + + +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.vector.complex.impl; + + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import com.google.common.collect.ObjectArrays; +import com.google.common.base.Charsets; +import com.google.common.collect.ObjectArrays; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.proto.SchemaDefProtos; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; +import org.apache.drill.exec.record.*; +import org.apache.drill.exec.vector.*; +import org.apache.drill.exec.expr.holders.*; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.types.TypeProtos.*; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.complex.*; +import org.apache.drill.exec.vector.complex.reader.*; +import org.apache.drill.exec.vector.complex.writer.*; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; + +import com.sun.codemodel.JType; +import com.sun.codemodel.JCodeModel; + +import java.util.Arrays; +import java.util.Random; +import java.util.List; +import java.io.Closeable; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.math.BigDecimal; +import java.math.BigInteger; + +import org.joda.time.DateTime; +import org.joda.time.Period; +import org.apache.hadoop.io.Text; + + + + + + + + + + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.vector.complex.MapVector; + +import com.google.common.collect.Maps; + +@SuppressWarnings("unused") +public class SingleMapReaderImpl extends AbstractFieldReader{ + + private final MapVector vector; + private final Map<String, FieldReader> fields = Maps.newHashMap(); + + public SingleMapReaderImpl(MapVector vector) { + this.vector = vector; + } + + private void setChildrenPosition(int index){ + for(FieldReader r : fields.values()){ + r.setPosition(index); + } + } + + public FieldReader reader(String name){ + FieldReader reader = fields.get(name); + if(reader == null){ + ValueVector child = vector.get(name, ValueVector.class); + if(child == null){ + reader = NullReader.INSTANCE; + }else{ + reader = child.getAccessor().getReader(); + } + fields.put(name, reader); + reader.setPosition(idx()); + } + return reader; + } + + public void setPosition(int index){ + super.setPosition(index); + for(FieldReader r : fields.values()){ + r.setPosition(index); + } + } + + @Override + public Object readObject() { + return vector.getAccessor().getObject(idx()); + } + + @Override + public boolean isSet() { + return true; + } + + public MajorType getType(){ + return vector.getField().getType(); + } + + public java.util.Iterator<String> iterator(){ + return vector.fieldNameIterator(); + } + + public void copyAsValue(MapWriter writer){ + SingleMapWriter impl = (SingleMapWriter) writer; + impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector)); + } + + public void copyAsField(String name, MapWriter writer){ + SingleMapWriter impl = (SingleMapWriter) writer.map(name); + impl.inform(impl.container.copyFromSafe(idx(), impl.idx(), vector)); + } + + +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java new file mode 100644 index 000000000..bc1d3675b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.impl; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + +public class VectorContainerWriter extends AbstractFieldWriter implements ComplexWriter { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerWriter.class); + + SingleMapWriter mapRoot; + SpecialMapVector mapVector; + OutputMutator mutator; + + public VectorContainerWriter(OutputMutator mutator) { + super(null); + this.mutator = mutator; + this.mapVector = new SpecialMapVector(); + this.mapRoot = new SingleMapWriter(mapVector, this); + } + + public void reset() { + setPosition(0); + resetState(); + } + + public void clear() { + mapRoot.clear(); + } + + public SingleMapWriter getWriter() { + return mapRoot; + } + + public void setValueCount(int count) { + mapRoot.setValueCount(count); + } + + public void setPosition(int index) { + super.setPosition(index); + mapRoot.setPosition(index); + } + + public MapWriter directMap() { + return mapRoot; + } + + @Override + public void allocate() { + mapRoot.allocate(); + } + + private class SpecialMapVector extends MapVector { + + public SpecialMapVector() { + super("", null); + } + + @Override + public <T extends ValueVector> T addOrGet(String name, MajorType type, Class<T> clazz) { + try { + ValueVector v = mutator.addField(MaterializedField.create(name, type), clazz); + this.put(name, v); + return this.typeify(v, clazz); + } catch (SchemaChangeException e) { + throw new IllegalStateException(e); + } + + } + + } + + @Override + public MapWriter rootAsMap() { + return mapRoot; + } + + @Override + public ListWriter rootAsList() { + throw new UnsupportedOperationException( + "Drill doesn't support objects whose first level is a scalar or array. Objects must start as maps."); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java new file mode 100644 index 000000000..caa3aa64b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/reader/FieldReader.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.reader; + +import org.apache.drill.exec.vector.complex.reader.BaseReader.ListReader; +import org.apache.drill.exec.vector.complex.reader.BaseReader.MapReader; +import org.apache.drill.exec.vector.complex.reader.BaseReader.RepeatedListReader; +import org.apache.drill.exec.vector.complex.reader.BaseReader.RepeatedMapReader; +import org.apache.drill.exec.vector.complex.reader.BaseReader.ScalarReader; + + + +public interface FieldReader extends MapReader, ListReader, ScalarReader, RepeatedMapReader, RepeatedListReader { +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java new file mode 100644 index 000000000..3faa4f700 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/writer/FieldWriter.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.complex.writer; + +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ScalarWriter; + + + +public interface FieldWriter extends MapWriter, ListWriter, ScalarWriter { + public void allocate(); + public void clear(); +}
\ No newline at end of file |