aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
blob: 95c6acd5e475cfb4321dbd9c0fbc0800ae1831f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.join;

import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import javax.inject.Named;
import java.util.LinkedList;
import java.util.List;

import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;

/*
 * Template class that combined with the runtime generated source implements the NestedLoopJoin interface. This
 * class contains the main nested loop join logic.
 */
public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {

  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);

  // Current left input batch being processed
  private RecordBatch left = null;

  // Record count of the left batch currently being processed
  private int leftRecordCount = 0;

  // List of record counts per batch in the hyper container
  private List<Integer> rightCounts = null;

  // Output batch
  private NestedLoopJoinBatch outgoing = null;

  // Iteration status tracker
  private IterationStatusTracker tracker = new IterationStatusTracker();

  private int targetOutputRecords;

  /**
   * Method initializes necessary state and invokes the doSetup() to set the
   * input and output value vector references.
   *
   * @param context Fragment context
   * @param left Current left input batch being processed
   * @param rightContainer Hyper container
   * @param rightCounts Counts for each right container
   * @param outgoing Output batch
   */
  public void setupNestedLoopJoin(FragmentContext context,
                                  RecordBatch left,
                                  ExpandableHyperContainer rightContainer,
                                  LinkedList<Integer> rightCounts,
                                  NestedLoopJoinBatch outgoing) {
    this.left = left;
    this.leftRecordCount = left.getRecordCount();
    this.rightCounts = rightCounts;
    this.outgoing = outgoing;
    doSetup(context, rightContainer, left, outgoing);
  }

  @Override
  public void setTargetOutputCount(int targetOutputRecords) {
    this.targetOutputRecords = targetOutputRecords;
  }

  /**
   * Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
   * controls which left batch we are processing and fetches the next left input batch once we exhaust the current one.
   *
   * @param joinType join type (INNER ot LEFT)
   * @return the number of records produced in the output batch
   */
  public int outputRecords(JoinRelType joinType) {
    int outputIndex = 0;
    while (leftRecordCount != 0) {
      outputIndex = populateOutgoingBatch(joinType, outputIndex);
      if (outputIndex >= targetOutputRecords) {
        break;
      }
      // reset state and get next left batch
      resetAndGetNextLeft(outputIndex);
    }
    return outputIndex;
  }

  /**
   * This method is the core of the nested loop join.For each left batch record looks for matching record
   * from the list of right batches. Match is checked by calling {@link #doEval(int, int, int)} method.
   * If matching record is found both left and right records are written into output batch,
   * otherwise if join type is LEFT, than only left record is written, right batch record values will be null.
   *
   * @param joinType join type (INNER or LEFT)
   * @param outputIndex index to start emitting records at
   * @return final outputIndex after producing records in the output batch
   */
  private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
    // copy index and match counters as local variables to speed up processing
    int nextRightBatchToProcess = tracker.getNextRightBatchToProcess();
    int nextRightRecordToProcess = tracker.getNextRightRecordToProcess();
    int nextLeftRecordToProcess = tracker.getNextLeftRecordToProcess();
    boolean rightRecordMatched = tracker.isRightRecordMatched();

    outer:
    // for every record in the left batch
    for (; nextLeftRecordToProcess < leftRecordCount; nextLeftRecordToProcess++) {
      // for every batch on the right
      for (; nextRightBatchToProcess < rightCounts.size(); nextRightBatchToProcess++) {
        int rightRecordCount = rightCounts.get(nextRightBatchToProcess);
        // Since right container is a hyper container, in doEval generated code it expects the
        // batch index in the 2 MSBytes of the index variable. See DRILL-6128 for details
        final int currentRightBatchIndex = nextRightBatchToProcess << 16;
        // for every record in right batch
        for (; nextRightRecordToProcess < rightRecordCount; nextRightRecordToProcess++) {
          if (doEval(nextLeftRecordToProcess, currentRightBatchIndex, nextRightRecordToProcess)) {
            // project records from the left and right batches
            emitLeft(nextLeftRecordToProcess, outputIndex);
            emitRight(nextRightBatchToProcess, nextRightRecordToProcess, outputIndex);
            outputIndex++;
            rightRecordMatched = true;

            if (outputIndex >= targetOutputRecords) {
              nextRightRecordToProcess++;

              // no more space left in the batch, stop processing
              break outer;
            }
          }
        }
        nextRightRecordToProcess = 0;
      }
      nextRightBatchToProcess = 0;
      if (joinType == JoinRelType.LEFT && !rightRecordMatched) {
        // project records from the left side only, records from right will be null
        emitLeft(nextLeftRecordToProcess, outputIndex);
        outputIndex++;
        if (outputIndex >= targetOutputRecords) {
          nextLeftRecordToProcess++;

          // no more space left in the batch, stop processing
          break;
        }
      } else {
        // reset match indicator if matching record was found
        rightRecordMatched = false;
      }
    }

    // update iteration status tracker with actual index and match counters
    tracker.update(nextRightBatchToProcess, nextRightRecordToProcess, nextLeftRecordToProcess, rightRecordMatched);
    return outputIndex;
  }

  /**
   * Utility method to clear the memory in the left input batch once we have completed processing it.
   * Resets some internal state which indicates the next records to process in the left and right batches,
   * also fetches the next left input batch.
   */
  private void resetAndGetNextLeft(int outputIndex) {
    for (VectorWrapper<?> vw : left) {
      vw.getValueVector().clear();
    }
    tracker.reset();
    RecordBatch.IterOutcome leftOutcome = outgoing.next(NestedLoopJoinBatch.LEFT_INPUT, left);
    switch (leftOutcome) {
      case OK_NEW_SCHEMA:
        throw new DrillRuntimeException("Nested loop join does not handle schema change. Schema change" +
            " found on the left side of NLJ.");
      case NONE:
      case NOT_YET:
      case STOP:
        leftRecordCount = 0;
        break;
      case OK:
        outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex);
        setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
        RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
          outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
          outgoing.getRecordBatchStatsContext());
        leftRecordCount = left.getRecordCount();
        break;
    }
  }

  public abstract void doSetup(@Named("context") FragmentContext context,
                               @Named("rightContainer") VectorContainer rightContainer,
                               @Named("leftBatch") RecordBatch leftBatch,
                               @Named("outgoing") RecordBatch outgoing);

  public abstract void emitRight(@Named("batchIndex") int batchIndex,
                                 @Named("recordIndexWithinBatch") int recordIndexWithinBatch,
                                 @Named("outIndex") int outIndex);

  public abstract void emitLeft(@Named("leftIndex") int leftIndex,
                                @Named("outIndex") int outIndex);

  protected abstract boolean doEval(@Named("leftIndex") int leftIndex,
                                    @Named("rightBatchIndex") int batchIndex,
                                    @Named("rightRecordIndexWithinBatch") int recordIndexWithinBatch);

  /**
   * Helper class to track position of left and record batches during iteration
   * and match status of record from the right batch.
   */
  private static class IterationStatusTracker {
    // Next right batch to process
    private int nextRightBatchToProcess;
    // Next record in the current right batch to process
    private int nextRightRecordToProcess;
    // Next record in the left batch to process
    private int nextLeftRecordToProcess;
    // Flag to indicate if record from the left found matching record from the right, applicable during left join
    private boolean rightRecordMatched;

    int getNextRightBatchToProcess() {
      return nextRightBatchToProcess;
    }

    boolean isRightRecordMatched() {
      return rightRecordMatched;
    }

    int getNextLeftRecordToProcess() {
      return nextLeftRecordToProcess;
    }

    int getNextRightRecordToProcess() {
      return nextRightRecordToProcess;
    }

    void update(int nextRightBatchToProcess,
                int nextRightRecordToProcess,
                int nextLeftRecordToProcess,
                boolean rightRecordMatchFound) {
      this.nextRightBatchToProcess = nextRightBatchToProcess;
      this.nextRightRecordToProcess = nextRightRecordToProcess;
      this.nextLeftRecordToProcess = nextLeftRecordToProcess;
      this.rightRecordMatched = rightRecordMatchFound;
    }

    void reset() {
      nextRightBatchToProcess = nextRightRecordToProcess = nextLeftRecordToProcess = 0;
      rightRecordMatched = false;
    }

  }
}