aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
blob: b45acfbc8c401e7366c49659dcaccbbc0db2cf27 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl;

import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;

import org.apache.commons.lang3.StringUtils;
import org.apache.drill.PlanTestBase;
import org.apache.drill.test.TestBuilder;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.config.UnorderedDeMuxExchange;
import org.apache.drill.exec.physical.config.HashToRandomExchange;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.util.Utilities;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.PrintWriter;
import java.nio.file.Paths;
import java.util.List;

import static org.apache.drill.exec.planner.physical.HashPrelUtil.HASH_EXPR_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
 * This test starts a Drill cluster with CLUSTER_SIZE nodes and generates data for test tables.
 *
 * Tests queries involve HashToRandomExchange (group by and join) and test the following.
 *   1. Plan that has mux and demux exchanges inserted
 *   2. Run the query and check the output record count
 *   3. Take the plan we got in (1), use SimpleParallelizer to get PlanFragments and test that the number of
 *   partition senders in a major fragment is not more than the number of Drillbit nodes in cluster and there exists
 *   at most one partition sender per Drillbit.
 */
public class TestLocalExchange extends PlanTestBase {
  private static final int CLUSTER_SIZE = 3;
  private static final String MUX_EXCHANGE = "\"unordered-mux-exchange\"";
  private static final String DEMUX_EXCHANGE = "\"unordered-demux-exchange\"";
  private static final String MUX_EXCHANGE_CONST = "unordered-mux-exchange";
  private static final String DEMUX_EXCHANGE_CONST = "unordered-demux-exchange";
  private static final String HASH_EXCHANGE = "hash-to-random-exchange";
  private static final String EMPT_TABLE = "empTable";
  private static final String DEPT_TABLE = "deptTable";
  private static final UserSession USER_SESSION = UserSession.Builder.newBuilder()
      .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
      .build();

  private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(
      1 /*parallelizationThreshold (slice_count)*/,
      6 /*maxWidthPerNode*/,
      1000 /*maxGlobalWidth*/,
      1.2 /*affinityFactor*/);

  private final static int NUM_DEPTS = 40;
  private final static int NUM_EMPLOYEES = 1000;
  private final static int NUM_MNGRS = 1;
  private final static int NUM_IDS = 1;

  private static String groupByQuery;
  private static String joinQuery;

  private static String[] joinQueryBaselineColumns;
  private static String[] groupByQueryBaselineColumns;

  private static List<Object[]> groupByQueryBaselineValues;
  private static List<Object[]> joinQueryBaselineValues;

  @BeforeClass
  public static void setupClusterSize() {
    updateTestCluster(CLUSTER_SIZE, null);
  }

  /**
   * Generate data for two tables. Each table consists of several JSON files.
   */
  @BeforeClass
  public static void generateTestDataAndQueries() throws Exception {
    // Table 1 consists of two columns "emp_id", "emp_name" and "dept_id"
    final File empTableLocation = dirTestWatcher.makeRootSubDir(Paths.get(EMPT_TABLE));

    // Write 100 records for each new file
    final int empNumRecsPerFile = 100;
    for(int fileIndex=0; fileIndex<NUM_EMPLOYEES/empNumRecsPerFile; fileIndex++) {
      File file = new File(empTableLocation, fileIndex + ".json");
      PrintWriter printWriter = new PrintWriter(file);
      for (int recordIndex = fileIndex*empNumRecsPerFile; recordIndex < (fileIndex+1)*empNumRecsPerFile; recordIndex++) {
        String record = String.format("{ \"emp_id\" : %d, \"emp_name\" : \"Employee %d\", \"dept_id\" : %d, \"mng_id\" : %d, \"some_id\" : %d }",
            recordIndex, recordIndex, recordIndex % NUM_DEPTS, recordIndex % NUM_MNGRS, recordIndex % NUM_IDS);
        printWriter.println(record);
      }
      printWriter.close();
    }

    // Table 2 consists of two columns "dept_id" and "dept_name"
    final File deptTableLocation = dirTestWatcher.makeRootSubDir(Paths.get(DEPT_TABLE));

    // Write 4 records for each new file
    final int deptNumRecsPerFile = 4;
    for(int fileIndex=0; fileIndex<NUM_DEPTS/deptNumRecsPerFile; fileIndex++) {
      File file = new File(deptTableLocation, fileIndex + ".json");
      PrintWriter printWriter = new PrintWriter(file);
      for (int recordIndex = fileIndex*deptNumRecsPerFile; recordIndex < (fileIndex+1)*deptNumRecsPerFile; recordIndex++) {
        String record = String.format("{ \"dept_id\" : %d, \"dept_name\" : \"Department %d\" }",
            recordIndex, recordIndex);
        printWriter.println(record);
      }
      printWriter.close();
    }

    // Initialize test queries
    groupByQuery = String.format("SELECT dept_id, count(*) as numEmployees FROM dfs.`%s` GROUP BY dept_id", EMPT_TABLE);
    joinQuery = String.format("SELECT e.emp_name, d.dept_name FROM dfs.`%s` e JOIN dfs.`%s` d ON e.dept_id = d.dept_id",
        EMPT_TABLE, DEPT_TABLE);

    // Generate and store output data for test queries. Used when verifying the output of queries ran using different
    // configurations.

    groupByQueryBaselineColumns = new String[] { "dept_id", "numEmployees" };

    groupByQueryBaselineValues = Lists.newArrayList();
    // group Id is generated based on expression 'recordIndex % NUM_DEPTS' above. 'recordIndex' runs from 0 to
    // NUM_EMPLOYEES, so we expect each number of occurrance of each dept_id to be NUM_EMPLOYEES/NUM_DEPTS (1000/40 =
    // 25)
    final int numOccurrances = NUM_EMPLOYEES/NUM_DEPTS;
    for(int i = 0; i < NUM_DEPTS; i++) {
      groupByQueryBaselineValues.add(new Object[] { (long)i, (long)numOccurrances});
    }

    joinQueryBaselineColumns = new String[] { "emp_name", "dept_name" };

    joinQueryBaselineValues = Lists.newArrayList();
    for(int i = 0; i < NUM_EMPLOYEES; i++) {
      final String employee = String.format("Employee %d", i);
      final String dept = String.format("Department %d", i % NUM_DEPTS);
      joinQueryBaselineValues.add(new String[] { employee, dept });
    }
  }

  public static void setupHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
    // set slice count to 1, so that we can have more parallelization for testing
    test("ALTER SESSION SET `planner.slice_target`=1");
    // disable the broadcast join to produce plans with HashToRandomExchanges.
    test("ALTER SESSION SET `planner.enable_broadcast_join`=false");
    test("ALTER SESSION SET `planner.enable_mux_exchange`=" + isMuxOn);
    test("ALTER SESSION SET `planner.enable_demux_exchange`=" + isDeMuxOn);
  }

  @Test
  public void testGroupByMultiFields() throws Exception {
    // Test multifield hash generation

    test("ALTER SESSION SET `planner.slice_target`=1");
    test("ALTER SESSION SET `planner.enable_mux_exchange`=" + true);
    test("ALTER SESSION SET `planner.enable_demux_exchange`=" + false);

    final String groupByMultipleQuery = String.format("SELECT dept_id, mng_id, some_id, count(*) as numEmployees FROM dfs.`%s` e GROUP BY dept_id, mng_id, some_id", EMPT_TABLE);
    final String[] groupByMultipleQueryBaselineColumns = new String[] { "dept_id", "mng_id", "some_id", "numEmployees" };

    final int numOccurrances = NUM_EMPLOYEES/NUM_DEPTS;

    final String plan = getPlanInString("EXPLAIN PLAN FOR " + groupByMultipleQuery, JSON_FORMAT);

    jsonExchangeOrderChecker(plan, false, 1, "hash32asdouble\\(.*, hash32asdouble\\(.*, hash32asdouble\\(.*\\) \\) \\) ");

    // Run the query and verify the output
    final TestBuilder testBuilder = testBuilder()
        .sqlQuery(groupByMultipleQuery)
        .unOrdered()
        .baselineColumns(groupByMultipleQueryBaselineColumns);

    for(int i = 0; i < NUM_DEPTS; i++) {
      testBuilder.baselineValues(new Object[] { (long)i, (long)0, (long)0, (long)numOccurrances});
    }

    testBuilder.go();
  }

  @Test
  public void testGroupBy_NoMux_NoDeMux() throws Exception {
    testGroupByHelper(false, false);
  }

  @Test
  public void testJoin_NoMux_NoDeMux() throws Exception {
    testJoinHelper(false, false);
  }

  @Test
  public void testGroupBy_Mux_NoDeMux() throws Exception {
    testGroupByHelper(true, false);
  }

  @Test
  public void testJoin_Mux_NoDeMux() throws Exception {
    testJoinHelper(true, false);
  }

  @Test
  public void testGroupBy_NoMux_DeMux() throws Exception {
    testGroupByHelper(false, true);
  }

  @Test
  public void testJoin_NoMux_DeMux() throws Exception {
    testJoinHelper(false, true);
  }

  @Test
  public void testGroupBy_Mux_DeMux() throws Exception {
    testGroupByHelper(true, true);
  }

  @Test
  public void testJoin_Mux_DeMux() throws Exception {
    testJoinHelper(true, true);
  }

  private static void testGroupByHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
    testHelper(isMuxOn, isDeMuxOn, groupByQuery,
        isMuxOn ? 1 : 0, isDeMuxOn ? 1 : 0,
        groupByQueryBaselineColumns, groupByQueryBaselineValues);
  }

  public static void testJoinHelper(boolean isMuxOn, boolean isDeMuxOn) throws Exception {
    testHelper(isMuxOn, isDeMuxOn, joinQuery,
        isMuxOn ? 2 : 0, isDeMuxOn ? 2 : 0,
        joinQueryBaselineColumns, joinQueryBaselineValues);
  }

  private static void testHelper(boolean isMuxOn, boolean isDeMuxOn, String query,
      int expectedNumMuxes, int expectedNumDeMuxes, String[] baselineColumns, List<Object[]> baselineValues)
      throws Exception {
    setupHelper(isMuxOn, isDeMuxOn);

    String plan = getPlanInString("EXPLAIN PLAN FOR " + query, JSON_FORMAT);

    if ( isMuxOn ) {
      // # of hash exchanges should be = # of mux exchanges + # of demux exchanges
      assertEquals("HashExpr on the hash column should not happen", 2*expectedNumMuxes+expectedNumDeMuxes, StringUtils.countMatches(plan, HASH_EXPR_NAME));
      jsonExchangeOrderChecker(plan, isDeMuxOn, expectedNumMuxes, "hash32asdouble\\(.*\\) ");
    } else {
      assertEquals("HashExpr on the hash column should not happen", 0, StringUtils.countMatches(plan, HASH_EXPR_NAME));
    }

    // Make sure the plan has mux and demux exchanges (TODO: currently testing is rudimentary,
    // need to move it to sophisticated testing once we have better planning test tools are available)
    assertEquals("Wrong number of MuxExchanges are present in the plan",
        expectedNumMuxes, StringUtils.countMatches(plan, MUX_EXCHANGE));

    assertEquals("Wrong number of DeMuxExchanges are present in the plan",
        expectedNumDeMuxes, StringUtils.countMatches(plan, DEMUX_EXCHANGE));

    // Run the query and verify the output
    TestBuilder testBuilder = testBuilder()
        .sqlQuery(query)
        .unOrdered()
        .baselineColumns(baselineColumns);

    for(Object[] baselineRecord : baselineValues) {
      testBuilder.baselineValues(baselineRecord);
    }

    testBuilder.go();

    testHelperVerifyPartitionSenderParallelization(plan, isMuxOn, isDeMuxOn);
  }

  private static void jsonExchangeOrderChecker(String plan, boolean isDemuxEnabled, int expectedNumMuxes, String hashExprPattern) throws Exception {
    final JSONObject planObj = (JSONObject) new JSONParser().parse(plan);
    assertNotNull("Corrupted query plan: null", planObj);
    final JSONArray graphArray = (JSONArray) planObj.get("graph");
    assertNotNull("No graph array present", graphArray);
    int i = 0;
    int k = 0;
    int prevExprsArraySize = 0;
    boolean foundExpr = false;
    int muxesCount = 0;
    for (Object object : graphArray) {
      final JSONObject popObj = (JSONObject) object;
      if ( popObj.containsKey("pop") && popObj.get("pop").equals("project")) {
        if ( popObj.containsKey("exprs")) {
          final JSONArray exprsArray = (JSONArray) popObj.get("exprs");
          for (Object exprObj : exprsArray) {
            final JSONObject expr = (JSONObject) exprObj;
            if ( expr.containsKey("ref") && expr.get("ref").equals("`"+ HASH_EXPR_NAME +"`")) {
              // found a match. Let's see if next one is the one we need
              final String hashField = (String) expr.get("expr");
              assertNotNull("HashExpr field can not be null", hashField);
              assertTrue("HashExpr field does not match pattern",hashField.matches(hashExprPattern));
              k = i;
              foundExpr = true;
              muxesCount++;
              break;
            }
          }
          if ( foundExpr ) {
            // will be reset to prevExprsArraySize-1 on the last project of the whole stanza
            prevExprsArraySize = exprsArray.size();
          }
        }
      }
      if ( !foundExpr ) {
        continue;
      }
      // next after project with hashexpr
      if ( k == i-1) {
        assertTrue("UnorderedMux should follow Project with HashExpr",
            popObj.containsKey("pop") && popObj.get("pop").equals(MUX_EXCHANGE_CONST));
      }
      if ( k == i-2) {
        assertTrue("HashToRandomExchange should follow UnorderedMux which should follow Project with HashExpr",
            popObj.containsKey("pop") && popObj.get("pop").equals(HASH_EXCHANGE));
        // is HashToRandom is using HashExpr
        assertTrue("HashToRandomExchnage should use hashExpr",
            popObj.containsKey("expr") && popObj.get("expr").equals("`"+ HASH_EXPR_NAME +"`"));
      }
      // if Demux is enabled it also should use HashExpr
      if ( isDemuxEnabled && k == i-3) {
        assertTrue("UnorderdDemuxExchange should follow HashToRandomExchange",
            popObj.containsKey("pop") && popObj.get("pop").equals(DEMUX_EXCHANGE_CONST));
        // is HashToRandom is using HashExpr
        assertTrue("UnorderdDemuxExchange should use hashExpr",
            popObj.containsKey("expr") && popObj.get("expr").equals("`"+HASH_EXPR_NAME +"`"));
      }
      if ( (isDemuxEnabled && k == i-4) || (!isDemuxEnabled && k == i-3) ) {
        // it should be a project without hashexpr, check if number of exprs is 1 less then in first project
        assertTrue("Should be project without hashexpr", popObj.containsKey("pop") && popObj.get("pop").equals("project"));
        final JSONArray exprsArray = (JSONArray) popObj.get("exprs");
        assertNotNull("Project should have some fields", exprsArray);
        assertEquals("Number of fields in closing project should be one less then in starting project",
            prevExprsArraySize, exprsArray.size());

        // Now let's reset all the counters, flags if we are going to have another batch of those exchanges
        k = 0;
        foundExpr = false;
        prevExprsArraySize = 0;
      }
      i++;
    }
    assertEquals("Number of Project/Mux/HashExchange/... ", expectedNumMuxes, muxesCount);
  }

  // Verify the number of partition senders in a major fragments is not more than the cluster size and each endpoint
  // in the cluster has at most one fragment from a given major fragment that has the partition sender.
  private static void testHelperVerifyPartitionSenderParallelization(
      String plan, boolean isMuxOn, boolean isDeMuxOn) throws Exception {

    final DrillbitContext drillbitContext = getDrillbitContext();
    final PhysicalPlanReader planReader = drillbitContext.getPlanReader();
    final Fragment rootFragment = PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);

    final List<Integer> deMuxFragments = Lists.newLinkedList();
    final List<Integer> htrFragments = Lists.newLinkedList();
    final PlanningSet planningSet = new PlanningSet();

    // Create a planningSet to get the assignment of major fragment ids to fragments.
    PARALLELIZER.initFragmentWrappers(rootFragment, planningSet);

    findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments);

    final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName",
        "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
    QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(),
        QueryId.getDefaultInstance(),
        drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo);
    qwu.applyPlan(planReader);

    // Make sure the number of minor fragments with HashPartitioner within a major fragment is not more than the
    // number of Drillbits in cluster
    ArrayListMultimap<Integer, DrillbitEndpoint> partitionSenderMap = ArrayListMultimap.create();
    for(PlanFragment planFragment : qwu.getFragments()) {
      if (planFragment.getFragmentJson().contains("hash-partition-sender")) {
        int majorFragmentId = planFragment.getHandle().getMajorFragmentId();
        DrillbitEndpoint assignedEndpoint = planFragment.getAssignment();
        partitionSenderMap.get(majorFragmentId).add(assignedEndpoint);
      }
    }

    if (isMuxOn) {
      verifyAssignment(htrFragments, partitionSenderMap);
    }

    if (isDeMuxOn) {
      verifyAssignment(deMuxFragments, partitionSenderMap);
    }
  }

  /**
   * Helper method to find the major fragment ids of fragments that have PartitionSender.
   * A fragment can have PartitionSender if sending exchange of the current fragment is a
   *   1. DeMux Exchange -> goes in deMuxFragments
   *   2. HashToRandomExchange -> goes into htrFragments
   */
  private static void findFragmentsWithPartitionSender(Fragment currentRootFragment, PlanningSet planningSet,
      List<Integer> deMuxFragments, List<Integer> htrFragments) {

    if (currentRootFragment != null) {
      final Exchange sendingExchange = currentRootFragment.getSendingExchange();
      if (sendingExchange != null) {
        final int majorFragmentId = planningSet.get(currentRootFragment).getMajorFragmentId();
        if (sendingExchange instanceof UnorderedDeMuxExchange) {
          deMuxFragments.add(majorFragmentId);
        } else if (sendingExchange instanceof HashToRandomExchange) {
          htrFragments.add(majorFragmentId);
        }
      }

      for(ExchangeFragmentPair e : currentRootFragment.getReceivingExchangePairs()) {
        findFragmentsWithPartitionSender(e.getNode(), planningSet, deMuxFragments, htrFragments);
      }
    }
  }

  /** Helper method to verify the number of PartitionSenders in a given fragment endpoint assignments */
  private static void verifyAssignment(List<Integer> fragmentList,
      ArrayListMultimap<Integer, DrillbitEndpoint> partitionSenderMap) {

    // We expect at least one entry the list
    assertTrue(fragmentList.size() > 0);

    for(Integer majorFragmentId : fragmentList) {
      // we expect the fragment that has DeMux/HashToRandom as sending exchange to have parallelization with not more
      // than the number of nodes in the cluster and each node in the cluster can have at most one assignment
      List<DrillbitEndpoint> assignments = partitionSenderMap.get(majorFragmentId);
      assertNotNull(assignments);
      assertTrue(assignments.size() > 0);
      assertTrue(String.format("Number of partition senders in major fragment [%d] is more than expected", majorFragmentId), CLUSTER_SIZE >= assignments.size());

      // Make sure there are no duplicates in assigned endpoints (i.e at most one partition sender per endpoint)
      assertTrue("Some endpoints have more than one fragment that has ParitionSender", ImmutableSet.copyOf(assignments).size() == assignments.size());
    }
  }
}