aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
blob: b394387cf13f2ec96a40731e59164bf95b47280c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package org.apache.drill.exec.physical.impl.filter;

import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.allocator.VectorAllocator;

import javax.inject.Named;

public abstract class FilterTemplate4 implements Filterer {
  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class);

  private SelectionVector4 outgoingSelectionVector;
  private SelectionVector4 incomingSelectionVector;
  private TransferPair[] transfers;

  @Override
  public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers)
      throws SchemaChangeException {
    this.transfers = transfers;
    this.outgoingSelectionVector = outgoing.getSelectionVector4();
    this.incomingSelectionVector = incoming.getSelectionVector4();
    doSetup(context, incoming, outgoing);
  }

  @Override
  public void filterBatch(int recordCount){
    int outPos = 0;
    for (int i = 0; i < incomingSelectionVector.getCount(); i++) {
      int index = incomingSelectionVector.get(i);
      if (doEval(index, 0)) {
        System.out.println(" (match): " + index + " (i: " + i + ") ");
        outgoingSelectionVector.set(outPos++, index);
      }
    }
    outgoingSelectionVector.setCount(outPos);
    doTransfers();
  }

  private void doTransfers(){
    for(TransferPair t : transfers){
      t.transfer();
    }
  }

  public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
  public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);

}