diff options
author | Aditya Kishore <aditya@maprtech.com> | 2014-09-11 10:43:08 -0700 |
---|---|---|
committer | Aditya Kishore <aditya@maprtech.com> | 2014-09-11 19:25:28 -0700 |
commit | 676f5df6b14b10ccc3603360e0efee9c745c5b97 (patch) | |
tree | 592b02f84e8a6da2ace67f8e6c0e46d4237af20b /exec/java-exec/src/main/java/org/apache/drill/exec/physical | |
parent | 7ae257c42b2eb4e1db778dca9ba64e2516078b38 (diff) |
DRILL-1402: Add check-style rules for trailing space, TABs and blocks without braces
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
42 files changed, 723 insertions, 471 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index e54e67c3d..defb4e4af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -33,8 +33,10 @@ public abstract class AbstractBase implements PhysicalOperator{ @Override public void accept(GraphVisitor<PhysicalOperator> visitor) { visitor.enter(this); - if(this.iterator() == null) throw new IllegalArgumentException("Null iterator for pop." + this); - for(PhysicalOperator o : this){ + if (this.iterator() == null) { + throw new IllegalArgumentException("Null iterator for pop." + this); + } + for (PhysicalOperator o : this) { Preconditions.checkNotNull(o, String.format("Null in iterator for pop %s.", this)); o.accept(visitor); } @@ -46,7 +48,7 @@ public abstract class AbstractBase implements PhysicalOperator{ return true; } - public final void setOperatorId(int id){ + public final void setOperatorId(int id) { this.id = id; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 9e7beec47..48b38011f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -90,7 +90,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme @Override public T visitHashAggregate(HashAggregate agg, X value) throws E { - return visitOp(agg, value); + return visitOp(agg, value); } @Override @@ -120,7 +120,7 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme public T visitChildren(PhysicalOperator op, X value) throws E{ - for(PhysicalOperator child : op){ + for (PhysicalOperator child : op) { child.accept(this, value); } return null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java index 5f0648da4..980b413de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java @@ -60,7 +60,9 @@ public class Screen extends AbstractStore { public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException { // we actually don't have to do anything since nothing should have changed. we'll check just check that things // didn't get screwed up. - if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node."); + if (endpoints.size() != 1) { + throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node."); + } DrillbitEndpoint endpoint = endpoints.iterator().next(); // logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint); if (!endpoint.equals(this.endpoint)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java index 26d881dc2..f6e11c479 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java @@ -63,8 +63,9 @@ public class SingleMergeExchange extends AbstractExchange { protected void setupReceivers(List<CoordinationProtos.DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { - if (receiverLocations.size() != 1) + if (receiverLocations.size() != 1) { throw new PhysicalOperatorSetupException("SingleMergeExchange only supports a single receiver endpoint"); + } receiverLocation = receiverLocations.iterator().next(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java index cafdbdd47..bf2b4a150 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java @@ -48,7 +48,9 @@ public class UnionExchange extends AbstractExchange{ @Override protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException { - if(receiverLocations.size() != 1) throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint."); + if (receiverLocations.size() != 1) { + throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint."); + } this.destinationLocation = receiverLocations.iterator().next(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 7f9762415..e25f1c08e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -41,9 +41,9 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo private RootExec root = null; - private ImplCreator(){} + private ImplCreator() {} - private RootExec getRoot(){ + private RootExec getRoot() { return root; } @@ -78,7 +78,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { ImplCreator i = new ImplCreator(); - if(AssertionUtil.isAssertionsEnabled()){ + if (AssertionUtil.isAssertionsEnabled()) { root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root); } @@ -86,9 +86,11 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo watch.start(); root.accept(i, context); logger.debug("Took {} ms to accept", watch.elapsed(TimeUnit.MILLISECONDS)); - if (i.root == null) + if (i.root == null) { throw new ExecutionSetupException( "The provided fragment did not have a root node that correctly created a RootExec value."); + } return i.getRoot(); } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java index 8c768e508..82a9a6364 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OperatorCreatorRegistry.java @@ -42,7 +42,9 @@ public class OperatorCreatorRegistry { public synchronized Object getOperatorCreator(Class<?> operator) throws ExecutionSetupException { Object opCreator = instanceRegistry.get(operator); - if (opCreator != null) return opCreator; + if (opCreator != null) { + return opCreator; + } Constructor<?> c = constructorRegistry.get(operator); if(c == null) { @@ -75,9 +77,9 @@ public class OperatorCreatorRegistry { Type[] args = ((ParameterizedType)iface).getActualTypeArguments(); interfaceFound = true; boolean constructorFound = false; - for(Constructor<?> constructor : operatorClass.getConstructors()){ + for (Constructor<?> constructor : operatorClass.getConstructors()) { Class<?>[] params = constructor.getParameterTypes(); - if(params.length == 0){ + if (params.length == 0) { Constructor<?> old = constructorRegistry.put((Class<?>) args[0], constructor); if (old != null) { throw new RuntimeException( @@ -88,7 +90,7 @@ public class OperatorCreatorRegistry { constructorFound = true; } } - if(!constructorFound){ + if (!constructorFound) { logger.debug("Skipping registration of OperatorCreator {} as it doesn't have a default constructor", operatorClass.getCanonicalName()); } @@ -97,4 +99,5 @@ public class OperatorCreatorRegistry { } } } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index c2a03b9d4..2712e2735 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -83,8 +83,9 @@ public class ScanBatch implements RecordBatch { public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException { this.context = context; this.readers = readers; - if (!readers.hasNext()) + if (!readers.hasNext()) { throw new ExecutionSetupException("A scan batch must contain at least one reader."); + } this.currentReader = readers.next(); this.oContext = new OperatorContext(subScanConfig, context); this.currentReader.setOperatorContext(this.oContext); @@ -121,7 +122,7 @@ public class ScanBatch implements RecordBatch { @Override public void kill(boolean sendUpstream) { - if(currentReader != null){ + if (currentReader != null) { currentReader.cleanup(); } @@ -220,8 +221,8 @@ public class ScanBatch implements RecordBatch { private void addPartitionVectors() throws ExecutionSetupException{ try { - if(partitionVectors != null){ - for(ValueVector v : partitionVectors){ + if (partitionVectors != null) { + for (ValueVector v : partitionVectors) { v.clear(); } } @@ -290,7 +291,9 @@ public class ScanBatch implements RecordBatch { if (v == null || v.getClass() != clazz) { // Field does not exist add it to the map and the output container v = TypeHelper.getNewVector(field, oContext.getAllocator()); - if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); + if (!clazz.isAssignableFrom(v.getClass())) { + throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); + } container.add(v); fieldVectorMap.put(field.key(), v); @@ -342,9 +345,9 @@ public class ScanBatch implements RecordBatch { return WritableBatch.get(this); } - public void cleanup(){ + public void cleanup() { container.clear(); - for(ValueVector v : partitionVectors){ + for (ValueVector v : partitionVectors) { v.clear(); } fieldVectorMap.clear(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 2b7fdf3b6..352deaea6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -79,7 +79,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override public boolean innerNext() { - if(!ok){ + if (!ok) { incoming.kill(false); return false; @@ -93,7 +93,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ out = IterOutcome.NONE; } // logger.debug("Outcome of sender next {}", out); - switch(out){ + switch (out) { case STOP: case NONE: FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(), @@ -158,7 +158,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override public void success(Ack value, ByteBuf buf) { sendCount.decrement(); - if(value.getOk()) return; + if (value.getOk()) { + return; + } logger.error("Downstream fragment was not accepted. Stopping future sends."); // if we didn't get ack ok, we'll need to kill the query. @@ -170,5 +172,4 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 6eede30dc..473e3a3f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -132,10 +132,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { @Override public IterOutcome innerNext() { - if(schema != null){ - if(getSelectionVector4().next()){ + if (schema != null) { + if (getSelectionVector4().next()) { return IterOutcome.OK; - }else{ + } else { return IterOutcome.NONE; } } @@ -156,8 +156,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. - if(!incoming.getSchema().equals(schema)){ - if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + if (!incoming.getSchema().equals(schema)) { + if (schema != null) { + throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + } this.schema = incoming.getSchema(); } // fall through. @@ -181,7 +183,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } } - if (schema == null){ + if (schema == null) { // builder may be null at this point if the first incoming batch is empty return IterOutcome.NONE; } @@ -196,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return IterOutcome.OK_NEW_SCHEMA; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); context.fail(ex); @@ -215,7 +217,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { if (copier == null) { copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch); } else { - for(VectorWrapper<?> i : batch){ + for (VectorWrapper<?> i : batch) { ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator()); newContainer.add(v); @@ -227,7 +229,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { int count = selectionVector4.getCount(); int copiedRecords = copier.copyRecords(0, count); assert copiedRecords == count; - for(VectorWrapper<?> v : newContainer){ + for (VectorWrapper<?> v : newContainer) { ValueVector.Mutator m = v.getValueVector().getMutator(); m.setValueCount(count); } @@ -253,11 +255,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { ClassGenerator<PriorityQueue> g = cg.getRoot(); g.setMappingSet(mainMapping); - for(Ordering od : orderings){ + for (Ordering od : orderings) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(leftMapping); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(rightMapping); @@ -269,9 +273,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); - }else{ + } else { jc._then()._return(out.getValue().minus()); } g.rotateBlock(); @@ -377,5 +381,4 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java index 58dd247e0..92d1882eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java @@ -82,10 +82,12 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra } /* Inject trace operator */ - if (list.size() > 0) - newOp = op.getNewWithChildren(list); - newOp.setOperatorId(op.getOperatorId()); + if (list.size() > 0) { + newOp = op.getNewWithChildren(list); + } + newOp.setOperatorId(op.getOperatorId()); return newOp; } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 99eeed374..8c1a4c07b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -82,8 +82,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { IterOutcome upstream; do { upstream = next(incoming); - if(first && upstream == IterOutcome.OK) + if(first && upstream == IterOutcome.OK) { upstream = IterOutcome.OK_NEW_SCHEMA; + } first = false; switch(upstream) { @@ -91,14 +92,15 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { case NONE: case STOP: cleanup(); - if (upstream == IterOutcome.STOP) + if (upstream == IterOutcome.STOP) { return upstream; + } break; case OK_NEW_SCHEMA: try{ setupNewSchema(); - }catch(Exception ex){ + } catch(Exception ex) { kill(false); logger.error("Failure during query", ex); context.fail(ex); @@ -113,9 +115,9 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { throw new RuntimeException(ex); } - for(VectorWrapper v : incoming) + for(VectorWrapper v : incoming) { v.getValueVector().clear(); - + } break; default: @@ -176,4 +178,5 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { throw new RuntimeException("Failed to close RecordWriter", ex); } } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index e9be2ac99..c5228709d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -82,7 +82,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { @Override public int getRecordCount() { - if(done) return 0; + if (done) { + return 0; + } return aggregator.getOutputCount(); } @@ -102,7 +104,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { case STOP: return outcome; case OK_NEW_SCHEMA: - if (!createAggregator()){ + if (!createAggregator()) { done = true; return IterOutcome.STOP; } @@ -131,10 +133,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); - while(true){ + while (true) { AggOutcome out = aggregator.doWork(); logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); - switch(out){ + switch (out) { case CLEANUP_AND_RETURN: container.zeroVectors(); aggregator.cleanup(); @@ -150,7 +152,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { return aggregator.getOutcome(); case UPDATE_AGGREGATOR: aggregator = null; - if(!createAggregator()){ + if (!createAggregator()) { return IterOutcome.STOP; } continue; @@ -168,23 +170,23 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { */ private boolean createAggregator() { logger.debug("Creating new aggregator."); - try{ + try { stats.startSetup(); this.aggregator = createAggregatorInternal(); return true; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch (SchemaChangeException | ClassTransformationException | IOException ex) { context.fail(ex); container.clear(); incoming.kill(false); return false; - }finally{ + } finally { stats.stopSetup(); } } private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ - CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - ClassGenerator<HashAggregator> cg = top.getRoot(); + CodeGenerator<HashAggregator> top = CodeGenerator.get(HashAggregator.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + ClassGenerator<HashAggregator> cg = top.getRoot(); ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder"); container.clear(); @@ -199,10 +201,12 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { int i; - for(i = 0; i < numGroupByExprs; i++) { + for (i = 0; i < numGroupByExprs; i++) { NamedExpression ne = popConfig.getGroupByExprs()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() ); - if(expr == null) continue; + if (expr == null) { + continue; + } final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); @@ -211,13 +215,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { groupByOutFieldIds[i] = container.add(vv); } - for(i = 0; i < numAggrExprs; i++){ + for (i = 0; i < numAggrExprs; i++) { NamedExpression ne = popConfig.getAggrExprs()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry() ); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } - if(expr == null) continue; + if (expr == null) { + continue; + } final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); @@ -248,7 +256,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { return agg; } - private void setupUpdateAggrValues(ClassGenerator<HashAggregator> cg) { cg.setMappingSet(UpdateAggrValuesMapping); @@ -260,8 +267,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE); } - private void setupGetIndex(ClassGenerator<HashAggregator> cg){ - switch(incoming.getSchema().getSelectionVectorMode()){ + private void setupGetIndex(ClassGenerator<HashAggregator> cg) { + switch (incoming.getSchema().getSelectionVectorMode()) { case FOUR_BYTE: { JVar var = cg.declareClassField("sv4_", cg.getModel()._ref(SelectionVector4.class)); cg.getBlock("doSetup").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4")); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index b6b887415..d25a95266 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -156,7 +156,9 @@ public abstract class HashAggTemplate implements HashAggregator { boolean status = true; for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) { if (outputRecordValues(i, batchOutputCount) ) { - if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: {}", batchOutputCount) ; + if (EXTRA_DEBUG_2) { + logger.debug("Outputting values to output index: {}", batchOutputCount) ; + } batchOutputCount++; outNumRecordsHolder.value++; } else { @@ -270,31 +272,41 @@ public abstract class HashAggTemplate implements HashAggregator { outside: while(true) { // loop through existing records, aggregating the values as necessary. - if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()..."); + if (EXTRA_DEBUG_1) { + logger.debug ("Starting outer loop of doWork()..."); + } for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + if(EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + } boolean success = checkGroupAndAggrValues(currentIndex); assert success : "HashAgg couldn't copy values."; } - if (EXTRA_DEBUG_1) logger.debug("Processed {} records", underlyingIndex); + if (EXTRA_DEBUG_1) { + logger.debug("Processed {} records", underlyingIndex); + } - try{ + try { - while(true){ + while (true) { // Cleanup the previous batch since we are done processing it. for (VectorWrapper<?> v : incoming) { v.getValueVector().clear(); } IterOutcome out = outgoing.next(0, incoming); - if(EXTRA_DEBUG_1) logger.debug("Received IterOutcome of {}", out); - switch(out){ + if (EXTRA_DEBUG_1) { + logger.debug("Received IterOutcome of {}", out); + } + switch (out) { case NOT_YET: this.outcome = out; return AggOutcome.RETURN_OUTCOME; case OK_NEW_SCHEMA: - if(EXTRA_DEBUG_1) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); + if (EXTRA_DEBUG_1) { + logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); + } newSchema = true; this.cleanup(); // TODO: new schema case needs to be handled appropriately @@ -302,14 +314,16 @@ public abstract class HashAggTemplate implements HashAggregator { case OK: resetIndex(); - if(incoming.getRecordCount() == 0){ + if (incoming.getRecordCount() == 0) { continue; } else { boolean success = checkGroupAndAggrValues(currentIndex); assert success : "HashAgg couldn't copy values."; incIndex(); - if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop"); + if (EXTRA_DEBUG_1) { + logger.debug("Continuing outside loop"); + } continue outside; } @@ -343,8 +357,10 @@ public abstract class HashAggTemplate implements HashAggregator { // placeholder... } } - } finally{ - if(first) first = !first; + } finally { + if (first) { + first = !first; + } } } @@ -373,7 +389,7 @@ public abstract class HashAggTemplate implements HashAggregator { } @Override - public void cleanup(){ + public void cleanup() { if (htable != null) { htable.clear(); htable = null; @@ -392,28 +408,28 @@ public abstract class HashAggTemplate implements HashAggregator { } } - private final AggOutcome setOkAndReturn(){ - if(first){ + private final AggOutcome setOkAndReturn() { + if (first) { this.outcome = IterOutcome.OK_NEW_SCHEMA; - }else{ + } else { this.outcome = IterOutcome.OK; } - for(VectorWrapper<?> v : outgoing){ + for (VectorWrapper<?> v : outgoing) { v.getValueVector().getMutator().setValueCount(outputCount); } return AggOutcome.RETURN_OUTCOME; } - private final void incIndex(){ + private final void incIndex() { underlyingIndex++; - if(underlyingIndex >= incoming.getRecordCount()){ + if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } currentIndex = getVectorIndex(underlyingIndex); } - private final void resetIndex(){ + private final void resetIndex() { underlyingIndex = -1; incIndex(); } @@ -422,7 +438,9 @@ public abstract class HashAggTemplate implements HashAggregator { BatchHolder bh = new BatchHolder(); batchHolders.add(bh); - if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + if (EXTRA_DEBUG_1) { + logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + } bh.setup(); } @@ -465,9 +483,9 @@ public abstract class HashAggTemplate implements HashAggregator { outputCount += numOutputRecords; - if(first){ + if (first) { this.outcome = IterOutcome.OK_NEW_SCHEMA; - }else{ + } else { this.outcome = IterOutcome.OK; } @@ -486,14 +504,14 @@ public abstract class HashAggTemplate implements HashAggregator { } else { if (!outputKeysStatus) { logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex); - for(VectorWrapper<?> v : outContainer) { + for (VectorWrapper<?> v : outContainer) { logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity()); } context.fail(new Exception("Failed to output keys for current batch !")); } if (!outputValuesStatus) { logger.debug("Failed to output values for current batch index: {} ", outBatchIndex); - for(VectorWrapper<?> v : outContainer) { + for (VectorWrapper<?> v : outContainer) { logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity()); } context.fail(new Exception("Failed to output values for current batch !")); @@ -557,7 +575,9 @@ public abstract class HashAggTemplate implements HashAggregator { if (putStatus == HashTable.PutStatus.KEY_PRESENT) { - if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash table, updating the aggregate values"); + if (EXTRA_DEBUG_2) { + logger.debug("Group-by key already present in hash table, updating the aggregate values"); + } // debugging //if (holder.value == 100018 || holder.value == 100021) { @@ -566,7 +586,9 @@ public abstract class HashAggTemplate implements HashAggregator { } else if (putStatus == HashTable.PutStatus.KEY_ADDED) { - if (EXTRA_DEBUG_2) logger.debug("Group-by key was added to hash table, inserting new aggregate values") ; + if (EXTRA_DEBUG_2) { + logger.debug("Group-by key was added to hash table, inserting new aggregate values") ; + } // debugging // if (holder.value == 100018 || holder.value == 100021) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 4277f2306..238242bc1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -40,7 +40,7 @@ public interface HashAggregator { public static enum AggOutcome { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR - } + } public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index 3e6def128..e6900605f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -34,8 +34,8 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ private final SelectionVector2 sv2; private final SelectionVector4 sv4; - public InternalBatch(RecordBatch incoming){ - switch(incoming.getSchema().getSelectionVectorMode()){ + public InternalBatch(RecordBatch incoming) { + switch(incoming.getSchema().getSelectionVectorMode()) { case FOUR_BYTE: this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent(); this.sv2 = null; @@ -69,13 +69,17 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ return container.iterator(); } - public void clear(){ - if(sv2 != null) sv2.clear(); - if(sv4 != null) sv4.clear(); + public void clear() { + if (sv2 != null) { + sv2.clear(); + } + if (sv4 != null) { + sv4.clear(); + } container.clear(); } - public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds){ + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int[] fieldIds) { return container.getValueAccessorById(clazz, fieldIds); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 820f7229b..ced51798f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -67,8 +67,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { @Override public int getRecordCount() { - if(done) return 0; - if (aggregator == null) return 0; + if (done) { + return 0; + } + if (aggregator == null) { + return 0; + } return aggregator.getOutputCount(); } @@ -88,7 +92,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { case STOP: return outcome; case OK_NEW_SCHEMA: - if (!createAggregator()){ + if (!createAggregator()) { done = true; return IterOutcome.STOP; } @@ -100,12 +104,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } - while(true){ + while (true) { AggOutcome out = aggregator.doWork(); logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); - switch(out){ + switch (out) { case CLEANUP_AND_RETURN: - if (!first) container.zeroVectors(); + if (!first) { + container.zeroVectors(); + } done = true; // fall through case RETURN_OUTCOME: @@ -122,7 +128,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { case UPDATE_AGGREGATOR: first = false; aggregator = null; - if(!createAggregator()){ + if (!createAggregator()) { return IterOutcome.STOP; } continue; @@ -142,23 +148,20 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { */ private boolean createAggregator() { logger.debug("Creating new aggregator."); - try{ + try { stats.startSetup(); this.aggregator = createAggregatorInternal(); return true; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch (SchemaChangeException | ClassTransformationException | IOException ex) { context.fail(ex); container.clear(); incoming.kill(false); return false; - }finally{ + } finally { stats.stopSetup(); } } - - - private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getFunctionRegistry()); container.clear(); @@ -169,20 +172,24 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { ErrorCollector collector = new ErrorCollectorImpl(); - for(int i =0; i < keyExprs.length; i++){ + for (int i =0; i < keyExprs.length; i++) { NamedExpression ne = popConfig.getKeys()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() ); - if(expr == null) continue; + if (expr == null) { + continue; + } keyExprs[i] = expr; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); keyOutputIds[i] = container.add(vector); } - for(int i =0; i < valueExprs.length; i++){ + for (int i =0; i < valueExprs.length; i++) { NamedExpression ne = popConfig.getExprs()[i]; final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); - if(expr == null) continue; + if (expr == null) { + continue; + } final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); @@ -190,7 +197,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { valueExprs[i] = new ValueVectorWriteExpression(id, expr, true); } - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } setupIsSame(cg, keyExprs); setupIsSameApart(cg, keyExprs); @@ -207,15 +216,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { return agg; } - - private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null); private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME); private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME); - private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){ + private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { cg.setMappingSet(IS_SAME_I1); - for(LogicalExpression expr : keyExprs){ + for (LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. cg.setMappingSet(IS_SAME_I1); HoldingContainer first = cg.addExpr(expr, false); @@ -234,9 +241,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ); private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV); - private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs){ + private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { cg.setMappingSet(ISA_B1); - for(LogicalExpression expr : keyExprs){ + for (LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. cg.setMappingSet(ISA_B1); HoldingContainer first = cg.addExpr(expr, false); @@ -254,9 +261,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup"); private final MappingSet EVAL = new MappingSet("index", "outIndex", "incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE); - private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs){ + private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) { cg.setMappingSet(EVAL); - for(LogicalExpression ex : valueExprs){ + for (LogicalExpression ex : valueExprs) { HoldingContainer hc = cg.addExpr(ex); cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); } @@ -265,9 +272,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null)); - private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){ + private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { cg.setMappingSet(RECORD_KEYS); - for(int i =0; i < keyExprs.length; i++){ + for (int i =0; i < keyExprs.length; i++) { HoldingContainer hc = cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true)); cg.getBlock(BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); } @@ -280,10 +287,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final GeneratorMapping PREVIOUS_KEYS = GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, null); private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", "outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS); - private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs){ + private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { cg.setMappingSet(RECORD_KEYS_PREV); - for(int i =0; i < keyExprs.length; i++){ + for (int i =0; i < keyExprs.length; i++) { // IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same. This is possible because InternalBatch guarantees this. logger.debug("Writing out expr {}", keyExprs[i]); cg.rotateBlock(); @@ -297,8 +304,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { cg.getBlock(BlockType.EVAL)._return(JExpr.TRUE); } - private void getIndex(ClassGenerator<StreamingAggregator> g){ - switch(incoming.getSchema().getSelectionVectorMode()){ + private void getIndex(ClassGenerator<StreamingAggregator> g) { + switch (incoming.getSchema().getSelectionVectorMode()) { case FOUR_BYTE: { JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class)); g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4")); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 53ac1ed4f..c2a5715cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -60,7 +60,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private void allocateOutgoing() { - for(VectorWrapper<?> w : outgoing){ + for (VectorWrapper<?> w : outgoing) { w.getValueVector().allocateNew(); } } @@ -75,7 +75,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return outputCount; } - private AggOutcome tooBigFailure(){ + private AggOutcome tooBigFailure() { context.fail(new Exception(TOO_BIG_ERROR)); this.outcome = IterOutcome.STOP; return AggOutcome.CLEANUP_AND_RETURN; @@ -87,11 +87,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { outcome = IterOutcome.NONE; return AggOutcome.CLEANUP_AND_RETURN; } - try{ // outside loop to ensure that first is set to false after the first run. + try { // outside loop to ensure that first is set to false after the first run. outputCount = 0; // if we're in the first state, allocate outgoing. - if(first){ + if (first) { allocateOutgoing(); } @@ -119,8 +119,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } // pick up a remainder batch if we have one. - if(remainderBatch != null){ - if (!outputToBatch( previousIndex )) return tooBigFailure(); + if (remainderBatch != null) { + if (!outputToBatch( previousIndex )) { + return tooBigFailure(); + } remainderBatch.clear(); remainderBatch = null; return setOkAndReturn(); @@ -131,38 +133,56 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { if (pendingOutput) { allocateOutgoing(); pendingOutput = false; - if(EXTRA_DEBUG) logger.debug("Attempting to output remainder."); - if (!outputToBatch( previousIndex)) return tooBigFailure(); + if (EXTRA_DEBUG) { + logger.debug("Attempting to output remainder."); + } + if (!outputToBatch( previousIndex)) { + return tooBigFailure(); + } } - if(newSchema){ + if (newSchema) { return AggOutcome.UPDATE_AGGREGATOR; } - if(lastOutcome != null){ + if (lastOutcome != null) { outcome = lastOutcome; return AggOutcome.CLEANUP_AND_RETURN; } - outside: while(true){ + outside: while(true) { // loop through existing records, adding as necessary. for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if(EXTRA_DEBUG) logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + if (EXTRA_DEBUG) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); + } if (previousIndex == -1) { - if (EXTRA_DEBUG) logger.debug("Adding the initial row's keys and values."); + if (EXTRA_DEBUG) { + logger.debug("Adding the initial row's keys and values."); + } addRecordInc(currentIndex); } else if (isSame( previousIndex, currentIndex )) { - if(EXTRA_DEBUG) logger.debug("Values were found the same, adding."); + if (EXTRA_DEBUG) { + logger.debug("Values were found the same, adding."); + } addRecordInc(currentIndex); } else { - if(EXTRA_DEBUG) logger.debug("Values were different, outputting previous batch."); + if (EXTRA_DEBUG) { + logger.debug("Values were different, outputting previous batch."); + } if (outputToBatch(previousIndex)) { - if(EXTRA_DEBUG) logger.debug("Output successful."); + if (EXTRA_DEBUG) { + logger.debug("Output successful."); + } addRecordInc(currentIndex); } else { - if(EXTRA_DEBUG) logger.debug("Output failed."); - if(outputCount == 0) return tooBigFailure(); + if (EXTRA_DEBUG) { + logger.debug("Output failed."); + } + if (outputCount == 0) { + return tooBigFailure(); + } // mark the pending output but move forward for the next cycle. pendingOutput = true; @@ -178,23 +198,29 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { InternalBatch previous = null; - try{ - while(true){ + try { + while (true) { if (previous != null) { previous.clear(); } previous = new InternalBatch(incoming); IterOutcome out = outgoing.next(0, incoming); - if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out); - switch(out){ + if (EXTRA_DEBUG) { + logger.debug("Received IterOutcome of {}", out); + } + switch (out) { case NONE: done = true; lastOutcome = out; if (first && addedRecordCount == 0) { return setOkAndReturn(); - } else if(addedRecordCount > 0){ - if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous; - if(EXTRA_DEBUG) logger.debug("Received no more batches, returning."); + } else if(addedRecordCount > 0) { + if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) { + remainderBatch = previous; + } + if (EXTRA_DEBUG) { + logger.debug("Received no more batches, returning."); + } return setOkAndReturn(); }else{ if (first && out == IterOutcome.OK) { @@ -204,17 +230,21 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return AggOutcome.CLEANUP_AND_RETURN; } - - case NOT_YET: this.outcome = out; return AggOutcome.RETURN_OUTCOME; case OK_NEW_SCHEMA: - if(EXTRA_DEBUG) logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); - if(addedRecordCount > 0){ - if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous; - if(EXTRA_DEBUG) logger.debug("Wrote out end of previous batch, returning."); + if (EXTRA_DEBUG) { + logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); + } + if (addedRecordCount > 0) { + if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) { + remainderBatch = previous; + } + if (EXTRA_DEBUG) { + logger.debug("Wrote out end of previous batch, returning."); + } newSchema = true; return setOkAndReturn(); } @@ -222,21 +252,27 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return AggOutcome.UPDATE_AGGREGATOR; case OK: resetIndex(); - if(incoming.getRecordCount() == 0){ + if (incoming.getRecordCount() == 0) { continue; - }else{ - if(previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)){ - if(EXTRA_DEBUG) logger.debug("New value was same as last value of previous batch, adding."); + } else { + if (previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)) { + if (EXTRA_DEBUG) { + logger.debug("New value was same as last value of previous batch, adding."); + } addRecordInc(currentIndex); previousIndex = currentIndex; incIndex(); - if(EXTRA_DEBUG) logger.debug("Continuing outside"); + if (EXTRA_DEBUG) { + logger.debug("Continuing outside"); + } continue outside; - }else{ // not the same - if(EXTRA_DEBUG) logger.debug("This is not the same as the previous, add record and continue outside."); + } else { // not the same + if (EXTRA_DEBUG) { + logger.debug("This is not the same as the previous, add record and continue outside."); + } previousIndex = currentIndex; - if(addedRecordCount > 0){ - if( !outputToBatchPrev( previous, previousIndex, outputCount) ){ + if (addedRecordCount > 0) { + if ( !outputToBatchPrev( previous, previousIndex, outputCount) ) { remainderBatch = previous; return setOkAndReturn(); } @@ -251,72 +287,78 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return AggOutcome.CLEANUP_AND_RETURN; } - } - }finally{ + } finally { // make sure to clear previous if we haven't saved it. - if(remainderBatch == null && previous != null){ + if (remainderBatch == null && previous != null) { previous.clear(); } } } - }finally{ - if(first) first = !first; + } finally { + if (first) { + first = !first; + } } } - - private final void incIndex(){ + private final void incIndex() { underlyingIndex++; - if(underlyingIndex >= incoming.getRecordCount()){ + if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } currentIndex = getVectorIndex(underlyingIndex); } - private final void resetIndex(){ + private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private final AggOutcome setOkAndReturn(){ - if(first){ + private final AggOutcome setOkAndReturn() { + if (first) { this.outcome = IterOutcome.OK_NEW_SCHEMA; - }else{ + } else { this.outcome = IterOutcome.OK; } - for(VectorWrapper<?> v : outgoing){ + for (VectorWrapper<?> v : outgoing) { v.getValueVector().getMutator().setValueCount(outputCount); } return AggOutcome.RETURN_OUTCOME; } - private final boolean outputToBatch(int inIndex){ + private final boolean outputToBatch(int inIndex) { - if(!outputRecordKeys(inIndex, outputCount)){ - if(EXTRA_DEBUG) logger.debug("Failure while outputting keys {}", outputCount); + if (!outputRecordKeys(inIndex, outputCount)) { + if(EXTRA_DEBUG) { + logger.debug("Failure while outputting keys {}", outputCount); + } return false; } - if(!outputRecordValues(outputCount)){ - if(EXTRA_DEBUG) logger.debug("Failure while outputting values {}", outputCount); + if (!outputRecordValues(outputCount)) { + if (EXTRA_DEBUG) { + logger.debug("Failure while outputting values {}", outputCount); + } return false; } - if(EXTRA_DEBUG) logger.debug("{} values output successfully", outputCount); + if (EXTRA_DEBUG) { + logger.debug("{} values output successfully", outputCount); + } resetValues(); outputCount++; addedRecordCount = 0; return true; } - private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex){ + private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) { boolean success = outputRecordKeysPrev(b1, inIndex, outIndex) // && outputRecordValues(outIndex) // && resetValues(); - if(success){ + if (success) { resetValues(); outputCount++; addedRecordCount = 0; @@ -325,17 +367,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return success; } - private void addRecordInc(int index){ + private void addRecordInc(int index) { addRecord(index); this.addedRecordCount++; } @Override - public void cleanup(){ - if(remainderBatch != null) remainderBatch.clear(); + public void cleanup() { + if (remainderBatch != null) { + remainderBatch.clear(); + } } - public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2); public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 8f5f29be1..96da00b46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -28,8 +28,8 @@ public interface StreamingAggregator { public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class); public static enum AggOutcome { - RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; - } + RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; + } public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 195d24900..f77407eac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -53,23 +53,23 @@ public class ChainedHashTable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class); private static final GeneratorMapping KEY_MATCH_BUILD = - GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */, + GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping KEY_MATCH_PROBE = - GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */, + GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping GET_HASH_BUILD = - GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, + GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping GET_HASH_PROBE = - GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, + GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping SET_VALUE = - GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, + GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, null /* reset */, null /* cleanup */); private static final GeneratorMapping OUTPUT_KEYS = @@ -138,8 +138,12 @@ public class ChainedHashTable { int i = 0; for (NamedExpression ne : htConfig.getKeyExprsBuild()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); - if (expr == null) continue; + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + if (expr == null) { + continue; + } keyExprsBuild[i] = expr; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); @@ -155,8 +159,12 @@ public class ChainedHashTable { i = 0; for (NamedExpression ne : htConfig.getKeyExprsProbe()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); - if (expr == null) continue; + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + if (expr == null) { + continue; + } keyExprsProbe[i] = expr; i++; } @@ -293,4 +301,3 @@ public class ChainedHashTable { } } } - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index b03880cce..6024523e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -164,10 +164,11 @@ public abstract class HashTableTemplate implements HashTable { assert (currentIdxWithinBatch < HashTable.BATCH_SIZE); assert (incomingRowIdx < HashTable.BATCH_SIZE); - if (isProbe) + if (isProbe) { match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch); - else + } else { match = isKeyMatchInternalBuild(incomingRowIdx, currentIdxWithinBatch); + } if (! match) { currentIdxHolder.value = links.getAccessor().get(currentIdxWithinBatch); @@ -196,7 +197,9 @@ public abstract class HashTableTemplate implements HashTable { maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch); - if (EXTRA_DEBUG) logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue); + if (EXTRA_DEBUG) { + logger.debug("BatchHolder: inserted key at incomingRowIdx = {}, currentIdx = {}, hash value = {}.", incomingRowIdx, currentIdx, hashValue); + } return true; } @@ -225,7 +228,9 @@ public abstract class HashTableTemplate implements HashTable { newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); - if (EXTRA_DEBUG) logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + if (EXTRA_DEBUG) { + logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + } } else { // follow the new table's hash chain until we encounter empty slot. Note that the hash chain could @@ -245,7 +250,9 @@ public abstract class HashTableTemplate implements HashTable { newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); - if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + if (EXTRA_DEBUG) { + logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + } break; } else if (bh != this && bh.links.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) { @@ -253,7 +260,9 @@ public abstract class HashTableTemplate implements HashTable { newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this batch to mark end of the hash chain newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); - if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + if (EXTRA_DEBUG) { + logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + } break; } @@ -381,11 +390,19 @@ public abstract class HashTableTemplate implements HashTable { float loadf = htConfig.getLoadFactor(); int initialCap = htConfig.getInitialCapacity(); - if (loadf <= 0 || Float.isNaN(loadf)) throw new IllegalArgumentException("Load factor must be a valid number greater than 0"); - if (initialCap <= 0) throw new IllegalArgumentException("The initial capacity must be greater than 0"); - if (initialCap > MAXIMUM_CAPACITY) throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed"); + if (loadf <= 0 || Float.isNaN(loadf)) { + throw new IllegalArgumentException("Load factor must be a valid number greater than 0"); + } + if (initialCap <= 0) { + throw new IllegalArgumentException("The initial capacity must be greater than 0"); + } + if (initialCap > MAXIMUM_CAPACITY) { + throw new IllegalArgumentException("The initial capacity must be less than maximum capacity allowed"); + } - if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) throw new IllegalArgumentException("Hash table must have at least 1 key expression"); + if (htConfig.getKeyExprsBuild() == null || htConfig.getKeyExprsBuild().length == 0) { + throw new IllegalArgumentException("Hash table must have at least 1 key expression"); + } this.htConfig = htConfig; this.context = context; @@ -397,8 +414,9 @@ public abstract class HashTableTemplate implements HashTable { // round up the initial capacity to nearest highest power of 2 tableSize = roundUpToPowerOf2(initialCap); - if (tableSize > MAXIMUM_CAPACITY) + if (tableSize > MAXIMUM_CAPACITY) { tableSize = MAXIMUM_CAPACITY; + } threshold = (int) Math.ceil(tableSize * loadf); @@ -500,7 +518,9 @@ public abstract class HashTableTemplate implements HashTable { currentIdx = freeIndex++; addBatchIfNeeded(currentIdx); - if (EXTRA_DEBUG) logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx); + if (EXTRA_DEBUG) { + logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, incomingRowIdx, currentIdx); + } if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) { // update the start index array @@ -543,14 +563,16 @@ public abstract class HashTableTemplate implements HashTable { currentIdx = freeIndex++; addBatchIfNeeded(currentIdx); - if (EXTRA_DEBUG) logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx); + if (EXTRA_DEBUG) { + logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx); + } if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) { htIdxHolder.value = currentIdx; return PutStatus.KEY_ADDED; - } - else + } else { return PutStatus.PUT_FAILED; + } } return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED ; @@ -618,7 +640,9 @@ public abstract class HashTableTemplate implements HashTable { if (currentIdx >= totalBatchSize) { BatchHolder bh = addBatchHolder(); - if (EXTRA_DEBUG) logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size()); + if (EXTRA_DEBUG) { + logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size()); + } return bh; } else { @@ -638,12 +662,15 @@ public abstract class HashTableTemplate implements HashTable { // in the new table.. the metadata consists of the startIndices, links and hashValues. // Note that the keys stored in the BatchHolders are not moved around. private void resizeAndRehashIfNeeded() { - if (numEntries < threshold) + if (numEntries < threshold) { return; + } long t0 = System.currentTimeMillis(); - if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold); + if (EXTRA_DEBUG) { + logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold); + } // If the table size is already MAXIMUM_CAPACITY, don't resize // the table, but set the threshold to Integer.MAX_VALUE such that @@ -656,8 +683,9 @@ public abstract class HashTableTemplate implements HashTable { int newSize = 2 * tableSize; tableSize = roundUpToPowerOf2(newSize); - if (tableSize > MAXIMUM_CAPACITY) + if (tableSize > MAXIMUM_CAPACITY) { tableSize = MAXIMUM_CAPACITY; + } // set the new threshold based on the new table size and load factor threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor()); @@ -717,5 +745,3 @@ public abstract class HashTableTemplate implements HashTable { protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) ; } - - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index bf00194aa..f1fcce0d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -79,7 +79,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ protected void doWork() { int recordCount = incoming.getRecordCount(); filter.filterBatch(recordCount); -// for(VectorWrapper<?> v : container){ +// for (VectorWrapper<?> v : container) { // ValueVector.Mutator m = v.getValueVector().getMutator(); // m.setValueCount(recordCount); // } @@ -88,8 +88,12 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ @Override public void cleanup() { - if(sv2 != null) sv2.clear(); - if(sv4 != null) sv4.clear(); + if (sv2 != null) { + sv2.clear(); + } + if (sv4 != null) { + sv4.clear(); + } super.cleanup(); } @@ -100,7 +104,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ sv2.clear(); } - switch(incoming.getSchema().getSelectionVectorMode()){ + switch (incoming.getSchema().getSelectionVectorMode()) { case NONE: sv2 = new SelectionVector2(oContext.getAllocator()); this.filter = generateSV2Filterer(); @@ -137,13 +141,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry()); - if(collector.hasErrors()){ + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } cg.addExpr(new ReturnValueExpression(expr)); -// for(VectorWrapper<?> i : incoming){ +// for (VectorWrapper<?> i : incoming) { // ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator()); // container.add(v); // allocators.add(getAllocator4(v)); @@ -177,13 +181,13 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ final ClassGenerator<Filterer> cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry()); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry()); - if(collector.hasErrors()){ + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } cg.addExpr(new ReturnValueExpression(expr)); - for(VectorWrapper<?> v : incoming){ + for (VectorWrapper<?> v : incoming) { TransferPair pair = v.getValueVector().getTransferPair(); container.add(pair.getTo()); transfers.add(pair); @@ -202,5 +206,4 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 15044b823..2a08c053a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -457,8 +457,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { return hj; } - private void allocateVectors(){ - for(VectorWrapper<?> v : container){ + private void allocateVectors() { + for(VectorWrapper<?> v : container) { v.getValueVector().allocateNew(); } } @@ -472,7 +472,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } private void updateStats(HashTable htable) { - if(htable == null) return; + if (htable == null) { + return; + } htable.getStats(htStats); this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); @@ -488,7 +490,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { @Override public void cleanup() { - if(hjHelper != null){ + if (hjHelper != null) { hjHelper.clear(); } @@ -504,4 +506,5 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { left.cleanup(); right.cleanup(); } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 785deae79..133289e08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -94,11 +94,13 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { boolean success = true; while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords); - if(success){ + if (success) { recordsProcessed++; outputRecords++; - }else{ - if(outputRecords == 0) throw new IllegalStateException("Too big to fail."); + } else { + if (outputRecords == 0) { + throw new IllegalStateException("Too big to fail."); + } break; } } @@ -166,11 +168,11 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) // && projectProbeRecord(recordsProcessed, outputRecords); - if(!success){ + if (!success) { // we failed to project. redo this record. getNextRecord = false; return; - }else{ + } else { outputRecords++; /* Projected single row from the build side with matching key but there @@ -182,8 +184,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { * from the probe side. Drain the next row in the probe side. */ recordsProcessed++; - } - else { + } else { /* There is more than one row with the same key on the build side * don't drain more records from the probe side till we have projected * all the rows with this key @@ -197,10 +198,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { // If we have a left outer join, project the keys if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { boolean success = projectProbeRecord(recordsProcessed, outputRecords); - if(!success){ - if(outputRecords == 0){ + if (!success) { + if (outputRecords == 0) { throw new IllegalStateException("Record larger than single batch."); - }else{ + } else { // we've output some records but failed to output this one. return and wait for next call. return; } @@ -214,10 +215,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { hjHelper.setRecordMatched(currentCompositeIdx); boolean success = projectBuildRecord(currentCompositeIdx, outputRecords) // && projectProbeRecord(recordsProcessed, outputRecords); - if(!success){ - if(outputRecords == 0){ + if (!success) { + if (outputRecords == 0) { throw new IllegalStateException("Record larger than single batch."); - }else{ + } else { // we've output some records but failed to output this one. return and wait for next call. return; } @@ -264,5 +265,7 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch, @Named("outgoing") RecordBatch outgoing); public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex); + public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex); + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index faca32a97..39bdb9440 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -66,31 +66,32 @@ public final class JoinStatus { this.joinType = output.getJoinType(); } - private final IterOutcome nextLeft(){ + private final IterOutcome nextLeft() { return outputBatch.next(LEFT_INPUT, left); } - private final IterOutcome nextRight(){ + private final IterOutcome nextRight() { return outputBatch.next(RIGHT_INPUT, right); } - public final void ensureInitial(){ - if(!initialSet){ + public final void ensureInitial() { + if(!initialSet) { this.lastLeft = nextLeft(); this.lastRight = nextRight(); initialSet = true; } } - public final void advanceLeft(){ + public final void advanceLeft() { leftPosition++; } - public final void advanceRight(){ - if (rightSourceMode == RightSourceMode.INCOMING) + public final void advanceRight() { + if (rightSourceMode == RightSourceMode.INCOMING) { rightPosition++; - else + } else { svRightPosition++; + } } public final int getLeftPosition() { @@ -101,7 +102,7 @@ public final class JoinStatus { return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition; } - public final int getRightCount(){ + public final int getRightCount() { return right.getRecordCount(); } @@ -153,9 +154,10 @@ public final class JoinStatus { * Check if the left record position can advance by one. * Side effect: advances to next left batch if current left batch size is exceeded. */ - public final boolean isLeftPositionAllowed(){ - if (lastLeft == IterOutcome.NONE) + public final boolean isLeftPositionAllowed() { + if (lastLeft == IterOutcome.NONE) { return false; + } if (!isLeftPositionInCurrentBatch()) { leftPosition = 0; releaseData(left); @@ -170,11 +172,13 @@ public final class JoinStatus { * Check if the right record position can advance by one. * Side effect: advances to next right batch if current right batch size is exceeded */ - public final boolean isRightPositionAllowed(){ - if (rightSourceMode == RightSourceMode.SV4) + public final boolean isRightPositionAllowed() { + if (rightSourceMode == RightSourceMode.SV4) { return svRightPosition < sv4.getCount(); - if (lastRight == IterOutcome.NONE) + } + if (lastRight == IterOutcome.NONE) { return false; + } if (!isRightPositionInCurrentBatch()) { rightPosition = 0; releaseData(right); @@ -185,11 +189,13 @@ public final class JoinStatus { return true; } - private void releaseData(RecordBatch b){ - for(VectorWrapper<?> v : b){ + private void releaseData(RecordBatch b) { + for (VectorWrapper<?> v : b) { v.clear(); } - if(b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) b.getSelectionVector2().clear(); + if (b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { + b.getSelectionVector2().clear(); + } } /** @@ -220,29 +226,34 @@ public final class JoinStatus { return rightPosition + 1 < right.getRecordCount(); } - public JoinOutcome getOutcome(){ - if (!ok) + public JoinOutcome getOutcome() { + if (!ok) { return JoinOutcome.FAILURE; + } if (bothMatches(IterOutcome.NONE) || (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) || (joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) || - (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) + (joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) { return JoinOutcome.NO_MORE_DATA; + } if (bothMatches(IterOutcome.OK) || - (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) + (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) { return JoinOutcome.BATCH_RETURNED; - if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) + } + if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) { return JoinOutcome.SCHEMA_CHANGED; - if (eitherMatches(IterOutcome.NOT_YET)) + } + if (eitherMatches(IterOutcome.NOT_YET)) { return JoinOutcome.WAITING; + } return JoinOutcome.FAILURE; } - private boolean bothMatches(IterOutcome outcome){ + private boolean bothMatches(IterOutcome outcome) { return lastLeft == outcome && lastRight == outcome; } - private boolean eitherMatches(IterOutcome outcome){ + private boolean eitherMatches(IterOutcome outcome) { return lastLeft == outcome || lastRight == outcome; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java index bb3b9ac6d..c1dffc107 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinTemplate.java @@ -94,8 +94,9 @@ public abstract class JoinTemplate implements JoinWorker { if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { // we've hit the end of the right record batch; copy any remaining values from the left batch while (status.isLeftPositionAllowed()) { - if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) { return false; + } status.incOutputPos(); status.advanceLeft(); @@ -103,8 +104,9 @@ public abstract class JoinTemplate implements JoinWorker { } return true; } - if (!status.isLeftPositionAllowed()) + if (!status.isLeftPositionAllowed()) { return true; + } int comparison = doCompare(status.getLeftPosition(), status.getRightPosition()); switch (comparison) { @@ -112,8 +114,9 @@ public abstract class JoinTemplate implements JoinWorker { case -1: // left key < right key if (((MergeJoinPOP)status.outputBatch.getPopConfig()).getJoinType() == JoinRelType.LEFT) { - if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) { return false; + } status.incOutputPos(); } status.advanceLeft(); @@ -125,25 +128,27 @@ public abstract class JoinTemplate implements JoinWorker { // check for repeating values on the left side if (!status.isLeftRepeating() && status.isNextLeftPositionInCurrentBatch() && - doCompareNextLeftKey(status.getLeftPosition()) == 0) + doCompareNextLeftKey(status.getLeftPosition()) == 0) { // subsequent record(s) in the left batch have the same key status.notifyLeftRepeating(); - - else if (status.isLeftRepeating() && + } else if (status.isLeftRepeating() && status.isNextLeftPositionInCurrentBatch() && - doCompareNextLeftKey(status.getLeftPosition()) != 0) + doCompareNextLeftKey(status.getLeftPosition()) != 0) { // this record marks the end of repeated keys status.notifyLeftStoppedRepeating(); + } boolean crossedBatchBoundaries = false; int initialRightPosition = status.getRightPosition(); do { // copy all equal right keys to the output record batch - if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) + if (!doCopyLeft(status.getLeftPosition(), status.getOutPosition())) { return false; + } - if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) + if (!doCopyRight(status.getRightPosition(), status.getOutPosition())) { return false; + } status.incOutputPos(); @@ -159,9 +164,10 @@ public abstract class JoinTemplate implements JoinWorker { } while ((!status.isLeftRepeating() || status.isRightPositionInCurrentBatch()) && status.isRightPositionAllowed() && doCompare(status.getLeftPosition(), status.getRightPosition()) == 0); if (status.getRightPosition() > initialRightPosition && - (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch())) + (status.isLeftRepeating() || ! status.isNextLeftPositionInCurrentBatch())) { // more than one matching result from right table; reset position in case of subsequent left match status.setRightPosition(initialRightPosition); + } status.advanceLeft(); if (status.isLeftRepeating() && doCompareNextLeftKey(status.getLeftPosition()) != 0) { @@ -233,5 +239,4 @@ public abstract class JoinTemplate implements JoinWorker { */ protected abstract int doCompareNextLeftKey(@Named("leftIndex") int leftIndex); - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index b24b5348a..1d4e353c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -144,19 +144,21 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { status.ensureInitial(); // loop so we can start over again if we find a new batch was created. - while(true){ + while (true) { JoinOutcome outcome = status.getOutcome(); // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. if (outcome == JoinOutcome.BATCH_RETURNED || - outcome == JoinOutcome.SCHEMA_CHANGED) + outcome == JoinOutcome.SCHEMA_CHANGED) { allocateBatch(); + } // reset the output position to zero after our parent iterates this RecordBatch if (outcome == JoinOutcome.BATCH_RETURNED || outcome == JoinOutcome.SCHEMA_CHANGED || - outcome == JoinOutcome.NO_MORE_DATA) + outcome == JoinOutcome.NO_MORE_DATA) { status.resetOutputPos(); + } if (outcome == JoinOutcome.NO_MORE_DATA) { logger.debug("NO MORE DATA; returning {} NONE"); @@ -164,7 +166,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } boolean first = false; - if(worker == null){ + if (worker == null) { try { logger.debug("Creating New Worker"); stats.startSetup(); @@ -180,11 +182,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } // join until we have a complete outgoing batch - if (!worker.doJoin(status)) + if (!worker.doJoin(status)) { worker = null; + } // get the outcome of the join. - switch(status.getOutcome()){ + switch (status.getOutcome()) { case BATCH_RETURNED: // only return new schema if new worker has been setup. logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); @@ -200,7 +203,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE); case SCHEMA_CHANGED: worker = null; - if(status.getOutPosition() > 0){ + if (status.getOutPosition() > 0) { // if we have current data, let's return that. logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); setRecordCountInContainer(); @@ -218,7 +221,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } private void setRecordCountInContainer() { - for(VectorWrapper vw : container){ + for (VectorWrapper vw : container) { Preconditions.checkArgument(!vw.isHyper()); vw.getValueVector().getMutator().setValueCount(getRecordCount()); } @@ -257,9 +260,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { // materialize value vector readers from join expression final LogicalExpression materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry()); - if (collector.hasErrors()) + if (collector.hasErrors()) { throw new ClassTransformationException(String.format( "Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString())); + } // generate compareNextLeftKey() //////////////////////////////// @@ -475,9 +479,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT)); } - if (collector.hasErrors()) + if (collector.hasErrors()) { throw new ClassTransformationException(String.format( "Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString())); + } LogicalExpression materializedRightExpr; if (worker == null || status.isRightPositionAllowed()) { @@ -485,9 +490,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT)); } - if (collector.hasErrors()) + if (collector.hasErrors()) { throw new ClassTransformationException(String.format( "Failure while trying to materialize incoming right field. Errors:\n %s.", collector.toErrorString())); + } // generate compare() //////////////////////// @@ -519,4 +525,5 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { //Pass the equality check for all the join conditions. Finally, return 0. cg.getEvalBlock()._return(JExpr.lit(0)); } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java index 904d38cea..1187bd6da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java @@ -50,15 +50,24 @@ public class MergeJoinBatchBuilder { } public boolean add(RecordBatch batch) { - if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) + if (batch.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE) { throw new UnsupportedOperationException("A merge join cannot currently work against a sv4 batch."); - if (batch.getRecordCount() == 0) return true; // skip over empty record batches. + } + if (batch.getRecordCount() == 0) { + return true; // skip over empty record batches. + } // resource checks long batchBytes = getSize(batch); - if (batchBytes + runningBytes > Integer.MAX_VALUE) return false; // TODO: 2GB is arbitrary - if (runningBatches++ >= Character.MAX_VALUE) return false; // allowed in batch. - if (!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available. + if (batchBytes + runningBytes > Integer.MAX_VALUE) { + return false; // TODO: 2GB is arbitrary + } + if (runningBatches++ >= Character.MAX_VALUE) { + return false; // allowed in batch. + } + if (!svAllocator.preAllocate(batch.getRecordCount()*4)) { + return false; // sv allocation available. + } // transfer VVs to a new RecordBatchData RecordBatchData bd = new RecordBatchData(batch); @@ -68,9 +77,9 @@ public class MergeJoinBatchBuilder { return true; } - private long getSize(RecordBatch batch){ + private long getSize(RecordBatch batch) { long bytes = 0; - for(VectorWrapper<?> v : batch){ + for (VectorWrapper<?> v : batch) { bytes += v.getValueVector().getBufferSize(); } return bytes; @@ -78,18 +87,20 @@ public class MergeJoinBatchBuilder { public void build() throws SchemaChangeException { container.clear(); - if (queuedRightBatches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); + if (queuedRightBatches.size() > Character.MAX_VALUE) { + throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); + } status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE); BatchSchema schema = queuedRightBatches.keySet().iterator().next(); List<RecordBatchData> data = queuedRightBatches.get(schema); // now we're going to generate the sv4 pointers - switch(schema.getSelectionVectorMode()){ + switch (schema.getSelectionVectorMode()) { case NONE: { int index = 0; int recordBatchId = 0; - for(RecordBatchData d : data){ - for(int i =0; i < d.getRecordCount(); i++, index++){ + for (RecordBatchData d : data) { + for (int i =0; i < d.getRecordCount(); i++, index++) { status.sv4.set(index, recordBatchId, i); } recordBatchId++; @@ -99,8 +110,8 @@ public class MergeJoinBatchBuilder { case TWO_BYTE: { int index = 0; int recordBatchId = 0; - for(RecordBatchData d : data){ - for(int i =0; i < d.getRecordCount(); i++, index++){ + for (RecordBatchData d : data) { + for (int i =0; i < d.getRecordCount(); i++, index++) { status.sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i)); } // might as well drop the selection vector since we'll stop using it now. @@ -121,7 +132,7 @@ public class MergeJoinBatchBuilder { } } - for(MaterializedField f : vectors.keySet()){ + for (MaterializedField f : vectors.keySet()) { List<ValueVector> v = vectors.get(f); container.addHyperList(v); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index cf2e36f9d..29fd80f72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -133,7 +133,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> stats.startWait(); try { RawFragmentBatch b = provider.getNext(); - if(b != null){ + if (b != null) { stats.addLongStat(Metric.BYTES_RECEIVED, b.getByteCount()); stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); } @@ -191,7 +191,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> emptyBatch = rawBatch; } try { - while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0); + while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { + ; + } if (rawBatch == null && context.isCancelled()) { return IterOutcome.STOP; } @@ -400,14 +402,17 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> batchOffsets[node.batchId] = 0; // add front value from batch[x] to priority queue - if (batchLoaders[node.batchId].getRecordCount() != 0) + if (batchLoaders[node.batchId].getRecordCount() != 0) { pqueue.add(new Node(node.batchId, 0)); + } } else { pqueue.add(new Node(node.batchId, node.valueIndex + 1)); } - if (prevBatchWasFull) break; + if (prevBatchWasFull) { + break; + } } // set the value counts in the outgoing vectors @@ -589,11 +594,13 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException { g.setMappingSet(MAIN_MAPPING); - for(Ordering od : popConfig.getOrderings()){ + for (Ordering od : popConfig.getOrderings()) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(LEFT_MAPPING); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(RIGHT_MAPPING); @@ -605,9 +612,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); - }else{ + } else { jc._then()._return(out.getValue().minus()); } } @@ -648,7 +655,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> public void cleanup() { outgoingContainer.clear(); if (batchLoaders != null) { - for(RecordBatchLoader rbl : batchLoaders){ + for (RecordBatchLoader rbl : batchLoaders) { if (rbl != null) { rbl.clear(); } @@ -662,4 +669,4 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } -}
\ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 45f32cff4..aecf3636d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -189,8 +189,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } builder.add(incoming); recordsSampled += incoming.getRecordCount(); - if (upstream == IterOutcome.NONE) + if (upstream == IterOutcome.NONE) { break; + } } VectorContainer sortedSamples = new VectorContainer(); builder.build(context, sortedSamples); @@ -258,7 +259,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart try { - if (!saveSamples()){ + if (!saveSamples()) { return false; } @@ -277,16 +278,17 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Wait until sufficient number of fragments have submitted samples, or proceed after xx ms passed // TODO: this should be polling. - if (val < fragmentsBeforeProceed) + if (val < fragmentsBeforeProceed) { Thread.sleep(10); + } for (int i = 0; i < 100 && finalTable == null; i++) { finalTable = tableMap.get(finalTableKey); - if (finalTable != null){ + if (finalTable != null) { break; } Thread.sleep(10); } - if (finalTable == null){ + if (finalTable == null) { buildTable(); } finalTable = tableMap.get(finalTableKey); @@ -429,8 +431,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are // done - if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) + if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) { return IterOutcome.NONE; + } // if there are batches on the queue, process them first, rather than calling incoming.next() if (batchQueue != null && batchQueue.size() > 0) { @@ -461,7 +464,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // If this is the first iteration, we need to generate the partition vectors before we can proceed if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) { - if (!getPartitionVectors()){ + if (!getPartitionVectors()) { cleanup(); return IterOutcome.STOP; } @@ -490,8 +493,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema. if (this.startedUnsampledBatches == false) { this.startedUnsampledBatches = true; - if (upstream == IterOutcome.OK) + if (upstream == IterOutcome.OK) { upstream = IterOutcome.OK_NEW_SCHEMA; + } } switch (upstream) { case NONE: @@ -560,8 +564,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart int count = 0; for (Ordering od : popConfig.getOrderings()) { final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); - if (collector.hasErrors()) + if (collector.hasErrors()) { throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } cg.setMappingSet(incomingMapping); ClassGenerator.HoldingContainer left = cg.addExpr(expr, false); cg.setMappingSet(partitionMapping); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 051a590f2..7f3a96637 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -120,7 +120,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { @Override public void run() { try { - if (stop) return; + if (stop) { + return; + } outer: while (true) { IterOutcome upstream = incoming.next(); @@ -208,4 +210,5 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { this.failed = failed; } } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index ec29cac55..a1a834052 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -195,55 +195,64 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private boolean doAlloc() { //Allocate vv in the allocationVectors. - for(ValueVector v : this.allocationVectors){ + for (ValueVector v : this.allocationVectors) { //AllocationHelper.allocate(v, remainingRecordCount, 250); - if (!v.allocateNewSafe()) + if (!v.allocateNewSafe()) { return false; + } } //Allocate vv for complexWriters. - if (complexWriters == null) + if (complexWriters == null) { return true; + } - for (ComplexWriter writer : complexWriters) + for (ComplexWriter writer : complexWriters) { writer.allocate(); + } return true; } private void setValueCount(int count) { - for(ValueVector v : allocationVectors){ + for (ValueVector v : allocationVectors) { ValueVector.Mutator m = v.getMutator(); m.setValueCount(count); } - if (complexWriters == null) + if (complexWriters == null) { return; + } - for (ComplexWriter writer : complexWriters) + for (ComplexWriter writer : complexWriters) { writer.setValueCount(count); + } } /** hack to make ref and full work together... need to figure out if this is still necessary. **/ - private FieldReference getRef(NamedExpression e){ + private FieldReference getRef(NamedExpression e) { FieldReference ref = e.getRef(); PathSegment seg = ref.getRootSegment(); -// if(seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())){ +// if (seg.isNamed() && "output".contentEquals(seg.getNameSegment().getPath())) { // return new FieldReference(ref.getPath().toString().subSequence(7, ref.getPath().length()), ref.getPosition()); // } return ref; } - private boolean isAnyWildcard(List<NamedExpression> exprs){ - for(NamedExpression e : exprs){ - if(isWildcard(e)) return true; + private boolean isAnyWildcard(List<NamedExpression> exprs) { + for (NamedExpression e : exprs) { + if (isWildcard(e)) { + return true; + } } return false; } - private boolean isWildcard(NamedExpression ex){ - if( !(ex.getExpr() instanceof SchemaPath)) return false; + private boolean isWildcard(NamedExpression ex) { + if ( !(ex.getExpr() instanceof SchemaPath)) { + return false; + } NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment(); NameSegment ref = ex.getRef().getRootSegment(); return ref.getPath().equals("*") && expr.getPath().equals("*"); @@ -266,7 +275,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ ClassifierResult result = new ClassifierResult(); boolean classify = isClassificationNeeded(exprs); - for(int i = 0; i < exprs.size(); i++){ + for (int i = 0; i < exprs.size(); i++) { final NamedExpression namedExpression = exprs.get(i); result.clear(); @@ -278,14 +287,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ Integer value = result.prefixMap.get(result.prefix); if (value != null && value.intValue() == 1) { int k = 0; - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); SchemaPath originalPath = vvIn.getField().getPath(); if (k > result.outputNames.size()-1) { assert false; } String name = result.outputNames.get(k++); // get the renamed column names - if (name == EMPTY_STRING) continue; + if (name == EMPTY_STRING) { + continue; + } FieldReference ref = new FieldReference(name); TransferPair tp = wrapper.getValueVector().getTransferPair(ref); transfers.add(tp); @@ -293,17 +304,19 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors int k = 0; - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); SchemaPath originalPath = vvIn.getField().getPath(); if (k > result.outputNames.size()-1) { assert false; } String name = result.outputNames.get(k++); // get the renamed column names - if (name == EMPTY_STRING) continue; + if (name == EMPTY_STRING) { + continue; + } final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() ); - if(collector.hasErrors()){ + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } @@ -333,16 +346,15 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); - if(collector.hasErrors()){ + if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. - if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE + if (expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE && !((ValueVectorReadExpression) expr).hasReadPath() && !isAnyWildcard - && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0]) - ) { + && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])) { ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; TypedFieldId id = vectorRead.getFieldId(); @@ -358,8 +370,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) { // Need to process ComplexWriter function evaluation. // Lazy initialization of the list of complex writers, if not done yet. - if (complexWriters == null) + if (complexWriters == null) { complexWriters = Lists.newArrayList(); + } // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); @@ -419,9 +432,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private boolean isClassificationNeeded(List<NamedExpression> exprs) { boolean needed = false; - for(int i = 0; i < exprs.size(); i++){ + for (int i = 0; i < exprs.size(); i++) { final NamedExpression ex = exprs.get(i); - if (!(ex.getExpr() instanceof SchemaPath)) continue; + if (!(ex.getExpr() instanceof SchemaPath)) { + continue; + } NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); NameSegment ref = ex.getRef().getRootSegment(); boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); @@ -530,7 +545,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ result.outputNames.add(EMPTY_STRING); // initialize } - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); // get the prefix of the name @@ -586,7 +601,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ result.outputNames.add(EMPTY_STRING); // initialize } - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); String name = vvIn.getField().getPath().getRootSegment().getPath(); String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2); @@ -627,7 +642,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } int k = 0; - for(VectorWrapper<?> wrapper : incoming) { + for (VectorWrapper<?> wrapper : incoming) { ValueVector vvIn = wrapper.getValueVector(); String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java index b36bd92a9..49ad39071 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java @@ -39,27 +39,25 @@ public abstract class ProjectorTemplate implements Projector { private SelectionVector4 vector4; private SelectionVectorMode svMode; - public ProjectorTemplate() throws SchemaChangeException{ + public ProjectorTemplate() throws SchemaChangeException { } @Override public final int projectRecords(int startIndex, final int recordCount, int firstOutputIndex) { - switch(svMode){ + switch (svMode) { case FOUR_BYTE: throw new UnsupportedOperationException(); - case TWO_BYTE: final int count = recordCount; - for(int i = 0; i < count; i++, firstOutputIndex++){ - if (!doEval(vector2.getIndex(i), firstOutputIndex)) + for (int i = 0; i < count; i++, firstOutputIndex++) { + if (!doEval(vector2.getIndex(i), firstOutputIndex)) { return i; + } } return recordCount; - case NONE: - final int countN = recordCount; int i; for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) { @@ -68,18 +66,16 @@ public abstract class ProjectorTemplate implements Projector { } } if (i < startIndex + recordCount || startIndex > 0) { - for(TransferPair t : transfers){ + for (TransferPair t : transfers) { t.splitAndTransfer(startIndex, i - startIndex); } return i - startIndex; } - for(TransferPair t : transfers){ + for (TransferPair t : transfers) { t.transfer(); } return recordCount; - - default: throw new UnsupportedOperationException(); } @@ -89,7 +85,7 @@ public abstract class ProjectorTemplate implements Projector { public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{ this.svMode = incoming.getSchema().getSelectionVectorMode(); - switch(svMode){ + switch (svMode) { case FOUR_BYTE: this.vector4 = incoming.getSelectionVector4(); break; @@ -104,8 +100,4 @@ public abstract class ProjectorTemplate implements Projector { public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - - - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index 8116869c4..419dc8587 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -40,7 +40,7 @@ public class RecordBatchData { private int recordCount; VectorContainer container = new VectorContainer(); - public RecordBatchData(VectorAccessible batch){ + public RecordBatchData(VectorAccessible batch) { List<ValueVector> vectors = Lists.newArrayList(); if (batch instanceof RecordBatch && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { this.sv2 = ((RecordBatch)batch).getSelectionVector2().clone(); @@ -48,8 +48,10 @@ public class RecordBatchData { this.sv2 = null; } - for(VectorWrapper<?> v : batch){ - if(v.isHyper()) throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); + for (VectorWrapper<?> v : batch) { + if (v.isHyper()) { + throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); + } TransferPair tp = v.getValueVector().getTransferPair(); tp.transfer(); vectors.add(tp.getTo()); @@ -67,9 +69,10 @@ public class RecordBatchData { container.buildSchema(mode); } - public int getRecordCount(){ + public int getRecordCount() { return recordCount; } + public List<ValueVector> getVectors() { List<ValueVector> vectors = Lists.newArrayList(); for (VectorWrapper w : container) { @@ -91,4 +94,5 @@ public class RecordBatchData { public VectorContainer getContainer() { return container; } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 3a374910c..19f542302 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -82,8 +82,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return builder.getSv4(); } - - @Override public void cleanup() { builder.clear(); @@ -93,15 +91,14 @@ public class SortBatch extends AbstractRecordBatch<Sort> { @Override public IterOutcome innerNext() { - if(schema != null){ - if(getSelectionVector4().next()){ + if (schema != null) { + if (getSelectionVector4().next()) { return IterOutcome.OK; - }else{ + } else { return IterOutcome.NONE; } } - try{ outer: while (true) { IterOutcome upstream = incoming.next(); @@ -114,13 +111,15 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. - if(!incoming.getSchema().equals(schema)){ - if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + if (!incoming.getSchema().equals(schema)) { + if (schema != null) { + throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + } this.schema = incoming.getSchema(); } // fall through. case OK: - if(!builder.add(incoming)){ + if (!builder.add(incoming)) { throw new UnsupportedOperationException("Sort doesn't currently support doing an external sort."); }; break; @@ -129,7 +128,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } } - if (schema == null || builder.isEmpty()){ + if (schema == null || builder.isEmpty()) { // builder may be null at this point if the first incoming batch is empty return IterOutcome.NONE; } @@ -141,7 +140,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return IterOutcome.OK_NEW_SCHEMA; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); context.fail(ex); @@ -167,11 +166,13 @@ public class SortBatch extends AbstractRecordBatch<Sort> { ClassGenerator<Sorter> g = cg.getRoot(); g.setMappingSet(mainMapping); - for(Ordering od : orderings){ + for(Ordering od : orderings) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(leftMapping); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(rightMapping); @@ -183,7 +184,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); }else{ jc._then()._return(out.getValue().minus()); @@ -193,8 +194,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> { g.getEvalBlock()._return(JExpr.lit(0)); return context.getImplementationClass(cg); - - } @Override @@ -207,7 +206,4 @@ public class SortBatch extends AbstractRecordBatch<Sort> { incoming.kill(sendUpstream); } - - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 80b4ef664..707c41c0d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -49,14 +49,14 @@ public class SortRecordBatchBuilder { private SelectionVector4 sv4; final PreAllocator svAllocator; - public SortRecordBatchBuilder(BufferAllocator a, long maxBytes){ + public SortRecordBatchBuilder(BufferAllocator a, long maxBytes) { this.maxBytes = maxBytes; this.svAllocator = a.getNewPreAllocator(); } - private long getSize(VectorAccessible batch){ + private long getSize(VectorAccessible batch) { long bytes = 0; - for(VectorWrapper<?> v : batch){ + for (VectorWrapper<?> v : batch) { bytes += v.getValueVector().getBufferSize(); } return bytes; @@ -68,8 +68,10 @@ public class SortRecordBatchBuilder { * @return True if the requested add completed successfully. Returns false in the case that this builder is full and cannot receive additional packages. * @throws SchemaChangeException */ - public boolean add(VectorAccessible batch){ - if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch."); + public boolean add(VectorAccessible batch) { + if (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) { + throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch."); + } if (batch.getRecordCount() == 0 && batches.size() > 0) { return true; // skip over empty record batches. } @@ -78,9 +80,15 @@ public class SortRecordBatchBuilder { if (batchBytes == 0 && batches.size() > 0) { return true; } - if(batchBytes + runningBytes > maxBytes) return false; // enough data memory. - if(runningBatches+1 > Character.MAX_VALUE) return false; // allowed in batch. - if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available. + if (batchBytes + runningBytes > maxBytes) { + return false; // enough data memory. + } + if (runningBatches+1 > Character.MAX_VALUE) { + return false; // allowed in batch. + } + if (!svAllocator.preAllocate(batch.getRecordCount()*4)) { + return false; // sv allocation available. + } RecordBatchData bd = new RecordBatchData(batch); @@ -126,15 +134,19 @@ public class SortRecordBatchBuilder { } } - public boolean isEmpty(){ + public boolean isEmpty() { return batches.isEmpty(); } public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException{ outputContainer.clear(); - if(batches.keySet().size() > 1) throw new SchemaChangeException("Sort currently only supports a single schema."); - if(batches.size() > Character.MAX_VALUE) throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); - if(batches.keys().size() < 1){ + if (batches.keySet().size() > 1) { + throw new SchemaChangeException("Sort currently only supports a single schema."); + } + if (batches.size() > Character.MAX_VALUE) { + throw new SchemaChangeException("Sort cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE); + } + if (batches.keys().size() < 1) { assert false : "Invalid to have an empty set of batches with no schemas."; } sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE); @@ -142,12 +154,12 @@ public class SortRecordBatchBuilder { List<RecordBatchData> data = batches.get(schema); // now we're going to generate the sv4 pointers - switch(schema.getSelectionVectorMode()){ + switch (schema.getSelectionVectorMode()) { case NONE: { int index = 0; int recordBatchId = 0; - for(RecordBatchData d : data){ - for(int i =0; i < d.getRecordCount(); i++, index++){ + for (RecordBatchData d : data) { + for (int i =0; i < d.getRecordCount(); i++, index++) { sv4.set(index, recordBatchId, i); } recordBatchId++; @@ -157,8 +169,8 @@ public class SortRecordBatchBuilder { case TWO_BYTE: { int index = 0; int recordBatchId = 0; - for(RecordBatchData d : data){ - for(int i =0; i < d.getRecordCount(); i++, index++){ + for (RecordBatchData d : data) { + for (int i =0; i < d.getRecordCount(); i++, index++) { sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i)); } // might as well drop the selection vector since we'll stop using it now. @@ -173,13 +185,13 @@ public class SortRecordBatchBuilder { // next, we'll create lists of each of the vector types. ArrayListMultimap<MaterializedField, ValueVector> vectors = ArrayListMultimap.create(); - for(RecordBatchData rbd : batches.values()){ - for(ValueVector v : rbd.getVectors()){ + for (RecordBatchData rbd : batches.values()) { + for (ValueVector v : rbd.getVectors()) { vectors.put(v.getField(), v); } } - for(MaterializedField f : schema){ + for (MaterializedField f : schema) { List<ValueVector> v = vectors.get(f); outputContainer.addHyperList(v, false); } @@ -191,11 +203,13 @@ public class SortRecordBatchBuilder { return sv4; } - public void clear(){ - for(RecordBatchData d : batches.values()){ + public void clear() { + for (RecordBatchData d : batches.values()) { d.container.clear(); } - if(sv4 != null) sv4.clear(); + if (sv4 != null) { + sv4.clear(); + } } public List<VectorContainer> getHeldRecordBatches() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index 609cb29bb..6d909623a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -88,10 +88,11 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { @Override public int getRecordCount() { - if (sv == null) + if (sv == null) { return incoming.getRecordCount(); - else + } else { return sv.getCount(); + } } /** @@ -125,8 +126,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { @Override protected void setupNewSchema() throws SchemaChangeException { /* Trace operator does not deal with hyper vectors yet */ - if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) + if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) { throw new SchemaChangeException("Trace operator does not work with hyper vectors"); + } /* * we have a new schema, clear our existing container to load the new value vectors @@ -152,8 +154,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { @Override public void cleanup() { /* Release the selection vector */ - if (sv != null) + if (sv != null) { sv.clear(); + } /* Close the file descriptors */ try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 0e69bcf71..171d12c9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -111,7 +111,9 @@ public class IteratorValidatorBatchIterator implements RecordBatch { @Override public IterOutcome next() { - if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again."); + if (state == IterOutcome.NONE ) { + throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again."); + } state = incoming.next(); if (first && state == IterOutcome.NONE) { throw new IllegalStateException("The incoming iterator returned a state of NONE on the first batch. There should always be at least one batch output before returning NONE"); @@ -119,14 +121,16 @@ public class IteratorValidatorBatchIterator implements RecordBatch { if (first && state == IterOutcome.OK) { throw new IllegalStateException("The incoming iterator returned a state of OK on the first batch. There should always be a new schema on the first batch. Incoming: " + incoming.getClass().getName()); } - if (first) first = !first; + if (first) { + first = !first; + } - if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) { + if (state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) { BatchSchema schema = incoming.getSchema(); - if(schema.getFieldCount() == 0){ + if (schema.getFieldCount() == 0) { throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed."); } - if(incoming.getRecordCount() > MAX_BATCH_SIZE){ + if (incoming.getRecordCount() > MAX_BATCH_SIZE) { throw new IllegalStateException (String.format("Incoming batch of %s has size %d, which is beyond the limit of %d", incoming.getClass().getName(), incoming.getRecordCount(), MAX_BATCH_SIZE)); } @@ -157,4 +161,5 @@ public class IteratorValidatorBatchIterator implements RecordBatch { public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java index 428f335a5..2f7f531e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java @@ -36,10 +36,11 @@ public class IteratorValidatorInjector extends IteratorValidatorInjector inject = new IteratorValidatorInjector(); PhysicalOperator newOp = root.accept(inject, context); - if( !(newOp instanceof FragmentRoot) ) throw new IllegalStateException("This shouldn't happen."); + if ( !(newOp instanceof FragmentRoot) ) { + throw new IllegalStateException("This shouldn't happen."); + } return (FragmentRoot) newOp; - } /** @@ -67,12 +68,11 @@ public class IteratorValidatorInjector extends } /* Inject trace operator */ - if (newChildren.size() > 0){ + if (newChildren.size() > 0) { newOp = op.getNewWithChildren(newChildren); newOp.setOperatorId(op.getOperatorId()); } - return newOp; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java index 237007046..9359ea188 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java @@ -143,14 +143,24 @@ public class BatchGroup implements VectorAccessible { } public void cleanup() throws IOException { - if (sv2 != null) sv2.clear(); - if (outputStream != null) outputStream.close(); - if (inputStream != null) inputStream.close(); - if (fs != null && fs.exists(path)) fs.delete(path, false); + if (sv2 != null) { + sv2.clear(); + } + if (outputStream != null) { + outputStream.close(); + } + if (inputStream != null) { + inputStream.close(); + } + if (fs != null && fs.exists(path)) { + fs.delete(path, false); + } } public void closeOutputStream() throws IOException { - if (outputStream != null) outputStream.close(); + if (outputStream != null) { + outputStream.close(); + } } @Override @@ -181,4 +191,5 @@ public class BatchGroup implements VectorAccessible { public Iterator<VectorWrapper<?>> iterator() { return currentContainer.iterator(); } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 505f56745..52249e9e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -192,12 +192,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { @Override public IterOutcome innerNext() { - if(schema != null){ + if (schema != null) { if (spillCount == 0) { - if(schema != null){ - if(getSelectionVector4().next()){ + if (schema != null) { + if (getSelectionVector4().next()) { return IterOutcome.OK; - }else{ + } else { return IterOutcome.NONE; } } @@ -206,12 +206,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { w.start(); // int count = selector.next(); int count = copier.next(targetRecordCount); - if(count > 0){ + if (count > 0) { long t = w.elapsed(TimeUnit.MICROSECONDS); logger.debug("Took {} us to merge {} records", t, count); container.setRecordCount(count); return IterOutcome.OK; - }else{ + } else { logger.debug("copier returned 0 records"); return IterOutcome.NONE; } @@ -236,8 +236,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. - if(!incoming.getSchema().equals(schema)){ - if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + if (!incoming.getSchema().equals(schema)) { + if (schema != null) { + throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas."); + } this.schema = incoming.getSchema(); this.sorter = createNewSorter(context, incoming); } @@ -249,7 +251,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } break; } - if (first) first = false; + if (first) { + first = false; + } totalSizeInMemory += getBufferSize(incoming); SelectionVector2 sv2; if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { @@ -291,7 +295,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { break; case OUT_OF_MEMORY: highWaterMark = totalSizeInMemory; - if (batchesSinceLastSpill > 2) mergeAndSpill(); + if (batchesSinceLastSpill > 2) { + mergeAndSpill(); + } batchesSinceLastSpill = 0; break; default: @@ -348,7 +354,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return IterOutcome.OK_NEW_SCHEMA; - }catch(SchemaChangeException | ClassTransformationException | IOException ex){ + } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); context.fail(ex); @@ -502,11 +508,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { ClassGenerator<MSorter> g = cg.getRoot(); g.setMappingSet(mainMapping); - for(Ordering od : orderings){ + for (Ordering od : orderings) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(leftMapping); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(rightMapping); @@ -518,7 +526,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); }else{ jc._then()._return(out.getValue().minus()); @@ -547,11 +555,13 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private void generateComparisons(ClassGenerator g, VectorAccessible batch) throws SchemaChangeException { g.setMappingSet(MAIN_MAPPING); - for(Ordering od : popConfig.getOrderings()){ + for (Ordering od : popConfig.getOrderings()) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector,context.getFunctionRegistry()); - if(collector.hasErrors()) throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } g.setMappingSet(LEFT_MAPPING); HoldingContainer left = g.addExpr(expr, false); g.setMappingSet(RIGHT_MAPPING); @@ -563,7 +573,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { HoldingContainer out = g.addExpr(fh, false); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); - if(od.getDirection() == Direction.ASCENDING){ + if (od.getDirection() == Direction.ASCENDING) { jc._then()._return(out.getValue()); }else{ jc._then()._return(out.getValue().minus()); @@ -590,7 +600,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } List<VectorAllocator> allocators = Lists.newArrayList(); - for(VectorWrapper<?> i : batch){ + for (VectorWrapper<?> i : batch) { ValueVector v = TypeHelper.getNewVector(i.getField(), copierAllocator); outputContainer.add(v); allocators.add(VectorAllocator.getAllocator(v, 110)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index df79b1acb..3fd744ff8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -84,7 +84,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ while (l < rightStart) { aux.set(o++, vector4.get(l++)); } - while (r < rightEnd){ + while (r < rightEnd) { aux.set(o++, vector4.get(r++)); } assert o == outStart + (rightEnd - leftStart); @@ -97,7 +97,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ } @Override - public void sort(VectorContainer container){ + public void sort(VectorContainer container) { Stopwatch watch = new Stopwatch(); watch.start(); while (runStarts.size() > 1) { @@ -109,9 +109,13 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ int left = runStarts.poll(); int right = runStarts.poll(); Integer end = runStarts.peek(); - if (end == null) end = vector4.getTotalCount(); + if (end == null) { + end = vector4.getTotalCount(); + } outIndex = merge(left, right, end, outIndex); - if (outIndex < vector4.getTotalCount()) newRunStarts.add(outIndex); + if (outIndex < vector4.getTotalCount()) { + newRunStarts.add(outIndex); + } } if (outIndex < vector4.getTotalCount()) { copyRun(outIndex, vector4.getTotalCount()); |