diff options
author | Ben Becker <benjamin.becker@gmail.com> | 2013-08-30 09:13:14 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2013-10-30 17:21:53 -0700 |
commit | 4481dadcec3d96638274469e695053c4d7c95805 (patch) | |
tree | e31cffb5c754b67929651118938410e6bcfe6325 | |
parent | a73512d3ca7f78baac7368ccd4c40223956fdc53 (diff) |
implement SV4 support for filter
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java | 118 | ||||
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java) | 27 | ||||
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java | 52 | ||||
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java | 3 | ||||
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java | 11 | ||||
-rw-r--r-- | exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java | 5 | ||||
-rw-r--r-- | exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java | 30 | ||||
-rw-r--r-- | exec/java-exec/src/test/resources/filter/test_sv4.json | 42 |
8 files changed, 241 insertions, 47 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 5f9a06a32..e67e531c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.filter; import java.io.IOException; import java.util.List; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -27,27 +28,29 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Filter; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.AbstractSingleRecordBatch; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TransferPair; -import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.TypeHelper; import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; +import org.apache.drill.exec.vector.allocator.VectorAllocator; public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class); - private final SelectionVector2 sv; + private SelectionVector2 sv2; + private SelectionVector4 sv4; + private BufferAllocator.PreAllocator svAllocator; private Filterer filter; - - public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context){ + + public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) { super(pop, context, incoming); - sv = new SelectionVector2(context.getAllocator()); } @Override @@ -57,18 +60,22 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ @Override public int getRecordCount() { - return sv.getCount(); + return sv2 != null ? sv2.getCount() : sv4.getCount(); } @Override public SelectionVector2 getSelectionVector2() { - return sv; + return sv2; + } + + @Override + public SelectionVector4 getSelectionVector4() { + return sv4; } @Override protected void doWork() { int recordCount = incoming.getRecordCount(); - sv.allocateNew(recordCount); filter.filterBatch(recordCount); for(VectorWrapper<?> v : container){ ValueVector.Mutator m = v.getValueVector().getMutator(); @@ -79,33 +86,102 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ @Override protected void setupNewSchema() throws SchemaChangeException { container.clear(); - LogicalExpression filterExpression = popConfig.getExpr(); + + switch(incoming.getSchema().getSelectionVectorMode()){ + case NONE: + sv2 = new SelectionVector2(context.getAllocator()); + this.filter = generateSV2Filterer(); + break; + case TWO_BYTE: + sv2 = new SelectionVector2(context.getAllocator()); + this.filter = generateSV2Filterer(); + break; + case FOUR_BYTE: + // set up the multi-batch selection vector + this.svAllocator = context.getAllocator().getPreAllocator(); + if (!svAllocator.preAllocate(incoming.getRecordCount()*4)) + throw new SchemaChangeException("Attempted to filter an SV4 which exceeds allowed memory (" + + incoming.getRecordCount() * 4 + " bytes)"); + sv4 = new SelectionVector4(svAllocator.getAllocation(), incoming.getRecordCount(), Character.MAX_VALUE); + this.filter = generateSV4Filterer(); + break; + default: + throw new UnsupportedOperationException(); + } + + } + + protected Filterer generateSV4Filterer() throws SchemaChangeException { final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); - final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(filterExpression, incoming, collector); + final List<VectorAllocator> allocators = Lists.newArrayList(); + final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry()); + + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector); if(collector.hasErrors()){ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } - + cg.addExpr(new ReturnValueExpression(expr)); - + +// for(VectorWrapper<?> i : incoming){ +// ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator()); +// container.add(v); +// allocators.add(getAllocator4(v)); +// } + + for (VectorWrapper<?> vw : incoming) { + for (ValueVector vv : vw.getValueVectors()) { + TransferPair pair = vv.getTransferPair(); + container.add(pair.getTo()); + transfers.add(pair); + } + } + + // allocate outgoing sv4 + container.buildSchema(SelectionVectorMode.FOUR_BYTE); + + try { + TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); + Filterer filter = context.getImplementationClass(cg); + filter.setup(context, incoming, this, tx); + return filter; + } catch (ClassTransformationException | IOException e) { + throw new SchemaChangeException("Failure while attempting to load generated class", e); + } + + } + + protected Filterer generateSV2Filterer() throws SchemaChangeException { + final ErrorCollector collector = new ErrorCollectorImpl(); + final List<TransferPair> transfers = Lists.newArrayList(); + final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry()); + + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector); + if(collector.hasErrors()){ + throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); + } + + cg.addExpr(new ReturnValueExpression(expr)); + for(VectorWrapper<?> v : incoming){ TransferPair pair = v.getValueVector().getTransferPair(); container.add(pair.getTo()); transfers.add(pair); } - + container.buildSchema(SelectionVectorMode.TWO_BYTE); - + try { TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); - this.filter = context.getImplementationClass(cg); + Filterer filter = context.getImplementationClass(cg); filter.setup(context, incoming, this, tx); + return filter; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); } + } - + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index a03d48fc5..587440cfa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -1,20 +1,3 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.drill.exec.physical.impl.filter; import javax.inject.Named; @@ -26,14 +9,14 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.selection.SelectionVector2; -public abstract class FilterTemplate implements Filterer{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate.class); +public abstract class FilterTemplate2 implements Filterer{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class); private SelectionVector2 outgoingSelectionVector; private SelectionVector2 incomingSelectionVector; private SelectionVectorMode svMode; private TransferPair[] transfers; - + @Override public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException{ this.transfers = transfers; @@ -47,6 +30,7 @@ public abstract class FilterTemplate implements Filterer{ this.incomingSelectionVector = incoming.getSelectionVector2(); break; default: + // SV4 is handled in FilterTemplate4 throw new UnsupportedOperationException(); } doSetup(context, incoming, outgoing); @@ -59,6 +43,7 @@ public abstract class FilterTemplate implements Filterer{ } public void filterBatch(int recordCount){ + outgoingSelectionVector.allocateNew(recordCount); switch(svMode){ case NONE: filterBatchNoSV(recordCount); @@ -84,7 +69,7 @@ public abstract class FilterTemplate implements Filterer{ } outgoingSelectionVector.setRecordCount(svIndex); } - + private void filterBatchNoSV(int recordCount){ int svIndex = 0; for(char i =0; i < recordCount; i++){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java new file mode 100644 index 000000000..b394387cf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java @@ -0,0 +1,52 @@ +package org.apache.drill.exec.physical.impl.filter; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.allocator.VectorAllocator; + +import javax.inject.Named; + +public abstract class FilterTemplate4 implements Filterer { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class); + + private SelectionVector4 outgoingSelectionVector; + private SelectionVector4 incomingSelectionVector; + private TransferPair[] transfers; + + @Override + public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) + throws SchemaChangeException { + this.transfers = transfers; + this.outgoingSelectionVector = outgoing.getSelectionVector4(); + this.incomingSelectionVector = incoming.getSelectionVector4(); + doSetup(context, incoming, outgoing); + } + + @Override + public void filterBatch(int recordCount){ + int outPos = 0; + for (int i = 0; i < incomingSelectionVector.getCount(); i++) { + int index = incomingSelectionVector.get(i); + if (doEval(index, 0)) { + System.out.println(" (match): " + index + " (i: " + i + ") "); + outgoingSelectionVector.set(outPos++, index); + } + } + outgoingSelectionVector.setCount(outPos); + doTransfers(); + } + + private void doTransfers(){ + for(TransferPair t : transfers){ + t.transfer(); + } + } + + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); + public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java index b82172080..8e8cb2e72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java @@ -29,6 +29,7 @@ public interface Filterer { public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; public void filterBatch(int recordCount); - public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate.class); + public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); + public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java index 606103bff..4533cc2f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java @@ -25,7 +25,7 @@ public class SelectionVector4 { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class); private final ByteBuf vector; - private final int recordCount; + private int recordCount; private int start; private int length; @@ -44,7 +44,12 @@ public class SelectionVector4 { public int getCount(){ return length; } - + + public void setCount(int length) { + this.length = length; + this.recordCount = length; + } + public void set(int index, int compound){ vector.setInt(index*4, compound); } @@ -55,7 +60,7 @@ public class SelectionVector4 { public int get(int index){ return vector.getInt( (start+index)*4); } - + /** * Caution: This method shares the underlying buffer between this vector and the newly created one. * @return Newly created single batch SelectionVector4. diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java index c4533b19f..0312863b9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; import com.beust.jcommander.internal.Lists; @@ -54,6 +55,10 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{ return incoming.getSelectionVector2(); } + public SelectionVector4 getSelectionVector4(){ + return incoming.getSelectionVector4(); + } + @SuppressWarnings("unchecked") public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass){ TypedFieldId tfid = incoming.getValueVectorId(path); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java index 6841662c6..e81774ad7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java @@ -74,7 +74,35 @@ public class TestSimpleFilter { assertTrue(!context.isFailed()); } - + + @Test + public void testSV4Filter(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{ + new NonStrictExpectations(){{ + bitContext.getMetrics(); result = new MetricRegistry("test"); + bitContext.getAllocator(); result = BufferAllocator.getAllocator(c); + }}; + + PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance()); + PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test_sv4.json"), Charsets.UTF_8)); + FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); + FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry); + SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); + int recordCount = 0; + while(exec.next()) { + for (int i = 0; i < exec.getSelectionVector4().getCount(); i++) { + System.out.println("Got: " + exec.getSelectionVector4().get(i)); + } + recordCount += exec.getSelectionVector4().getCount(); + } + assertEquals(50, recordCount); + + if(context.getFailureCause() != null){ + throw context.getFailureCause(); + } + assertTrue(!context.isFailed()); + + } + @AfterClass public static void tearDown() throws Exception{ // pause to get logger to catch up. diff --git a/exec/java-exec/src/test/resources/filter/test_sv4.json b/exec/java-exec/src/test/resources/filter/test_sv4.json new file mode 100644 index 000000000..685e31535 --- /dev/null +++ b/exec/java-exec/src/test/resources/filter/test_sv4.json @@ -0,0 +1,42 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"mock-sub-scan", + url: "http://apache.org", + entries:[ + {records: 100, types: [ + {name: "blue", type: "INT", mode: "REQUIRED"}, + {name: "red", type: "BIGINT", mode: "REQUIRED"}, + {name: "green", type: "INT", mode: "REQUIRED"} + ]} + ] + }, + { + @id:2, + child: 1, + pop:"sort", + orderings: [ + {expr: "blue"} + ] + }, + { + @id:3, + child: 2, + pop:"filter", + expr: "alternate()" + }, + { + @id: 4, + child: 3, + pop: "screen" + } + ] +}
\ No newline at end of file |