aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Becker <benjamin.becker@gmail.com>2013-08-30 09:13:14 -0700
committerJacques Nadeau <jacques@apache.org>2013-10-30 17:21:53 -0700
commit4481dadcec3d96638274469e695053c4d7c95805 (patch)
treee31cffb5c754b67929651118938410e6bcfe6325
parenta73512d3ca7f78baac7368ccd4c40223956fdc53 (diff)
implement SV4 support for filter
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java118
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java)27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java11
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java5
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java30
-rw-r--r--exec/java-exec/src/test/resources/filter/test_sv4.json42
8 files changed, 241 insertions, 47 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 5f9a06a32..e67e531c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.filter;
import java.io.IOException;
import java.util.List;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -27,27 +28,29 @@ import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Filter;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.AbstractSingleRecordBatch;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TransferPair;
-import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.TypeHelper;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
- private final SelectionVector2 sv;
+ private SelectionVector2 sv2;
+ private SelectionVector4 sv4;
+ private BufferAllocator.PreAllocator svAllocator;
private Filterer filter;
-
- public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context){
+
+ public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) {
super(pop, context, incoming);
- sv = new SelectionVector2(context.getAllocator());
}
@Override
@@ -57,18 +60,22 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
@Override
public int getRecordCount() {
- return sv.getCount();
+ return sv2 != null ? sv2.getCount() : sv4.getCount();
}
@Override
public SelectionVector2 getSelectionVector2() {
- return sv;
+ return sv2;
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ return sv4;
}
@Override
protected void doWork() {
int recordCount = incoming.getRecordCount();
- sv.allocateNew(recordCount);
filter.filterBatch(recordCount);
for(VectorWrapper<?> v : container){
ValueVector.Mutator m = v.getValueVector().getMutator();
@@ -79,33 +86,102 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
@Override
protected void setupNewSchema() throws SchemaChangeException {
container.clear();
- LogicalExpression filterExpression = popConfig.getExpr();
+
+ switch(incoming.getSchema().getSelectionVectorMode()){
+ case NONE:
+ sv2 = new SelectionVector2(context.getAllocator());
+ this.filter = generateSV2Filterer();
+ break;
+ case TWO_BYTE:
+ sv2 = new SelectionVector2(context.getAllocator());
+ this.filter = generateSV2Filterer();
+ break;
+ case FOUR_BYTE:
+ // set up the multi-batch selection vector
+ this.svAllocator = context.getAllocator().getPreAllocator();
+ if (!svAllocator.preAllocate(incoming.getRecordCount()*4))
+ throw new SchemaChangeException("Attempted to filter an SV4 which exceeds allowed memory (" +
+ incoming.getRecordCount() * 4 + " bytes)");
+ sv4 = new SelectionVector4(svAllocator.getAllocation(), incoming.getRecordCount(), Character.MAX_VALUE);
+ this.filter = generateSV4Filterer();
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ protected Filterer generateSV4Filterer() throws SchemaChangeException {
final ErrorCollector collector = new ErrorCollectorImpl();
final List<TransferPair> transfers = Lists.newArrayList();
- final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
-
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(filterExpression, incoming, collector);
+ final List<VectorAllocator> allocators = Lists.newArrayList();
+ final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
+
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector);
if(collector.hasErrors()){
throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
-
+
cg.addExpr(new ReturnValueExpression(expr));
-
+
+// for(VectorWrapper<?> i : incoming){
+// ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+// container.add(v);
+// allocators.add(getAllocator4(v));
+// }
+
+ for (VectorWrapper<?> vw : incoming) {
+ for (ValueVector vv : vw.getValueVectors()) {
+ TransferPair pair = vv.getTransferPair();
+ container.add(pair.getTo());
+ transfers.add(pair);
+ }
+ }
+
+ // allocate outgoing sv4
+ container.buildSchema(SelectionVectorMode.FOUR_BYTE);
+
+ try {
+ TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
+ Filterer filter = context.getImplementationClass(cg);
+ filter.setup(context, incoming, this, tx);
+ return filter;
+ } catch (ClassTransformationException | IOException e) {
+ throw new SchemaChangeException("Failure while attempting to load generated class", e);
+ }
+
+ }
+
+ protected Filterer generateSV2Filterer() throws SchemaChangeException {
+ final ErrorCollector collector = new ErrorCollectorImpl();
+ final List<TransferPair> transfers = Lists.newArrayList();
+ final CodeGenerator<Filterer> cg = new CodeGenerator<Filterer>(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
+
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector);
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ }
+
+ cg.addExpr(new ReturnValueExpression(expr));
+
for(VectorWrapper<?> v : incoming){
TransferPair pair = v.getValueVector().getTransferPair();
container.add(pair.getTo());
transfers.add(pair);
}
-
+
container.buildSchema(SelectionVectorMode.TWO_BYTE);
-
+
try {
TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
- this.filter = context.getImplementationClass(cg);
+ Filterer filter = context.getImplementationClass(cg);
filter.setup(context, incoming, this, tx);
+ return filter;
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
}
+
}
-
+
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index a03d48fc5..587440cfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -1,20 +1,3 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.drill.exec.physical.impl.filter;
import javax.inject.Named;
@@ -26,14 +9,14 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.selection.SelectionVector2;
-public abstract class FilterTemplate implements Filterer{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate.class);
+public abstract class FilterTemplate2 implements Filterer{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class);
private SelectionVector2 outgoingSelectionVector;
private SelectionVector2 incomingSelectionVector;
private SelectionVectorMode svMode;
private TransferPair[] transfers;
-
+
@Override
public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException{
this.transfers = transfers;
@@ -47,6 +30,7 @@ public abstract class FilterTemplate implements Filterer{
this.incomingSelectionVector = incoming.getSelectionVector2();
break;
default:
+ // SV4 is handled in FilterTemplate4
throw new UnsupportedOperationException();
}
doSetup(context, incoming, outgoing);
@@ -59,6 +43,7 @@ public abstract class FilterTemplate implements Filterer{
}
public void filterBatch(int recordCount){
+ outgoingSelectionVector.allocateNew(recordCount);
switch(svMode){
case NONE:
filterBatchNoSV(recordCount);
@@ -84,7 +69,7 @@ public abstract class FilterTemplate implements Filterer{
}
outgoingSelectionVector.setRecordCount(svIndex);
}
-
+
private void filterBatchNoSV(int recordCount){
int svIndex = 0;
for(char i =0; i < recordCount; i++){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
new file mode 100644
index 000000000..b394387cf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
@@ -0,0 +1,52 @@
+package org.apache.drill.exec.physical.impl.filter;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.allocator.VectorAllocator;
+
+import javax.inject.Named;
+
+public abstract class FilterTemplate4 implements Filterer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class);
+
+ private SelectionVector4 outgoingSelectionVector;
+ private SelectionVector4 incomingSelectionVector;
+ private TransferPair[] transfers;
+
+ @Override
+ public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers)
+ throws SchemaChangeException {
+ this.transfers = transfers;
+ this.outgoingSelectionVector = outgoing.getSelectionVector4();
+ this.incomingSelectionVector = incoming.getSelectionVector4();
+ doSetup(context, incoming, outgoing);
+ }
+
+ @Override
+ public void filterBatch(int recordCount){
+ int outPos = 0;
+ for (int i = 0; i < incomingSelectionVector.getCount(); i++) {
+ int index = incomingSelectionVector.get(i);
+ if (doEval(index, 0)) {
+ System.out.println(" (match): " + index + " (i: " + i + ") ");
+ outgoingSelectionVector.set(outPos++, index);
+ }
+ }
+ outgoingSelectionVector.setCount(outPos);
+ doTransfers();
+ }
+
+ private void doTransfers(){
+ for(TransferPair t : transfers){
+ t.transfer();
+ }
+ }
+
+ public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+ public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
index b82172080..8e8cb2e72 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -29,6 +29,7 @@ public interface Filterer {
public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
public void filterBatch(int recordCount);
- public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate.class);
+ public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
+ public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index 606103bff..4533cc2f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -25,7 +25,7 @@ public class SelectionVector4 {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector4.class);
private final ByteBuf vector;
- private final int recordCount;
+ private int recordCount;
private int start;
private int length;
@@ -44,7 +44,12 @@ public class SelectionVector4 {
public int getCount(){
return length;
}
-
+
+ public void setCount(int length) {
+ this.length = length;
+ this.recordCount = length;
+ }
+
public void set(int index, int compound){
vector.setInt(index*4, compound);
}
@@ -55,7 +60,7 @@ public class SelectionVector4 {
public int get(int index){
return vector.getInt( (start+index)*4);
}
-
+
/**
* Caution: This method shares the underlying buffer between this vector and the newly created one.
* @return Newly created single batch SelectionVector4.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index c4533b19f..0312863b9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
import com.beust.jcommander.internal.Lists;
@@ -54,6 +55,10 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
return incoming.getSelectionVector2();
}
+ public SelectionVector4 getSelectionVector4(){
+ return incoming.getSelectionVector4();
+ }
+
@SuppressWarnings("unchecked")
public <T extends ValueVector> T getValueVectorById(SchemaPath path, Class<?> vvClass){
TypedFieldId tfid = incoming.getValueVectorId(path);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index 6841662c6..e81774ad7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -74,7 +74,35 @@ public class TestSimpleFilter {
assertTrue(!context.isFailed());
}
-
+
+ @Test
+ public void testSV4Filter(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ }};
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/filter/test_sv4.json"), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+ int recordCount = 0;
+ while(exec.next()) {
+ for (int i = 0; i < exec.getSelectionVector4().getCount(); i++) {
+ System.out.println("Got: " + exec.getSelectionVector4().get(i));
+ }
+ recordCount += exec.getSelectionVector4().getCount();
+ }
+ assertEquals(50, recordCount);
+
+ if(context.getFailureCause() != null){
+ throw context.getFailureCause();
+ }
+ assertTrue(!context.isFailed());
+
+ }
+
@AfterClass
public static void tearDown() throws Exception{
// pause to get logger to catch up.
diff --git a/exec/java-exec/src/test/resources/filter/test_sv4.json b/exec/java-exec/src/test/resources/filter/test_sv4.json
new file mode 100644
index 000000000..685e31535
--- /dev/null
+++ b/exec/java-exec/src/test/resources/filter/test_sv4.json
@@ -0,0 +1,42 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100, types: [
+ {name: "blue", type: "INT", mode: "REQUIRED"},
+ {name: "red", type: "BIGINT", mode: "REQUIRED"},
+ {name: "green", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"sort",
+ orderings: [
+ {expr: "blue"}
+ ]
+ },
+ {
+ @id:3,
+ child: 2,
+ pop:"filter",
+ expr: "alternate()"
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+} \ No newline at end of file