aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestPartitionRace.java
blob: 6e98339ee8b11c63593d6062d8a8c0549c3e53f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.store.easy.text.compliant;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.rowSet.DirectRowSet;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.RowSetReader;
import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.BeforeClass;
import org.junit.Test;

/**
 * Demonstrates a race condition inherent in the way that partition
 * columns are currently implemented. Two files: one at the root directory,
 * one down one level. Parallelization is forced to two. (Most tests use
 * small files and both files end up being read in the same scanner, which
 * masks the problem shown here.)
 * <p>
 * Depending on which file is read first, the output row may start with
 * or without the partition column. Once the column occurs, it will
 * persist.
 * <p>
 * The solution is to figure out the max partition depth in the
 * EasySubScan rather than in each scan operator.
 * <p>
 * The tests here test both the "V2" (AKA "new text reader") which has
 * many issues, and the "V3" (row-set-based version) that has fixes.
 * <p>
 * See DRILL-7082 for the multi-scan race (fixed in V3), and
 * DRILL-7083 for the problem with partition columns returning nullable INT
 * (also fixed in V3.)
 */

public class TestPartitionRace extends BaseCsvTest {

  @BeforeClass
  public static void setup() throws Exception {
    BaseCsvTest.setup(false,  true, 2);

    // Two-level partitioned table

    File rootDir = new File(testDir, PART_DIR);
    rootDir.mkdir();
    buildFile(new File(rootDir, "first.csv"), validHeaders);
    File nestedDir = new File(rootDir, NESTED_DIR);
    nestedDir.mkdir();
    buildFile(new File(nestedDir, "second.csv"), secondFile);
  }

  /**
   * Oddly, when run in a single fragment, the files occur in a
   * stable order, the partition always appars, and it appears in
   * the first column position.
   */
  @Test
  public void testSingleScanV2() throws IOException {
    String sql = "SELECT * FROM `dfs.data`.`%s`";

    try {
      enableV3(false);

      // Loop to run the query 10 times, or until we see the race

      boolean sawMissingPartition = false;
      boolean sawPartitionFirst = false;
      boolean sawPartitionLast = false;

      // Read the two batches.

      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
      for (int j = 0; j < 2; j++) {
        assertTrue(iter.hasNext());
        RowSet rowSet = iter.next();

        // Check location of partition column

        int posn = rowSet.schema().index("dir0");
        if (posn == -1) {
          sawMissingPartition = true;
        } else if (posn == 0) {
          sawPartitionFirst = true;
        } else {
          sawPartitionLast = true;
        }
        rowSet.clear();
      }
      assertFalse(iter.hasNext());

      // When run in a single fragment, the partition column appears
      // all the time, and is in the first column position.

      assertFalse(sawMissingPartition);
      assertTrue(sawPartitionFirst);
      assertFalse(sawPartitionLast);
    } finally {
      resetV3();
      client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY);
    }
  }

  /**
   * V3 provides the same schema for the single- and multi-scan
   * cases.
   */
  @Test
  public void testSingleScanV3() throws IOException {
    String sql = "SELECT * FROM `dfs.data`.`%s`";

    TupleMetadata expectedSchema = new SchemaBuilder()
        .add("a", MinorType.VARCHAR)
        .add("b", MinorType.VARCHAR)
        .add("c", MinorType.VARCHAR)
        .addNullable("dir0", MinorType.VARCHAR)
        .buildSchema();

    try {
      enableV3(true);

      // Loop to run the query 10 times to verify no race

      // First batch is empty; just carries the schema.

      Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
      assertTrue(iter.hasNext());
      RowSet rowSet = iter.next();
      assertEquals(0, rowSet.rowCount());
      rowSet.clear();

      // Read the two batches.

      for (int j = 0; j < 2; j++) {
        assertTrue(iter.hasNext());
        rowSet = iter.next();

        // Figure out which record this is and test accordingly.

        RowSetReader reader = rowSet.reader();
        assertTrue(reader.next());
        String col1 = reader.scalar("a").getString();
        if (col1.equals("10")) {
          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
              .addRow("10", "foo", "bar", null)
              .build();
          RowSetUtilities.verify(expected, rowSet);
        } else {
          RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
              .addRow("20", "fred", "wilma", NESTED_DIR)
              .build();
          RowSetUtilities.verify(expected, rowSet);
        }
      }
      assertFalse(iter.hasNext());
    } finally {
      resetV3();
    }
  }

  /**
   * When forced to run in two fragments, the fun really starts. The
   * partition column (usually) appears in the last column position instead
   * of the first. The partition may or may not occur in the first row
   * depending on which file is read first. The result is that the
   * other columns will jump around. If we tried to create an expected
   * result set, we'd be frustrated because the schema randomly changes.
   * <p>
   * Just to be clear: this behavior is a bug, not a feature. But, it is
   * an established baseline for the "V2" reader.
   * <p>
   * This is really a test (demonstration) of the wrong behavior. This test
   * is pretty unreliable. In particular, the position of the partition column
   * seems to randomly shift from first to last position across runs.
   */
  @Test
  public void testRaceV2() throws IOException {
    String sql = "SELECT * FROM `dfs.data`.`%s`";

    try {
      enableV3(false);

      // Special test-only feature to force even small scans
      // to use more than one thread. Requires that the max
      // parallelization option be set when starting the cluster.

      client.alterSession(ExecConstants.MIN_READER_WIDTH_KEY, 2);

      // Loop to run the query 10 times, or until we see the race

      boolean sawRootFirst = false;
      boolean sawNestedFirst = false;
      boolean sawMissingPartition = false;
      boolean sawPartitionFirst = false;
      boolean sawPartitionLast = false;
      for (int i = 0; i < 10; i++) {

        // Read the two batches.

        Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
        for (int j = 0; j < 2; j++) {
          assertTrue(iter.hasNext());
          RowSet rowSet = iter.next();

          // Check location of partition column

          int posn = rowSet.schema().index("dir0");
          if (posn == -1) {
            sawMissingPartition = true;
          } else if (posn == 0) {
            sawPartitionFirst = true;
          } else {
            sawPartitionLast = true;
          }

          // Figure out which record this is and test accordingly.

          RowSetReader reader = rowSet.reader();
          assertTrue(reader.next());
          String col1 = reader.scalar("a").getString();
          if (col1.equals("10")) {
            if (i == 0) {
              sawRootFirst = true;
            }
          } else {
            if (i == 0) {
              sawNestedFirst = true;
            }
          }
          rowSet.clear();
        }
        assertFalse(iter.hasNext());
        if (sawMissingPartition &&
            sawPartitionFirst &&
            sawPartitionLast &&
            sawRootFirst &&
            sawNestedFirst) {
          // The following should appear most of the time.
          System.out.println("All variations occurred");
          return;
        }
      }

      // If you see this, maybe something got fixed. Or, maybe the
      // min parallelization hack above stopped working.
      // Or, you were just unlucky and can try the test again.
      // We print messages, rather than using assertTrue, to avoid
      // introducing a flaky test.

      System.out.println("Some variations did not occur");
      System.out.println(String.format("Missing partition: %s", sawMissingPartition));
      System.out.println(String.format("Partition first: %s", sawPartitionFirst));
      System.out.println(String.format("Partition last: %s", sawPartitionLast));
      System.out.println(String.format("Outer first: %s", sawRootFirst));
      System.out.println(String.format("Nested first: %s", sawNestedFirst));
    } finally {
      resetV3();
      client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY);
    }
  }

  /**
   * V3 computes partition depth in the group scan (which sees all files), and
   * so the partition column count does not vary across scans. Also, V3 puts
   * partition columns at the end of the row so that data columns don't
   * "jump around" when files are shifted to a new partition depth.
   */
  @Test
  public void testNoRaceV3() throws IOException {
    String sql = "SELECT * FROM `dfs.data`.`%s`";

    TupleMetadata expectedSchema = new SchemaBuilder()
        .add("a", MinorType.VARCHAR)
        .add("b", MinorType.VARCHAR)
        .add("c", MinorType.VARCHAR)
        .addNullable("dir0", MinorType.VARCHAR)
        .buildSchema();

    try {
      enableV3(true);
      client.alterSession(ExecConstants.MIN_READER_WIDTH_KEY, 2);

      // Loop to run the query 10 times or until we see both files
      // in the first position.

      boolean sawRootFirst = false;
      boolean sawNestedFirst = false;
      for (int i = 0; i < 10; i++) {

        // First batch is empty; just carries the schema.

        Iterator<DirectRowSet> iter = client.queryBuilder().sql(sql, PART_DIR).rowSetIterator();
        assertTrue(iter.hasNext());
        RowSet rowSet = iter.next();
        assertEquals(0, rowSet.rowCount());
        rowSet.clear();

        // Read the two batches.

        for (int j = 0; j < 2; j++) {
          assertTrue(iter.hasNext());
          rowSet = iter.next();

          // Figure out which record this is and test accordingly.

          RowSetReader reader = rowSet.reader();
          assertTrue(reader.next());
          String col1 = reader.scalar("a").getString();
          if (col1.equals("10")) {
            if (i == 0) {
              sawRootFirst = true;
            }
            RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
                .addRow("10", "foo", "bar", null)
                .build();
            RowSetUtilities.verify(expected, rowSet);
          } else {
            if (i == 0) {
              sawNestedFirst = true;
            }
            RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
                .addRow("20", "fred", "wilma", NESTED_DIR)
                .build();
            RowSetUtilities.verify(expected, rowSet);
          }
        }
        assertFalse(iter.hasNext());
        if (sawRootFirst &&
            sawNestedFirst) {
          // The following should appear most of the time.
          System.out.println("Both variations occurred");
          return;
        }
      }

      // If you see this, maybe something got fixed. Or, maybe the
      // min parallelization hack above stopped working.
      // Or, you were just unlucky and can try the test again.
      // We print messages, rather than using assertTrue, to avoid
      // introducing a flaky test.

      System.out.println("Some variations did not occur");
      System.out.println(String.format("Outer first: %s", sawRootFirst));
      System.out.println(String.format("Nested first: %s", sawNestedFirst));
    } finally {
      resetV3();
      client.resetSession(ExecConstants.MIN_READER_WIDTH_KEY);
    }
  }
}