aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
blob: a06e2c35d952bdd653979db06409955aa38997f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.record;

import java.util.Set;

public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);

  private int rowWidth[];
  private RecordBatch recordBatch[];
  private Set<String> columnsToExclude;

  private static final int numInputs = 2;
  public static final int LEFT_INDEX = 0;
  public static final int RIGHT_INDEX = 1;

  public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch,
                                RecordBatch rightBatch, Set<String> excludedColumns) {
    super(numInputs, outputBatchSize);
    recordBatch = new RecordBatch[numInputs];
    recordBatch[LEFT_INDEX] = leftBatch;
    recordBatch[RIGHT_INDEX] = rightBatch;
    rowWidth = new int[numInputs];
    this.columnsToExclude = excludedColumns;
  }

  /**
   * Update the memory manager parameters based on the new incoming batch
   *
   * Notice three (possibly) different "row counts" for the outgoing batches:
   *
   *  1. The rowCount that the current outgoing batch was allocated with (always a power of 2; e.g. 8192)
   *  2. The new rowCount computed based on the newly seen input rows (always a power of 2); may be bigger than (1) if the
   *     new input rows are much smaller than before (e.g. 16384), or smaller (e.g. 4096) if the new rows are much wider.
   *     Subsequent outgoing batches would be allocated based on this (2) new rowCount.
   *  3. The target rowCount for the current outgoing batch. While initially (1), it may be resized down if the new rows
   *     are getting bigger. In any case it won't be resized above (1) (to avoid IOOB) or below the current number of rows
   *     in that batch (i.e., outputPosition). (Need not be a power of two; e.g., 7983).
   *
   *  After every call to update() while the outgoing batch is active, the current target should be updated with (3) by
   *  calling getCurrentOutgoingMaxRowCount() .
   *
   *  Comment: The "power of 2" in the above (1) and (2) is actually "power of 2 minus 1" (e.g. 65535, or 8191) in order
   *  to avoid memory waste in case offset vectors are used (see DRILL-5446)
   *
   * @param inputIndex  Left (0) or Right (1)
   * @param outputPosition  Position (i.e. number of inserted rows) in the current output batch
   * @param useAggregate If true, compute using average row width (else based on allocated sizes)
   */
  private void updateInternal(int inputIndex, int outputPosition,  boolean useAggregate) {
    updateIncomingStats(inputIndex);
    rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();

    // Reduce the width of excluded columns from actual rowWidth
    for (String columnName : columnsToExclude) {
      final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName);
      if (currentColSizer == null) {
        continue;
      }
      rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry();
    }

    // Get final net outgoing row width after reducing the excluded columns width
    int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];

    // If outgoing row width is 0 or there is no change in outgoing row width, just return.
    // This is possible for empty batches or
    // when first set of batches come with OK_NEW_SCHEMA and no data.
    if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) {
      return;
    }

    // Adjust for the current batch.
    // calculate memory used so far based on previous outgoing row width and how many rows we already processed.
    final int previousOutgoingWidth = getOutgoingRowWidth();
    final long memoryUsed = outputPosition * previousOutgoingWidth;

    final int configOutputBatchSize = getOutputBatchSize();
    // This is the remaining memory.
    final long remainingMemory = Math.max(configOutputBatchSize - memoryUsed, 0);

    // These are number of rows we can fit in remaining memory based on new outgoing row width.
    final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);

    final int currentOutputBatchRowCount = getOutputRowCount();

    // update the value to be used for next batch(es)
    setOutputRowCount(configOutputBatchSize, newOutgoingRowWidth);

    // set the new row width
    setOutgoingRowWidth(newOutgoingRowWidth);

    int newOutputRowCount = getOutputRowCount();

    if ( currentOutputBatchRowCount != newOutputRowCount ) {
      logger.debug("Memory manager update changed the output row count from {} to {}",currentOutputBatchRowCount,newOutputRowCount);
    }

    // The current outgoing batch target count (i.e., max number of rows to put there) is modified to be the current number of rows there
    // plus as many of the future new rows that would fit in the remaining memory (e.g., if the new rows are wider, fewer would fit), but
    // in any case no larger than the size the batch was allocated for (to avoid IOOB on the allocated vectors)
    setCurrentOutgoingMaxRowCount(Math.min(currentOutputBatchRowCount, outputPosition + numOutputRowsRemaining ));
  }

  /**
   * Update the memory manager parameters based on the new incoming batch
   *
   * @param inputIndex Left (0) or Right (1)
   * @param outputPosition Position (i.e. number of inserted rows) in the output batch
   * @param useAggregate Compute using average row width (else based on allocated sizes)
   */
  @Override
  public void update(int inputIndex, int outputPosition, boolean useAggregate) {
    setRecordBatchSizer(inputIndex, new RecordBatchSizer(recordBatch[inputIndex]));
    updateInternal(inputIndex, outputPosition, useAggregate);
  }

  /**
   * Update the memory manager parameters based on the new incoming batch (based on allocated sizes, not average row size)
   *
   * @param inputIndex Left (0) or Right (1)
   * @param outputPosition Position (i.e. number of inserted rows) in the output batch
   */
  @Override
  public void update(int inputIndex, int outputPosition) {
    update(inputIndex, outputPosition, false);
  }

  /**
   * Update the memory manager parameters based on the given (incoming) batch
   *
   * @param batch Update based on the data in this batch
   * @param inputIndex Left (0) or Right (1)
   * @param outputPosition Position (i.e. number of inserted rows) in the output batch
   * @param useAggregate Compute using average row width (else based on allocated sizes)
   */
  @Override
  public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
    setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch));
    updateInternal(inputIndex, outputPosition, useAggregate);
  }

  /**
   * Update the memory manager parameters based on the given (incoming) batch (based on allocated sizes, not average row size)
   *
   * @param batch Update based on the data in this batch
   * @param inputIndex Left (0) or Right (1)
   * @param outputPosition Position (i.e. number of inserted rows) in the output batch
   */
  @Override
  public void update(RecordBatch batch, int inputIndex, int outputPosition) {
    update(batch, inputIndex, outputPosition, false);
  }
}