aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshuifeng lu <lushuifeng@gmail.com>2018-09-26 11:22:20 +0800
committerBen-Zvi <bben-zvi@mapr.com>2018-10-26 20:09:12 -0700
commit18e09a1b1c801f2691a05ae7db543bf71874cfea (patch)
treed46867d40be8f526981dcde351bcec51f68c0f70
parentf39c7724cbbbe04a53ce98b74dfada46b37ecb0d (diff)
DRILL-6763: Codegen optimization of SQL functions with constant values(#1481)
closes #1481
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java54
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java186
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java68
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java14
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java12
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java39
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java17
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java36
20 files changed, 405 insertions, 185 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
index 134f90ee1..5b33acfb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
@@ -22,10 +22,13 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
@@ -39,7 +42,9 @@ import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.compile.sig.SignatureHolder;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.fn.WorkspaceReference;
+import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -65,6 +70,7 @@ public class ClassGenerator<T>{
public static final GeneratorMapping DEFAULT_SCALAR_MAP = GM("doSetup", "doEval", null, null);
public static final GeneratorMapping DEFAULT_CONSTANT_MAP = GM("doSetup", "doSetup", null, null);
+ public static final String INNER_CLASS_FIELD_NAME = "innerClassField";
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClassGenerator.class);
@@ -76,6 +82,7 @@ public class ClassGenerator<T>{
private final Map<String, ClassGenerator<T>> innerClasses = Maps.newHashMap();
private final List<TypedFieldId> workspaceTypes = Lists.newArrayList();
private final Map<WorkspaceReference, JVar> workspaceVectors = Maps.newHashMap();
+ private final Map<Pair<Integer, JVar>, Function<DrillBuf, ? extends ValueHolder>> constantVars;
private final CodeGenerator<T> codeGenerator;
public final JDefinedClass clazz;
@@ -87,6 +94,8 @@ public class ClassGenerator<T>{
private LinkedList<SizedJBlock>[] blocks;
private LinkedList<SizedJBlock>[] oldBlocks;
+ private JVar innerClassField;
+
/**
* Assumed that field has 3 indexes within the constant pull: index of the CONSTANT_Fieldref_info +
* CONSTANT_Fieldref_info.name_and_type_index + CONSTANT_NameAndType_info.name_index.
@@ -135,6 +144,7 @@ public class ClassGenerator<T>{
this.evaluationVisitor = eval;
this.model = model;
this.optionManager = optionManager;
+ constantVars = new HashMap<>();
blocks = (LinkedList<SizedJBlock>[]) new LinkedList[sig.size()];
for (int i =0; i < sig.size(); i++) {
@@ -370,6 +380,7 @@ public class ClassGenerator<T>{
innerClassGenerator.maxIndex += index;
// blocks from the inner class should be used
setupInnerClassBlocks();
+ innerClassField = clazz.field(JMod.NONE, model.ref(innerClassGenerator.clazz.name()), INNER_CLASS_FIELD_NAME);
return true;
}
return innerClassGenerator.createNestedClass();
@@ -425,10 +436,8 @@ public class ClassGenerator<T>{
* Creates methods from the signature {@code sig} with body from the appropriate {@code blocks}.
*/
void flushCode() {
- JVar innerClassField = null;
if (innerClassGenerator != null) {
blocks = oldBlocks;
- innerClassField = clazz.field(JMod.NONE, model.ref(innerClassGenerator.clazz.name()), "innerClassField");
innerClassGenerator.flushCode();
}
int i = 0;
@@ -531,11 +540,48 @@ public class ClassGenerator<T>{
public JVar declareClassField(String prefix, JType t, JExpression init) {
if (innerClassGenerator != null && hasMaxIndexValue()) {
- return innerClassGenerator.clazz.field(JMod.NONE, t, prefix + index++, init);
+ return innerClassGenerator.declareClassField(prefix, t, init);
}
return clazz.field(JMod.NONE, t, prefix + index++, init);
}
+ public Pair<Integer, JVar> declareClassConstField(String prefix, JType t,
+ Function<DrillBuf, ? extends ValueHolder> function) {
+ return declareClassConstField(prefix, t, null, function);
+ }
+
+ /**
+ * declare a constant field for the class.
+ * argument {@code function} holds the constant value which
+ * returns a value holder must be set to the class field when the class instance created.
+ * the class field innerClassField will be created if innerClassGenerator exists.
+ *
+ * @param prefix the prefix name of class field
+ * @param t the type of class field
+ * @param init init expression
+ * @param function the function holds the constant value
+ * @return the depth of nested class, class field
+ */
+ public Pair<Integer, JVar> declareClassConstField(String prefix, JType t, JExpression init,
+ Function<DrillBuf, ? extends ValueHolder> function) {
+ JVar var;
+ int depth = 1;
+ if (innerClassGenerator != null) {
+ Pair<Integer, JVar> nested = innerClassGenerator.declareClassConstField(prefix, t, init, function);
+ depth = nested.getKey() + 1;
+ var = nested.getValue();
+ } else {
+ var = clazz.field(JMod.NONE, t, prefix + index++, init);
+ }
+ Pair<Integer, JVar> depthVar = Pair.of(depth, var);
+ constantVars.put(depthVar, function);
+ return depthVar;
+ }
+
+ public Map<Pair<Integer, JVar>, Function<DrillBuf, ? extends ValueHolder>> getConstantVars() {
+ return constantVars;
+ }
+
public HoldingContainer declare(MajorType t) {
return declare(t, true);
}
@@ -646,7 +692,7 @@ public class ClassGenerator<T>{
Class<?> p = params[i];
childNew.arg(shim.param(model._ref(p), "arg" + i));
}
- shim.body()._return(childNew);
+ shim.body()._return(JExpr._this().invoke("injectMembers").arg(childNew));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 8685130e2..e13a9adb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
import org.apache.drill.common.expression.AnyValueExpression;
import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.CastExpression;
@@ -56,19 +58,20 @@ import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
import org.apache.drill.common.expression.ValueExpressions.VarDecimalExpression;
import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.expr.ClassGenerator.BlockType;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.fn.AbstractFuncHolder;
+import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.vector.ValueHolderHelper;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import com.sun.codemodel.JBlock;
@@ -76,6 +79,7 @@ import com.sun.codemodel.JClass;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JFieldRef;
import com.sun.codemodel.JInvocation;
import com.sun.codemodel.JLabel;
import com.sun.codemodel.JType;
@@ -265,72 +269,94 @@ public class EvaluationVisitor {
throw new UnsupportedOperationException("All schema paths should have been replaced with ValueVectorExpressions.");
}
+ private HoldingContainer getHoldingContainer(ClassGenerator<?> generator,
+ MajorType majorType,
+ Function<DrillBuf, ? extends ValueHolder> function) {
+ JType holderType = generator.getHolderType(majorType);
+ Pair<Integer, JVar> depthVar = generator.declareClassConstField("const", holderType, function);
+ JFieldRef outputSet = null;
+ JVar var = depthVar.getValue();
+ if (majorType.getMode() == TypeProtos.DataMode.OPTIONAL) {
+ outputSet = var.ref("isSet");
+ }
+ return new HoldingContainer(majorType, var, var.ref("value"), outputSet);
+ }
+
@Override
public HoldingContainer visitLongConstant(LongExpression e, ClassGenerator<?> generator) throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getLong()));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getBigIntHolder(e.getLong()));
}
@Override
public HoldingContainer visitIntConstant(IntExpression e, ClassGenerator<?> generator) throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getInt()));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getIntHolder(e.getInt()));
}
@Override
public HoldingContainer visitDateConstant(DateExpression e, ClassGenerator<?> generator) throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getDate()));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getDateHolder(e.getDate()));
}
@Override
public HoldingContainer visitTimeConstant(TimeExpression e, ClassGenerator<?> generator) throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getTime()));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getTimeHolder(e.getTime()));
}
@Override
public HoldingContainer visitIntervalYearConstant(IntervalYearExpression e, ClassGenerator<?> generator)
throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getIntervalYear()));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getIntervalYearHolder(e.getIntervalYear()));
}
@Override
public HoldingContainer visitTimeStampConstant(TimeStampExpression e, ClassGenerator<?> generator)
throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getTimeStamp()));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getTimeStampHolder(e.getTimeStamp()));
}
@Override
public HoldingContainer visitFloatConstant(FloatExpression e, ClassGenerator<?> generator)
throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getFloat()));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getFloat4Holder(e.getFloat()));
}
@Override
public HoldingContainer visitDoubleConstant(DoubleExpression e, ClassGenerator<?> generator)
throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getDouble()));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getFloat8Holder(e.getDouble()));
}
@Override
public HoldingContainer visitBooleanConstant(BooleanExpression e, ClassGenerator<?> generator)
throws RuntimeException {
- HoldingContainer out = generator.declare(e.getMajorType());
- generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getBoolean() ? 1 : 0));
- return out;
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getBitHolder(e.getBoolean() ? 1 : 0));
}
@Override
@@ -589,110 +615,64 @@ public class EvaluationVisitor {
@Override
public HoldingContainer visitQuotedStringConstant(QuotedString e, ClassGenerator<?> generator)
throws RuntimeException {
- MajorType majorType = e.getMajorType();
- JBlock setup = generator.getBlock(BlockType.SETUP);
- JType holderType = generator.getHolderType(majorType);
- JVar var = generator.declareClassField("string", holderType);
- JExpression stringLiteral = JExpr.lit(e.value);
- JExpression buffer = generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
- setup.assign(var,
- generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getVarCharHolder").arg(buffer).arg(stringLiteral));
- return new HoldingContainer(majorType, var, null, null);
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getVarCharHolder(buffer, e.getString()));
}
@Override
public HoldingContainer visitIntervalDayConstant(IntervalDayExpression e, ClassGenerator<?> generator)
throws RuntimeException {
- MajorType majorType = Types.required(MinorType.INTERVALDAY);
- JBlock setup = generator.getBlock(BlockType.SETUP);
- JType holderType = generator.getHolderType(majorType);
- JVar var = generator.declareClassField("intervalday", holderType);
- JExpression dayLiteral = JExpr.lit(e.getIntervalDay());
- JExpression millisLiteral = JExpr.lit(e.getIntervalMillis());
- setup.assign(
- var,
- generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getIntervalDayHolder").arg(dayLiteral)
- .arg(millisLiteral));
- return new HoldingContainer(majorType, var, null, null);
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getIntervalDayHolder(e.getIntervalDay(), e.getIntervalMillis()));
}
@Override
public HoldingContainer visitDecimal9Constant(Decimal9Expression e, ClassGenerator<?> generator)
throws RuntimeException {
- MajorType majorType = e.getMajorType();
- JBlock setup = generator.getBlock(BlockType.SETUP);
- JType holderType = generator.getHolderType(majorType);
- JVar var = generator.declareClassField("dec9", holderType);
- JExpression valueLiteral = JExpr.lit(e.getIntFromDecimal());
- JExpression scaleLiteral = JExpr.lit(e.getScale());
- JExpression precisionLiteral = JExpr.lit(e.getPrecision());
- setup.assign(
- var,
- generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal9Holder").arg(valueLiteral)
- .arg(scaleLiteral).arg(precisionLiteral));
- return new HoldingContainer(majorType, var, null, null);
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getDecimal9Holder(e.getIntFromDecimal(), e.getScale(), e.getPrecision()));
}
@Override
public HoldingContainer visitDecimal18Constant(Decimal18Expression e, ClassGenerator<?> generator)
throws RuntimeException {
- MajorType majorType = e.getMajorType();
- JBlock setup = generator.getBlock(BlockType.SETUP);
- JType holderType = generator.getHolderType(majorType);
- JVar var = generator.declareClassField("dec18", holderType);
- JExpression valueLiteral = JExpr.lit(e.getLongFromDecimal());
- JExpression scaleLiteral = JExpr.lit(e.getScale());
- JExpression precisionLiteral = JExpr.lit(e.getPrecision());
- setup.assign(
- var,
- generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal18Holder").arg(valueLiteral)
- .arg(scaleLiteral).arg(precisionLiteral));
- return new HoldingContainer(majorType, var, null, null);
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getDecimal18Holder(e.getLongFromDecimal(), e.getScale(), e.getPrecision()));
}
@Override
public HoldingContainer visitDecimal28Constant(Decimal28Expression e, ClassGenerator<?> generator)
throws RuntimeException {
- MajorType majorType = e.getMajorType();
- JBlock setup = generator.getBlock(BlockType.SETUP);
- JType holderType = generator.getHolderType(majorType);
- JVar var = generator.declareClassField("dec28", holderType);
- JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
- JExpression buffer = generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
- setup.assign(var,
- generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal28Holder")
- .arg(buffer).arg(stringLiteral));
- return new HoldingContainer(majorType, var, null, null);
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getDecimal28Holder(buffer, e.getBigDecimal()));
}
@Override
public HoldingContainer visitDecimal38Constant(Decimal38Expression e, ClassGenerator<?> generator)
throws RuntimeException {
- MajorType majorType = e.getMajorType();
- JBlock setup = generator.getBlock(BlockType.SETUP);
- JType holderType = generator.getHolderType(majorType);
- JVar var = generator.declareClassField("dec38", holderType);
- JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
- JExpression buffer = generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
- setup.assign(var,
- generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal38Holder")
- .arg(buffer).arg(stringLiteral));
- return new HoldingContainer(majorType, var, null, null);
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getDecimal38Holder(buffer, e.getBigDecimal()));
}
@Override
public HoldingContainer visitVarDecimalConstant(VarDecimalExpression e, ClassGenerator<?> generator)
throws RuntimeException {
- MajorType majorType = e.getMajorType();
- JBlock setup = generator.getBlock(BlockType.SETUP);
- JType holderType = generator.getHolderType(majorType);
- JVar var = generator.declareClassField("varDec", holderType);
- JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
- JExpression buffer = generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
- setup.assign(var,
- generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getVarDecimalHolder")
- .arg(buffer).arg(stringLiteral));
- return new HoldingContainer(majorType, var, null, null);
+ return getHoldingContainer(
+ generator,
+ e.getMajorType(),
+ buffer -> ValueHolderHelper.getVarDecimalHolder(buffer, e.getBigDecimal()));
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
index 72d861453..a0373d9a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
@@ -227,7 +227,7 @@ public class InterpreterEvaluator {
@Nullable
@Override
public ValueHolder apply(DrillBuf buffer) {
- return ValueHolderHelper.getDecimal28Holder(buffer, decExpr.getBigDecimal().toString());
+ return ValueHolderHelper.getDecimal28Holder(buffer, decExpr.getBigDecimal());
}
});
}
@@ -238,7 +238,7 @@ public class InterpreterEvaluator {
@Nullable
@Override
public ValueHolder apply(DrillBuf buffer) {
- return ValueHolderHelper.getDecimal38Holder(buffer, decExpr.getBigDecimal().toString());
+ return ValueHolderHelper.getDecimal38Holder(buffer, decExpr.getBigDecimal());
}
});
}
@@ -246,7 +246,7 @@ public class InterpreterEvaluator {
@Override
public ValueHolder visitVarDecimalConstant(final ValueExpressions.VarDecimalExpression decExpr, Integer value) throws RuntimeException {
return getConstantValueHolder(decExpr.getBigDecimal().toString(), decExpr.getMajorType().getMinorType(),
- buffer -> ValueHolderHelper.getVarDecimalHolder(Objects.requireNonNull(buffer), decExpr.getBigDecimal().toString()));
+ buffer -> ValueHolderHelper.getVarDecimalHolder(Objects.requireNonNull(buffer), decExpr.getBigDecimal()));
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
index f81d4c983..8005f046a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
import io.netty.buffer.DrillBuf;
@@ -53,7 +54,9 @@ public abstract class BaseFragmentContext implements FragmentContext {
@Override
public <T> T getImplementationClass(final CodeGenerator<T> cg)
throws ClassTransformationException, IOException {
- return getCompiler().createInstance(cg);
+ T instance = getCompiler().createInstance(cg);
+ CodeGenMemberInjector.injectMembers(cg.getRoot(), instance, this);
+ return instance;
}
@Override
@@ -63,7 +66,9 @@ public abstract class BaseFragmentContext implements FragmentContext {
@Override
public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException {
- return getCompiler().createInstances(cg, instanceCount);
+ List<T> instances = getCompiler().createInstances(cg, instanceCount);
+ instances.forEach(instance -> CodeGenMemberInjector.injectMembers(cg.getRoot(), instance, this));
+ return instances;
}
protected abstract BufferManager getBufferManager();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 22dfdf090..aaca8a586 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -379,8 +379,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit)
throws SchemaChangeException, ClassTransformationException, IOException {
return createNewPriorityQueue(
- mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getCompiler(),
- config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode());
+ mainMapping, leftMapping, rightMapping, config.getOrderings(), batch, unionTypeEnabled,
+ codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode(), context);
}
public static MappingSet createMainMappingSet() {
@@ -397,10 +397,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
public static PriorityQueue createNewPriorityQueue(
MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping,
- OptionSet optionSet, FunctionLookupContext functionLookupContext, CodeCompiler codeCompiler,
List<Ordering> orderings, VectorAccessible batch, boolean unionTypeEnabled, boolean codegenDump,
- int limit, BufferAllocator allocator, SelectionVectorMode mode)
+ int limit, BufferAllocator allocator, SelectionVectorMode mode, FragmentContext context)
throws ClassTransformationException, IOException, SchemaChangeException {
+ OptionSet optionSet = context.getOptions();
+ FunctionLookupContext functionLookupContext = context.getFunctionRegistry();
CodeGenerator<PriorityQueue> cg = CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, optionSet);
cg.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
@@ -438,7 +439,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
g.rotateBlock();
g.getEvalBlock()._return(JExpr.lit(0));
- PriorityQueue q = codeCompiler.createInstance(cg);
+ PriorityQueue q = context.getImplementationClass(cg);
q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE);
return q;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 485d36372..9f51204f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -433,6 +433,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
agg.setup(popConfig, htConfig, context, oContext, incoming, this,
aggrExprs,
cgInner.getWorkspaceTypes(),
+ cgInner,
groupByOutFieldIds,
this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 32db9eaf4..d10a84af9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BaseAllocator;
@@ -50,6 +51,7 @@ import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
@@ -137,7 +139,8 @@ public abstract class HashAggTemplate implements HashAggregator {
private HashAggBatch outgoing;
private VectorContainer outContainer;
- private FragmentContext context;
+ protected FragmentContext context;
+ protected ClassGenerator<?> cg;
private OperatorContext oContext;
private BufferAllocator allocator;
@@ -360,7 +363,7 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext,
RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds,
- TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
+ ClassGenerator<?> cg, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
if (valueExprs == null || valueFieldIds == null) {
throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -375,6 +378,7 @@ public abstract class HashAggTemplate implements HashAggregator {
this.oContext = oContext;
this.incoming = incoming;
this.outgoing = outgoing;
+ this.cg = cg;
this.outContainer = outContainer;
this.useMemoryPrediction = context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
this.phase = hashAggrConfig.getAggPhase();
@@ -1097,7 +1101,12 @@ public abstract class HashAggTemplate implements HashAggregator {
// These methods are overridden in the generated class when created as plain Java code.
protected BatchHolder newBatchHolder(int batchRowCount) {
- return new BatchHolder(batchRowCount);
+ return this.injectMembers(new BatchHolder(batchRowCount));
+ }
+
+ protected BatchHolder injectMembers(BatchHolder batchHolder) {
+ CodeGenMemberInjector.injectMembers(cg, batchHolder, context);
+ return batchHolder;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 4c54650cf..5ee77ab27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.HashAggregate;
@@ -46,8 +47,10 @@ public interface HashAggregator {
// OK - batch returned, NONE - end of data, RESTART - call again, EMIT - like OK but EMIT
enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART, AGG_EMIT }
- void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
- LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
+ void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
+ OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+ LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, ClassGenerator<?> cg,
+ TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
IterOutcome getOutcome();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index a14bf8c5f..dcdac954f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -228,7 +228,7 @@ public class ChainedHashTable {
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
HashTable ht = context.getImplementationClass(top);
- ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, outgoing, htContainerOrig);
+ ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, outgoing, htContainerOrig, context, cgInner);
return ht;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java
new file mode 100644
index 000000000..195f002b3
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.sun.codemodel.JVar;
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CodeGenMemberInjector {
+
+ /**
+ * Generated code for a class may have several class members, they
+ * are initialized by invoking this method when the instance created.
+ *
+ * @param cg the class generator
+ * @param instance the class instance created by the compiler
+ * @param context the fragment context
+ */
+ public static void injectMembers(ClassGenerator<?> cg, Object instance, FragmentContext context) {
+ Map<Integer, Object> cachedInstances = new HashMap<>();
+ for (Map.Entry<Pair<Integer, JVar>, Function<DrillBuf, ? extends ValueHolder>> setter : cg.getConstantVars().entrySet()) {
+ try {
+ JVar var = setter.getKey().getValue();
+ Integer depth = setter.getKey().getKey();
+ Object varInstance = getFieldInstance(instance, depth, cachedInstances);
+ Field field = varInstance.getClass().getDeclaredField(var.name());
+ field.setAccessible(true);
+ field.set(varInstance, setter.getValue().apply(context.getManagedBuffer()));
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static Object getFieldInstance(Object instance, Integer depth, Map<Integer, Object> cache) throws ReflectiveOperationException {
+ if (depth <= 1) {
+ return instance;
+ }
+ Object methodInstance = cache.get(depth);
+ if (methodInstance != null) {
+ return methodInstance;
+ }
+ methodInstance = getFieldInstance(instance, depth);
+ cache.put(depth, methodInstance);
+ return methodInstance;
+ }
+
+ private static Object getFieldInstance(Object instance, Integer depth) throws ReflectiveOperationException {
+ if (depth <= 1) {
+ return instance;
+ }
+ Field field = instance.getClass().getDeclaredField(ClassGenerator.INNER_CLASS_FIELD_NAME);
+ field.setAccessible(true);
+ return getFieldInstance(field.get(instance), depth - 1);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 57324583a..1d9e2679e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -20,7 +20,9 @@ package org.apache.drill.exec.physical.impl.common;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
@@ -48,17 +50,19 @@ public interface HashTable {
int BATCH_MASK = 0x0000FFFF;
/**
- * {@link HashTable#setup(HashTableConfig, BufferAllocator, VectorContainer, RecordBatch, RecordBatch, VectorContainer)} must be called before anything can be done to the
- * {@link HashTable}.
+ * {@link HashTable#setup} must be called before anything can be done to the {@link HashTable}.
+ *
* @param htConfig
* @param allocator
* @param incomingBuild
* @param incomingProbe
* @param outgoing
* @param htContainerOrig
+ * @param context
+ * @param cg
*/
void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
- VectorContainer htContainerOrig);
+ VectorContainer htContainerOrig, FragmentContext context, ClassGenerator<?> cg);
/**
* Updates the incoming (build and probe side) value vectors references in the {@link HashTableTemplate.BatchHolder}s.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index ae9a62169..25ada28ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -23,6 +23,8 @@ import java.util.Set;
import javax.inject.Named;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -110,6 +112,10 @@ public abstract class HashTableTemplate implements HashTable {
private MaterializedField dummyIntField;
+ protected FragmentContext context;
+
+ protected ClassGenerator<?> cg;
+
private int numResizing = 0;
private int resizingTime = 0;
@@ -448,7 +454,9 @@ public abstract class HashTableTemplate implements HashTable {
}
@Override
- public void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
+ public void setup(HashTableConfig htConfig, BufferAllocator allocator, VectorContainer incomingBuild,
+ RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig,
+ FragmentContext context, ClassGenerator<?> cg) {
float loadf = htConfig.getLoadFactor();
int initialCap = htConfig.getInitialCapacity();
@@ -472,6 +480,8 @@ public abstract class HashTableTemplate implements HashTable {
this.incomingProbe = incomingProbe;
this.outgoing = outgoing;
this.htContainerOrig = htContainerOrig;
+ this.context = context;
+ this.cg = cg;
this.allocationTracker = new HashTableAllocationTracker(htConfig);
// round up the initial capacity to nearest highest power of 2
@@ -764,7 +774,12 @@ public abstract class HashTableTemplate implements HashTable {
}
protected BatchHolder newBatchHolder(int index, int newBatchHolderSize) { // special method to allow debugging of gen code
- return new BatchHolder(index, newBatchHolderSize);
+ return this.injectMembers(new BatchHolder(index, newBatchHolderSize));
+ }
+
+ protected BatchHolder injectMembers(BatchHolder batchHolder) {
+ CodeGenMemberInjector.injectMembers(cg, batchHolder, context);
+ return batchHolder;
}
// Resize the hash table if needed by creating a new one with double the number of buckets.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index a969ffd5a..2f17ff2c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -1415,8 +1415,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
// No real code generation !!
- final HashJoinProbe hj = context.getImplementationClass(cg);
- return hj;
+ return context.getImplementationClass(cg);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 3918d27f6..c185ac7ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -229,40 +229,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
@VisibleForTesting
protected void createPartitioner() throws SchemaChangeException {
- final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
- final int longTail = outGoingBatchCount % actualPartitions;
-
- final List<Partitioner> subPartitioners = createClassInstances(actualPartitions);
- int startIndex = 0;
- int endIndex = 0;
-
- boolean success = false;
- try {
- for (int i = 0; i < actualPartitions; i++) {
- startIndex = endIndex;
- endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
- if (i < longTail) {
- endIndex++;
- }
- final OperatorStats partitionStats = new OperatorStats(stats, true);
- subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
- startIndex, endIndex);
- }
-
- partitioner = new PartitionerDecorator(subPartitioners, stats, context);
- for (int index = 0; index < terminations.size(); index++) {
- partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
- }
- terminations.clear();
-
- success = true;
- } finally {
- if (!success) {
- for (Partitioner p : subPartitioners) {
- p.clear();
- }
- }
- }
+ createClassInstances(actualPartitions);
}
private List<Partitioner> createClassInstances(int actualPartitions) throws SchemaChangeException {
@@ -297,6 +264,39 @@ public class PartitionSenderRootExec extends BaseRootExec {
try {
// compile and setup generated code
List<Partitioner> subPartitioners = context.getImplementationClass(cg, actualPartitions);
+
+ final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
+ final int longTail = outGoingBatchCount % actualPartitions;
+ int startIndex = 0;
+ int endIndex = 0;
+
+ boolean success = false;
+ try {
+ for (int i = 0; i < actualPartitions; i++) {
+ startIndex = endIndex;
+ endIndex = (i < actualPartitions - 1) ? startIndex + divisor : outGoingBatchCount;
+ if (i < longTail) {
+ endIndex++;
+ }
+ final OperatorStats partitionStats = new OperatorStats(stats, true);
+ subPartitioners.get(i).setup(context, incoming, popConfig, partitionStats, oContext,
+ cgInner, startIndex, endIndex);
+ }
+
+ partitioner = new PartitionerDecorator(subPartitioners, stats, context);
+ for (int index = 0; index < terminations.size(); index++) {
+ partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+ }
+ terminations.clear();
+
+ success = true;
+ } finally {
+ if (!success) {
+ for (Partitioner p : subPartitioners) {
+ p.clear();
+ }
+ }
+ }
return subPartitioners;
} catch (ClassTransformationException | IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index a2fc069e4..76c60e8f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
@@ -39,6 +40,7 @@ public interface Partitioner {
HashPartitionSender popConfig,
OperatorStats stats,
OperatorContext oContext,
+ ClassGenerator<?> cg,
int start, int count) throws SchemaChangeException;
void partitionBatch(RecordBatch incoming) throws IOException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index ea31a79cb..687ff814a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -26,6 +26,7 @@ import javax.inject.Named;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.AccountingDataTunnel;
@@ -35,6 +36,7 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.BatchSchema;
@@ -62,6 +64,8 @@ public abstract class PartitionerTemplate implements Partitioner {
private SelectionVector4 sv4;
private RecordBatch incoming;
private OperatorStats stats;
+ protected ClassGenerator<?> cg;
+ protected FragmentContext context;
private int start;
private int end;
private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
@@ -87,10 +91,13 @@ public abstract class PartitionerTemplate implements Partitioner {
HashPartitionSender popConfig,
OperatorStats stats,
OperatorContext oContext,
+ ClassGenerator<?> cg,
int start, int end) throws SchemaChangeException {
this.incoming = incoming;
this.stats = stats;
+ this.context = context;
+ this.cg = cg;
this.start = start;
this.end = end;
doSetup(context, incoming, null);
@@ -144,7 +151,12 @@ public abstract class PartitionerTemplate implements Partitioner {
protected OutgoingRecordBatch newOutgoingRecordBatch(
OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
- return new OutgoingRecordBatch(stats, operator, tunnel, context, allocator, oppositeMinorFragmentId);
+ return this.injectMembers(new OutgoingRecordBatch(stats, operator, tunnel, context, allocator, oppositeMinorFragmentId));
+ }
+
+ protected OutgoingRecordBatch injectMembers(OutgoingRecordBatch outgoingRecordBatch) {
+ CodeGenMemberInjector.injectMembers(cg, outgoingRecordBatch, context);
+ return outgoingRecordBatch;
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
index 3153cd0a8..f48315d0e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
@@ -72,7 +72,11 @@ public abstract class ExampleTemplateWithInner implements ExampleInner{
}
protected DoubleInner newDoubleInner() {
- return new DoubleInner();
+ return this.injectMembers(new DoubleInner());
+ }
+
+ protected DoubleInner injectMembers(DoubleInner doubleInner) {
+ return doubleInner;
}
public class DoubleInner {
@@ -101,6 +105,10 @@ public abstract class ExampleTemplateWithInner implements ExampleInner{
* subclass (or replacement) of the template inner class
*/
protected TheInnerClass newTheInnerClass( ) {
- return new TheInnerClass();
+ return this.injectMembers(new TheInnerClass());
+ }
+
+ protected TheInnerClass injectMembers(TheInnerClass theInnerClass) {
+ return theInnerClass;
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 084107dde..ecff4e10d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.compile;
+import java.util.concurrent.ThreadLocalRandom;
+
import org.apache.drill.categories.SlowTest;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.TestTools;
@@ -39,6 +41,8 @@ public class TestLargeFileCompilation extends BaseTestQuery {
private static final String LARGE_QUERY_FILTER;
+ private static final String HUGE_STRING_CONST_QUERY;
+
private static final String LARGE_QUERY_WRITER;
private static final String LARGE_QUERY_SELECT_LIST;
@@ -110,6 +114,21 @@ public class TestLargeFileCompilation extends BaseTestQuery {
}
static {
+ final char[] alphabet = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ int len = 1 << 18;
+ char[] longText = new char[len];
+ for (int j = 0; j < len; ++j) {
+ longText[j] = alphabet[ThreadLocalRandom.current().nextInt(0, alphabet.length)];
+ }
+ StringBuilder sb = new StringBuilder("select *\n")
+ .append("from cp.`employee.json`\n")
+ .append("where last_name ='")
+ .append(longText)
+ .append("'");
+ HUGE_STRING_CONST_QUERY = sb.toString();
+ }
+
+ static {
LARGE_QUERY_WRITER = createTableWithColsCount(NUM_PROJECT_COLUMNS);
LARGE_TABLE_WRITER = createTableWithColsCount(NUM_JOIN_TABLE_COLUMNS);
QUERY_WITH_JOIN = "select * from %1$s t1, %1$s t2 where t1.col1 = t2.col1";
@@ -228,4 +247,24 @@ public class TestLargeFileCompilation extends BaseTestQuery {
testNoResult("drop table if exists %s", tableName);
}
}
+
+ @Test
+ public void testJDKHugeStringConstantCompilation() throws Exception {
+ try {
+ setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK");
+ testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
+ } finally {
+ resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ }
+ }
+
+ @Test
+ public void testJaninoHugeStringConstantCompilation() throws Exception {
+ try {
+ setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO");
+ testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
+ } finally {
+ resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+ }
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
index 7537cfbea..e3731a8e8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
@@ -21,6 +21,9 @@ import java.util.List;
import java.util.Properties;
import java.util.Random;
+import org.apache.drill.exec.ops.FragmentContextImpl;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.ClusterFixtureBuilder;
@@ -31,7 +34,6 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.compile.CodeCompiler;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.RootAllocator;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -50,6 +52,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.mockito.Mockito.when;
+
@Category(OperatorTest.class)
public class TopNBatchTest extends PopUnitTestBase {
@Rule
@@ -63,6 +67,8 @@ public class TopNBatchTest extends PopUnitTestBase {
public void priorityQueueOrderingTest() throws Exception {
Properties properties = new Properties();
DrillConfig drillConfig = DrillConfig.create(properties);
+ DrillbitContext drillbitContext = mockDrillbitContext();
+ when(drillbitContext.getFunctionImplementationRegistry()).thenReturn(new FunctionImplementationRegistry(drillConfig));
FieldReference expr = FieldReference.getWithQuotedRef("colA");
Order.Ordering ordering = new Order.Ordering(Order.Ordering.ORDER_DESC, expr, Order.Ordering.NULLS_FIRST);
@@ -73,6 +79,9 @@ public class TopNBatchTest extends PopUnitTestBase {
List<MaterializedField> cols = Lists.newArrayList(colA, colB);
BatchSchema batchSchema = new BatchSchema(BatchSchema.SelectionVectorMode.NONE, cols);
+ FragmentContextImpl context = new FragmentContextImpl(drillbitContext,
+ BitControl.PlanFragment.getDefaultInstance(), null,
+ drillbitContext.getFunctionImplementationRegistry());
RowSet expectedRowSet;
try (RootAllocator allocator = new RootAllocator(100_000_000)) {
@@ -100,12 +109,10 @@ public class TopNBatchTest extends PopUnitTestBase {
queue = TopNBatch.createNewPriorityQueue(
TopNBatch.createMainMappingSet(), TopNBatch.createLeftMappingSet(),
- TopNBatch.createRightMappingSet(), optionManager,
- new FunctionImplementationRegistry(drillConfig),
- new CodeCompiler(drillConfig, optionManager),
+ TopNBatch.createRightMappingSet(),
orderings, hyperContainer, false,
true, 10, allocator,
- batchSchema.getSelectionVectorMode());
+ batchSchema.getSelectionVectorMode(), context);
}
List<RecordBatchData> testBatches = Lists.newArrayList();
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
index 52afbe0cb..7087687c1 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
@@ -169,10 +169,13 @@ public class ValueHolderHelper {
}
public static Decimal28SparseHolder getDecimal28Holder(DrillBuf buf, String decimal) {
+ BigDecimal bigDecimal = new BigDecimal(decimal);
- Decimal28SparseHolder dch = new Decimal28SparseHolder();
+ return getDecimal28Holder(buf, bigDecimal);
+ }
- BigDecimal bigDecimal = new BigDecimal(decimal);
+ public static Decimal28SparseHolder getDecimal28Holder(DrillBuf buf, BigDecimal bigDecimal) {
+ Decimal28SparseHolder dch = new Decimal28SparseHolder();
dch.scale = bigDecimal.scale();
dch.precision = bigDecimal.precision();
@@ -180,33 +183,40 @@ public class ValueHolderHelper {
dch.start = 0;
dch.buffer = buf.reallocIfNeeded(5 * DecimalUtility.INTEGER_SIZE);
DecimalUtility
- .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, Decimal28SparseHolder.nDecimalDigits);
+ .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, Decimal28SparseHolder.nDecimalDigits);
return dch;
}
public static Decimal38SparseHolder getDecimal38Holder(DrillBuf buf, String decimal) {
+ BigDecimal bigDecimal = new BigDecimal(decimal);
- Decimal38SparseHolder dch = new Decimal38SparseHolder();
+ return getDecimal38Holder(buf, bigDecimal);
+ }
- BigDecimal bigDecimal = new BigDecimal(decimal);
+ public static Decimal38SparseHolder getDecimal38Holder(DrillBuf buf, BigDecimal bigDecimal) {
+ Decimal38SparseHolder dch = new Decimal38SparseHolder();
- dch.scale = bigDecimal.scale();
- dch.precision = bigDecimal.precision();
- Decimal38SparseHolder.setSign(bigDecimal.signum() == -1, dch.start, dch.buffer);
- dch.start = 0;
+ dch.scale = bigDecimal.scale();
+ dch.precision = bigDecimal.precision();
+ Decimal38SparseHolder.setSign(bigDecimal.signum() == -1, dch.start, dch.buffer);
+ dch.start = 0;
dch.buffer = buf.reallocIfNeeded(Decimal38SparseHolder.maxPrecision * DecimalUtility.INTEGER_SIZE);
DecimalUtility
- .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, Decimal38SparseHolder.nDecimalDigits);
+ .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, Decimal38SparseHolder.nDecimalDigits);
- return dch;
+ return dch;
}
public static VarDecimalHolder getVarDecimalHolder(DrillBuf buf, String decimal) {
- VarDecimalHolder dch = new VarDecimalHolder();
-
BigDecimal bigDecimal = new BigDecimal(decimal);
+ return getVarDecimalHolder(buf, bigDecimal);
+ }
+
+ public static VarDecimalHolder getVarDecimalHolder(DrillBuf buf, BigDecimal bigDecimal) {
+ VarDecimalHolder dch = new VarDecimalHolder();
+
byte[] bytes = bigDecimal.unscaledValue().toByteArray();
int length = bytes.length;