aboutsummaryrefslogtreecommitdiff
path: root/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
blob: 921cb002e8983b9b8b9ef3e1e4254ca4cd8e5adf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.vector.accessor.writer;

import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter;

/**
 * Base class for writers for fixed-width vectors. Handles common
 * tasks, leaving the generated code to handle only type-specific
 * operations.
 */

public abstract class AbstractFixedWidthWriter extends BaseScalarWriter {

  public static abstract class BaseFixedWidthWriter extends AbstractFixedWidthWriter {

    /**
     * Buffer of zeros used to back-fill vector buffers with
     * zeros.
     */

    private static final byte ZERO_BUF[] = new byte[256];

    /**
     * Determine the write index, growing, overflowing and back-filling
     * the vector as needed.
     * <p>
     * This is a bit tricky. This method has side effects, by design.
     * The current vector buffer, and buffer address, will change in
     * this method when a vector grows or overflows. So, don't use this
     * method in inline calls of the form<br><code>
     * vector.getBuffer().doSomething(prepareWrite());</code></br>
     * The buffer obtained by <tt>getBuffer()</tt> can be different than
     * the current buffer after <tt>prepareWrite()</tt>.
     *
     * @return the index at which to write the current value
     */

    protected final int prepareWrite() {

      // "Fast path" for the normal case of no fills, no overflow.
      // This is the only bounds check we want to do for the entire
      // set operation.

      // This is performance critical code; every operation counts.
      // Please be thoughtful when changing the code.

      int writeIndex = vectorIndex.vectorIndex();
      if (lastWriteIndex + 1 < writeIndex || writeIndex >= capacity) {
        writeIndex = prepareWrite(writeIndex);
      }

      // Track the last write location for zero-fill use next time around.

      lastWriteIndex = writeIndex;
      return writeIndex;
    }

    protected final int prepareWrite(int writeIndex) {

      // Either empties must be filed or the vector is full.

      writeIndex = resize(writeIndex);

      // Fill empties to the write position.

      fillEmpties(writeIndex);
      return writeIndex;
    }

    /**
     * Fill empties. This is required because the allocated memory is not
     * zero-filled.
     */

    @Override
    protected final void fillEmpties(final int writeIndex) {
      final int width = width();
      final int stride = ZERO_BUF.length / width;
      int dest = lastWriteIndex + 1;
      while (dest < writeIndex) {
        int length = writeIndex - dest;
        length = Math.min(length, stride);
        drillBuf.setBytes(dest * width, ZERO_BUF, 0, length * width);
        dest += length;
      }
    }
  }

  /**
   * The largest position to which the writer has written data. Used to allow
   * "fill-empties" (AKA "back-fill") of missing values one each value write
   * and at the end of a batch. Note that this is the position of the last
   * write, not the next write position. Starts at -1 (no last write).
   */

  protected int lastWriteIndex;

  @Override
  public void startWrite() {
    setBuffer();
    lastWriteIndex = -1;
  }

  public abstract int width();

  @Override
  protected final void setBuffer() {
    drillBuf = vector().getBuffer();
    capacity = drillBuf.capacity() / width();
  }

  protected final void mandatoryResize(final int writeIndex) {
    if (writeIndex < capacity) {
      return;
    }

    // Since some vectors start off as 0 length, set a
    // minimum size to avoid silly thrashing on early rows.

    final int size = BaseAllocator.nextPowerOfTwo(
        Math.max((writeIndex + 1) * width(), MIN_BUFFER_SIZE));
    realloc(size);
  }

  protected final int resize(final int writeIndex) {
    if (writeIndex < capacity) {
      return writeIndex;
    }
    final int width = width();

    // Since some vectors start off as 0 length, set a
    // minimum size to avoid silly thrashing on early rows.

    final int size = BaseAllocator.nextPowerOfTwo(
        Math.max((writeIndex + 1) * width, MIN_BUFFER_SIZE));

    // Two cases: grow this vector or allocate a new one.

    // Grow the vector -- or overflow if the growth would make the batch
    // consume too much memory. The idea is that we grow vectors as they
    // fit the available memory budget, then we fill those vectors until
    // one of them needs more space. At that point we trigger overflow to
    // a new set of vectors. Internal fragmentation will result, but this
    // approach (along with proper initial vector sizing), minimizes that
    // fragmentation.

    if (size <= ValueVector.MAX_BUFFER_SIZE &&
        canExpand(size - capacity * width)) {

      // Optimized form of reAlloc() which does not zero memory, does not do
      // bounds checks (since they were already done above). The write index
      // and offset remain unchanged.

      realloc(size);
    } else {

      // Allocate a new vector, or throw an exception if overflow is not
      // supported. If overflow is supported, the callback will call
      // endWrite(), which will fill empties, so no need to do that here.
      // The call to endWrite() will also set the final writer index for the
      // current vector. Then, bindVector() will be called to provide the new
      // vector. The write index changes with the new vector.

      overflowed();
    }

    // Call to resize may cause rollover, so reset write index
    // afterwards.

    return vectorIndex.vectorIndex();
  }

  @Override
  public int lastWriteIndex() { return lastWriteIndex; }

  /**
   * For internal use only to update the write position on those
   * very rare occasions in which the vector is written to outside
   * of this writer framework. Not to be called by application code!
   *
   * @param index new last write index
   */

  public void setLastWriteIndex(int index) {
    lastWriteIndex = index;
  }

  @Override
  public void skipNulls() {

    // Pretend we've written up to the previous value.
    // This will leave null values (as specified by the
    // caller) uninitialized.

    lastWriteIndex = vectorIndex.vectorIndex() - 1;
  }

  @Override
  public void restartRow() {
    lastWriteIndex = Math.min(lastWriteIndex, vectorIndex.vectorIndex() - 1);
  }

  @Override
  public void preRollover() {
    setValueCount(vectorIndex.rowStartIndex());
  }

  @Override
  public void postRollover() {
    int newIndex = Math.max(lastWriteIndex - vectorIndex.rowStartIndex(), -1);
    startWrite();
    lastWriteIndex = newIndex;
  }

  @Override
  public void endWrite() {
    setValueCount(vectorIndex.vectorIndex());
  }

  protected abstract void fillEmpties(int writeIndex);

  public void setValueCount(int valueCount) {

    // Done this way to avoid another drill buf access in value set path.
    // Though this calls writeOffset(), which handles vector overflow,
    // such overflow should never occur because here we are simply
    // finalizing a position already set. However, the vector size may
    // grow and the "missing" values may be zero-filled. Note that, in
    // odd cases, the call to writeOffset() might cause the vector to
    // resize (as part of filling empties), so grab the buffer AFTER
    // the call to writeOffset().

    mandatoryResize(valueCount - 1);
    fillEmpties(valueCount);
    vector().getBuffer().writerIndex(valueCount * width());

    // Last write index is either the last value we just filled,
    // or it is the last actual write, if this is an overflow
    // situation.

    lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1);
  }

  @Override
  public void dump(HierarchicalFormatter format) {
    format.extend();
    super.dump(format);
    format
      .attribute("lastWriteIndex", lastWriteIndex)
      .endObject();
  }
}