aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical
diff options
context:
space:
mode:
authorAditya Kishore <aditya@maprtech.com>2014-09-11 10:43:08 -0700
committerAditya Kishore <aditya@maprtech.com>2014-09-11 19:25:28 -0700
commit676f5df6b14b10ccc3603360e0efee9c745c5b97 (patch)
tree592b02f84e8a6da2ace67f8e6c0e46d4237af20b /exec/java-exec/src/main/java/org/apache/drill/exec/physical
parent7ae257c42b2eb4e1db778dca9ba64e2516078b38 (diff)
DRILL-1402: Add check-style rules for trailing space, TABs and blocks without braces
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java78
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java173
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java68
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java61
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java75
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java60
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java46
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java12
42 files changed, 723 insertions, 471 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index e54e67c3d..defb4e4af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -33,8 +33,10 @@ public abstract class AbstractBase implements PhysicalOperator{
@Override
public void accept(GraphVisitor<PhysicalOperator> visitor) {
visitor.enter(this);
- if(this.iterator() == null) throw new IllegalArgumentException("Null iterator for pop." + this);
- for(PhysicalOperator o : this){
+ if (this.iterator() == null) {
+ throw new IllegalArgumentException("Null iterator for pop." + this);
+ }
+ for (PhysicalOperator o : this) {
Preconditions.checkNotNull(o, String.format("Null in iterator for pop %s.", this));
o.accept(visitor);
}
@@ -46,7 +48,7 @@ public abstract class AbstractBase implements PhysicalOperator{
return true;
}
- public final void setOperatorId(int id){
+ public final void setOperatorId(int id) {
this.id = id;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 9e7beec47..48b38011f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -90,7 +90,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
@Override
public T visitHashAggregate(HashAggregate agg, X value) throws E {
- return visitOp(agg, value);
+ return visitOp(agg, value);
}
@Override
@@ -120,7 +120,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
public T visitChildren(PhysicalOperator op, X value) throws E{
- for(PhysicalOperator child : op){
+ for (PhysicalOperator child : op) {
child.accept(this, value);
}
return null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
index 5f0648da4..980b413de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -60,7 +60,9 @@ public class Screen extends AbstractStore {
public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
// we actually don't have to do anything since nothing should have changed. we'll check just check that things
// didn't get screwed up.
- if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
+ if (endpoints.size() != 1) {
+ throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
+ }
DrillbitEndpoint endpoint = endpoints.iterator().next();
// logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
if (!endpoint.equals(this.endpoint)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index 26d881dc2..f6e11c479 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -63,8 +63,9 @@ public class SingleMergeExchange extends AbstractExchange {
protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> receiverLocations)
throws PhysicalOperatorSetupException {
- if (receiverLocations.size() != 1)
+ if (receiverLocations.size() != 1) {
throw new PhysicalOperatorSetupException("SingleMergeExchange only supports a single receiver endpoint");
+ }
receiverLocation = receiverLocations.iterator().next();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index cafdbdd47..bf2b4a150 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -48,7 +48,9 @@ public class UnionExchange extends AbstractExchange{
@Override
protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
- if(receiverLocations.size() != 1) throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
+ if (receiverLocations.size() != 1) {
+ throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
+ }
this.destinationLocation = receiverLocations.iterator().next();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 7f9762415..e25f1c08e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -41,9 +41,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private RootExec root = null;
- private ImplCreator(){}
+ private ImplCreator() {}
- private RootExec getRoot(){
+ private RootExec getRoot() {
return root;
}
@@ -78,7 +78,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
ImplCreator i = new ImplCreator();
- if(AssertionUtil.isAssertionsEnabled()){
+ if (AssertionUtil.isAssertionsEnabled()) {
root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
}
@@ -86,9 +86,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
watch.start();
root.accept(i, context);
logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS));
- if (i.root == null)
+ if (i.root == null) {
throw new ExecutionSetupException(
"The provided fragment did not have a root node that correctly created a RootExec value.");
+ }
return i.getRoot();
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
index 8c768e508..82a9a6364 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java
@@ -42,7 +42,9 @@ public class OperatorCreatorRegistry {
public synchronized Object getOperatorCreator(Class<?> operator) throws ExecutionSetupException {
Object opCreator = instanceRegistry.get(operator);
- if (opCreator != null) return opCreator;
+ if (opCreator != null) {
+ return opCreator;
+ }
Constructor<?> c = constructorRegistry.get(operator);
if(c == null) {
@@ -75,9 +77,9 @@ public class OperatorCreatorRegistry {
Type[] args = ((ParameterizedType)iface).getActualTypeArguments();
interfaceFound = true;
boolean constructorFound = false;
- for(Constructor<?> constructor : operatorClass.getConstructors()){
+ for (Constructor<?> constructor : operatorClass.getConstructors()) {
Class<?>[] params = constructor.getParameterTypes();
- if(params.length == 0){
+ if (params.length == 0) {
Constructor<?> old = constructorRegistry.put((Class<?>) args[0], constructor);
if (old != null) {
throw new RuntimeException(
@@ -88,7 +90,7 @@ public class OperatorCreatorRegistry {
constructorFound = true;
}
}
- if(!constructorFound){
+ if (!constructorFound) {
logger.debug("Skipping registration of OperatorCreator {} as it doesn't have a default constructor",
operatorClass.getCanonicalName());
}
@@ -97,4 +99,5 @@ public class OperatorCreatorRegistry {
}
}
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index c2a03b9d4..2712e2735 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -83,8 +83,9 @@ public class ScanBatch implements RecordBatch {
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
this.context = context;
this.readers = readers;
- if (!readers.hasNext())
+ if (!readers.hasNext()) {
throw new ExecutionSetupException("A scan batch must contain at least one reader.");
+ }
this.currentReader = readers.next();
this.oContext = new OperatorContext(subScanConfig, context);
this.currentReader.setOperatorContext(this.oContext);
@@ -121,7 +122,7 @@ public class ScanBatch implements RecordBatch {
@Override
public void kill(boolean sendUpstream) {
- if(currentReader != null){
+ if (currentReader != null) {
currentReader.cleanup();
}
@@ -220,8 +221,8 @@ public class ScanBatch implements RecordBatch {
private void addPartitionVectors() throws ExecutionSetupException{
try {
- if(partitionVectors != null){
- for(ValueVector v : partitionVectors){
+ if (partitionVectors != null) {
+ for (ValueVector v : partitionVectors) {
v.clear();
}
}
@@ -290,7 +291,9 @@ public class ScanBatch implements RecordBatch {
if (v == null || v.getClass() != clazz) {
// Field does not exist add it to the map and the output container
v = TypeHelper.getNewVector(field, oContext.getAllocator());
- if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ if (!clazz.isAssignableFrom(v.getClass())) {
+ throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
+ }
container.add(v);
fieldVectorMap.put(field.key(), v);
@@ -342,9 +345,9 @@ public class ScanBatch implements RecordBatch {
return WritableBatch.get(this);
}
- public void cleanup(){
+ public void cleanup() {
container.clear();
- for(ValueVector v : partitionVectors){
+ for (ValueVector v : partitionVectors) {
v.clear();
}
fieldVectorMap.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 2b7fdf3b6..352deaea6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -79,7 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public boolean innerNext() {
- if(!ok){
+ if (!ok) {
incoming.kill(false);
return false;
@@ -93,7 +93,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
out = IterOutcome.NONE;
}
// logger.debug("Outcome of sender next {}", out);
- switch(out){
+ switch (out) {
case STOP:
case NONE:
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(),
@@ -158,7 +158,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public void success(Ack value, ByteBuf buf) {
sendCount.decrement();
- if(value.getOk()) return;
+ if (value.getOk()) {
+ return;
+ }
logger.error("Downstream fragment was not accepted. Stopping future sends.");
// if we didn't get ack ok, we'll need to kill the query.
@@ -170,5 +172,4 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 6eede30dc..473e3a3f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -132,10 +132,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
@Override
public IterOutcome innerNext() {
- if(schema != null){
- if(getSelectionVector4().next()){
+ if (schema != null) {
+ if (getSelectionVector4().next()) {
return IterOutcome.OK;
- }else{
+ } else {
return IterOutcome.NONE;
}
}
@@ -156,8 +156,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
- if(!incoming.getSchema().equals(schema)){
- if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ if (!incoming.getSchema().equals(schema)) {
+ if (schema != null) {
+ throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ }
this.schema = incoming.getSchema();
}
// fall through.
@@ -181,7 +183,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
}
- if (schema == null){
+ if (schema == null) {
// builder may be null at this point if the first incoming batch is empty
return IterOutcome.NONE;
}
@@ -196,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return IterOutcome.OK_NEW_SCHEMA;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
@@ -215,7 +217,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
if (copier == null) {
copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch);
} else {
- for(VectorWrapper<?> i : batch){
+ for (VectorWrapper<?> i : batch) {
ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
newContainer.add(v);
@@ -227,7 +229,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
int count = selectionVector4.getCount();
int copiedRecords = copier.copyRecords(0, count);
assert copiedRecords == count;
- for(VectorWrapper<?> v : newContainer){
+ for (VectorWrapper<?> v : newContainer) {
ValueVector.Mutator m = v.getValueVector().getMutator();
m.setValueCount(count);
}
@@ -253,11 +255,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
ClassGenerator<PriorityQueue> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(Ordering od : orderings){
+ for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(leftMapping);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(rightMapping);
@@ -269,9 +273,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
- }else{
+ } else {
jc._then()._return(out.getValue().minus());
}
g.rotateBlock();
@@ -377,5 +381,4 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
index 58dd247e0..92d1882eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -82,10 +82,12 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra
}
/* Inject trace operator */
- if (list.size() > 0)
- newOp = op.getNewWithChildren(list);
- newOp.setOperatorId(op.getOperatorId());
+ if (list.size() > 0) {
+ newOp = op.getNewWithChildren(list);
+ }
+ newOp.setOperatorId(op.getOperatorId());
return newOp;
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 99eeed374..8c1a4c07b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -82,8 +82,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
IterOutcome upstream;
do {
upstream = next(incoming);
- if(first && upstream == IterOutcome.OK)
+ if(first && upstream == IterOutcome.OK) {
upstream = IterOutcome.OK_NEW_SCHEMA;
+ }
first = false;
switch(upstream) {
@@ -91,14 +92,15 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
case NONE:
case STOP:
cleanup();
- if (upstream == IterOutcome.STOP)
+ if (upstream == IterOutcome.STOP) {
return upstream;
+ }
break;
case OK_NEW_SCHEMA:
try{
setupNewSchema();
- }catch(Exception ex){
+ } catch(Exception ex) {
kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
@@ -113,9 +115,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
throw new RuntimeException(ex);
}
- for(VectorWrapper v : incoming)
+ for(VectorWrapper v : incoming) {
v.getValueVector().clear();
-
+ }
break;
default:
@@ -176,4 +178,5 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
throw new RuntimeException("Failed to close RecordWriter", ex);
}
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index e9be2ac99..c5228709d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -82,7 +82,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
@Override
public int getRecordCount() {
- if(done) return 0;
+ if (done) {
+ return 0;
+ }
return aggregator.getOutputCount();
}
@@ -102,7 +104,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
case STOP:
return outcome;
case OK_NEW_SCHEMA:
- if (!createAggregator()){
+ if (!createAggregator()) {
done = true;
return IterOutcome.STOP;
}
@@ -131,10 +133,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
- while(true){
+ while (true) {
AggOutcome out = aggregator.doWork();
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
- switch(out){
+ switch (out) {
case CLEANUP_AND_RETURN:
container.zeroVectors();
aggregator.cleanup();
@@ -150,7 +152,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return aggregator.getOutcome();
case UPDATE_AGGREGATOR:
aggregator = null;
- if(!createAggregator()){
+ if (!createAggregator()) {
return IterOutcome.STOP;
}
continue;
@@ -168,23 +170,23 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
*/
private boolean createAggregator() {
logger.debug("Creating new aggregator.");
- try{
+ try {
stats.startSetup();
this.aggregator = createAggregatorInternal();
return true;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
context.fail(ex);
container.clear();
incoming.kill(false);
return false;
- }finally{
+ } finally {
stats.stopSetup();
}
}
private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
- CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- ClassGenerator<HashAggregator> cg = top.getRoot();
+ CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ ClassGenerator<HashAggregator> cg = top.getRoot();
ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
container.clear();
@@ -199,10 +201,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
int i;
- for(i = 0; i < numGroupByExprs; i++) {
+ for (i = 0; i < numGroupByExprs; i++) {
NamedExpression ne = popConfig.getGroupByExprs()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() );
- if(expr == null) continue;
+ if (expr == null) {
+ continue;
+ }
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -211,13 +215,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
groupByOutFieldIds[i] = container.add(vv);
}
- for(i = 0; i < numAggrExprs; i++){
+ for (i = 0; i < numAggrExprs; i++) {
NamedExpression ne = popConfig.getAggrExprs()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() );
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
- if(expr == null) continue;
+ if (expr == null) {
+ continue;
+ }
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -248,7 +256,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return agg;
}
-
private void setupUpdateAggrValues(ClassGenerator<HashAggregator> cg) {
cg.setMappingSet(UpdateAggrValuesMapping);
@@ -260,8 +267,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
- private void setupGetIndex(ClassGenerator<HashAggregator> cg){
- switch(incoming.getSchema().getSelectionVectorMode()){
+ private void setupGetIndex(ClassGenerator<HashAggregator> cg) {
+ switch (incoming.getSchema().getSelectionVectorMode()) {
case FOUR_BYTE: {
JVar var = cg.declareClassField("sv4_", cg.getModel()._ref(SelectionVector4.class));
cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index b6b887415..d25a95266 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -156,7 +156,9 @@ public abstract class HashAggTemplate implements HashAggregator {
boolean status = true;
for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
if (outputRecordValues(i, batchOutputCount) ) {
- if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: {}", batchOutputCount) ;
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Outputting values to output index: {}", batchOutputCount) ;
+ }
batchOutputCount++;
outNumRecordsHolder.value++;
} else {
@@ -270,31 +272,41 @@ public abstract class HashAggTemplate implements HashAggregator {
outside: while(true) {
// loop through existing records, aggregating the values as necessary.
- if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
+ if (EXTRA_DEBUG_1) {
+ logger.debug ("Starting outer loop of doWork()...");
+ }
for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
- if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ if(EXTRA_DEBUG_2) {
+ logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ }
boolean success = checkGroupAndAggrValues(currentIndex);
assert success : "HashAgg couldn't copy values.";
}
- if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex);
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Processed {} records", underlyingIndex);
+ }
- try{
+ try {
- while(true){
+ while (true) {
// Cleanup the previous batch since we are done processing it.
for (VectorWrapper<?> v : incoming) {
v.getValueVector().clear();
}
IterOutcome out = outgoing.next(0, incoming);
- if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out);
- switch(out){
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Received IterOutcome of {}", out);
+ }
+ switch (out) {
case NOT_YET:
this.outcome = out;
return AggOutcome.RETURN_OUTCOME;
case OK_NEW_SCHEMA:
- if(EXTRA_DEBUG_1) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ }
newSchema = true;
this.cleanup();
// TODO: new schema case needs to be handled appropriately
@@ -302,14 +314,16 @@ public abstract class HashAggTemplate implements HashAggregator {
case OK:
resetIndex();
- if(incoming.getRecordCount() == 0){
+ if (incoming.getRecordCount() == 0) {
continue;
} else {
boolean success = checkGroupAndAggrValues(currentIndex);
assert success : "HashAgg couldn't copy values.";
incIndex();
- if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop");
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Continuing outside loop");
+ }
continue outside;
}
@@ -343,8 +357,10 @@ public abstract class HashAggTemplate implements HashAggregator {
// placeholder...
}
}
- } finally{
- if(first) first = !first;
+ } finally {
+ if (first) {
+ first = !first;
+ }
}
}
@@ -373,7 +389,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
@Override
- public void cleanup(){
+ public void cleanup() {
if (htable != null) {
htable.clear();
htable = null;
@@ -392,28 +408,28 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
- private final AggOutcome setOkAndReturn(){
- if(first){
+ private final AggOutcome setOkAndReturn() {
+ if (first) {
this.outcome = IterOutcome.OK_NEW_SCHEMA;
- }else{
+ } else {
this.outcome = IterOutcome.OK;
}
- for(VectorWrapper<?> v : outgoing){
+ for (VectorWrapper<?> v : outgoing) {
v.getValueVector().getMutator().setValueCount(outputCount);
}
return AggOutcome.RETURN_OUTCOME;
}
- private final void incIndex(){
+ private final void incIndex() {
underlyingIndex++;
- if(underlyingIndex >= incoming.getRecordCount()){
+ if (underlyingIndex >= incoming.getRecordCount()) {
currentIndex = Integer.MAX_VALUE;
return;
}
currentIndex = getVectorIndex(underlyingIndex);
}
- private final void resetIndex(){
+ private final void resetIndex() {
underlyingIndex = -1;
incIndex();
}
@@ -422,7 +438,9 @@ public abstract class HashAggTemplate implements HashAggregator {
BatchHolder bh = new BatchHolder();
batchHolders.add(bh);
- if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+ if (EXTRA_DEBUG_1) {
+ logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+ }
bh.setup();
}
@@ -465,9 +483,9 @@ public abstract class HashAggTemplate implements HashAggregator {
outputCount += numOutputRecords;
- if(first){
+ if (first) {
this.outcome = IterOutcome.OK_NEW_SCHEMA;
- }else{
+ } else {
this.outcome = IterOutcome.OK;
}
@@ -486,14 +504,14 @@ public abstract class HashAggTemplate implements HashAggregator {
} else {
if (!outputKeysStatus) {
logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex);
- for(VectorWrapper<?> v : outContainer) {
+ for (VectorWrapper<?> v : outContainer) {
logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity());
}
context.fail(new Exception("Failed to output keys for current batch !"));
}
if (!outputValuesStatus) {
logger.debug("Failed to output values for current batch index: {} ", outBatchIndex);
- for(VectorWrapper<?> v : outContainer) {
+ for (VectorWrapper<?> v : outContainer) {
logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity());
}
context.fail(new Exception("Failed to output values for current batch !"));
@@ -557,7 +575,9 @@ public abstract class HashAggTemplate implements HashAggregator {
if (putStatus == HashTable.PutStatus.KEY_PRESENT) {
- if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash table, updating the aggregate values");
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Group-by key already present in hash table, updating the aggregate values");
+ }
// debugging
//if (holder.value == 100018 || holder.value == 100021) {
@@ -566,7 +586,9 @@ public abstract class HashAggTemplate implements HashAggregator {
}
else if (putStatus == HashTable.PutStatus.KEY_ADDED) {
- if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Group-by key was added to hash table, inserting new aggregate values") ;
+ }
// debugging
// if (holder.value == 100018 || holder.value == 100021) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 4277f2306..238242bc1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -40,7 +40,7 @@ public interface HashAggregator {
public static enum AggOutcome {
RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
- }
+ }
public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
OperatorStats stats, BufferAllocator allocator, RecordBatch incoming,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index 3e6def128..e6900605f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -34,8 +34,8 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
private final SelectionVector2 sv2;
private final SelectionVector4 sv4;
- public InternalBatch(RecordBatch incoming){
- switch(incoming.getSchema().getSelectionVectorMode()){
+ public InternalBatch(RecordBatch incoming) {
+ switch(incoming.getSchema().getSelectionVectorMode()) {
case FOUR_BYTE:
this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
this.sv2 = null;
@@ -69,13 +69,17 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
return container.iterator();
}
- public void clear(){
- if(sv2 != null) sv2.clear();
- if(sv4 != null) sv4.clear();
+ public void clear() {
+ if (sv2 != null) {
+ sv2.clear();
+ }
+ if (sv4 != null) {
+ sv4.clear();
+ }
container.clear();
}
- public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds) {
return container.getValueAccessorById(clazz, fieldIds);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 820f7229b..ced51798f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -67,8 +67,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
@Override
public int getRecordCount() {
- if(done) return 0;
- if (aggregator == null) return 0;
+ if (done) {
+ return 0;
+ }
+ if (aggregator == null) {
+ return 0;
+ }
return aggregator.getOutputCount();
}
@@ -88,7 +92,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
case STOP:
return outcome;
case OK_NEW_SCHEMA:
- if (!createAggregator()){
+ if (!createAggregator()) {
done = true;
return IterOutcome.STOP;
}
@@ -100,12 +104,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
}
- while(true){
+ while (true) {
AggOutcome out = aggregator.doWork();
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
- switch(out){
+ switch (out) {
case CLEANUP_AND_RETURN:
- if (!first) container.zeroVectors();
+ if (!first) {
+ container.zeroVectors();
+ }
done = true;
// fall through
case RETURN_OUTCOME:
@@ -122,7 +128,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
case UPDATE_AGGREGATOR:
first = false;
aggregator = null;
- if(!createAggregator()){
+ if (!createAggregator()) {
return IterOutcome.STOP;
}
continue;
@@ -142,23 +148,20 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
*/
private boolean createAggregator() {
logger.debug("Creating new aggregator.");
- try{
+ try {
stats.startSetup();
this.aggregator = createAggregatorInternal();
return true;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch (SchemaChangeException | ClassTransformationException | IOException ex) {
context.fail(ex);
container.clear();
incoming.kill(false);
return false;
- }finally{
+ } finally {
stats.stopSetup();
}
}
-
-
-
private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry());
container.clear();
@@ -169,20 +172,24 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
ErrorCollector collector = new ErrorCollectorImpl();
- for(int i =0; i < keyExprs.length; i++){
+ for (int i =0; i < keyExprs.length; i++) {
NamedExpression ne = popConfig.getKeys()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() );
- if(expr == null) continue;
+ if (expr == null) {
+ continue;
+ }
keyExprs[i] = expr;
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
keyOutputIds[i] = container.add(vector);
}
- for(int i =0; i < valueExprs.length; i++){
+ for (int i =0; i < valueExprs.length; i++) {
NamedExpression ne = popConfig.getExprs()[i];
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
- if(expr == null) continue;
+ if (expr == null) {
+ continue;
+ }
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -190,7 +197,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
}
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
setupIsSame(cg, keyExprs);
setupIsSameApart(cg, keyExprs);
@@ -207,15 +216,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
return agg;
}
-
-
private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null);
private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME);
private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME);
- private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){
+ private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
cg.setMappingSet(IS_SAME_I1);
- for(LogicalExpression expr : keyExprs){
+ for (LogicalExpression expr : keyExprs) {
// first, we rewrite the evaluation stack for each side of the comparison.
cg.setMappingSet(IS_SAME_I1);
HoldingContainer first = cg.addExpr(expr, false);
@@ -234,9 +241,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
- private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){
+ private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) {
cg.setMappingSet(ISA_B1);
- for(LogicalExpression expr : keyExprs){
+ for (LogicalExpression expr : keyExprs) {
// first, we rewrite the evaluation stack for each side of the comparison.
cg.setMappingSet(ISA_B1);
HoldingContainer first = cg.addExpr(expr, false);
@@ -254,9 +261,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup");
private final MappingSet EVAL = new MappingSet("index", "outIndex", "incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
- private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs){
+ private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) {
cg.setMappingSet(EVAL);
- for(LogicalExpression ex : valueExprs){
+ for (LogicalExpression ex : valueExprs) {
HoldingContainer hc = cg.addExpr(ex);
cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
@@ -265,9 +272,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null));
- private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
+ private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
cg.setMappingSet(RECORD_KEYS);
- for(int i =0; i < keyExprs.length; i++){
+ for (int i =0; i < keyExprs.length; i++) {
HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true));
cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
@@ -280,10 +287,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final GeneratorMapping PREVIOUS_KEYS = GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, null);
private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", "outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS);
- private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){
+ private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) {
cg.setMappingSet(RECORD_KEYS_PREV);
- for(int i =0; i < keyExprs.length; i++){
+ for (int i =0; i < keyExprs.length; i++) {
// IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same. This is possible because InternalBatch guarantees this.
logger.debug("Writing out expr {}", keyExprs[i]);
cg.rotateBlock();
@@ -297,8 +304,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE);
}
- private void getIndex(ClassGenerator<StreamingAggregator> g){
- switch(incoming.getSchema().getSelectionVectorMode()){
+ private void getIndex(ClassGenerator<StreamingAggregator> g) {
+ switch (incoming.getSchema().getSelectionVectorMode()) {
case FOUR_BYTE: {
JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class));
g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 53ac1ed4f..c2a5715cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -60,7 +60,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
private void allocateOutgoing() {
- for(VectorWrapper<?> w : outgoing){
+ for (VectorWrapper<?> w : outgoing) {
w.getValueVector().allocateNew();
}
}
@@ -75,7 +75,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return outputCount;
}
- private AggOutcome tooBigFailure(){
+ private AggOutcome tooBigFailure() {
context.fail(new Exception(TOO_BIG_ERROR));
this.outcome = IterOutcome.STOP;
return AggOutcome.CLEANUP_AND_RETURN;
@@ -87,11 +87,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
outcome = IterOutcome.NONE;
return AggOutcome.CLEANUP_AND_RETURN;
}
- try{ // outside loop to ensure that first is set to false after the first run.
+ try { // outside loop to ensure that first is set to false after the first run.
outputCount = 0;
// if we're in the first state, allocate outgoing.
- if(first){
+ if (first) {
allocateOutgoing();
}
@@ -119,8 +119,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
}
// pick up a remainder batch if we have one.
- if(remainderBatch != null){
- if (!outputToBatch( previousIndex )) return tooBigFailure();
+ if (remainderBatch != null) {
+ if (!outputToBatch( previousIndex )) {
+ return tooBigFailure();
+ }
remainderBatch.clear();
remainderBatch = null;
return setOkAndReturn();
@@ -131,38 +133,56 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
if (pendingOutput) {
allocateOutgoing();
pendingOutput = false;
- if(EXTRA_DEBUG) logger.debug("Attempting to output remainder.");
- if (!outputToBatch( previousIndex)) return tooBigFailure();
+ if (EXTRA_DEBUG) {
+ logger.debug("Attempting to output remainder.");
+ }
+ if (!outputToBatch( previousIndex)) {
+ return tooBigFailure();
+ }
}
- if(newSchema){
+ if (newSchema) {
return AggOutcome.UPDATE_AGGREGATOR;
}
- if(lastOutcome != null){
+ if (lastOutcome != null) {
outcome = lastOutcome;
return AggOutcome.CLEANUP_AND_RETURN;
}
- outside: while(true){
+ outside: while(true) {
// loop through existing records, adding as necessary.
for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
- if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ if (EXTRA_DEBUG) {
+ logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+ }
if (previousIndex == -1) {
- if (EXTRA_DEBUG) logger.debug("Adding the initial row's keys and values.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Adding the initial row's keys and values.");
+ }
addRecordInc(currentIndex);
}
else if (isSame( previousIndex, currentIndex )) {
- if(EXTRA_DEBUG) logger.debug("Values were found the same, adding.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Values were found the same, adding.");
+ }
addRecordInc(currentIndex);
} else {
- if(EXTRA_DEBUG) logger.debug("Values were different, outputting previous batch.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Values were different, outputting previous batch.");
+ }
if (outputToBatch(previousIndex)) {
- if(EXTRA_DEBUG) logger.debug("Output successful.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Output successful.");
+ }
addRecordInc(currentIndex);
} else {
- if(EXTRA_DEBUG) logger.debug("Output failed.");
- if(outputCount == 0) return tooBigFailure();
+ if (EXTRA_DEBUG) {
+ logger.debug("Output failed.");
+ }
+ if (outputCount == 0) {
+ return tooBigFailure();
+ }
// mark the pending output but move forward for the next cycle.
pendingOutput = true;
@@ -178,23 +198,29 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
InternalBatch previous = null;
- try{
- while(true){
+ try {
+ while (true) {
if (previous != null) {
previous.clear();
}
previous = new InternalBatch(incoming);
IterOutcome out = outgoing.next(0, incoming);
- if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
- switch(out){
+ if (EXTRA_DEBUG) {
+ logger.debug("Received IterOutcome of {}", out);
+ }
+ switch (out) {
case NONE:
done = true;
lastOutcome = out;
if (first && addedRecordCount == 0) {
return setOkAndReturn();
- } else if(addedRecordCount > 0){
- if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
- if(EXTRA_DEBUG) logger.debug("Received no more batches, returning.");
+ } else if(addedRecordCount > 0) {
+ if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
+ remainderBatch = previous;
+ }
+ if (EXTRA_DEBUG) {
+ logger.debug("Received no more batches, returning.");
+ }
return setOkAndReturn();
}else{
if (first && out == IterOutcome.OK) {
@@ -204,17 +230,21 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return AggOutcome.CLEANUP_AND_RETURN;
}
-
-
case NOT_YET:
this.outcome = out;
return AggOutcome.RETURN_OUTCOME;
case OK_NEW_SCHEMA:
- if(EXTRA_DEBUG) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
- if(addedRecordCount > 0){
- if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
- if(EXTRA_DEBUG) logger.debug("Wrote out end of previous batch, returning.");
+ if (EXTRA_DEBUG) {
+ logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ }
+ if (addedRecordCount > 0) {
+ if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
+ remainderBatch = previous;
+ }
+ if (EXTRA_DEBUG) {
+ logger.debug("Wrote out end of previous batch, returning.");
+ }
newSchema = true;
return setOkAndReturn();
}
@@ -222,21 +252,27 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return AggOutcome.UPDATE_AGGREGATOR;
case OK:
resetIndex();
- if(incoming.getRecordCount() == 0){
+ if (incoming.getRecordCount() == 0) {
continue;
- }else{
- if(previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)){
- if(EXTRA_DEBUG) logger.debug("New value was same as last value of previous batch, adding.");
+ } else {
+ if (previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)) {
+ if (EXTRA_DEBUG) {
+ logger.debug("New value was same as last value of previous batch, adding.");
+ }
addRecordInc(currentIndex);
previousIndex = currentIndex;
incIndex();
- if(EXTRA_DEBUG) logger.debug("Continuing outside");
+ if (EXTRA_DEBUG) {
+ logger.debug("Continuing outside");
+ }
continue outside;
- }else{ // not the same
- if(EXTRA_DEBUG) logger.debug("This is not the same as the previous, add record and continue outside.");
+ } else { // not the same
+ if (EXTRA_DEBUG) {
+ logger.debug("This is not the same as the previous, add record and continue outside.");
+ }
previousIndex = currentIndex;
- if(addedRecordCount > 0){
- if( !outputToBatchPrev( previous, previousIndex, outputCount) ){
+ if (addedRecordCount > 0) {
+ if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) {
remainderBatch = previous;
return setOkAndReturn();
}
@@ -251,72 +287,78 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return AggOutcome.CLEANUP_AND_RETURN;
}
-
}
- }finally{
+ } finally {
// make sure to clear previous if we haven't saved it.
- if(remainderBatch == null && previous != null){
+ if (remainderBatch == null && previous != null) {
previous.clear();
}
}
}
- }finally{
- if(first) first = !first;
+ } finally {
+ if (first) {
+ first = !first;
+ }
}
}
-
- private final void incIndex(){
+ private final void incIndex() {
underlyingIndex++;
- if(underlyingIndex >= incoming.getRecordCount()){
+ if (underlyingIndex >= incoming.getRecordCount()) {
currentIndex = Integer.MAX_VALUE;
return;
}
currentIndex = getVectorIndex(underlyingIndex);
}
- private final void resetIndex(){
+ private final void resetIndex() {
underlyingIndex = -1;
incIndex();
}
- private final AggOutcome setOkAndReturn(){
- if(first){
+ private final AggOutcome setOkAndReturn() {
+ if (first) {
this.outcome = IterOutcome.OK_NEW_SCHEMA;
- }else{
+ } else {
this.outcome = IterOutcome.OK;
}
- for(VectorWrapper<?> v : outgoing){
+ for (VectorWrapper<?> v : outgoing) {
v.getValueVector().getMutator().setValueCount(outputCount);
}
return AggOutcome.RETURN_OUTCOME;
}
- private final boolean outputToBatch(int inIndex){
+ private final boolean outputToBatch(int inIndex) {
- if(!outputRecordKeys(inIndex, outputCount)){
- if(EXTRA_DEBUG) logger.debug("Failure while outputting keys {}", outputCount);
+ if (!outputRecordKeys(inIndex, outputCount)) {
+ if(EXTRA_DEBUG) {
+ logger.debug("Failure while outputting keys {}", outputCount);
+ }
return false;
}
- if(!outputRecordValues(outputCount)){
- if(EXTRA_DEBUG) logger.debug("Failure while outputting values {}", outputCount);
+ if (!outputRecordValues(outputCount)) {
+ if (EXTRA_DEBUG) {
+ logger.debug("Failure while outputting values {}", outputCount);
+ }
return false;
}
- if(EXTRA_DEBUG) logger.debug("{} values output successfully", outputCount);
+ if (EXTRA_DEBUG) {
+ logger.debug("{} values output successfully", outputCount);
+ }
resetValues();
outputCount++;
addedRecordCount = 0;
return true;
}
- private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex){
+ private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) //
&& outputRecordValues(outIndex) //
&& resetValues();
- if(success){
+ if (success) {
resetValues();
outputCount++;
addedRecordCount = 0;
@@ -325,17 +367,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
return success;
}
- private void addRecordInc(int index){
+ private void addRecordInc(int index) {
addRecord(index);
this.addedRecordCount++;
}
@Override
- public void cleanup(){
- if(remainderBatch != null) remainderBatch.clear();
+ public void cleanup() {
+ if (remainderBatch != null) {
+ remainderBatch.clear();
+ }
}
-
public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 8f5f29be1..96da00b46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -28,8 +28,8 @@ public interface StreamingAggregator {
public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class);
public static enum AggOutcome {
- RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
- }
+ RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
+ }
public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 195d24900..f77407eac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -53,23 +53,23 @@ public class ChainedHashTable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class);
private static final GeneratorMapping KEY_MATCH_BUILD =
- GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */,
+ GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping KEY_MATCH_PROBE =
- GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */,
+ GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping GET_HASH_BUILD =
- GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
+ GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping GET_HASH_PROBE =
- GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */,
+ GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping SET_VALUE =
- GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */,
+ GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping OUTPUT_KEYS =
@@ -138,8 +138,12 @@ public class ChainedHashTable {
int i = 0;
for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
- if (expr == null) continue;
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
+ if (expr == null) {
+ continue;
+ }
keyExprsBuild[i] = expr;
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
@@ -155,8 +159,12 @@ public class ChainedHashTable {
i = 0;
for (NamedExpression ne : htConfig.getKeyExprsProbe()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
- if (expr == null) continue;
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
+ if (expr == null) {
+ continue;
+ }
keyExprsProbe[i] = expr;
i++;
}
@@ -293,4 +301,3 @@ public class ChainedHashTable {
}
}
}
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index b03880cce..6024523e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -164,10 +164,11 @@ public abstract class HashTableTemplate implements HashTable {
assert (currentIdxWithinBatch < HashTable.BATCH_SIZE);
assert (incomingRowIdx < HashTable.BATCH_SIZE);
- if (isProbe)
+ if (isProbe) {
match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch);
- else
+ } else {
match = isKeyMatchInternalBuild(incomingRowIdx, currentIdxWithinBatch);
+ }
if (! match) {
currentIdxHolder.value = links.getAccessor().get(currentIdxWithinBatch);
@@ -196,7 +197,9 @@ public abstract class HashTableTemplate implements HashTable {
maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch);
- if (EXTRA_DEBUG) logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue);
+ if (EXTRA_DEBUG) {
+ logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue);
+ }
return true;
}
@@ -225,7 +228,9 @@ public abstract class HashTableTemplate implements HashTable {
newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
- if (EXTRA_DEBUG) logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ if (EXTRA_DEBUG) {
+ logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ }
} else {
// follow the new table's hash chain until we encounter empty slot. Note that the hash chain could
@@ -245,7 +250,9 @@ public abstract class HashTableTemplate implements HashTable {
newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
- if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ if (EXTRA_DEBUG) {
+ logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ }
break;
} else if (bh != this && bh.links.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
@@ -253,7 +260,9 @@ public abstract class HashTableTemplate implements HashTable {
newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this batch to mark end of the hash chain
newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
- if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ if (EXTRA_DEBUG) {
+ logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
+ }
break;
}
@@ -381,11 +390,19 @@ public abstract class HashTableTemplate implements HashTable {
float loadf = htConfig.getLoadFactor();
int initialCap = htConfig.getInitialCapacity();
- if (loadf <= 0 || Float.isNaN(loadf)) throw new IllegalArgumentException("Load factor must be a valid number greater than 0");
- if (initialCap <= 0) throw new IllegalArgumentException("The initial capacity must be greater than 0");
- if (initialCap > MAXIMUM_CAPACITY) throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed");
+ if (loadf <= 0 || Float.isNaN(loadf)) {
+ throw new IllegalArgumentException("Load factor must be a valid number greater than 0");
+ }
+ if (initialCap <= 0) {
+ throw new IllegalArgumentException("The initial capacity must be greater than 0");
+ }
+ if (initialCap > MAXIMUM_CAPACITY) {
+ throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed");
+ }
- if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) throw new IllegalArgumentException("Hash table must have at least 1 key expression");
+ if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) {
+ throw new IllegalArgumentException("Hash table must have at least 1 key expression");
+ }
this.htConfig = htConfig;
this.context = context;
@@ -397,8 +414,9 @@ public abstract class HashTableTemplate implements HashTable {
// round up the initial capacity to nearest highest power of 2
tableSize = roundUpToPowerOf2(initialCap);
- if (tableSize > MAXIMUM_CAPACITY)
+ if (tableSize > MAXIMUM_CAPACITY) {
tableSize = MAXIMUM_CAPACITY;
+ }
threshold = (int) Math.ceil(tableSize * loadf);
@@ -500,7 +518,9 @@ public abstract class HashTableTemplate implements HashTable {
currentIdx = freeIndex++;
addBatchIfNeeded(currentIdx);
- if (EXTRA_DEBUG) logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx);
+ if (EXTRA_DEBUG) {
+ logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx);
+ }
if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
// update the start index array
@@ -543,14 +563,16 @@ public abstract class HashTableTemplate implements HashTable {
currentIdx = freeIndex++;
addBatchIfNeeded(currentIdx);
- if (EXTRA_DEBUG) logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
+ if (EXTRA_DEBUG) {
+ logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
+ }
if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
htIdxHolder.value = currentIdx;
return PutStatus.KEY_ADDED;
- }
- else
+ } else {
return PutStatus.PUT_FAILED;
+ }
}
return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED ;
@@ -618,7 +640,9 @@ public abstract class HashTableTemplate implements HashTable {
if (currentIdx >= totalBatchSize) {
BatchHolder bh = addBatchHolder();
- if (EXTRA_DEBUG) logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
+ if (EXTRA_DEBUG) {
+ logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
+ }
return bh;
}
else {
@@ -638,12 +662,15 @@ public abstract class HashTableTemplate implements HashTable {
// in the new table.. the metadata consists of the startIndices, links and hashValues.
// Note that the keys stored in the BatchHolders are not moved around.
private void resizeAndRehashIfNeeded() {
- if (numEntries < threshold)
+ if (numEntries < threshold) {
return;
+ }
long t0 = System.currentTimeMillis();
- if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
+ if (EXTRA_DEBUG) {
+ logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
+ }
// If the table size is already MAXIMUM_CAPACITY, don't resize
// the table, but set the threshold to Integer.MAX_VALUE such that
@@ -656,8 +683,9 @@ public abstract class HashTableTemplate implements HashTable {
int newSize = 2 * tableSize;
tableSize = roundUpToPowerOf2(newSize);
- if (tableSize > MAXIMUM_CAPACITY)
+ if (tableSize > MAXIMUM_CAPACITY) {
tableSize = MAXIMUM_CAPACITY;
+ }
// set the new threshold based on the new table size and load factor
threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor());
@@ -717,5 +745,3 @@ public abstract class HashTableTemplate implements HashTable {
protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) ;
}
-
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index bf00194aa..f1fcce0d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -79,7 +79,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
protected void doWork() {
int recordCount = incoming.getRecordCount();
filter.filterBatch(recordCount);
-// for(VectorWrapper<?> v : container){
+// for (VectorWrapper<?> v : container) {
// ValueVector.Mutator m = v.getValueVector().getMutator();
// m.setValueCount(recordCount);
// }
@@ -88,8 +88,12 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
@Override
public void cleanup() {
- if(sv2 != null) sv2.clear();
- if(sv4 != null) sv4.clear();
+ if (sv2 != null) {
+ sv2.clear();
+ }
+ if (sv4 != null) {
+ sv4.clear();
+ }
super.cleanup();
}
@@ -100,7 +104,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
sv2.clear();
}
- switch(incoming.getSchema().getSelectionVectorMode()){
+ switch (incoming.getSchema().getSelectionVectorMode()) {
case NONE:
sv2 = new SelectionVector2(oContext.getAllocator());
this.filter = generateSV2Filterer();
@@ -137,13 +141,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry());
- if(collector.hasErrors()){
+ if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
cg.addExpr(new ReturnValueExpression(expr));
-// for(VectorWrapper<?> i : incoming){
+// for (VectorWrapper<?> i : incoming) {
// ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
// container.add(v);
// allocators.add(getAllocator4(v));
@@ -177,13 +181,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry());
- if(collector.hasErrors()){
+ if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
cg.addExpr(new ReturnValueExpression(expr));
- for(VectorWrapper<?> v : incoming){
+ for (VectorWrapper<?> v : incoming) {
TransferPair pair = v.getValueVector().getTransferPair();
container.add(pair.getTo());
transfers.add(pair);
@@ -202,5 +206,4 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 15044b823..2a08c053a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -457,8 +457,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
return hj;
}
- private void allocateVectors(){
- for(VectorWrapper<?> v : container){
+ private void allocateVectors() {
+ for(VectorWrapper<?> v : container) {
v.getValueVector().allocateNew();
}
}
@@ -472,7 +472,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
private void updateStats(HashTable htable) {
- if(htable == null) return;
+ if (htable == null) {
+ return;
+ }
htable.getStats(htStats);
this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
@@ -488,7 +490,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
@Override
public void cleanup() {
- if(hjHelper != null){
+ if (hjHelper != null) {
hjHelper.clear();
}
@@ -504,4 +506,5 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
left.cleanup();
right.cleanup();
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 785deae79..133289e08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -94,11 +94,13 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
boolean success = true;
while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
- if(success){
+ if (success) {
recordsProcessed++;
outputRecords++;
- }else{
- if(outputRecords == 0) throw new IllegalStateException("Too big to fail.");
+ } else {
+ if (outputRecords == 0) {
+ throw new IllegalStateException("Too big to fail.");
+ }
break;
}
}
@@ -166,11 +168,11 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
&& projectProbeRecord(recordsProcessed, outputRecords);
- if(!success){
+ if (!success) {
// we failed to project. redo this record.
getNextRecord = false;
return;
- }else{
+ } else {
outputRecords++;
/* Projected single row from the build side with matching key but there
@@ -182,8 +184,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
* from the probe side. Drain the next row in the probe side.
*/
recordsProcessed++;
- }
- else {
+ } else {
/* There is more than one row with the same key on the build side
* don't drain more records from the probe side till we have projected
* all the rows with this key
@@ -197,10 +198,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
// If we have a left outer join, project the keys
if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
boolean success = projectProbeRecord(recordsProcessed, outputRecords);
- if(!success){
- if(outputRecords == 0){
+ if (!success) {
+ if (outputRecords == 0) {
throw new IllegalStateException("Record larger than single batch.");
- }else{
+ } else {
// we've output some records but failed to output this one. return and wait for next call.
return;
}
@@ -214,10 +215,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
hjHelper.setRecordMatched(currentCompositeIdx);
boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) //
&& projectProbeRecord(recordsProcessed, outputRecords);
- if(!success){
- if(outputRecords == 0){
+ if (!success) {
+ if (outputRecords == 0) {
throw new IllegalStateException("Record larger than single batch.");
- }else{
+ } else {
// we've output some records but failed to output this one. return and wait for next call.
return;
}
@@ -264,5 +265,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
@Named("outgoing") RecordBatch outgoing);
public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+
public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
index faca32a97..39bdb9440 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java
@@ -66,31 +66,32 @@ public final class JoinStatus {
this.joinType = output.getJoinType();
}
- private final IterOutcome nextLeft(){
+ private final IterOutcome nextLeft() {
return outputBatch.next(LEFT_INPUT, left);
}
- private final IterOutcome nextRight(){
+ private final IterOutcome nextRight() {
return outputBatch.next(RIGHT_INPUT, right);
}
- public final void ensureInitial(){
- if(!initialSet){
+ public final void ensureInitial() {
+ if(!initialSet) {
this.lastLeft = nextLeft();
this.lastRight = nextRight();
initialSet = true;
}
}
- public final void advanceLeft(){
+ public final void advanceLeft() {
leftPosition++;
}
- public final void advanceRight(){
- if (rightSourceMode == RightSourceMode.INCOMING)
+ public final void advanceRight() {
+ if (rightSourceMode == RightSourceMode.INCOMING) {
rightPosition++;
- else
+ } else {
svRightPosition++;
+ }
}
public final int getLeftPosition() {
@@ -101,7 +102,7 @@ public final class JoinStatus {
return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
}
- public final int getRightCount(){
+ public final int getRightCount() {
return right.getRecordCount();
}
@@ -153,9 +154,10 @@ public final class JoinStatus {
* Check if the left record position can advance by one.
* Side effect: advances to next left batch if current left batch size is exceeded.
*/
- public final boolean isLeftPositionAllowed(){
- if (lastLeft == IterOutcome.NONE)
+ public final boolean isLeftPositionAllowed() {
+ if (lastLeft == IterOutcome.NONE) {
return false;
+ }
if (!isLeftPositionInCurrentBatch()) {
leftPosition = 0;
releaseData(left);
@@ -170,11 +172,13 @@ public final class JoinStatus {
* Check if the right record position can advance by one.
* Side effect: advances to next right batch if current right batch size is exceeded
*/
- public final boolean isRightPositionAllowed(){
- if (rightSourceMode == RightSourceMode.SV4)
+ public final boolean isRightPositionAllowed() {
+ if (rightSourceMode == RightSourceMode.SV4) {
return svRightPosition < sv4.getCount();
- if (lastRight == IterOutcome.NONE)
+ }
+ if (lastRight == IterOutcome.NONE) {
return false;
+ }
if (!isRightPositionInCurrentBatch()) {
rightPosition = 0;
releaseData(right);
@@ -185,11 +189,13 @@ public final class JoinStatus {
return true;
}
- private void releaseData(RecordBatch b){
- for(VectorWrapper<?> v : b){
+ private void releaseData(RecordBatch b) {
+ for (VectorWrapper<?> v : b) {
v.clear();
}
- if(b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) b.getSelectionVector2().clear();
+ if (b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
+ b.getSelectionVector2().clear();
+ }
}
/**
@@ -220,29 +226,34 @@ public final class JoinStatus {
return rightPosition + 1 < right.getRecordCount();
}
- public JoinOutcome getOutcome(){
- if (!ok)
+ public JoinOutcome getOutcome() {
+ if (!ok) {
return JoinOutcome.FAILURE;
+ }
if (bothMatches(IterOutcome.NONE) ||
(joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) ||
(joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) ||
- (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE))
+ (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) {
return JoinOutcome.NO_MORE_DATA;
+ }
if (bothMatches(IterOutcome.OK) ||
- (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK)))
+ (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) {
return JoinOutcome.BATCH_RETURNED;
- if (eitherMatches(IterOutcome.OK_NEW_SCHEMA))
+ }
+ if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) {
return JoinOutcome.SCHEMA_CHANGED;
- if (eitherMatches(IterOutcome.NOT_YET))
+ }
+ if (eitherMatches(IterOutcome.NOT_YET)) {
return JoinOutcome.WAITING;
+ }
return JoinOutcome.FAILURE;
}
- private boolean bothMatches(IterOutcome outcome){
+ private boolean bothMatches(IterOutcome outcome) {
return lastLeft == outcome && lastRight == outcome;
}
- private boolean eitherMatches(IterOutcome outcome){
+ private boolean eitherMatches(IterOutcome outcome) {
return lastLeft == outcome || lastRight == outcome;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
index bb3b9ac6d..c1dffc107 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java
@@ -94,8 +94,9 @@ public abstract class JoinTemplate implements JoinWorker {
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
// we've hit the end of the right record batch; copy any remaining values from the left batch
while (status.isLeftPositionAllowed()) {
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
return false;
+ }
status.incOutputPos();
status.advanceLeft();
@@ -103,8 +104,9 @@ public abstract class JoinTemplate implements JoinWorker {
}
return true;
}
- if (!status.isLeftPositionAllowed())
+ if (!status.isLeftPositionAllowed()) {
return true;
+ }
int comparison = doCompare(status.getLeftPosition(), status.getRightPosition());
switch (comparison) {
@@ -112,8 +114,9 @@ public abstract class JoinTemplate implements JoinWorker {
case -1:
// left key < right key
if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) {
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
return false;
+ }
status.incOutputPos();
}
status.advanceLeft();
@@ -125,25 +128,27 @@ public abstract class JoinTemplate implements JoinWorker {
// check for repeating values on the left side
if (!status.isLeftRepeating() &&
status.isNextLeftPositionInCurrentBatch() &&
- doCompareNextLeftKey(status.getLeftPosition()) == 0)
+ doCompareNextLeftKey(status.getLeftPosition()) == 0) {
// subsequent record(s) in the left batch have the same key
status.notifyLeftRepeating();
-
- else if (status.isLeftRepeating() &&
+ } else if (status.isLeftRepeating() &&
status.isNextLeftPositionInCurrentBatch() &&
- doCompareNextLeftKey(status.getLeftPosition()) != 0)
+ doCompareNextLeftKey(status.getLeftPosition()) != 0) {
// this record marks the end of repeated keys
status.notifyLeftStoppedRepeating();
+ }
boolean crossedBatchBoundaries = false;
int initialRightPosition = status.getRightPosition();
do {
// copy all equal right keys to the output record batch
- if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition()))
+ if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) {
return false;
+ }
- if (!doCopyRight(status.getRightPosition(), status.getOutPosition()))
+ if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) {
return false;
+ }
status.incOutputPos();
@@ -159,9 +164,10 @@ public abstract class JoinTemplate implements JoinWorker {
} while ((!status.isLeftRepeating() || status.isRightPositionInCurrentBatch()) && status.isRightPositionAllowed() && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0);
if (status.getRightPosition() > initialRightPosition &&
- (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch()))
+ (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch())) {
// more than one matching result from right table; reset position in case of subsequent left match
status.setRightPosition(initialRightPosition);
+ }
status.advanceLeft();
if (status.isLeftRepeating() && doCompareNextLeftKey(status.getLeftPosition()) != 0) {
@@ -233,5 +239,4 @@ public abstract class JoinTemplate implements JoinWorker {
*/
protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex);
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index b24b5348a..1d4e353c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -144,19 +144,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
status.ensureInitial();
// loop so we can start over again if we find a new batch was created.
- while(true){
+ while (true) {
JoinOutcome outcome = status.getOutcome();
// if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch.
if (outcome == JoinOutcome.BATCH_RETURNED ||
- outcome == JoinOutcome.SCHEMA_CHANGED)
+ outcome == JoinOutcome.SCHEMA_CHANGED) {
allocateBatch();
+ }
// reset the output position to zero after our parent iterates this RecordBatch
if (outcome == JoinOutcome.BATCH_RETURNED ||
outcome == JoinOutcome.SCHEMA_CHANGED ||
- outcome == JoinOutcome.NO_MORE_DATA)
+ outcome == JoinOutcome.NO_MORE_DATA) {
status.resetOutputPos();
+ }
if (outcome == JoinOutcome.NO_MORE_DATA) {
logger.debug("NO MORE DATA; returning {} NONE");
@@ -164,7 +166,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
boolean first = false;
- if(worker == null){
+ if (worker == null) {
try {
logger.debug("Creating New Worker");
stats.startSetup();
@@ -180,11 +182,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
// join until we have a complete outgoing batch
- if (!worker.doJoin(status))
+ if (!worker.doJoin(status)) {
worker = null;
+ }
// get the outcome of the join.
- switch(status.getOutcome()){
+ switch (status.getOutcome()) {
case BATCH_RETURNED:
// only return new schema if new worker has been setup.
logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK"));
@@ -200,7 +203,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE);
case SCHEMA_CHANGED:
worker = null;
- if(status.getOutPosition() > 0){
+ if (status.getOutPosition() > 0) {
// if we have current data, let's return that.
logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK"));
setRecordCountInContainer();
@@ -218,7 +221,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
private void setRecordCountInContainer() {
- for(VectorWrapper vw : container){
+ for (VectorWrapper vw : container) {
Preconditions.checkArgument(!vw.isHyper());
vw.getValueVector().getMutator().setValueCount(getRecordCount());
}
@@ -257,9 +260,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
// materialize value vector readers from join expression
final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry());
- if (collector.hasErrors())
+ if (collector.hasErrors()) {
throw new ClassTransformationException(String.format(
"Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString()));
+ }
// generate compareNextLeftKey()
////////////////////////////////
@@ -475,9 +479,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
} else {
materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT));
}
- if (collector.hasErrors())
+ if (collector.hasErrors()) {
throw new ClassTransformationException(String.format(
"Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString()));
+ }
LogicalExpression materializedRightExpr;
if (worker == null || status.isRightPositionAllowed()) {
@@ -485,9 +490,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
} else {
materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT));
}
- if (collector.hasErrors())
+ if (collector.hasErrors()) {
throw new ClassTransformationException(String.format(
"Failure while trying to materialize incoming right field. Errors:\n %s.", collector.toErrorString()));
+ }
// generate compare()
////////////////////////
@@ -519,4 +525,5 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
//Pass the equality check for all the join conditions. Finally, return 0.
cg.getEvalBlock()._return(JExpr.lit(0));
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
index 904d38cea..1187bd6da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -50,15 +50,24 @@ public class MergeJoinBatchBuilder {
}
public boolean add(RecordBatch batch) {
- if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE)
+ if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) {
throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch.");
- if (batch.getRecordCount() == 0) return true; // skip over empty record batches.
+ }
+ if (batch.getRecordCount() == 0) {
+ return true; // skip over empty record batches.
+ }
// resource checks
long batchBytes = getSize(batch);
- if (batchBytes + runningBytes > Integer.MAX_VALUE) return false; // TODO: 2GB is arbitrary
- if (runningBatches++ >= Character.MAX_VALUE) return false; // allowed in batch.
- if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
+ if (batchBytes + runningBytes > Integer.MAX_VALUE) {
+ return false; // TODO: 2GB is arbitrary
+ }
+ if (runningBatches++ >= Character.MAX_VALUE) {
+ return false; // allowed in batch.
+ }
+ if (!svAllocator.preAllocate(batch.getRecordCount()*4)) {
+ return false; // sv allocation available.
+ }
// transfer VVs to a new RecordBatchData
RecordBatchData bd = new RecordBatchData(batch);
@@ -68,9 +77,9 @@ public class MergeJoinBatchBuilder {
return true;
}
- private long getSize(RecordBatch batch){
+ private long getSize(RecordBatch batch) {
long bytes = 0;
- for(VectorWrapper<?> v : batch){
+ for (VectorWrapper<?> v : batch) {
bytes += v.getValueVector().getBufferSize();
}
return bytes;
@@ -78,18 +87,20 @@ public class MergeJoinBatchBuilder {
public void build() throws SchemaChangeException {
container.clear();
- if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ if (queuedRightBatches.size() > Character.MAX_VALUE) {
+ throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ }
status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
BatchSchema schema = queuedRightBatches.keySet().iterator().next();
List<RecordBatchData> data = queuedRightBatches.get(schema);
// now we're going to generate the sv4 pointers
- switch(schema.getSelectionVectorMode()){
+ switch (schema.getSelectionVectorMode()) {
case NONE: {
int index = 0;
int recordBatchId = 0;
- for(RecordBatchData d : data){
- for(int i =0; i < d.getRecordCount(); i++, index++){
+ for (RecordBatchData d : data) {
+ for (int i =0; i < d.getRecordCount(); i++, index++) {
status.sv4.set(index, recordBatchId, i);
}
recordBatchId++;
@@ -99,8 +110,8 @@ public class MergeJoinBatchBuilder {
case TWO_BYTE: {
int index = 0;
int recordBatchId = 0;
- for(RecordBatchData d : data){
- for(int i =0; i < d.getRecordCount(); i++, index++){
+ for (RecordBatchData d : data) {
+ for (int i =0; i < d.getRecordCount(); i++, index++) {
status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
}
// might as well drop the selection vector since we'll stop using it now.
@@ -121,7 +132,7 @@ public class MergeJoinBatchBuilder {
}
}
- for(MaterializedField f : vectors.keySet()){
+ for (MaterializedField f : vectors.keySet()) {
List<ValueVector> v = vectors.get(f);
container.addHyperList(v);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index cf2e36f9d..29fd80f72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -133,7 +133,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
stats.startWait();
try {
RawFragmentBatch b = provider.getNext();
- if(b != null){
+ if (b != null) {
stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount());
stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
}
@@ -191,7 +191,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
emptyBatch = rawBatch;
}
try {
- while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0);
+ while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
+ ;
+ }
if (rawBatch == null && context.isCancelled()) {
return IterOutcome.STOP;
}
@@ -400,14 +402,17 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
batchOffsets[node.batchId] = 0;
// add front value from batch[x] to priority queue
- if (batchLoaders[node.batchId].getRecordCount() != 0)
+ if (batchLoaders[node.batchId].getRecordCount() != 0) {
pqueue.add(new Node(node.batchId, 0));
+ }
} else {
pqueue.add(new Node(node.batchId, node.valueIndex + 1));
}
- if (prevBatchWasFull) break;
+ if (prevBatchWasFull) {
+ break;
+ }
}
// set the value counts in the outgoing vectors
@@ -589,11 +594,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
g.setMappingSet(MAIN_MAPPING);
- for(Ordering od : popConfig.getOrderings()){
+ for (Ordering od : popConfig.getOrderings()) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(LEFT_MAPPING);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(RIGHT_MAPPING);
@@ -605,9 +612,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
- }else{
+ } else {
jc._then()._return(out.getValue().minus());
}
}
@@ -648,7 +655,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public void cleanup() {
outgoingContainer.clear();
if (batchLoaders != null) {
- for(RecordBatchLoader rbl : batchLoaders){
+ for (RecordBatchLoader rbl : batchLoaders) {
if (rbl != null) {
rbl.clear();
}
@@ -662,4 +669,4 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
-} \ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 45f32cff4..aecf3636d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -189,8 +189,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
builder.add(incoming);
recordsSampled += incoming.getRecordCount();
- if (upstream == IterOutcome.NONE)
+ if (upstream == IterOutcome.NONE) {
break;
+ }
}
VectorContainer sortedSamples = new VectorContainer();
builder.build(context, sortedSamples);
@@ -258,7 +259,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
try {
- if (!saveSamples()){
+ if (!saveSamples()) {
return false;
}
@@ -277,16 +278,17 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Wait until sufficient number of fragments have submitted samples, or proceed after xx ms passed
// TODO: this should be polling.
- if (val < fragmentsBeforeProceed)
+ if (val < fragmentsBeforeProceed) {
Thread.sleep(10);
+ }
for (int i = 0; i < 100 && finalTable == null; i++) {
finalTable = tableMap.get(finalTableKey);
- if (finalTable != null){
+ if (finalTable != null) {
break;
}
Thread.sleep(10);
}
- if (finalTable == null){
+ if (finalTable == null) {
buildTable();
}
finalTable = tableMap.get(finalTableKey);
@@ -429,8 +431,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are
// done
- if (upstreamNone && (batchQueue == null || batchQueue.size() == 0))
+ if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) {
return IterOutcome.NONE;
+ }
// if there are batches on the queue, process them first, rather than calling incoming.next()
if (batchQueue != null && batchQueue.size() > 0) {
@@ -461,7 +464,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// If this is the first iteration, we need to generate the partition vectors before we can proceed
if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
- if (!getPartitionVectors()){
+ if (!getPartitionVectors()) {
cleanup();
return IterOutcome.STOP;
}
@@ -490,8 +493,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema.
if (this.startedUnsampledBatches == false) {
this.startedUnsampledBatches = true;
- if (upstream == IterOutcome.OK)
+ if (upstream == IterOutcome.OK) {
upstream = IterOutcome.OK_NEW_SCHEMA;
+ }
}
switch (upstream) {
case NONE:
@@ -560,8 +564,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
int count = 0;
for (Ordering od : popConfig.getOrderings()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
- if (collector.hasErrors())
+ if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
cg.setMappingSet(incomingMapping);
ClassGenerator.HoldingContainer left = cg.addExpr(expr, false);
cg.setMappingSet(partitionMapping);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 051a590f2..7f3a96637 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -120,7 +120,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
@Override
public void run() {
try {
- if (stop) return;
+ if (stop) {
+ return;
+ }
outer:
while (true) {
IterOutcome upstream = incoming.next();
@@ -208,4 +210,5 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
this.failed = failed;
}
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index ec29cac55..a1a834052 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -195,55 +195,64 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private boolean doAlloc() {
//Allocate vv in the allocationVectors.
- for(ValueVector v : this.allocationVectors){
+ for (ValueVector v : this.allocationVectors) {
//AllocationHelper.allocate(v, remainingRecordCount, 250);
- if (!v.allocateNewSafe())
+ if (!v.allocateNewSafe()) {
return false;
+ }
}
//Allocate vv for complexWriters.
- if (complexWriters == null)
+ if (complexWriters == null) {
return true;
+ }
- for (ComplexWriter writer : complexWriters)
+ for (ComplexWriter writer : complexWriters) {
writer.allocate();
+ }
return true;
}
private void setValueCount(int count) {
- for(ValueVector v : allocationVectors){
+ for (ValueVector v : allocationVectors) {
ValueVector.Mutator m = v.getMutator();
m.setValueCount(count);
}
- if (complexWriters == null)
+ if (complexWriters == null) {
return;
+ }
- for (ComplexWriter writer : complexWriters)
+ for (ComplexWriter writer : complexWriters) {
writer.setValueCount(count);
+ }
}
/** hack to make ref and full work together... need to figure out if this is still necessary. **/
- private FieldReference getRef(NamedExpression e){
+ private FieldReference getRef(NamedExpression e) {
FieldReference ref = e.getRef();
PathSegment seg = ref.getRootSegment();
-// if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){
+// if (seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())) {
// return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition());
// }
return ref;
}
- private boolean isAnyWildcard(List<NamedExpression> exprs){
- for(NamedExpression e : exprs){
- if(isWildcard(e)) return true;
+ private boolean isAnyWildcard(List<NamedExpression> exprs) {
+ for (NamedExpression e : exprs) {
+ if (isWildcard(e)) {
+ return true;
+ }
}
return false;
}
- private boolean isWildcard(NamedExpression ex){
- if( !(ex.getExpr() instanceof SchemaPath)) return false;
+ private boolean isWildcard(NamedExpression ex) {
+ if ( !(ex.getExpr() instanceof SchemaPath)) {
+ return false;
+ }
NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
NameSegment ref = ex.getRef().getRootSegment();
return ref.getPath().equals("*") && expr.getPath().equals("*");
@@ -266,7 +275,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
ClassifierResult result = new ClassifierResult();
boolean classify = isClassificationNeeded(exprs);
- for(int i = 0; i < exprs.size(); i++){
+ for (int i = 0; i < exprs.size(); i++) {
final NamedExpression namedExpression = exprs.get(i);
result.clear();
@@ -278,14 +287,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
Integer value = result.prefixMap.get(result.prefix);
if (value != null && value.intValue() == 1) {
int k = 0;
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
SchemaPath originalPath = vvIn.getField().getPath();
if (k > result.outputNames.size()-1) {
assert false;
}
String name = result.outputNames.get(k++); // get the renamed column names
- if (name == EMPTY_STRING) continue;
+ if (name == EMPTY_STRING) {
+ continue;
+ }
FieldReference ref = new FieldReference(name);
TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
transfers.add(tp);
@@ -293,17 +304,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
} else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors
int k = 0;
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
SchemaPath originalPath = vvIn.getField().getPath();
if (k > result.outputNames.size()-1) {
assert false;
}
String name = result.outputNames.get(k++); // get the renamed column names
- if (name == EMPTY_STRING) continue;
+ if (name == EMPTY_STRING) {
+ continue;
+ }
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() );
- if(collector.hasErrors()){
+ if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
@@ -333,16 +346,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
- if(collector.hasErrors()){
+ if (collector.hasErrors()) {
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
// add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
- if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+ if (expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
&& !((ValueVectorReadExpression) expr).hasReadPath()
&& !isAnyWildcard
- && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])
- ) {
+ && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) {
ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
TypedFieldId id = vectorRead.getFieldId();
@@ -358,8 +370,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) {
// Need to process ComplexWriter function evaluation.
// Lazy initialization of the list of complex writers, if not done yet.
- if (complexWriters == null)
+ if (complexWriters == null) {
complexWriters = Lists.newArrayList();
+ }
// The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
@@ -419,9 +432,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private boolean isClassificationNeeded(List<NamedExpression> exprs) {
boolean needed = false;
- for(int i = 0; i < exprs.size(); i++){
+ for (int i = 0; i < exprs.size(); i++) {
final NamedExpression ex = exprs.get(i);
- if (!(ex.getExpr() instanceof SchemaPath)) continue;
+ if (!(ex.getExpr() instanceof SchemaPath)) {
+ continue;
+ }
NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
NameSegment ref = ex.getRef().getRootSegment();
boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
@@ -530,7 +545,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
result.outputNames.add(EMPTY_STRING); // initialize
}
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
// get the prefix of the name
@@ -586,7 +601,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
result.outputNames.add(EMPTY_STRING); // initialize
}
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
String name = vvIn.getField().getPath().getRootSegment().getPath();
String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
@@ -627,7 +642,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
int k = 0;
- for(VectorWrapper<?> wrapper : incoming) {
+ for (VectorWrapper<?> wrapper : incoming) {
ValueVector vvIn = wrapper.getValueVector();
String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index b36bd92a9..49ad39071 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -39,27 +39,25 @@ public abstract class ProjectorTemplate implements Projector {
private SelectionVector4 vector4;
private SelectionVectorMode svMode;
- public ProjectorTemplate() throws SchemaChangeException{
+ public ProjectorTemplate() throws SchemaChangeException {
}
@Override
public final int projectRecords(int startIndex, final int recordCount, int firstOutputIndex) {
- switch(svMode){
+ switch (svMode) {
case FOUR_BYTE:
throw new UnsupportedOperationException();
-
case TWO_BYTE:
final int count = recordCount;
- for(int i = 0; i < count; i++, firstOutputIndex++){
- if (!doEval(vector2.getIndex(i), firstOutputIndex))
+ for (int i = 0; i < count; i++, firstOutputIndex++) {
+ if (!doEval(vector2.getIndex(i), firstOutputIndex)) {
return i;
+ }
}
return recordCount;
-
case NONE:
-
final int countN = recordCount;
int i;
for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) {
@@ -68,18 +66,16 @@ public abstract class ProjectorTemplate implements Projector {
}
}
if (i < startIndex + recordCount || startIndex > 0) {
- for(TransferPair t : transfers){
+ for (TransferPair t : transfers) {
t.splitAndTransfer(startIndex, i - startIndex);
}
return i - startIndex;
}
- for(TransferPair t : transfers){
+ for (TransferPair t : transfers) {
t.transfer();
}
return recordCount;
-
-
default:
throw new UnsupportedOperationException();
}
@@ -89,7 +85,7 @@ public abstract class ProjectorTemplate implements Projector {
public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{
this.svMode = incoming.getSchema().getSelectionVectorMode();
- switch(svMode){
+ switch (svMode) {
case FOUR_BYTE:
this.vector4 = incoming.getSelectionVector4();
break;
@@ -104,8 +100,4 @@ public abstract class ProjectorTemplate implements Projector {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
-
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index 8116869c4..419dc8587 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -40,7 +40,7 @@ public class RecordBatchData {
private int recordCount;
VectorContainer container = new VectorContainer();
- public RecordBatchData(VectorAccessible batch){
+ public RecordBatchData(VectorAccessible batch) {
List<ValueVector> vectors = Lists.newArrayList();
if (batch instanceof RecordBatch && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
this.sv2 = ((RecordBatch)batch).getSelectionVector2().clone();
@@ -48,8 +48,10 @@ public class RecordBatchData {
this.sv2 = null;
}
- for(VectorWrapper<?> v : batch){
- if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
+ for (VectorWrapper<?> v : batch) {
+ if (v.isHyper()) {
+ throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
+ }
TransferPair tp = v.getValueVector().getTransferPair();
tp.transfer();
vectors.add(tp.getTo());
@@ -67,9 +69,10 @@ public class RecordBatchData {
container.buildSchema(mode);
}
- public int getRecordCount(){
+ public int getRecordCount() {
return recordCount;
}
+
public List<ValueVector> getVectors() {
List<ValueVector> vectors = Lists.newArrayList();
for (VectorWrapper w : container) {
@@ -91,4 +94,5 @@ public class RecordBatchData {
public VectorContainer getContainer() {
return container;
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 3a374910c..19f542302 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -82,8 +82,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return builder.getSv4();
}
-
-
@Override
public void cleanup() {
builder.clear();
@@ -93,15 +91,14 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
@Override
public IterOutcome innerNext() {
- if(schema != null){
- if(getSelectionVector4().next()){
+ if (schema != null) {
+ if (getSelectionVector4().next()) {
return IterOutcome.OK;
- }else{
+ } else {
return IterOutcome.NONE;
}
}
-
try{
outer: while (true) {
IterOutcome upstream = incoming.next();
@@ -114,13 +111,15 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
- if(!incoming.getSchema().equals(schema)){
- if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ if (!incoming.getSchema().equals(schema)) {
+ if (schema != null) {
+ throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ }
this.schema = incoming.getSchema();
}
// fall through.
case OK:
- if(!builder.add(incoming)){
+ if (!builder.add(incoming)) {
throw new UnsupportedOperationException("Sort doesn't currently support doing an external sort.");
};
break;
@@ -129,7 +128,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
}
}
- if (schema == null || builder.isEmpty()){
+ if (schema == null || builder.isEmpty()) {
// builder may be null at this point if the first incoming batch is empty
return IterOutcome.NONE;
}
@@ -141,7 +140,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return IterOutcome.OK_NEW_SCHEMA;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
@@ -167,11 +166,13 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
ClassGenerator<Sorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(Ordering od : orderings){
+ for(Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(leftMapping);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(rightMapping);
@@ -183,7 +184,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
@@ -193,8 +194,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
g.getEvalBlock()._return(JExpr.lit(0));
return context.getImplementationClass(cg);
-
-
}
@Override
@@ -207,7 +206,4 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
incoming.kill(sendUpstream);
}
-
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 80b4ef664..707c41c0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -49,14 +49,14 @@ public class SortRecordBatchBuilder {
private SelectionVector4 sv4;
final PreAllocator svAllocator;
- public SortRecordBatchBuilder(BufferAllocator a, long maxBytes){
+ public SortRecordBatchBuilder(BufferAllocator a, long maxBytes) {
this.maxBytes = maxBytes;
this.svAllocator = a.getNewPreAllocator();
}
- private long getSize(VectorAccessible batch){
+ private long getSize(VectorAccessible batch) {
long bytes = 0;
- for(VectorWrapper<?> v : batch){
+ for (VectorWrapper<?> v : batch) {
bytes += v.getValueVector().getBufferSize();
}
return bytes;
@@ -68,8 +68,10 @@ public class SortRecordBatchBuilder {
* @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages.
* @throws SchemaChangeException
*/
- public boolean add(VectorAccessible batch){
- if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+ public boolean add(VectorAccessible batch) {
+ if (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) {
+ throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
+ }
if (batch.getRecordCount() == 0 && batches.size() > 0) {
return true; // skip over empty record batches.
}
@@ -78,9 +80,15 @@ public class SortRecordBatchBuilder {
if (batchBytes == 0 && batches.size() > 0) {
return true;
}
- if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
- if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch.
- if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
+ if (batchBytes + runningBytes > maxBytes) {
+ return false; // enough data memory.
+ }
+ if (runningBatches+1 > Character.MAX_VALUE) {
+ return false; // allowed in batch.
+ }
+ if (!svAllocator.preAllocate(batch.getRecordCount()*4)) {
+ return false; // sv allocation available.
+ }
RecordBatchData bd = new RecordBatchData(batch);
@@ -126,15 +134,19 @@ public class SortRecordBatchBuilder {
}
}
- public boolean isEmpty(){
+ public boolean isEmpty() {
return batches.isEmpty();
}
public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException{
outputContainer.clear();
- if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema.");
- if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
- if(batches.keys().size() < 1){
+ if (batches.keySet().size() > 1) {
+ throw new SchemaChangeException("Sort currently only supports a single schema.");
+ }
+ if (batches.size() > Character.MAX_VALUE) {
+ throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
+ }
+ if (batches.keys().size() < 1) {
assert false : "Invalid to have an empty set of batches with no schemas.";
}
sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
@@ -142,12 +154,12 @@ public class SortRecordBatchBuilder {
List<RecordBatchData> data = batches.get(schema);
// now we're going to generate the sv4 pointers
- switch(schema.getSelectionVectorMode()){
+ switch (schema.getSelectionVectorMode()) {
case NONE: {
int index = 0;
int recordBatchId = 0;
- for(RecordBatchData d : data){
- for(int i =0; i < d.getRecordCount(); i++, index++){
+ for (RecordBatchData d : data) {
+ for (int i =0; i < d.getRecordCount(); i++, index++) {
sv4.set(index, recordBatchId, i);
}
recordBatchId++;
@@ -157,8 +169,8 @@ public class SortRecordBatchBuilder {
case TWO_BYTE: {
int index = 0;
int recordBatchId = 0;
- for(RecordBatchData d : data){
- for(int i =0; i < d.getRecordCount(); i++, index++){
+ for (RecordBatchData d : data) {
+ for (int i =0; i < d.getRecordCount(); i++, index++) {
sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
}
// might as well drop the selection vector since we'll stop using it now.
@@ -173,13 +185,13 @@ public class SortRecordBatchBuilder {
// next, we'll create lists of each of the vector types.
ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create();
- for(RecordBatchData rbd : batches.values()){
- for(ValueVector v : rbd.getVectors()){
+ for (RecordBatchData rbd : batches.values()) {
+ for (ValueVector v : rbd.getVectors()) {
vectors.put(v.getField(), v);
}
}
- for(MaterializedField f : schema){
+ for (MaterializedField f : schema) {
List<ValueVector> v = vectors.get(f);
outputContainer.addHyperList(v, false);
}
@@ -191,11 +203,13 @@ public class SortRecordBatchBuilder {
return sv4;
}
- public void clear(){
- for(RecordBatchData d : batches.values()){
+ public void clear() {
+ for (RecordBatchData d : batches.values()) {
d.container.clear();
}
- if(sv4 != null) sv4.clear();
+ if (sv4 != null) {
+ sv4.clear();
+ }
}
public List<VectorContainer> getHeldRecordBatches() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 609cb29bb..6d909623a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -88,10 +88,11 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
@Override
public int getRecordCount() {
- if (sv == null)
+ if (sv == null) {
return incoming.getRecordCount();
- else
+ } else {
return sv.getCount();
+ }
}
/**
@@ -125,8 +126,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
@Override
protected void setupNewSchema() throws SchemaChangeException {
/* Trace operator does not deal with hyper vectors yet */
- if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) {
throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+ }
/*
* we have a new schema, clear our existing container to load the new value vectors
@@ -152,8 +154,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
@Override
public void cleanup() {
/* Release the selection vector */
- if (sv != null)
+ if (sv != null) {
sv.clear();
+ }
/* Close the file descriptors */
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 0e69bcf71..171d12c9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -111,7 +111,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
@Override
public IterOutcome next() {
- if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
+ if (state == IterOutcome.NONE ) {
+ throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
+ }
state = incoming.next();
if (first && state == IterOutcome.NONE) {
throw new IllegalStateException("The incoming iterator returned a state of NONE on the first batch. There should always be at least one batch output before returning NONE");
@@ -119,14 +121,16 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
if (first && state == IterOutcome.OK) {
throw new IllegalStateException("The incoming iterator returned a state of OK on the first batch. There should always be a new schema on the first batch. Incoming: " + incoming.getClass().getName());
}
- if (first) first = !first;
+ if (first) {
+ first = !first;
+ }
- if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
+ if (state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
BatchSchema schema = incoming.getSchema();
- if(schema.getFieldCount() == 0){
+ if (schema.getFieldCount() == 0) {
throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed.");
}
- if(incoming.getRecordCount() > MAX_BATCH_SIZE){
+ if (incoming.getRecordCount() > MAX_BATCH_SIZE) {
throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE));
}
@@ -157,4 +161,5 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
public VectorContainer getOutgoingContainer() {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
index 428f335a5..2f7f531e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java
@@ -36,10 +36,11 @@ public class IteratorValidatorInjector extends
IteratorValidatorInjector inject = new IteratorValidatorInjector();
PhysicalOperator newOp = root.accept(inject, context);
- if( !(newOp instanceof FragmentRoot) ) throw new IllegalStateException("This shouldn't happen.");
+ if ( !(newOp instanceof FragmentRoot) ) {
+ throw new IllegalStateException("This shouldn't happen.");
+ }
return (FragmentRoot) newOp;
-
}
/**
@@ -67,12 +68,11 @@ public class IteratorValidatorInjector extends
}
/* Inject trace operator */
- if (newChildren.size() > 0){
+ if (newChildren.size() > 0) {
newOp = op.getNewWithChildren(newChildren);
newOp.setOperatorId(op.getOperatorId());
}
-
return newOp;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 237007046..9359ea188 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -143,14 +143,24 @@ public class BatchGroup implements VectorAccessible {
}
public void cleanup() throws IOException {
- if (sv2 != null) sv2.clear();
- if (outputStream != null) outputStream.close();
- if (inputStream != null) inputStream.close();
- if (fs != null && fs.exists(path)) fs.delete(path, false);
+ if (sv2 != null) {
+ sv2.clear();
+ }
+ if (outputStream != null) {
+ outputStream.close();
+ }
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ if (fs != null && fs.exists(path)) {
+ fs.delete(path, false);
+ }
}
public void closeOutputStream() throws IOException {
- if (outputStream != null) outputStream.close();
+ if (outputStream != null) {
+ outputStream.close();
+ }
}
@Override
@@ -181,4 +191,5 @@ public class BatchGroup implements VectorAccessible {
public Iterator<VectorWrapper<?>> iterator() {
return currentContainer.iterator();
}
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 505f56745..52249e9e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -192,12 +192,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
@Override
public IterOutcome innerNext() {
- if(schema != null){
+ if (schema != null) {
if (spillCount == 0) {
- if(schema != null){
- if(getSelectionVector4().next()){
+ if (schema != null) {
+ if (getSelectionVector4().next()) {
return IterOutcome.OK;
- }else{
+ } else {
return IterOutcome.NONE;
}
}
@@ -206,12 +206,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
w.start();
// int count = selector.next();
int count = copier.next(targetRecordCount);
- if(count > 0){
+ if (count > 0) {
long t = w.elapsed(TimeUnit.MICROSECONDS);
logger.debug("Took {} us to merge {} records", t, count);
container.setRecordCount(count);
return IterOutcome.OK;
- }else{
+ } else {
logger.debug("copier returned 0 records");
return IterOutcome.NONE;
}
@@ -236,8 +236,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
- if(!incoming.getSchema().equals(schema)){
- if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ if (!incoming.getSchema().equals(schema)) {
+ if (schema != null) {
+ throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
+ }
this.schema = incoming.getSchema();
this.sorter = createNewSorter(context, incoming);
}
@@ -249,7 +251,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
break;
}
- if (first) first = false;
+ if (first) {
+ first = false;
+ }
totalSizeInMemory += getBufferSize(incoming);
SelectionVector2 sv2;
if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
@@ -291,7 +295,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
break;
case OUT_OF_MEMORY:
highWaterMark = totalSizeInMemory;
- if (batchesSinceLastSpill > 2) mergeAndSpill();
+ if (batchesSinceLastSpill > 2) {
+ mergeAndSpill();
+ }
batchesSinceLastSpill = 0;
break;
default:
@@ -348,7 +354,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return IterOutcome.OK_NEW_SCHEMA;
- }catch(SchemaChangeException | ClassTransformationException | IOException ex){
+ } catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
@@ -502,11 +508,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
ClassGenerator<MSorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
- for(Ordering od : orderings){
+ for (Ordering od : orderings) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(leftMapping);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(rightMapping);
@@ -518,7 +526,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
@@ -547,11 +555,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException {
g.setMappingSet(MAIN_MAPPING);
- for(Ordering od : popConfig.getOrderings()){
+ for (Ordering od : popConfig.getOrderings()) {
// first, we rewrite the evaluation stack for each side of the comparison.
ErrorCollector collector = new ErrorCollectorImpl();
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry());
- if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
g.setMappingSet(LEFT_MAPPING);
HoldingContainer left = g.addExpr(expr, false);
g.setMappingSet(RIGHT_MAPPING);
@@ -563,7 +573,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
HoldingContainer out = g.addExpr(fh, false);
JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
- if(od.getDirection() == Direction.ASCENDING){
+ if (od.getDirection() == Direction.ASCENDING) {
jc._then()._return(out.getValue());
}else{
jc._then()._return(out.getValue().minus());
@@ -590,7 +600,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
List<VectorAllocator> allocators = Lists.newArrayList();
- for(VectorWrapper<?> i : batch){
+ for (VectorWrapper<?> i : batch) {
ValueVector v = TypeHelper.getNewVector(i.getField(), copierAllocator);
outputContainer.add(v);
allocators.add(VectorAllocator.getAllocator(v, 110));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index df79b1acb..3fd744ff8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -84,7 +84,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
while (l < rightStart) {
aux.set(o++, vector4.get(l++));
}
- while (r < rightEnd){
+ while (r < rightEnd) {
aux.set(o++, vector4.get(r++));
}
assert o == outStart + (rightEnd - leftStart);
@@ -97,7 +97,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
}
@Override
- public void sort(VectorContainer container){
+ public void sort(VectorContainer container) {
Stopwatch watch = new Stopwatch();
watch.start();
while (runStarts.size() > 1) {
@@ -109,9 +109,13 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
int left = runStarts.poll();
int right = runStarts.poll();
Integer end = runStarts.peek();
- if (end == null) end = vector4.getTotalCount();
+ if (end == null) {
+ end = vector4.getTotalCount();
+ }
outIndex = merge(left, right, end, outIndex);
- if (outIndex < vector4.getTotalCount()) newRunStarts.add(outIndex);
+ if (outIndex < vector4.getTotalCount()) {
+ newRunStarts.add(outIndex);
+ }
}
if (outIndex < vector4.getTotalCount()) {
copyRun(outIndex, vector4.getTotalCount());