diff options
author | Aditya Kishore <aditya@maprtech.com> | 2014-09-09 13:26:06 -0700 |
---|---|---|
committer | Aditya Kishore <aditya@maprtech.com> | 2014-09-11 19:24:11 -0700 |
commit | 39990292e833bbfb419565a3503312c552110378 (patch) | |
tree | 6c1e527a082b7801162fa727b8a5d26f06c6072f /exec/java-exec/src/main/java/org/apache/drill/exec/physical | |
parent | 20d2aa46e2b789d5fe09b1383ec559b4aa8f5316 (diff) |
DRILL-634: Cleanup/organize Java imports and trailing whitespaces from Drill code
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
109 files changed, 374 insertions, 452 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java index 4082661f3..78b882b79 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java @@ -20,10 +20,9 @@ package org.apache.drill.exec.physical; import java.io.IOException; import java.util.List; -import org.apache.drill.common.logical.PlanProperties; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.graph.Graph; import org.apache.drill.common.graph.GraphAlgos; +import org.apache.drill.common.logical.PlanProperties; import org.apache.drill.exec.physical.base.Leaf; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Root; @@ -39,23 +38,23 @@ import com.google.common.collect.Lists; @JsonPropertyOrder({ "head", "graph" }) public class PhysicalPlan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlan.class); - + PlanProperties properties; - + Graph<PhysicalOperator, Root, Leaf> graph; - + @JsonCreator public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){ this.properties = properties; this.graph = Graph.newGraph(operators, Root.class, Leaf.class); } - + @JsonProperty("graph") public List<PhysicalOperator> getSortedOperators(){ // reverse the list so that nested references are flattened rather than nested. return getSortedOperators(true); } - + public List<PhysicalOperator> getSortedOperators(boolean reverse){ List<PhysicalOperator> list = GraphAlgos.TopoSorter.sort(graph); if(reverse){ @@ -63,7 +62,7 @@ public class PhysicalPlan { }else{ return list; } - + } @@ -90,5 +89,5 @@ public class PhysicalPlan { throw new RuntimeException(e); } } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index 54f9ef836..e54e67c3d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.base; import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; public abstract class AbstractBase implements PhysicalOperator{ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java index 7326be6f9..909a15232 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractMultiple.java @@ -17,10 +17,10 @@ */ package org.apache.drill.exec.physical.base; -import com.google.common.collect.Iterators; - import java.util.Iterator; +import com.google.common.collect.Iterators; + /** * Describes an operator that expects more than one children operators as its input. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 5e854251b..9e7beec47 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -30,7 +30,6 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.RangeSender; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; @@ -39,6 +38,7 @@ import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; +import org.apache.drill.exec.physical.config.UnorderedReceiver; public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class); @@ -90,9 +90,9 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme @Override public T visitHashAggregate(HashAggregate agg, X value) throws E { - return visitOp(agg, value); + return visitOp(agg, value); } - + @Override public T visitSender(Sender sender, X value) throws E { return visitOp(sender, value); @@ -118,14 +118,14 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme return visitOp(store, value); } - + public T visitChildren(PhysicalOperator op, X value) throws E{ for(PhysicalOperator child : op){ child.accept(this, value); } return null; } - + @Override public T visitMergeJoin(MergeJoinPOP join, X value) throws E { return visitOp(join, value); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java index 6c0b98f86..2b10e6d0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.base; import java.util.Iterator; import java.util.List; - import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java index f4a358c7c..1721fcf62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.base; /** - * Describes the root operation within a particular Fragment. This includes things Sender nodes. + * Describes the root operation within a particular Fragment. This includes things Sender nodes. */ public interface FragmentRoot extends FragmentLeaf{ } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index 9c27c0c37..a88a5ec83 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -19,13 +19,13 @@ package org.apache.drill.exec.physical.base; import java.util.List; -import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.Lists; /** * A GroupScan operator represents all data which will be scanned by a given physical diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java index faf082968..52462db77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java @@ -27,7 +27,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; * Describes a physical operator that has affinity to particular nodes. Used for assignment decisions. */ public interface HasAffinity extends PhysicalOperator { - + /** * Get the list of Endpoints with associated affinities that this operator has preference for. * @return List of EndpointAffinity objects. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java index a5dbd6a6b..dfcb11393 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java @@ -23,9 +23,9 @@ import org.apache.drill.common.util.PathScanner; public class PhysicalOperatorUtil { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalOperatorUtil.class); - + private PhysicalOperatorUtil(){} - + public synchronized static Class<?>[] getSubTypes(DrillConfig config){ Class<?>[] ops = PathScanner.scanForImplementationsArr(PhysicalOperator.class, config.getStringList(CommonConstants.PHYSICAL_OPERATOR_SCAN_PACKAGES)); logger.debug("Adding Physical Operator sub types: {}", ((Object) ops) ); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index f0b0b9a84..8da06cbb8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -30,7 +30,6 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.config.Project; -import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.RangeSender; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; @@ -39,6 +38,7 @@ import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; +import org.apache.drill.exec.physical.config.UnorderedReceiver; /** * Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization. @@ -48,8 +48,8 @@ import org.apache.drill.exec.physical.config.UnionExchange; */ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalVisitor.class); - - + + public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP; public RETURN visitGroupScan(GroupScan groupScan, EXTRA value) throws EXCEP; public RETURN visitSubScan(SubScan subScan, EXTRA value) throws EXCEP; @@ -69,7 +69,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitHashAggregate(HashAggregate agg, EXTRA value) throws EXCEP; public RETURN visitWriter(Writer op, EXTRA value) throws EXCEP; public RETURN visitOp(PhysicalOperator op, EXTRA value) throws EXCEP; - + public RETURN visitHashPartitionSender(HashPartitionSender op, EXTRA value) throws EXCEP; public RETURN visitOrderedPartitionSender(OrderedPartitionSender op, EXTRA value) throws EXCEP; public RETURN visitUnorderedReceiver(UnorderedReceiver op, EXTRA value) throws EXCEP; @@ -81,6 +81,6 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP; public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP; - + public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java index ef6535f01..0c67770e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java @@ -29,7 +29,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; * corresponding Senders. Receivers are a special type of Physical Operator that are typically only expressed within the execution plan. */ public interface Receiver extends FragmentLeaf { - + /** * A receiver is expecting streams from one or more providing endpoints. This method should return a list of the expected sending endpoints. * @return List of counterpart sending DrillbitEndpoints. @@ -40,7 +40,7 @@ public interface Receiver extends FragmentLeaf { * Whether or not this receive supports out of order exchange. This provides a hint for the scheduling node on whether * the receiver can start work if only a subset of all sending endpoints are currently providing data. A random * receiver would supports this form of operation. A NWAY receiver would not. - * + * * @return True if this receiver supports working on a streaming/out of order input. */ @JsonIgnore diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java index 00d94eb70..bbd1b2c8d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java @@ -28,13 +28,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; * record set to a set of destination locations. This is typically only utilized at the level of the execution plan. */ public interface Sender extends FragmentRoot { - + /** * Get the list of destination endpoints that this Sender will be communicating with. * @return List of DrillbitEndpoints. */ public abstract List<DrillbitEndpoint> getDestinations(); - + /** * Get the receiver major fragment id that is opposite this sender. * @return diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java index beefb9bae..acf53f2be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java @@ -35,7 +35,7 @@ public interface Store extends HasAffinity { * Inform the Store node about the actual decided DrillbitEndpoint assignments desired for storage purposes. This is a * precursor to the execution planner running a set of getSpecificStore() method calls for full Store node * materialization. - * + * * @param endpoints * The list of endpoints that this Store node are going to be executed on. * @throws PhysicalOperatorSetupException @@ -44,7 +44,7 @@ public interface Store extends HasAffinity { /** * Provides full materialized Store operators for execution purposes. - * + * * @param child * The child operator that this operator will consume from. * @param minorFragmentId @@ -60,7 +60,7 @@ public interface Store extends HasAffinity { * parallelizations that it can support. For example, a Screen return cannot be parallelized at all. In this case, a * maxWidth value of 1 will be returned. In the case that there is no limit for parallelization, this method should * return Integer.MAX_VALUE. - * + * * @return */ @JsonIgnore diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java index c196a962c..456b9c0cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java @@ -39,10 +39,12 @@ public class ExternalSort extends Sort { super(child, orderings, reverse); } + @Override public List<Ordering> getOrderings() { return orderings; } + @Override public boolean getReverse() { return reverse; } @@ -68,6 +70,7 @@ public class ExternalSort extends Sort { this.maxAllocation = Math.max(initialAllocation, maxAllocation); } + @Override public long getInitialAllocation() { return initialAllocation; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java index 8bae26f78..f62d9224f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java @@ -35,16 +35,16 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class HashToMergeExchange extends AbstractExchange{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToMergeExchange.class); - + private final LogicalExpression distExpr; private final List<Ordering> orderExprs; //ephemeral for setup tasks. private List<DrillbitEndpoint> senderLocations; private List<DrillbitEndpoint> receiverLocations; - + @JsonCreator - public HashToMergeExchange(@JsonProperty("child") PhysicalOperator child, + public HashToMergeExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr, @JsonProperty("orderings") List<Ordering> orderExprs) { super(child); @@ -88,6 +88,6 @@ public class HashToMergeExchange extends AbstractExchange{ return orderExprs; } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java index dddaf8382..fac374bcd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.config; -import java.beans.Transient; import java.util.List; import org.apache.drill.common.expression.LogicalExpression; @@ -35,13 +34,13 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class HashToRandomExchange extends AbstractExchange{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToRandomExchange.class); - + private final LogicalExpression expr; //ephemeral for setup tasks. private List<DrillbitEndpoint> senderLocations; private List<DrillbitEndpoint> receiverLocations; - + @JsonCreator public HashToRandomExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr) { super(child); @@ -84,6 +83,6 @@ public class HashToRandomExchange extends AbstractExchange{ return expr; } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java index d76ec8058..f5dca1abf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.config; import java.util.List; -import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.base.AbstractReceiver; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -27,6 +26,7 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java index 00bb32869..25e184341 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java @@ -24,10 +24,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; public class PartitionRange { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionRange.class); - + private LogicalExpression start; private LogicalExpression finish; - + @JsonCreator public PartitionRange(@JsonProperty("start") LogicalExpression start, @JsonProperty("finish") LogicalExpression finish) { super(); @@ -42,6 +42,6 @@ public class PartitionRange { public LogicalExpression getFinish() { return finish; } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java index 87655d149..3159ef852 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ProducerConsumer.java @@ -17,14 +17,15 @@ */ package org.apache.drill.exec.physical.config; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + @JsonTypeName("producer-consumer") public class ProducerConsumer extends AbstractSingle{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumer.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java index 4e3d9cd0b..4d2f1f051 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/TopN.java @@ -40,10 +40,12 @@ public class TopN extends Sort { this.limit = limit; } + @Override public List<Ordering> getOrderings() { return orderings; } + @Override public boolean getReverse() { return reverse; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java index 357d62d60..3a4dd0ed6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java @@ -19,13 +19,13 @@ package org.apache.drill.exec.physical.config; import java.util.List; -import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.exec.physical.base.AbstractReceiver; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java index b55abefc4..8dc0a6b11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java @@ -20,13 +20,12 @@ package org.apache.drill.exec.physical.impl; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.RecordBatch; public interface BatchCreator<T extends PhysicalOperator> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class); - + public RecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 873a040b0..7f9762415 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl; import java.util.List; import java.util.concurrent.TimeUnit; -import com.google.common.base.Stopwatch; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; @@ -31,6 +30,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.util.AssertionUtil; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; /** @@ -79,7 +79,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { ImplCreator i = new ImplCreator(); if(AssertionUtil.isAssertionsEnabled()){ - root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root); + root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root); } Stopwatch watch = new Stopwatch(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java index 4058a7949..528611e3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergingReceiverPOP; @@ -25,8 +27,6 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.work.batch.IncomingBuffers; import org.apache.drill.exec.work.batch.RawBatchBuffer; -import java.util.List; - public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverCreator.class); @@ -44,6 +44,6 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> return new MergingRecordBatch(context, receiver, buffers); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java index 0bae6feb0..f3d95243c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java @@ -26,6 +26,6 @@ import org.apache.drill.exec.record.RecordBatch; public interface RootCreator<T extends PhysicalOperator> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class); - + public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index 42ac4f66d..4250e27f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -26,13 +26,13 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; */ public interface RootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class); - + /** - * Do the next batch of work. + * Do the next batch of work. * @return Whether or not additional batches of work are necessary. False means that this fragment is done. */ public boolean next(); - + /** * Inform all children to clean up and go away. */ @@ -43,5 +43,5 @@ public interface RootExec { * @param handle */ public void receivingFragmentFinished(FragmentHandle handle); - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 79a25dcee..c2a03b9d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -32,7 +32,6 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index d96bdf356..bd15ac9ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -26,7 +26,6 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer; @@ -71,13 +70,13 @@ public class ScreenCreator implements RootCreator<Screen>{ public enum Metric implements MetricDef { BYTES_SENT; - + @Override public int metricId() { return ordinal(); } } - + public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException { super(context, config); assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client. As such, this should always be true."; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java index d67d214b4..7af7b65f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java @@ -22,7 +22,7 @@ import java.util.concurrent.Semaphore; /** * Account for whether all messages sent have been completed. Necessary before finishing a task so we don't think * buffers are hanging when they will be released. - * + * * TODO: Need to update to use long for number of pending messages. */ public class SendingAccountor { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 26aa5abd2..2b7fdf3b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -25,8 +25,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -45,9 +43,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ assert children != null && children.size() == 1; return new SingleSenderRootExec(context, children.iterator().next(), config); } - - - + + + private static class SingleSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class); private RecordBatch incoming; @@ -61,13 +59,13 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ public enum Metric implements MetricDef { BYTES_SENT; - + @Override public int metricId() { return ordinal(); } } - + public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { super(context, config); this.incoming = batch; @@ -78,12 +76,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ this.tunnel = context.getDataTunnel(config.getDestination(), opposite); this.context = context; } - + @Override public boolean innerNext() { if(!ok){ incoming.kill(false); - + return false; } @@ -128,9 +126,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ throw new IllegalStateException(); } } - + public void updateStats(FragmentWritableBatch writableBatch) { - stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); + stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); } @Override @@ -167,10 +165,10 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ context.fail(new RpcException("A downstream fragment batch wasn't accepted. This fragment thus fails.")); stop(); } - + } - + } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index fb9554c10..6eede30dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -58,7 +58,6 @@ import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.eigenbase.rel.RelFieldCollation.Direction; import com.google.common.base.Stopwatch; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java index 08c754395..aa8b6115d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java @@ -17,16 +17,15 @@ */ package org.apache.drill.exec.physical.impl.TopN; -import com.google.common.base.Preconditions; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.physical.impl.sort.SortBatch; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.base.Preconditions; public class TopNSortBatchCreator implements BatchCreator<TopN>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class); @@ -36,6 +35,6 @@ public class TopNSortBatchCreator implements BatchCreator<TopN>{ Preconditions.checkArgument(children.size() == 1); return new TopNBatch(config, context, children.iterator().next()); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java index dddb53f01..58dd247e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.physical.impl; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; @@ -25,8 +27,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.Trace; -import java.util.List; - import com.google.common.collect.Lists; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index ef4db2a78..99eeed374 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -32,7 +32,6 @@ import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordValueAccessor; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.store.EventBasedRecordWriter; import org.apache.drill.exec.store.RecordWriter; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java index feb44897f..8a9259e95 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/BatchIterator.java @@ -21,6 +21,6 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome; public interface BatchIterator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchIterator.class); - + public IterOutcome next(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 393fa4f58..e9be2ac99 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; -import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.ErrorCollector; @@ -30,31 +29,27 @@ import org.apache.drill.exec.compile.sig.GeneratorMapping; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator.BlockType; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; +import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; +import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome; +import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.record.AbstractRecordBatch; -import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; -import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome; -import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.physical.impl.common.HashTableConfig; -import com.google.common.collect.Lists; import com.sun.codemodel.JExpr; import com.sun.codemodel.JVar; @@ -242,13 +237,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */) ; - + agg.setup(popConfig, htConfig, context, this.stats, oContext.getAllocator(), incoming, this, aggrExprs, cgInner.getWorkspaceTypes(), - groupByOutFieldIds, - this.container); + groupByOutFieldIds, + this.container); return agg; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java index 5e0167e43..8c605415d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java @@ -23,7 +23,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; @@ -36,6 +35,6 @@ public class HashAggBatchCreator implements BatchCreator<HashAggregate>{ Preconditions.checkArgument(children.size() == 1); return new HashAggBatch(config, children.iterator().next(), context); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 22df5f600..b6b887415 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -29,10 +29,10 @@ import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; @@ -52,7 +52,6 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; -import org.apache.drill.exec.compile.sig.RuntimeOverridden; import com.google.common.collect.Lists; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 421bd5315..4277f2306 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; import java.util.List; -import java.util.Collection; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.compile.TemplateClassDefinition; @@ -34,7 +33,6 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.vector.allocator.VectorAllocator; public interface HashAggregator { @@ -43,13 +41,13 @@ public interface HashAggregator { public static enum AggOutcome { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR } - - public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, + + public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, - HashAggBatch outgoing, LogicalExpression[] valueExprs, + HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, - VectorContainer outContainer) + VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException; public abstract IterOutcome getOutcome(); @@ -61,9 +59,9 @@ public interface HashAggregator { public abstract void cleanup(); public abstract boolean allFlushed(); - + public abstract boolean buildComplete(); - + public abstract IterOutcome outputCurrentBatch(); - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 39131125d..820f7229b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -18,12 +18,9 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; -import java.util.List; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.exec.compile.sig.GeneratorMapping; @@ -39,13 +36,11 @@ import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator.AggOutcome; import org.apache.drill.exec.record.AbstractRecordBatch; -import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -53,10 +48,7 @@ import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.sun.codemodel.JExpr; import com.sun.codemodel.JVar; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java index 96b71fecf..0203b8144 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java @@ -35,6 +35,6 @@ public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate Preconditions.checkArgument(children.size() == 1); return new StreamingAggBatch(config, children.iterator().next(), context); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 6ed37e747..53ac1ed4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -25,8 +25,6 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; public abstract class StreamingAggTemplate implements StreamingAggregator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index c624c9aac..8f5f29be1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -22,7 +22,6 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; -import org.apache.drill.exec.vector.allocator.VectorAllocator; public interface StreamingAggregator { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java index add5117be..01122bebd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java @@ -17,7 +17,8 @@ */ package org.apache.drill.exec.physical.impl.broadcastsender; -import com.google.common.collect.Iterators; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.BroadcastSender; @@ -25,7 +26,7 @@ import org.apache.drill.exec.physical.impl.RootCreator; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.collect.Iterators; public class BroadcastSenderCreator implements RootCreator<BroadcastSender> { @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 36e54f944..d09559d29 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -25,17 +25,14 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.impl.BaseRootExec; -import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.WritableBatch; @@ -57,7 +54,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { private final ExecProtos.FragmentHandle handle; private volatile boolean ok; private final RecordBatch incoming; - + public enum Metric implements MetricDef { N_RECEIVERS, BYTES_SENT; @@ -113,7 +110,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { case OK: WritableBatch writableBatch = incoming.getWritableBatch(); if (tunnels.length > 1) { - writableBatch.retainBuffers(tunnels.length - 1); + writableBatch.retainBuffers(tunnels.length - 1); } for (int i = 0; i < tunnels.length; ++i) { FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch); @@ -134,10 +131,10 @@ public class BroadcastSenderRootExec extends BaseRootExec { throw new IllegalStateException(); } } - + public void updateStats(FragmentWritableBatch writableBatch) { stats.setLongStat(Metric.N_RECEIVERS, tunnels.length); - stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); + stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); } /* @@ -162,7 +159,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { return true; } */ - + @Override public void stop() { ok = false; @@ -170,12 +167,12 @@ public class BroadcastSenderRootExec extends BaseRootExec { oContext.close(); incoming.cleanup(); } - + private StatusHandler statusHandler = new StatusHandler(); private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> { volatile RpcException ex; private final SendingAccountor sendCount = new SendingAccountor(); - + @Override public void success(Ack value, ByteBuf buffer) { sendCount.decrement(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 91d203763..195d24900 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -21,8 +21,6 @@ import java.io.IOException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -35,12 +33,10 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; -import org.apache.drill.exec.expr.fn.impl.BitFunctions; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; @@ -48,9 +44,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; -import com.google.common.collect.ImmutableList; import com.sun.codemodel.JConditional; import com.sun.codemodel.JExpr; @@ -58,40 +52,40 @@ import com.sun.codemodel.JExpr; public class ChainedHashTable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class); - private static final GeneratorMapping KEY_MATCH_BUILD = - GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */, + private static final GeneratorMapping KEY_MATCH_BUILD = + GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */, null /* reset */, null /* cleanup */); - private static final GeneratorMapping KEY_MATCH_PROBE = - GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */, + private static final GeneratorMapping KEY_MATCH_PROBE = + GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */, null /* reset */, null /* cleanup */); - private static final GeneratorMapping GET_HASH_BUILD = - GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, + private static final GeneratorMapping GET_HASH_BUILD = + GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, null /* reset */, null /* cleanup */); - private static final GeneratorMapping GET_HASH_PROBE = - GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, + private static final GeneratorMapping GET_HASH_PROBE = + GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, null /* reset */, null /* cleanup */); - private static final GeneratorMapping SET_VALUE = - GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, + private static final GeneratorMapping SET_VALUE = + GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, null /* reset */, null /* cleanup */); - - private static final GeneratorMapping OUTPUT_KEYS = + + private static final GeneratorMapping OUTPUT_KEYS = GeneratorMapping.create("setupInterior" /* setup method */, "outputRecordKeys" /* eval method */, null /* reset */, null /* cleanup */) ; // GM for putting constant expression into method "setupInterior" - private static final GeneratorMapping SETUP_INTERIOR_CONSTANT = - GeneratorMapping.create("setupInterior" /* setup method */, "setupInterior" /* eval method */, + private static final GeneratorMapping SETUP_INTERIOR_CONSTANT = + GeneratorMapping.create("setupInterior" /* setup method */, "setupInterior" /* eval method */, null /* reset */, null /* cleanup */); - // GM for putting constant expression into method "doSetup" - private static final GeneratorMapping DO_SETUP_CONSTANT = - GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, + // GM for putting constant expression into method "doSetup" + private static final GeneratorMapping DO_SETUP_CONSTANT = + GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, null /* reset */, null /* cleanup */); - + private final MappingSet KeyMatchIncomingBuildMapping = new MappingSet("incomingRowIdx", null, "incomingBuild", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_BUILD); private final MappingSet KeyMatchIncomingProbeMapping = new MappingSet("incomingRowIdx", null, "incomingProbe", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_PROBE); private final MappingSet KeyMatchHtableMapping = new MappingSet("htRowIdx", null, "htContainer", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_BUILD); @@ -109,10 +103,10 @@ public class ChainedHashTable { private final RecordBatch incomingProbe; private final RecordBatch outgoing; - public ChainedHashTable(HashTableConfig htConfig, + public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, - RecordBatch incomingBuild, + RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing) { @@ -135,31 +129,31 @@ public class ChainedHashTable { if (isProbe) { keyExprsProbe = new LogicalExpression[htConfig.getKeyExprsProbe().length]; } - + ErrorCollector collector = new ErrorCollectorImpl(); VectorContainer htContainerOrig = new VectorContainer(); // original ht container from which others may be cloned LogicalExpression[] htKeyExprs = new LogicalExpression[htConfig.getKeyExprsBuild().length]; TypedFieldId[] htKeyFieldIds = new TypedFieldId[htConfig.getKeyExprsBuild().length]; int i = 0; - for (NamedExpression ne : htConfig.getKeyExprsBuild()) { + for (NamedExpression ne : htConfig.getKeyExprsBuild()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry()); if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); if (expr == null) continue; keyExprsBuild[i] = expr; - + final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); // create a type-specific ValueVector for this key ValueVector vv = TypeHelper.getNewVector(outputField, allocator); vv.allocateNew(); htKeyFieldIds[i] = htContainerOrig.add(vv); - + i++; } if (isProbe) { i = 0; - for (NamedExpression ne : htConfig.getKeyExprsProbe()) { + for (NamedExpression ne : htConfig.getKeyExprsProbe()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry()); if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); if (expr == null) continue; @@ -189,9 +183,9 @@ public class ChainedHashTable { return ht; } - - private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping, + + private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping, LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) throws SchemaChangeException { cg.setMappingSet(incomingMapping); @@ -202,10 +196,10 @@ public class ChainedHashTable { } int i = 0; - for (LogicalExpression expr : keyExprs) { + for (LogicalExpression expr : keyExprs) { cg.setMappingSet(incomingMapping); HoldingContainer left = cg.addExpr(expr, false); - + cg.setMappingSet(htableMapping); ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i++]); HoldingContainer right = cg.addExpr(vvrExpr, false); @@ -216,7 +210,7 @@ public class ChainedHashTable { // check if two values are not equal (comparator result != 0) JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - + jc._then()._return(JExpr.FALSE); } @@ -224,7 +218,7 @@ public class ChainedHashTable { cg.getEvalBlock()._return(JExpr.TRUE); } - private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) + private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) throws SchemaChangeException { cg.setMappingSet(SetValueMapping); @@ -234,7 +228,7 @@ public class ChainedHashTable { ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, true) ; HoldingContainer hc = cg.addExpr(vvwExpr, false); // this will write to the htContainer at htRowIdx - cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); } cg.getEvalBlock()._return(JExpr.TRUE); @@ -267,22 +261,22 @@ public class ChainedHashTable { cg.getEvalBlock()._return(JExpr.lit(0)); return; } - + HoldingContainer combinedHashValue = null; for (int i = 0; i < keyExprs.length; i++) { LogicalExpression expr = keyExprs[i]; - + cg.setMappingSet(incomingMapping); HoldingContainer input = cg.addExpr(expr, false); // compute the hash(expr) - LogicalExpression hashfunc = FunctionGenerationHelper.getFunctionExpression("hash", Types.required(MinorType.INT), context.getFunctionRegistry(), input); + LogicalExpression hashfunc = FunctionGenerationHelper.getFunctionExpression("hash", Types.required(MinorType.INT), context.getFunctionRegistry(), input); HoldingContainer hashValue = cg.addExpr(hashfunc, false); if (i == 0) { - combinedHashValue = hashValue; // first expression..just use the hash value - } + combinedHashValue = hashValue; // first expression..just use the hash value + } else { // compute the combined hash value using XOR diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index d9499c7ba..6028a04b4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -17,10 +17,7 @@ */ package org.apache.drill.exec.physical.impl.common; -import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.compile.TemplateClassDefinition; -import org.apache.drill.exec.expr.holders.BitHolder; -import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java index 98892c069..fa6d4b5e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java @@ -34,8 +34,8 @@ public class HashTableConfig { private final NamedExpression[] keyExprsProbe; @JsonCreator - public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor, - @JsonProperty("keyExprsBuild") NamedExpression[] keyExprsBuild, + public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor, + @JsonProperty("keyExprsBuild") NamedExpression[] keyExprsBuild, @JsonProperty("keyExprsProbe") NamedExpression[] keyExprsProbe) { this.initialCapacity = initialCapacity; this.loadFactor = loadFactor; @@ -47,15 +47,15 @@ public class HashTableConfig { return initialCapacity; } - public float getLoadFactor() { + public float getLoadFactor() { return loadFactor; } - public NamedExpression[] getKeyExprsBuild() { + public NamedExpression[] getKeyExprsBuild() { return keyExprsBuild; } - public NamedExpression[] getKeyExprsProbe() { + public NamedExpression[] getKeyExprsProbe() { return keyExprsProbe; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index e6c55bdbe..b03880cce 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -23,18 +23,17 @@ import java.util.Iterator; import javax.inject.Named; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.types.Types; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.compile.sig.RuntimeOverridden; +import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.compile.sig.RuntimeOverridden; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.expr.holders.IntHolder; -import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java index dd0e26374..dc0abd961 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/EvaluationPredicate.java @@ -21,10 +21,10 @@ import org.apache.drill.exec.record.selection.SelectionVector2; public class EvaluationPredicate { private SelectionVector2 vector; - + EvaluationPredicate(String pred){ - + } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java index c5c81c6e7..7f2fe8e24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.filter; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -36,6 +35,6 @@ public class FilterBatchCreator implements BatchCreator<Filter>{ Preconditions.checkArgument(children.size() == 1); return new FilterRecordBatch(config, children.iterator().next(), context); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 3ece98beb..bf00194aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -40,7 +40,6 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import com.google.common.collect.Lists; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java index 39519b476..74a5d1671 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java @@ -24,8 +24,8 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; public interface FilterSignature extends CodeGeneratorSignature{ - + public void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index 5b0b88f54..bd8541847 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -28,7 +28,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; public abstract class FilterTemplate2 implements Filterer{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class); - + private SelectionVector2 outgoingSelectionVector; private SelectionVector2 incomingSelectionVector; private SelectionVectorMode svMode; @@ -39,7 +39,7 @@ public abstract class FilterTemplate2 implements Filterer{ this.transfers = transfers; this.outgoingSelectionVector = outgoing.getSelectionVector2(); this.svMode = incoming.getSchema().getSelectionVectorMode(); - + switch(svMode){ case NONE: break; @@ -58,7 +58,7 @@ public abstract class FilterTemplate2 implements Filterer{ t.transfer(); } } - + public void filterBatch(int recordCount){ if (! outgoingSelectionVector.allocateNew(recordCount)) { throw new UnsupportedOperationException("Unable to allocate filter batch"); @@ -75,7 +75,7 @@ public abstract class FilterTemplate2 implements Filterer{ } doTransfers(); } - + private void filterBatchSV2(int recordCount){ int svIndex = 0; final int count = recordCount; @@ -99,7 +99,7 @@ public abstract class FilterTemplate2 implements Filterer{ } outgoingSelectionVector.setRecordCount(svIndex); } - + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java index cfb1f5b7c..a1769b90d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java @@ -17,15 +17,13 @@ */ package org.apache.drill.exec.physical.impl.filter; +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.allocator.VectorAllocator; - -import javax.inject.Named; public abstract class FilterTemplate4 implements Filterer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java index 8e8cb2e72..fd7a13f65 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java @@ -25,10 +25,10 @@ import org.apache.drill.exec.record.TransferPair; public interface Filterer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filterer.class); - + public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; public void filterBatch(int recordCount); - + public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java index 1aa300635..c81fb2c93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/ReturnValueExpression.java @@ -22,8 +22,8 @@ import java.util.Iterator; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.visitors.ExprVisitor; -import org.apache.drill.common.types.Types; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.Types; import com.google.common.collect.Iterators; @@ -32,11 +32,11 @@ public class ReturnValueExpression implements LogicalExpression{ private LogicalExpression child; private boolean returnTrueOnOne; - + public ReturnValueExpression(LogicalExpression child) { this(child, true); } - + public ReturnValueExpression(LogicalExpression child, boolean returnTrueOnOne) { this.child = child; this.returnTrueOnOne = returnTrueOnOne; @@ -60,7 +60,7 @@ public class ReturnValueExpression implements LogicalExpression{ public ExpressionPosition getPosition() { return ExpressionPosition.UNKNOWN; } - + @Override public Iterator<LogicalExpression> iterator() { return Iterators.singletonIterator(child); @@ -70,12 +70,12 @@ public class ReturnValueExpression implements LogicalExpression{ return returnTrueOnOne; } - public int getSelfCost() { - throw new UnsupportedOperationException(String.format("The type of %s doesn't currently support LogicalExpression.getSelfCost().", this.getClass().getCanonicalName())); + public int getSelfCost() { + throw new UnsupportedOperationException(String.format("The type of %s doesn't currently support LogicalExpression.getSelfCost().", this.getClass().getCanonicalName())); } - - public int getCumulativeCost() { - throw new UnsupportedOperationException(String.format("The type of %s doesn't currently support LogicalExpression.getCumulativeCost().", this.getClass().getCanonicalName())); + + public int getCumulativeCost() { + throw new UnsupportedOperationException(String.format("The type of %s doesn't currently support LogicalExpression.getCumulativeCost().", this.getClass().getCanonicalName())); } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java index d92595897..bfe89c020 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.physical.impl.join; -import com.google.common.base.Preconditions; +import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; @@ -25,7 +25,7 @@ import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.base.Preconditions; public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java index aec0f31be..a3c33ed24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java @@ -18,17 +18,17 @@ package org.apache.drill.exec.physical.impl.join; +import io.netty.buffer.ByteBuf; + import java.util.ArrayList; import java.util.BitSet; import java.util.List; -import io.netty.buffer.ByteBuf; - import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.record.selection.SelectionVector4; /* @@ -71,7 +71,7 @@ public class HashJoinHelper { public static final int LEFT_INPUT = 0; public static final int RIGHT_INPUT = 1; - + public HashJoinHelper(FragmentContext context, BufferAllocator allocator) { this.context = context; this.allocator = allocator; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java index ae703399d..7599f9e9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -18,20 +18,17 @@ package org.apache.drill.exec.physical.impl.join; +import java.io.IOException; + import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.physical.impl.common.HashTableConfig; -import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorContainer; import org.eigenbase.rel.JoinRelType; -import java.io.IOException; - public interface HashJoinProbe { public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 0b90362f6..785deae79 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -17,23 +17,21 @@ */ package org.apache.drill.exec.physical.impl.join; +import java.io.IOException; +import java.util.List; + import javax.inject.Named; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.common.HashTable; -import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.AbstractRecordBatch; - +import org.apache.drill.exec.record.VectorWrapper; import org.eigenbase.rel.JoinRelType; -import java.io.IOException; -import java.util.List; - public abstract class HashJoinProbeTemplate implements HashJoinProbe { // Probe side record batch diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index c704a8a3d..bb3b9ac6d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.join; import javax.inject.Named; -import org.apache.drill.common.logical.data.Join; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java index 8643d662e..e4e13d1ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java @@ -24,14 +24,14 @@ import org.apache.drill.exec.record.VectorContainer; public interface JoinWorker { - + public static enum JoinOutcome { NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE; } public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException; public boolean doJoin(JoinStatus status); - + public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class); - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 7a6273c23..b24b5348a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -505,7 +505,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { HoldingContainer out = cg.addExpr(fh, false); // If not 0, it means not equal. We return this out value. - // Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal. + // Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal. if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()) { JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)). cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java index 3549a33bc..d6b566cce 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java @@ -37,8 +37,8 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { if(config.getJoinType() == JoinRelType.RIGHT){ return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0)); }else{ - return new MergeJoinBatch(config, context, children.get(0), children.get(1)); + return new MergeJoinBatch(config, context, children.get(0), children.get(1)); } - + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java index ccbf755fa..e71dababa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java @@ -17,14 +17,15 @@ */ package org.apache.drill.exec.physical.impl.limit; -import com.google.common.collect.Iterables; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.collect.Iterables; public class LimitBatchCreator implements BatchCreator<Limit> { @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 32eb70909..f5bc9f91b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -23,13 +23,11 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Limit; -import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.selection.SelectionVector2; import com.google.common.collect.Lists; @@ -37,7 +35,7 @@ import com.google.common.collect.Lists; public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); - + private SelectionVector2 outgoingSv; private SelectionVector2 incomingSv; private int recordsToSkip; @@ -96,7 +94,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { if (first) { return produceEmptyFirstBatch(); } - + incoming.kill(true); IterOutcome upStream = incoming.next(); @@ -159,7 +157,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { incoming.kill(true); return IterOutcome.OK_NEW_SCHEMA; } - + private void limitWithNoSV(int recordCount) { int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); recordsToSkip -= offset; @@ -211,5 +209,5 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { outgoingSv.clear(); super.cleanup(); } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java index 9d4a62979..cef41013f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java @@ -17,27 +17,27 @@ */ package org.apache.drill.exec.physical.impl.materialize; +import io.netty.buffer.ByteBuf; + import java.util.Arrays; import java.util.List; -import com.google.common.collect.Lists; -import io.netty.buffer.ByteBuf; - import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.WritableBatch; + +import com.google.common.collect.Lists; public class QueryWritableBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class); - + private final QueryResult header; private final ByteBuf[] buffers; - - + + public QueryWritableBatch(QueryResult header, ByteBuf... buffers) { super(); this.header = header; @@ -55,7 +55,7 @@ public class QueryWritableBatch { } return n; } - + public QueryResult getHeader() { return header; } @@ -79,5 +79,5 @@ public class QueryWritableBatch { .build(); return new QueryWritableBatch(header); } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java index aa7f86ecf..221fc3437 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.materialize; public interface RecordMaterializer { - + public QueryWritableBatch convertNext(boolean isLast); - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java index b4e4871a9..cc1b3bf9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java @@ -35,7 +35,7 @@ public class VectorRecordMaterializer implements RecordMaterializer{ this.batch = batch; BatchSchema schema = batch.getSchema(); assert schema != null : "Schema must be defined."; - + // for (MaterializedField f : batch.getSchema()) { // logger.debug("New Field: {}", f); // } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java index 237f2f868..2885c529b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java @@ -17,18 +17,16 @@ */ package org.apache.drill.exec.physical.impl.mergereceiver; +import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; + import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorAccessible; -import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; - public interface MergingReceiverGeneratorBase { - + public abstract void doSetup(FragmentContext context, VectorAccessible incoming, VectorAccessible outgoing) throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java index 002e0544f..c29ef75c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java @@ -17,14 +17,12 @@ */ package org.apache.drill.exec.physical.impl.mergereceiver; +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorAccessible; -import javax.inject.Named; - public abstract class MergingReceiverTemplate implements MergingReceiverGeneratorBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverTemplate.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index b8e18afa5..cf2e36f9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -18,13 +18,14 @@ package org.apache.drill.exec.physical.impl.mergereceiver; * limitations under the License. */ +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; -import io.netty.buffer.ByteBuf; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -68,7 +69,6 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.eigenbase.rel.RelFieldCollation.Direction; import parquet.Preconditions; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java index 2265150fc..3213d1146 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjector.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl.orderedpartitioner; +import java.util.List; + import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; @@ -26,8 +28,6 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; -import java.util.List; - public interface OrderedPartitionProjector { public abstract void setup(FragmentContext context, VectorAccessible incoming, RecordBatch outgoing, List<TransferPair> transfers, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java index 339844360..f5068b4aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java @@ -17,7 +17,10 @@ */ package org.apache.drill.exec.physical.impl.orderedpartitioner; -import com.google.common.collect.ImmutableList; +import java.util.List; + +import javax.inject.Named; + import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; @@ -30,8 +33,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.IntVector; -import javax.inject.Named; -import java.util.List; +import com.google.common.collect.ImmutableList; public abstract class OrderedPartitionProjectorTemplate implements OrderedPartitionProjector { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionProjectorTemplate.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java index 73fcd1fd7..1371e1ce2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java @@ -17,16 +17,16 @@ */ package org.apache.drill.exec.physical.impl.orderedpartitioner; +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector4; -import javax.inject.Named; - public abstract class SampleCopierTemplate implements SampleCopier { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SampleCopierTemplate.class); - + private SelectionVector4 sv4; private int outputRecords = 0; @@ -41,7 +41,7 @@ public abstract class SampleCopierTemplate implements SampleCopier { public int getOutputRecords() { return outputRecords; } - + @Override public boolean copyRecords(int skip, int start, int total) { @@ -57,10 +57,10 @@ public abstract class SampleCopierTemplate implements SampleCopier { } return true; } - + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java index 5a8354142..3d5c548b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSortTemplate.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl.orderedpartitioner; +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; @@ -25,8 +27,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; -import javax.inject.Named; - public abstract class SampleSortTemplate implements SampleSorter, IndexedSortable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SampleSortTemplate.class); @@ -36,7 +36,7 @@ public abstract class SampleSortTemplate implements SampleSorter, IndexedSortabl this.vector2 = vector2; doSetup(context, sampleBatch, null); } - + @Override public void sort(SelectionVector2 vector2, VectorContainer container){ QuickSort qs = new QuickSort(); @@ -49,7 +49,7 @@ public abstract class SampleSortTemplate implements SampleSorter, IndexedSortabl vector2.setIndex(sv0, vector2.getIndex(sv1)); vector2.setIndex(sv1, tmp); } - + @Override public int compare(int leftIndex, int rightIndex) { return doEval(leftIndex, rightIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java index b58f15ed1..1eb5790ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleSorter.java @@ -27,7 +27,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; public interface SampleSorter { public void setup(FragmentContext context, RecordBatch hyperBatch) throws SchemaChangeException; public void sort(SelectionVector2 vector2, VectorContainer container); - + public static TemplateClassDefinition<SampleSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<SampleSorter>(SampleSorter.class, SampleSortTemplate.class); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java index 2ced9dd5d..06fd1155a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java @@ -17,13 +17,14 @@ */ package org.apache.drill.exec.physical.impl.partitionsender; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.RootCreator; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; public class PartitionSenderCreator implements RootCreator<HashPartitionSender> { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 14cf092ee..6ff041879 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -66,10 +66,10 @@ public class PartitionSenderRootExec extends BaseRootExec { private final AtomicIntegerArray remainingReceivers; private final AtomicInteger remaingReceiverCount; private volatile boolean done = false; - + long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; - + public enum Metric implements MetricDef { BATCHES_SENT, RECORDS_SENT, @@ -249,7 +249,7 @@ public class PartitionSenderRootExec extends BaseRootExec { } } } - + public void stop() { logger.debug("Partition sender stopping."); ok = false; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index c5fe154ab..5ed9c392f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.physical.impl.partitionsender; +import java.io.IOException; +import java.util.List; + import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; @@ -26,9 +29,6 @@ import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.record.RecordBatch; -import java.io.IOException; -import java.util.List; - public interface Partitioner { public abstract void setup(FragmentContext context, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 3141aed1b..338a704ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -324,7 +323,7 @@ public abstract class PartitionerTemplate implements Partitioner { throw new IOException(statusHandler.getException()); } } - + public void updateStats(FragmentWritableBatch writableBatch) { stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); stats.addLongStat(Metric.BATCHES_SENT, 1); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java index 7c3fd52df..469140c96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.partitionsender; import io.netty.buffer.ByteBuf; + import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.SendingAccountor; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 91d364751..051a590f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -17,8 +17,11 @@ */ package org.apache.drill.exec.physical.impl.producer; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; + import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; @@ -29,20 +32,11 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.vector.ValueVector; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; - public class ProducerConsumerBatch extends AbstractRecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProducerConsumerBatch.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java index 0fcf4f358..c568ed4d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java @@ -17,16 +17,15 @@ */ package org.apache.drill.exec.physical.impl.producer; -import com.google.common.collect.Iterables; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.impl.BatchCreator; -import org.apache.drill.exec.physical.impl.producer.ProducerConsumerBatch; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.collect.Iterables; public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> { @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 734088e12..ec29cac55 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -54,7 +54,6 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.record.AbstractSingleRecordBatch; -import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -82,8 +81,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private int recordCount; private static final String EMPTY_STRING = ""; - - private class ClassifierResult { + + private class ClassifierResult { public boolean isStar = false; public List<String> outputNames; public String prefix = ""; @@ -91,13 +90,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ public CaseInsensitiveMap outputMap = new CaseInsensitiveMap(); private CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap(); - private void clear() { + private void clear() { isStar = false; prefix = ""; - if (outputNames != null) { + if (outputNames != null) { outputNames.clear(); } - + // note: don't clear the internal maps since they have cumulative data.. } } @@ -128,6 +127,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ return super.innerNext(); } + @Override public VectorContainer getOutgoingContainer() { return this.container; } @@ -262,17 +262,17 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ IntOpenHashSet transferFieldIds = new IntOpenHashSet(); boolean isAnyWildcard = false; - - ClassifierResult result = new ClassifierResult(); + + ClassifierResult result = new ClassifierResult(); boolean classify = isClassificationNeeded(exprs); - + for(int i = 0; i < exprs.size(); i++){ final NamedExpression namedExpression = exprs.get(i); result.clear(); - + if (classify && namedExpression.getExpr() instanceof SchemaPath) { classifyExpr(namedExpression, incoming, result); - + if (result.isStar) { isAnyWildcard = true; Integer value = result.prefixMap.get(result.prefix); @@ -284,12 +284,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ if (k > result.outputNames.size()-1) { assert false; } - String name = result.outputNames.get(k++); // get the renamed column names + String name = result.outputNames.get(k++); // get the renamed column names if (name == EMPTY_STRING) continue; FieldReference ref = new FieldReference(name); TransferPair tp = wrapper.getValueVector().getTransferPair(ref); transfers.add(tp); - container.add(tp.getTo()); + container.add(tp.getTo()); } } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors int k = 0; @@ -305,7 +305,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() ); if(collector.hasErrors()){ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); - } + } MaterializedField outputField = MaterializedField.create(name, expr.getMajorType()); ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); @@ -330,7 +330,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } } } - + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); if(collector.hasErrors()){ @@ -426,7 +426,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ NameSegment ref = ex.getRef().getRootSegment(); boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN); - + if (refHasPrefix || exprContainsStar) { needed = true; break; @@ -434,19 +434,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } return needed; } - + private String getUniqueName(String name, ClassifierResult result) { Integer currentSeq = (Integer) result.sequenceMap.get(name); if (currentSeq == null) { // name is unique, so return the original name Integer n = -1; result.sequenceMap.put(name, n); - return name; + return name; } // create a new name Integer newSeq = currentSeq + 1; - result.sequenceMap.put(name, newSeq); - - String newName = name + newSeq; + result.sequenceMap.put(name, newSeq); + + String newName = name + newSeq; return newName; } @@ -462,8 +462,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ result.outputNames.add(EMPTY_STRING); } } - - private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result) { + + private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result) { NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment(); NameSegment ref = ex.getRef().getRootSegment(); boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); @@ -475,11 +475,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ String exprPrefix = EMPTY_STRING; String exprSuffix = expr.getPath(); - + if (exprHasPrefix) { // get the prefix of the expr String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(exprComponents.length == 2); + assert(exprComponents.length == 2); exprPrefix = exprComponents[0]; exprSuffix = exprComponents[1]; result.prefix = exprPrefix; @@ -496,18 +496,18 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ result.prefixMap.put(exprPrefix, n); } } - + int incomingSchemaSize = incoming.getSchema().getFieldCount(); - // for debugging.. + // for debugging.. // if (incomingSchemaSize > 9) { // assert false; // } - + // input is '*' and output is 'prefix_*' - if (exprIsStar && refHasPrefix && refEndsWithStar) { + if (exprIsStar && refHasPrefix && refEndsWithStar) { String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(components.length == 2); + assert(components.length == 2); String prefix = components[0]; result.outputNames = Lists.newArrayList(); for(VectorWrapper<?> wrapper : incoming) { @@ -518,7 +518,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name; addToResultMaps(newName, result, false); } - } + } // input and output are the same else if (expr.getPath().equals(ref.getPath())) { if (exprContainsStar && exprHasPrefix) { @@ -533,16 +533,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ for(VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); - // get the prefix of the name + // get the prefix of the name String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2); // if incoming valuevector does not have a prefix, ignore it since this expression is not referencing it if (nameComponents.length <= 1) { k++; continue; - } + } String namePrefix = nameComponents[0]; if (exprPrefix.equals(namePrefix)) { - String newName = incomingName; + String newName = incomingName; if (!result.outputMap.containsKey(newName)) { result.outputNames.set(k, newName); result.outputMap.put(newName, newName); @@ -593,13 +593,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ if (components.length <= 1) { k++; continue; - } + } String namePrefix = components[0]; String nameSuffix = components[1]; if (exprPrefix.equals(namePrefix)) { if (refContainsStar) { - // remove the prefix from the incoming column names - String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique + // remove the prefix from the incoming column names + String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique result.outputNames.set(k, newName); } else if (exprSuffix.equals(nameSuffix)) { // example: ref: $f1, expr: T0<PREFIX><column_name> @@ -611,15 +611,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } k++; } - } + } // input and output have prefixes although they could be different... else if (exprHasPrefix && refHasPrefix) { String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); - assert(input.length == 2); - assert false : "Unexpected project expression or reference"; // not handled yet - } + assert(input.length == 2); + assert false : "Unexpected project expression or reference"; // not handled yet + } else { - // if the incoming schema's column name matches the expression name of the Project, + // if the incoming schema's column name matches the expression name of the Project, // then we just want to pick the ref name as the output column name result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize); for (int j=0; j < incomingSchemaSize; j++) { @@ -629,10 +629,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ int k = 0; for(VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); - String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); + String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); if (expr.getPath().equals(incomingName)) { - String newName = ref.getPath(); + String newName = ref.getPath(); if (!result.outputMap.containsKey(newName)) { result.outputNames.set(k, newName); result.outputMap.put(newName, newName); @@ -640,7 +640,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } k++; } - } + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index 02cad5a78..8116869c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -35,11 +35,11 @@ import com.google.common.collect.Lists; */ public class RecordBatchData { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchData.class); - + private SelectionVector2 sv2; private int recordCount; VectorContainer container = new VectorContainer(); - + public RecordBatchData(VectorAccessible batch){ List<ValueVector> vectors = Lists.newArrayList(); if (batch instanceof RecordBatch && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { @@ -47,7 +47,7 @@ public class RecordBatchData { } else { this.sv2 = null; } - + for(VectorWrapper<?> v : batch){ if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); TransferPair tp = v.getValueVector().getTransferPair(); @@ -66,7 +66,7 @@ public class RecordBatchData { container = VectorContainer.canonicalize(container); container.buildSchema(mode); } - + public int getRecordCount(){ return recordCount; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index dbb547dca..3a374910c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -32,7 +32,6 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Sort; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java index cd5079fca..217acf216 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java @@ -35,6 +35,6 @@ public class SortBatchCreator implements BatchCreator<Sort>{ Preconditions.checkArgument(children.size() == 1); return new SortBatch(config, context, children.iterator().next()); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java index e3971cbc3..593db0dee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortTemplate.java @@ -17,10 +17,10 @@ */ package org.apache.drill.exec.physical.impl.sort; +import java.util.concurrent.TimeUnit; + import javax.inject.Named; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; @@ -29,21 +29,22 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; public abstract class SortTemplate implements Sorter, IndexedSortable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortTemplate.class); - + private SelectionVector4 vector4; - + public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{ // we pass in the local hyperBatch since that is where we'll be reading data. Preconditions.checkNotNull(vector4); this.vector4 = vector4; doSetup(context, hyperBatch, null); } - + @Override public void sort(SelectionVector4 vector4, VectorContainer container){ Stopwatch watch = new Stopwatch(); @@ -59,7 +60,7 @@ public abstract class SortTemplate implements Sorter, IndexedSortable{ vector4.set(sv0, vector4.get(sv1)); vector4.set(sv1, tmp); } - + @Override public int compare(int leftIndex, int rightIndex) { int sv1 = vector4.get(leftIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java index dcb159c60..eb8dfdf3e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/Sorter.java @@ -20,14 +20,13 @@ package org.apache.drill.exec.physical.impl.sort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; public interface Sorter { public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException; public void sort(SelectionVector4 vector4, VectorContainer container); - + public static TemplateClassDefinition<Sorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<Sorter>(Sorter.class, SortTemplate.class); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java index dfc37c6ac..8ead6ab93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java @@ -21,7 +21,6 @@ import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.vector.allocator.VectorAllocator; public interface Copier { public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate2.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java index 7a1c0290f..c42332d81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java @@ -23,9 +23,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.allocator.VectorAllocator; public abstract class CopierTemplate4 implements Copier{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 4ca2cdfba..97f3608b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -23,26 +23,19 @@ import java.util.List; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.HyperVectorWrapper; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.vector.CopyUtil; -import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VariableWidthVector; -import org.apache.drill.exec.vector.allocator.FixedVectorAllocator; -import org.apache.drill.exec.vector.allocator.VariableEstimatedVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -186,6 +179,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect incoming.getSchema())); } + @Override public void cleanup(){ super.cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java index a932b4499..455a5f920 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java @@ -35,6 +35,6 @@ public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{ Preconditions.checkArgument(children.size() == 1); return new RemovingRecordBatch(config, context, children.iterator().next()); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java index a24ec705a..12afa3303 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java @@ -18,16 +18,14 @@ package org.apache.drill.exec.physical.impl.trace; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; -import com.google.common.base.Preconditions; - -import java.util.List; - public class TraceBatchCreator implements BatchCreator<Trace> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index b012cec1d..609cb29bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -27,8 +27,6 @@ import org.apache.drill.exec.cache.VectorAccessibleSerializable; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Trace; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java index 395cab425..7f7e11038 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java @@ -35,6 +35,6 @@ public class UnionAllBatchCreator implements BatchCreator<UnionAll>{ Preconditions.checkArgument(children.size() >= 1); return new UnionAllRecordBatch(config, children, context); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 1f2f8430d..6b83d04ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -149,7 +149,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { public WritableBatch getWritableBatch() { return WritableBatch.get(this); } - + @Override public void cleanup() { super.cleanup(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 16a68b809..8e7d9c623 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -17,10 +17,11 @@ */ package org.apache.drill.exec.physical.impl.unorderedreceiver; +import io.netty.buffer.ByteBuf; + import java.io.IOException; import java.util.Iterator; -import io.netty.buffer.ByteBuf; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; @@ -46,7 +47,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.control.ControlTunnel.ReceiverFinished; public class UnorderedReceiverBatch implements RecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java index 6be87143a..d9864f976 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -36,12 +36,12 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver> assert children == null || children.isEmpty(); IncomingBuffers bufHolder = context.getBuffers(); assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; - + RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); assert buffers.length == 1; RawBatchBuffer buffer = buffers[0]; return new UnorderedReceiverBatch(context, buffer, receiver); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 14110e374..0e69bcf71 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -21,7 +21,6 @@ import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; @@ -153,7 +152,7 @@ public class IteratorValidatorBatchIterator implements RecordBatch { public void cleanup() { incoming.cleanup(); } - + @Override public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java index 0078e18a6..eb5d83bf4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java @@ -17,14 +17,15 @@ */ package org.apache.drill.exec.physical.impl.xsort; -import com.google.common.base.Preconditions; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.base.Preconditions; public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatchCreator.class); @@ -34,6 +35,6 @@ public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{ Preconditions.checkArgument(children.size() == 1); return new ExternalSortBatch(config, context, children.iterator().next()); } - - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 0c98c6fd3..df79b1acb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -17,9 +17,10 @@ */ package org.apache.drill.exec.physical.impl.xsort; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Queues; +import java.util.Queue; + +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -28,8 +29,9 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.hadoop.util.IndexedSortable; -import javax.inject.Named; -import java.util.Queue; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Queues; public abstract class MSortTemplate implements MSorter, IndexedSortable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class); @@ -93,7 +95,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ public SelectionVector4 getSV4() { return vector4; } - + @Override public void sort(VectorContainer container){ Stopwatch watch = new Stopwatch(); @@ -137,7 +139,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ vector4.set(sv0, vector4.get(sv1)); vector4.set(sv1, tmp); } - + @Override public int compare(int leftIndex, int rightIndex) { int sv1 = vector4.get(leftIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java index 1300830f6..e80d30903 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java @@ -28,7 +28,7 @@ public interface MSorter { public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException; public void sort(VectorContainer container); public SelectionVector4 getSV4(); - + public static TemplateClassDefinition<MSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<MSorter>(MSorter.class, MSortTemplate.class); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java index 0eda0a6d2..4da3c3604 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java @@ -17,20 +17,15 @@ */ package org.apache.drill.exec.physical.impl.xsort; +import java.util.List; + import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.sort.RecordBatchData; -import org.apache.drill.exec.physical.impl.svremover.Copier; -import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.allocator.VectorAllocator; -import java.util.List; - public interface PriorityQueueCopier { public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java index b1c316ca1..951397c80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java @@ -31,7 +31,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator; public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class); - + private SelectionVector4 vector4; private List<BatchGroup> batchGroups; private VectorAccessible hyperBatch; @@ -165,7 +165,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier vector4.set(sv0, vector4.get(sv1)); vector4.set(sv1, tmp); } - + public int compare(int leftIndex, int rightIndex) { int sv1 = vector4.get(leftIndex); int sv2 = vector4.get(rightIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java index 761e4214d..b37229d2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java @@ -20,16 +20,13 @@ package org.apache.drill.exec.physical.impl.xsort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.sort.SortTemplate; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; public interface SingleBatchSorter { public void setup(FragmentContext context, SelectionVector2 vector2, RecordBatch incoming) throws SchemaChangeException; public void sort(SelectionVector2 vector2); - + public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<SingleBatchSorter>(SingleBatchSorter.class, SingleBatchSorterTemplate.class); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java index 3cb764164..75892f9be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java @@ -17,20 +17,19 @@ */ package org.apache.drill.exec.physical.impl.xsort; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; + +import javax.inject.Named; + import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.impl.sort.Sorter; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; -import javax.inject.Named; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, IndexedSortable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleBatchSorterTemplate.class); @@ -42,7 +41,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In this.vector2 = vector2; doSetup(context, incoming, null); } - + @Override public void sort(SelectionVector2 vector2){ QuickSort qs = new QuickSort(); @@ -60,7 +59,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In vector2.setIndex(sv0, vector2.getIndex(sv1)); vector2.setIndex(sv1, tmp); } - + @Override public int compare(int leftIndex, int rightIndex) { char sv1 = vector2.getIndex(leftIndex); |