aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
blob: 64f225ca583b2455164b43aebd387c276e1e6927 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.record;

import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;

import java.util.List;

public class RecordBatchMemoryManager {
  protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
  protected static final int MIN_NUM_ROWS = 1;
  protected static final int DEFAULT_INPUT_INDEX = 0;
  private int outputRowCount = MAX_NUM_ROWS;
  private int outgoingRowWidth;
  private int outputBatchSize;
  private RecordBatchSizer[] sizer;
  private BatchStats[] inputBatchStats;
  private BatchStats outputBatchStats;

  // By default, we expect one input batch stream and one output batch stream.
  // Some operators can get multiple input batch streams i.e. for example
  // joins get 2 batches (left and right). Merge Receiver can get more than 2.
  private int numInputs = 1;

  private class BatchStats {
    /**
     * operator metric stats
     */
    private long numBatches;
    private long sumBatchSizes;
    private long totalRecords;

    public long getNumBatches() {
      return numBatches;
    }

    public long getTotalRecords() {
      return totalRecords;
    }

    public long getAvgBatchSize() {
      return RecordBatchSizer.safeDivide(sumBatchSizes, numBatches);
    }

    public long getAvgRowWidth() {
      return RecordBatchSizer.safeDivide(sumBatchSizes, totalRecords);
    }

    public void incNumBatches() {
      ++numBatches;
    }

    public void incSumBatchSizes(long batchSize) {
      sumBatchSizes += batchSize;
    }

    public void incTotalRecords(long numRecords) {
      totalRecords += numRecords;
    }

  }

  public long getNumOutgoingBatches() {
    return outputBatchStats.getNumBatches();
  }

  public long getTotalOutputRecords() {
    return outputBatchStats.getTotalRecords();
  }

  public long getAvgOutputBatchSize() {
    return outputBatchStats.getAvgBatchSize();
  }

  public long getAvgOutputRowWidth() {
    return outputBatchStats.getAvgRowWidth();
  }

  public long getNumIncomingBatches() {
    return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getNumBatches();
  }

  public long getAvgInputBatchSize() {
    return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getAvgBatchSize();
  }

  public long getAvgInputRowWidth() {
    return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getAvgRowWidth();
  }

  public long getTotalInputRecords() {
    return inputBatchStats[DEFAULT_INPUT_INDEX] == null ? 0 : inputBatchStats[DEFAULT_INPUT_INDEX].getTotalRecords();
  }

  public long getNumIncomingBatches(int index) {
    Preconditions.checkArgument(index >= 0 && index < numInputs);
    return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getNumBatches();
  }

  public long getAvgInputBatchSize(int index) {
    Preconditions.checkArgument(index >= 0 && index < numInputs);
    return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getAvgBatchSize();
  }

  public long getAvgInputRowWidth(int index) {
    Preconditions.checkArgument(index >= 0 && index < numInputs);
    return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getAvgRowWidth();
  }

  public long getTotalInputRecords(int index) {
    Preconditions.checkArgument(index >= 0 && index < numInputs);
    return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getTotalRecords();
  }

  public RecordBatchMemoryManager(int numInputs, int configuredOutputSize) {
    this.numInputs = numInputs;
    this.outputBatchSize = configuredOutputSize;
    sizer = new RecordBatchSizer[numInputs];
    inputBatchStats = new BatchStats[numInputs];
    outputBatchStats = new BatchStats();
  }

  public RecordBatchMemoryManager(int configuredOutputSize) {
    this.outputBatchSize = configuredOutputSize;
    sizer = new RecordBatchSizer[numInputs];
    inputBatchStats = new BatchStats[numInputs];
    outputBatchStats = new BatchStats();
  }

  public int update(int inputIndex, int outputPosition) {
    // by default just return the outputRowCount
    return getOutputRowCount();
  }

  public void update(int inputIndex) {
  }

  public void update() {};

  public void update(RecordBatch recordBatch) {
  }

  public void update(RecordBatch recordBatch, int index) {
    // Get sizing information for the batch.
    setRecordBatchSizer(index, new RecordBatchSizer(recordBatch));
    setOutgoingRowWidth(getRecordBatchSizer(index).getNetRowWidth());
    // Number of rows in outgoing batch
    setOutputRowCount(getOutputBatchSize(), getRecordBatchSizer(index).getNetRowWidth());
    updateIncomingStats(index);
  }

  public int update(int inputIndex, int outputPosition, boolean useAggregate) {
    // by default just return the outputRowCount
    return getOutputRowCount();
  }

  public int update(RecordBatch batch, int inputIndex, int outputPosition) {
    return getOutputRowCount();
  }

  public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
    return getOutputRowCount();
  }

  public boolean updateIfNeeded(int newOutgoingRowWidth) {
    // We do not want to keep adjusting batch holders target row count
    // for small variations in row width.
    // If row width changes, calculate actual adjusted row count i.e. row count
    // rounded down to nearest power of two and do nothing if that does not change.
    if (newOutgoingRowWidth == outgoingRowWidth ||
      computeOutputRowCount(outputBatchSize, newOutgoingRowWidth) == computeOutputRowCount(outputBatchSize, outgoingRowWidth)) {
      return false;
    }

    // Set number of rows in outgoing batch. This number will be used for new batch creation.
    setOutputRowCount(outputBatchSize, newOutgoingRowWidth);
    setOutgoingRowWidth(newOutgoingRowWidth);
    return true;
  }

  public int getOutputRowCount() {
    return outputRowCount;
  }

  /**
   * Given batchSize and rowWidth, this will set output rowCount taking into account
   * the min and max that is allowed.
   */
  public void setOutputRowCount(int targetBatchSize, int rowWidth) {
    this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth));
  }

  public void setOutputRowCount(int outputRowCount) {
    this.outputRowCount = outputRowCount;
  }

  /**
   * This will adjust rowCount taking into account the min and max that is allowed.
   * We will round down to nearest power of two - 1 for better memory utilization.
   * -1 is done for adjusting accounting for offset vectors.
   */
  public static int adjustOutputRowCount(int rowCount) {
    return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS)));
  }

  public static int computeOutputRowCount(int batchSize, int rowWidth) {
    return adjustOutputRowCount(RecordBatchSizer.safeDivide(batchSize, rowWidth));
  }

  public void setOutgoingRowWidth(int outgoingRowWidth) {
    this.outgoingRowWidth = outgoingRowWidth;
  }

  public int getOutgoingRowWidth() {
    return outgoingRowWidth;
  }

  public void setRecordBatchSizer(int index, RecordBatchSizer sizer) {
    Preconditions.checkArgument(index >= 0 && index < numInputs);
    this.sizer[index] = sizer;
    if (inputBatchStats[index] == null) {
      inputBatchStats[index] = new BatchStats();
    }
  }

  public void setRecordBatchSizer(RecordBatchSizer sizer) {
    setRecordBatchSizer(DEFAULT_INPUT_INDEX, sizer);
  }

  public RecordBatchSizer getRecordBatchSizer(int index) {
    Preconditions.checkArgument(index >= 0 && index < numInputs);
    return sizer[index];
  }

  public RecordBatchSizer getRecordBatchSizer() {
    return sizer[DEFAULT_INPUT_INDEX];
  }

  public RecordBatchSizer.ColumnSize getColumnSize(int index, String name) {
    Preconditions.checkArgument(index >= 0 && index < numInputs);
    return sizer[index].getColumn(name);
  }

  public RecordBatchSizer.ColumnSize getColumnSize(String name) {
    for (int index = 0; index < numInputs; index++) {
      if (sizer[index] == null || sizer[index].getColumn(name) == null) {
        continue;
      }
      return sizer[index].getColumn(name);
    }
    return null;
  }

  public void updateIncomingStats(int index) {
    Preconditions.checkArgument(index >= 0 && index < numInputs);
    Preconditions.checkArgument(inputBatchStats[index] != null);
    inputBatchStats[index].incNumBatches();
    inputBatchStats[index].incSumBatchSizes(sizer[index].getNetBatchSize());
    inputBatchStats[index].incTotalRecords(sizer[index].rowCount());
  }

  public void updateIncomingStats() {
    inputBatchStats[DEFAULT_INPUT_INDEX].incNumBatches();
    inputBatchStats[DEFAULT_INPUT_INDEX].incSumBatchSizes(sizer[DEFAULT_INPUT_INDEX].getNetBatchSize());
    inputBatchStats[DEFAULT_INPUT_INDEX].incTotalRecords(sizer[DEFAULT_INPUT_INDEX].rowCount());
  }

  public void updateOutgoingStats(int outputRecords) {
    outputBatchStats.incNumBatches();
    outputBatchStats.incTotalRecords(outputRecords);
    outputBatchStats.incSumBatchSizes(outgoingRowWidth * outputRecords);
  }

  public int getOutputBatchSize() {
    return outputBatchSize;
  }

  public int getOffsetVectorWidth() {
    return UInt4Vector.VALUE_WIDTH;
  }

  public void allocateVectors(VectorContainer container, int recordCount) {
    // Allocate memory for the vectors.
    // This will iteratively allocate memory for all nested columns underneath.
    for (VectorWrapper w : container) {
      RecordBatchSizer.ColumnSize colSize = getColumnSize(w.getField().getName());
      colSize.allocateVector(w.getValueVector(), recordCount);
    }
    container.setRecordCount(0);
  }

  public void allocateVectors(List<ValueVector> valueVectors, int recordCount) {
    // Allocate memory for the vectors.
    // This will iteratively allocate memory for all nested columns underneath.
    for (ValueVector v : valueVectors) {
      RecordBatchSizer.ColumnSize colSize = getColumnSize(v.getField().getName());
      colSize.allocateVector(v, recordCount);
    }
  }

  public void allocateVectors(VectorContainer container) {
    allocateVectors(container, outputRowCount);
  }

  public void allocateVectors(List<ValueVector> valueVectors) {
    allocateVectors(valueVectors, outputRowCount);
  }
}