aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
blob: 04a459987a924fc7402c70a09610487729fd40e8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.LateralContract;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OUT_OF_MEMORY;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;

/**
 * RecordBatch implementation for the lateral join operator. Currently it's expected LATERAL to co-exists with UNNEST
 * operator. Both LATERAL and UNNEST will share a contract with each other defined at {@link LateralContract}
 */
public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);

  // Maximum number records in the outgoing batch
  private int maxOutputRowCount;

  // Schema on the left side
  private BatchSchema leftSchema;

  // Schema on the right side
  private BatchSchema rightSchema;

  // Index in output batch to populate next row
  private int outputIndex;

  // Current index of record in left incoming which is being processed
  private int leftJoinIndex = -1;

  // Current index of record in right incoming which is being processed
  private int rightJoinIndex = -1;

  // flag to keep track if current left batch needs to be processed in future next call
  private boolean processLeftBatchInFuture;

  // Keep track if any matching right record was found for current left index record
  private boolean matchedRecordFound;

  // Used only for testing
  private boolean useMemoryManager = true;

  // Flag to keep track of new left batch so that update on memory manager is called only once per left batch
  private boolean isNewLeftBatch = false;

  private final HashSet<String> excludedFieldNames = new HashSet<>();

  private final String implicitColumn;

  private boolean hasRemainderForLeftJoin = false;

  private ValueVector implicitVector;

  // Map to cache reference of input and corresponding output vectors for left and right batches
  private final Map<ValueVector, ValueVector> leftInputOutputVector = new HashMap<>();

  private final Map<ValueVector, ValueVector> rightInputOutputVector = new HashMap<>();

  /* ****************************************************************************************************************
   * Public Methods
   * ****************************************************************************************************************/
  public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
                          RecordBatch left, RecordBatch right) throws OutOfMemoryException {
    super(popConfig, context, left, right);
    Preconditions.checkNotNull(left);
    Preconditions.checkNotNull(right);
    final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
    implicitColumn = popConfig.getImplicitRIDColumn();

    populateExcludedField(popConfig);
    batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames);

    // Initially it's set to default value of 64K and later for each new output row it will be set to the computed
    // row count
    maxOutputRowCount = batchMemoryManager.getOutputRowCount();
  }

  /**
   * Handles cases where previous output batch got full after processing all the batches from right side for a left
   * side batch. But there are still few unprocessed rows in left batch which cannot be ignored because JoinType is
   * LeftJoin.
   * @return - true if all the rows in left batch is produced in output container
   *           false if there is still some rows pending in left incoming container
   */
  private boolean handleRemainingLeftRows() {
    Preconditions.checkState(popConfig.getJoinType() == JoinRelType.LEFT,
      "Unexpected leftover rows from previous left batch when join type is not left join");

    while(leftJoinIndex < left.getRecordCount() && !isOutgoingBatchFull()) {
      emitLeft(leftJoinIndex, outputIndex, 1);
      ++outputIndex;
      ++leftJoinIndex;
    }

    // Check if there is still pending left rows
    return leftJoinIndex >= left.getRecordCount();
  }

  /**
   * Method that get's left and right incoming batch and produce the output batch. If the left incoming batch is
   * empty then next on right branch is not called and empty batch with correct outcome is returned. If non empty
   * left incoming batch is received then it call's next on right branch to get an incoming and finally produces
   * output.
   * @return IterOutcome state of the lateral join batch
   */
  @Override
  public IterOutcome innerNext() {

    if (hasRemainderForLeftJoin) { // if set that means there is spill over from previous left batch and no
      // corresponding right rows and it is left join scenario
      allocateVectors();

      boolean hasMoreRows = !handleRemainingLeftRows();
      if (leftUpstream == EMIT || hasMoreRows) {
        logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " +
          "consumed now in output batch");
        hasRemainderForLeftJoin = hasMoreRows;
        finalizeOutputContainer();
        return (leftUpstream == EMIT) ? EMIT : OK;
      } else {
        // release memory for previous left batch
        leftJoinIndex = -1;
        VectorAccessibleUtilities.clear(left);
      }
    }

    // We don't do anything special on FIRST state. Process left batch first and then right batch if need be
    IterOutcome childOutcome = processLeftBatch();
    logger.debug("Received left batch with outcome {}", childOutcome);

    if (processLeftBatchInFuture && hasRemainderForLeftJoin) {
      finalizeOutputContainer();
      hasRemainderForLeftJoin = false;
      return OK;
    }

    // reset this state after calling processLeftBatch above.
    processLeftBatchInFuture = false;
    hasRemainderForLeftJoin = false;

    // If the left batch doesn't have any record in the incoming batch (with OK_NEW_SCHEMA/EMIT) or the state returned
    // from left side is terminal state then just return the IterOutcome and don't call next() on right branch
    if (isTerminalOutcome(childOutcome) || left.getRecordCount() == 0) {
      container.setRecordCount(0);
      return childOutcome;
    }

    // Left side has some records in the batch so let's process right batch
    childOutcome = processRightBatch();
    logger.debug("Received right batch with outcome {}", childOutcome);

    // reset the left & right outcomes to OK here and send the empty batch downstream. Non-Empty right batch with
    // OK_NEW_SCHEMA will be handled in subsequent next call
    if (childOutcome == OK_NEW_SCHEMA) {
      leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
      rightUpstream = OK;
      return childOutcome;
    }

    if (isTerminalOutcome(childOutcome)) {
      return childOutcome;
    }

    // If OK_NEW_SCHEMA is seen only on non empty left batch but not on right batch, then we should setup schema in
    // output container based on new left schema and old right schema. If schema change failed then return STOP
    // downstream
    if (leftUpstream == OK_NEW_SCHEMA && !handleSchemaChange()) {
      return STOP;
    }

    // Setup the references of left, right and outgoing container in generated operator
    state = BatchState.NOT_FIRST;

    // Update the memory manager only if its a brand new incoming i.e. leftJoinIndex and rightJoinIndex is 0
    // Otherwise there will be a case where while filling last output batch, some records from previous left or
    // right batch are still left to be sent in output for which we will count this batch twice. The actual checks
    // are done in updateMemoryManager
    updateMemoryManager(LEFT_INDEX);

    // We have to call update on memory manager for empty batches (rightJoinIndex = -1) as well since other wise while
    // allocating memory for vectors below it can fail. Since in that case colSize will not have any info on right side
    // vectors and throws NPE. The actual checks are done in updateMemoryManager
    updateMemoryManager(RIGHT_INDEX);

    if (outputIndex > 0) {
      // this means batch is already allocated but because of new incoming the width and output row count might have
      // changed. So update the maxOutputRowCount with new value
      if (useMemoryManager) {
        setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
      }
    }
    // if output is not allocated then maxRowCount will be set correctly below
    // allocate space for the outgoing batch
    allocateVectors();

    return produceOutputBatch();
  }

  @Override
  public void close() {
    updateBatchMemoryManagerStats();

    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
      "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
      "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
      "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
      "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
      batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
      batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
      batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
      "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, " +
      "record count : %d", batchMemoryManager.getNumOutgoingBatches(),
      batchMemoryManager.getAvgOutputBatchSize(),
      batchMemoryManager.getAvgOutputRowWidth(),
      batchMemoryManager.getTotalOutputRecords());

    super.close();
  }

  @Override
  public int getRecordCount() {
    return container.getRecordCount();
  }

  /**
   * Returns the left side incoming for the Lateral Join. Used by right branch leaf operator of Lateral
   * to process the records at leftJoinIndex.
   *
   * @return - RecordBatch received as left side incoming
   */
  @Override
  public RecordBatch getIncoming() {
    Preconditions.checkState (left != null,
      "Retuning null left batch. It's unexpected since right side will only be called iff " +
        "there is any valid left batch");
    return left;
  }

  /**
   * Returns the current row index which the calling operator should process in current incoming left record batch.
   * LATERAL should never return it as -1 since that indicated current left batch is empty and LATERAL will never
   * call next on right side with empty left batch
   *
   * @return - int - index of row to process.
   */
  @Override
  public int getRecordIndex() {
    Preconditions.checkState (leftJoinIndex < left.getRecordCount(),
      "Left join index: %s is out of bounds: %s", leftJoinIndex, left.getRecordCount());
    return leftJoinIndex;
  }

  /**
   * Returns the current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left incoming batch
   */
  @Override
  public IterOutcome getLeftOutcome() {
    return leftUpstream;
  }

  /* ****************************************************************************************************************
   * Protected Methods
   * ****************************************************************************************************************/

  /**
   * Method to get left and right batch during build schema phase for {@link LateralJoinBatch}. If left batch sees a
   * failure outcome then we don't even call next on right branch, since there is no left incoming.
   * @return true if both the left/right batch was received without failure outcome.
   *         false if either of batch is received with failure outcome.
   */
  @Override
  protected boolean prefetchFirstBatchFromBothSides() {
    // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch
    leftUpstream = next(0, left);

    boolean validBatch = setBatchState(leftUpstream);

    if (validBatch) {
      isNewLeftBatch = true;
      rightUpstream = next(1, right);
      validBatch = setBatchState(rightUpstream);
    }

    // EMIT outcome is not expected as part of first batch from either side
    if (leftUpstream == EMIT || rightUpstream == EMIT) {
      state = BatchState.STOP;
      throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " +
        "buildSchema phase");
    }
    return validBatch;
  }

  /**
   * Prefetch a batch from left and right branch to know about the schema of each side. Then adds value vector in
   * output container based on those schemas. For this phase LATERAL always expect's an empty batch from right side
   * which UNNEST should abide by.
   *
   * @throws SchemaChangeException if batch schema was changed during execution
   */
  @Override
  protected void buildSchema() throws SchemaChangeException {
    // Prefetch a RecordBatch from both left and right branch
    if (!prefetchFirstBatchFromBothSides()) {
      return;
    }
    Preconditions.checkState(right.getRecordCount() == 0,
      "Unexpected non-empty first right batch received");

    // Setup output container schema based on known left and right schema
    setupNewSchema();

    // Release the vectors received from right side
    VectorAccessibleUtilities.clear(right);

    // Set join index as invalid (-1) if the left side is empty, else set it to 0
    leftJoinIndex = (left.getRecordCount() <= 0) ? -1 : 0;
    rightJoinIndex = -1;

    // Reset the left side of the IterOutcome since for this call, OK_NEW_SCHEMA will be returned correctly
    // by buildSchema caller and we should treat the batch as received with OK outcome.
    leftUpstream = OK;
    rightUpstream = OK;
  }

  @Override
  protected void killIncoming(boolean sendUpstream) {
    this.left.kill(sendUpstream);
    // Reset the left side outcome as STOP since as part of right kill when UNNEST will ask IterOutcome of left incoming
    // from LATERAL and based on that it can make decision if the kill is coming from downstream to LATERAL or upstream
    // to LATERAL. Like LIMIT operator being present downstream to LATERAL or upstream to LATERAL and downstream to
    // UNNEST.
    leftUpstream = STOP;
    this.right.kill(sendUpstream);
  }

  /* ****************************************************************************************************************
   * Private Methods
   * ****************************************************************************************************************/

  private boolean handleSchemaChange() {
    try {
      stats.startSetup();
      logger.debug("Setting up new schema based on incoming batch. Old output schema: {}", container.getSchema());
      setupNewSchema();
      return true;
    } catch (SchemaChangeException ex) {
      logger.error("Failed to handle schema change hence killing the query");
      context.getExecutorState().fail(ex);
      left.kill(true); // Can have exchange receivers on left so called with true
      right.kill(false); // Won't have exchange receivers on right side
      return false;
    } finally {
      stats.stopSetup();
    }
  }

  private boolean isTerminalOutcome(IterOutcome outcome) {
    return (outcome == STOP || outcome == OUT_OF_MEMORY || outcome == NONE);
  }

  /**
   * Process left incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator. Also
   * when we populate the outgoing container then this method is called to get next left batch if current one is
   * fully processed. It calls next() on left side until we get a non-empty RecordBatch. OR we get either of
   * OK_NEW_SCHEMA/EMIT/NONE/STOP/OOM/NOT_YET outcome.
   * @return IterOutcome after processing current left batch
   */
  private IterOutcome processLeftBatch() {

    boolean needLeftBatch = leftJoinIndex == -1;

    // If left batch is empty
    while (needLeftBatch) {

      if (!processLeftBatchInFuture) {
        leftUpstream = next(LEFT_INDEX, left);
        isNewLeftBatch = true;
      }

      final boolean emptyLeftBatch = left.getRecordCount() <=0;
      logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);

      switch (leftUpstream) {
        case OK_NEW_SCHEMA:
          // This OK_NEW_SCHEMA is received post build schema phase and from left side
          if (outputIndex > 0) { // can only reach here from produceOutputBatch
            // This means there is already some records from previous join inside left batch
            // So we need to pass that downstream and then handle the OK_NEW_SCHEMA in subsequent next call
            processLeftBatchInFuture = true;
            return OK_NEW_SCHEMA;
          }

          // If left batch is empty with actual schema change then just rebuild the output container and send empty
          // batch downstream
          if (emptyLeftBatch) {
            if (handleSchemaChange()) {
              leftJoinIndex = -1;
              return OK_NEW_SCHEMA;
            } else {
              return STOP;
            }
          } // else - setup the new schema information after getting it from right side too.
        case OK:
          // With OK outcome we will keep calling next until we get a batch with >0 records
          if (emptyLeftBatch) {
            leftJoinIndex = -1;
            continue;
          } else {
            leftJoinIndex = 0;
          }
          break;
        case EMIT:
          // don't call next on right batch
          if (emptyLeftBatch) {
            leftJoinIndex = -1;
            return EMIT;
          } else {
            leftJoinIndex = 0;
          }
          break;
        case OUT_OF_MEMORY:
        case NONE:
        case STOP:
          // Not using =0 since if outgoing container is empty then no point returning anything
          if (outputIndex > 0) { // can only reach here from produceOutputBatch
            processLeftBatchInFuture = true;
          }
          return leftUpstream;
        case NOT_YET:
          try {
            Thread.sleep(5);
          } catch (InterruptedException ex) {
            logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " +
              "received NOT_YET");
          }
          break;
      }
      needLeftBatch = leftJoinIndex == -1;
    }
    return leftUpstream;
  }

  /**
   * Process right incoming batch with different {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. It is
   * called from main {@link LateralJoinBatch#innerNext()} block with each next() call from upstream operator and if
   * left batch has some records in it. Also when we populate the outgoing container then this method is called to
   * get next right batch if current one is fully processed.
   * @return IterOutcome after processing current left batch
   */
  private IterOutcome processRightBatch() {
    // Check if we still have records left to process in left incoming from new batch or previously half processed
    // batch based on indexes. We are making sure to update leftJoinIndex and rightJoinIndex correctly. Like for new
    // batch leftJoinIndex will always be set to zero and once leftSide batch is fully processed then it will be set
    // to -1.
    // Whereas rightJoinIndex is to keep track of record in right batch being joined with record in left batch.
    // So when there are cases such that all records in right batch is not consumed by the output, then rightJoinIndex
    // will be a valid index. When all records are consumed it will be set to -1.
    boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
    while (needNewRightBatch) {
      rightUpstream = next(RIGHT_INDEX, right);
      switch (rightUpstream) {
        case OK_NEW_SCHEMA:

          // If there is some records in the output batch that means left batch didn't came with OK_NEW_SCHEMA,
          // otherwise it would have been marked for processInFuture and output will be returned. This means for
          // current non processed left or new left non-empty batch there is unexpected right batch schema change
          if (outputIndex > 0) {
            throw new IllegalStateException("SchemaChange on right batch is not expected in between the rows of " +
              "current left batch or a new non-empty left batch with no schema change");
          }
          // We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
          // case where we get OK_NEW_SCHEMA --> OK (with batch) ---> OK_NEW_SCHEMA --> OK/EMIT fall through
          //
          // Right batch with OK_NEW_SCHEMA can be non-empty so update the rightJoinIndex correctly and pass the
          // new schema downstream with empty batch and later with subsequent next() call the join output will be
          // produced
          if (handleSchemaChange()) {
            container.setRecordCount(0);
            rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
            return OK_NEW_SCHEMA;
          } else {
            return STOP;
          }
        case OK:
        case EMIT:
          // Even if there are no records we should not call next() again because in case of LEFT join empty batch is
          // of importance too
          rightJoinIndex = (right.getRecordCount() > 0) ? 0 : -1;
          needNewRightBatch = false;
          break;
        case OUT_OF_MEMORY:
        case NONE:
        case STOP:
          needNewRightBatch = false;
          break;
        case NOT_YET:
          try {
            Thread.sleep(10);
          } catch (InterruptedException ex) {
            logger.debug("Thread interrupted while sleeping to call next on left branch of LATERAL since it " +
              "received NOT_YET");
          }
          break;
      }
    }
    return rightUpstream;
  }

  /**
   * Get's the current left and right incoming batch and does the cross join to fill the output batch. If all the
   * records in the either or both the batches are consumed then it get's next batch from that branch depending upon
   * if output batch still has some space left. If output batch is full then the output is finalized to be sent
   * downstream. Subsequent call's knows how to consume previously half consumed (if any) batches and producing the
   * output using that.
   *
   * @return - IterOutcome to be send along with output batch to downstream operator
   */
  private IterOutcome produceOutputBatch() {

    boolean isLeftProcessed = false;

    // Try to fully pack the outgoing container
    while (!isOutgoingBatchFull()) {
      // perform the cross join between records in left and right batch and populate the output container
      crossJoinAndOutputRecords();

      // rightJoinIndex should move by number of records in output batch for current right batch only. For cases when
      // right batch is fully consumed rightJoinIndex will be equal to record count. For cases when only part of it is
      // consumed in current output batch rightJoinIndex will point to next row to be consumed
      final boolean isRightProcessed = rightJoinIndex == -1 || rightJoinIndex >= right.getRecordCount();

      // Check if above join to produce output resulted in fully consuming right side batch
      if (isRightProcessed) {
        // Release vectors of right batch. This will happen for both rightUpstream = EMIT/OK
        VectorAccessibleUtilities.clear(right);
        rightJoinIndex = -1;
      }

      // Check if all rows in right batch is processed and there was a match for last rowId and this is last
      // right batch for this left batch, then increment the leftJoinIndex. If this is not the last right batch we
      // cannot increase the leftJoinIndex even though a match is found because next right batch can contain more
      // records for the same implicit rowId
      if (isRightProcessed && rightUpstream == EMIT && matchedRecordFound) {
        ++leftJoinIndex;
        matchedRecordFound = false;
      }

      // left is only declared as processed if this is last right batch for current left batch and we have processed
      // all the rows in it.
      isLeftProcessed = (rightUpstream == EMIT) && leftJoinIndex >= left.getRecordCount();

      // Even though if left batch is not fully processed but we have received EMIT outcome from right side.
      // In this case if left batch has some unprocessed rows and it's left join emit left side for these rows.
      // If it's inner join then just set treat left batch as processed.
      if (!isLeftProcessed && rightUpstream == EMIT && isRightProcessed) {
        if (popConfig.getJoinType() == JoinRelType.LEFT) {
          // If outgoing batch got full that means we still have some leftJoinIndex to output but right side is done
          // producing the batches. So mark hasRemainderForLeftJoin=true and we will take care of it in future next call.
          isLeftProcessed = handleRemainingLeftRows();
          hasRemainderForLeftJoin = !isLeftProcessed;
        } else {
          // not left join hence ignore rows pending in left batch since right side is done producing the output
          isLeftProcessed = true;
        }
      }

      if (isLeftProcessed) {
        leftJoinIndex = -1;
        VectorAccessibleUtilities.clear(left);
        matchedRecordFound = false;
      }

      // Check if output batch still has some space
      if (!isOutgoingBatchFull()) {
        // Check if left side still has records or not
        if (isLeftProcessed) {
          // The current left batch was with EMIT/OK_NEW_SCHEMA outcome, then return output to downstream layer before
          // getting next batch
          if (leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) {
            break;
          } else {
            logger.debug("Output batch still has some space left, getting new batches from left and right. OutIndex: {}",
              outputIndex);
            // Get both left batch and the right batch and make sure indexes are properly set
            leftUpstream = processLeftBatch();

            logger.debug("Received left batch with outcome {}", leftUpstream);

            // output batch is not empty and we have new left batch with OK_NEW_SCHEMA or terminal outcome
            if (processLeftBatchInFuture) {
              logger.debug("Received left batch such that we have to return the current outgoing batch and process " +
                "the new batch in subsequent next call");
              // We should return the current output batch with OK outcome and don't reset the leftUpstream
              finalizeOutputContainer();
              return OK;
            }

            // If left batch received a terminal outcome then don't call right batch
            if (isTerminalOutcome(leftUpstream)) {
              finalizeOutputContainer();
              return leftUpstream;
            }

            // If we have received the left batch with EMIT outcome and is empty then we should return previous output
            // batch with EMIT outcome
            if ((leftUpstream == EMIT || leftUpstream == OK_NEW_SCHEMA) && left.getRecordCount() == 0) {
              isLeftProcessed = true;
              break;
            }

            // Update the batch memory manager to use new left incoming batch
            updateMemoryManager(LEFT_INDEX);
          }
        }

        // If we are here it means one of the below:
        // 1) Either previous left batch was not fully processed and it came with OK outcome. There is still some space
        // left in outgoing batch so let's get next right batch.
        // 2) OR previous left & right batch was fully processed and it came with OK outcome. There is space in outgoing
        // batch. Now we have got new left batch with OK outcome. Let's get next right batch
        // 3) OR previous left & right batch was fully processed and left came with OK outcome. Outgoing batch is
        // empty since all right batches were empty for all left rows. Now we got another non-empty left batch with
        // OK_NEW_SCHEMA.
        rightUpstream = processRightBatch();

        logger.debug("Received right batch with outcome {}", rightUpstream);

        if (rightUpstream == OK_NEW_SCHEMA) {
          leftUpstream = (leftUpstream != EMIT) ? OK : leftUpstream;
          rightUpstream = OK;
          finalizeOutputContainer();
          return OK_NEW_SCHEMA;
        }

        if (isTerminalOutcome(rightUpstream)) {
          finalizeOutputContainer();
          return rightUpstream;
        }

        // Update the batch memory manager to use new right incoming batch
        updateMemoryManager(RIGHT_INDEX);

        // If previous left batch is fully processed and it didn't produced any output rows and later we got a new
        // non-empty left batch with OK_NEW_SCHEMA with schema change only on left side vectors, then setup schema
        // in output container based on new left schema and old right schema. If schema change failed then return STOP
        // downstream
        if (leftUpstream == OK_NEW_SCHEMA && outputIndex == 0) {
          if (!handleSchemaChange()) {
            return STOP;
          }
          // Since schema has change so we have new empty vectors in output container hence allocateMemory for them
          allocateVectors();
        } else {
          // means we are using already allocated output batch so row count may have changed based on new incoming
          // batch hence update it
          if (useMemoryManager) {
            setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
          }
        }
      }
    } // output batch is full to its max capacity

    finalizeOutputContainer();

    // Check if output batch was full and left was fully consumed or not. Since if left is not consumed entirely
    // but output batch is full, then if the left batch came with EMIT outcome we should send this output batch along
    // with OK outcome not with EMIT. Whereas if output is full and left is also fully consumed then we should send
    // EMIT outcome.
    if (leftUpstream == EMIT && isLeftProcessed) {
      logger.debug("Sending current output batch with EMIT outcome since left is received with EMIT and is fully " +
        "consumed in output batch");
      return EMIT;
    }

    if (leftUpstream == OK_NEW_SCHEMA) {
      // return output batch with OK_NEW_SCHEMA and reset the state to OK
      logger.debug("Sending current output batch with OK_NEW_SCHEMA and resetting the left outcome to OK for next set" +
        " of batches");
      leftUpstream = OK;
      return OK_NEW_SCHEMA;
    }
    return OK;
  }
  /**
   * Finalizes the current output container with the records produced so far before sending it downstream
   */
  private void finalizeOutputContainer() {
    VectorAccessibleUtilities.setValueCount(container, outputIndex);

    // Set the record count in the container
    container.setRecordCount(outputIndex);

    batchMemoryManager.updateOutgoingStats(outputIndex);

    RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
    RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
      "Number of records emitted: %d and Allocator Stats: [AllocatedMem: %d, PeakMem: %d]",
        outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());

    // Update the output index for next output batch to zero
    outputIndex = 0;
  }

  /**
   * Check if the schema changed between provided newSchema and oldSchema. It relies on
   * {@link BatchSchema#isEquivalent(BatchSchema)}.
   * @param newSchema - New Schema information
   * @param oldSchema -  - New Schema information to compare with
   *
   * @return - true - if newSchema is not same as oldSchema
   *         - false - if newSchema is same as oldSchema
   */
  private boolean isSchemaChanged(BatchSchema newSchema, BatchSchema oldSchema) {
    return (newSchema == null || oldSchema == null) || !newSchema.isEquivalent(oldSchema);
  }

  /**
   * Validate if the input schema is not null and doesn't contain any Selection Vector.
   * @param schema - input schema to verify
   * @return - true: valid input schema
   *           false: invalid input schema
   */
  private boolean verifyInputSchema(BatchSchema schema) {

    boolean isValid = true;
    if (schema == null) {
      logger.error("Null schema found for the incoming batch");
      isValid = false;
    } else {
      final BatchSchema.SelectionVectorMode svMode = schema.getSelectionVectorMode();
      if (svMode != BatchSchema.SelectionVectorMode.NONE) {
        logger.error("Incoming batch schema found with selection vector which is not supported. SVMode: {}",
          svMode.toString());
        isValid = false;
      }
    }
    return isValid;
  }

  private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema, boolean isRightBatch) {
    if (excludedFieldNames.size() == 0) {
      return originSchema;
    }

    final SchemaBuilder newSchemaBuilder =
      BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
    for (MaterializedField field : originSchema) {
      // Don't ignore implicit column from left side in multilevel case where plan is generated such that lower lateral
      // is on the right side of upper lateral.
      if (!excludedFieldNames.contains(field.getName()) ||
        (field.getName().equals(implicitColumn) && !isRightBatch)) {
        newSchemaBuilder.addField(field);
      }
    }
    return newSchemaBuilder.build();
  }

  /**
   * Helps to create the outgoing container vectors based on known left and right batch schemas
   * @throws SchemaChangeException
   */
  private void setupNewSchema() throws SchemaChangeException {

    logger.debug("Setting up new schema based on incoming batch. New left schema: {} and New right schema: {}",
      left.getSchema(), right.getSchema());

    // Clear up the container
    container.clear();
    leftInputOutputVector.clear();
    rightInputOutputVector.clear();

    leftSchema = batchSchemaWithNoExcludedCols(left.getSchema(), false);
    rightSchema = batchSchemaWithNoExcludedCols(right.getSchema(), true);

    if (!verifyInputSchema(leftSchema)) {
      throw new SchemaChangeException("Invalid Schema found for left incoming batch");
    }

    if (!verifyInputSchema(rightSchema)) {
      throw new SchemaChangeException("Invalid Schema found for right incoming batch");
    }

    // Setup LeftSchema in outgoing container and also include implicit column if present in left side for multilevel
    // case if plan is generated such that lower lateral is right child of upper lateral
    for (final VectorWrapper<?> vectorWrapper : left) {
      final MaterializedField leftField = vectorWrapper.getField();
      if (excludedFieldNames.contains(leftField.getName()) && !(leftField.getName().equals(implicitColumn))) {
        continue;
      }
      container.addOrGet(leftField);
    }

    // Setup RightSchema in the outgoing container
    for (final VectorWrapper<?> vectorWrapper : right) {
      MaterializedField rightField = vectorWrapper.getField();
      if (excludedFieldNames.contains(rightField.getName())) {
        if (rightField.getName().equals(implicitColumn)) {
          implicitVector = vectorWrapper.getValueVector();
        }
        continue;
      }

      TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();

      // make right input schema optional if we have LEFT join
      if (popConfig.getJoinType() == JoinRelType.LEFT &&
        rightFieldType.getMode() == TypeProtos.DataMode.REQUIRED) {
        final TypeProtos.MajorType outputType =
          Types.overrideMode(rightField.getType(), TypeProtos.DataMode.OPTIONAL);

        // Create the right field with optional type. This will also take care of creating
        // children fields in case of ValueVectors of map type
        rightField = rightField.withType(outputType);
      }
      container.addOrGet(rightField);
    }

    Preconditions.checkState(implicitVector != null,
      "Implicit column vector %s not found in right incoming batch", implicitColumn);

    // Let's build schema for the container
    outputIndex = 0;
    container.setRecordCount(outputIndex);
    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);

    // Setup left vectors
    setupInputOutputVectors(left, 0, leftSchema.getFieldCount(), 0, false);

    // Setup right vectors
    setupInputOutputVectors(right, 0, rightSchema.getFieldCount(),
      leftSchema.getFieldCount(), true);

    logger.debug("Output Schema created {} based on input left schema {} and right schema {}", container.getSchema(),
      leftSchema, rightSchema);
  }

  /**
   * Simple method to allocate space for all the vectors in the container.
   */
  private void allocateVectors() {
    // This check is here and will be true only in case of left join where the pending rows from previous left batch is
    // copied to the new output batch. Then same output batch is used to fill remaining memory using new left & right
    // batches.
    if (outputIndex > 0) {
      logger.trace("Allocation is already done for output container vectors since it already holds some record");
      return;
    }

    // Set this as max output rows to be filled in output batch since memory for that many rows are allocated
    if (useMemoryManager) {
      setMaxOutputRowCount(batchMemoryManager.getOutputRowCount());
    }

    for (VectorWrapper w : container) {
      RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
      colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
    }

    logger.debug("Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", container.getAllocator().getAllocatedMemory(),
      container.getAllocator().getPeakMemoryAllocation());
  }

  private boolean setBatchState(IterOutcome outcome) {
    switch(outcome) {
      case STOP:
      case EMIT:
        state = BatchState.STOP;
        return false;
      case OUT_OF_MEMORY:
        state = BatchState.OUT_OF_MEMORY;
        return false;
      case NONE:
      case NOT_YET:
        state = BatchState.DONE;
        return false;
    }
    return true;
  }

  /**
   * Creates a map of rowId to number of rows with that rowId in the right incoming batch of Lateral Join. It is
   * expected from UnnestRecordBatch to add an implicit column of IntVectorType with each output row. All the array
   * records belonging to same row in left incoming will have same rowId in the Unnest output batch.
   * @return - map of rowId to rowCount in right batch
   */
  private Map<Integer, Integer> getRowIdToRowCountMapping() {
    final Map<Integer, Integer> indexToFreq = new HashMap<>();
    final IntVector rowIdVector = (IntVector) implicitVector;
    int prevRowId = rowIdVector.getAccessor().get(rightJoinIndex);
    int countRows = 1;
    for (int i=rightJoinIndex + 1; i < right.getRecordCount(); ++i) {
      int currentRowId = rowIdVector.getAccessor().get(i);
      if (prevRowId == currentRowId) {
        ++countRows;
      } else {
        indexToFreq.put(prevRowId, countRows);
        prevRowId = currentRowId;
        countRows = 1;
      }
    }
    indexToFreq.put(prevRowId, countRows);
    return indexToFreq;
  }

  /**
   * Main entry point for producing the output records. This method populates the output batch after cross join of
   * the record in a given left batch at left index and all the corresponding rows in right batches produced by Unnest
   * for current left batch. For each call to this function number of records copied in output batch is limited to
   * maximum rows output batch can hold or the number of rows in right incoming batch
   */
  private void crossJoinAndOutputRecords() {
    final int rightRecordCount = right.getRecordCount();

    // If there is no record in right batch just return current index in output batch
    if (rightRecordCount <= 0) {
      return;
    }

    // Check if right batch is empty since we have to handle left join case
    Preconditions.checkState(rightJoinIndex != -1, "Right batch record count is >0 but index is -1");

    int currentOutIndex = outputIndex;
    // Number of rows that can be copied in output batch
    int maxAvailableRowSlot = maxOutputRowCount - currentOutIndex;

    if (logger.isDebugEnabled()) {
      logger.debug("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {}, outputIndex: {} and " +
        "availableSlotInOutput: {}", leftJoinIndex, rightJoinIndex, rightRecordCount, outputIndex, maxAvailableRowSlot);
      logger.debug("Output Batch stats before copying new data: {}", new RecordBatchSizer(this));
    }

    // Assuming that first vector in right batch is for implicitColumn.
    // get a mapping of number of rows for each rowId present in current right side batch
    //final Map<Integer, Integer> indexToFreq = getRowIdToRowCountMapping();
    final IntVector rowIdVector = (IntVector) implicitVector;
    final int leftRecordCount = left.getRecordCount();

    // we need to have both conditions because in left join case we can exceed the maxAvailableRowSlot before reaching
    // rightBatch end or vice-versa
    while(maxAvailableRowSlot > 0 && rightJoinIndex < rightRecordCount) {
      // Get rowId from current right row
      int currentRowId = rowIdVector.getAccessor().get(rightJoinIndex);
      int leftRowId = leftJoinIndex + 1;
      int numRowsCopied = 0;

      if (currentRowId > leftRecordCount || leftJoinIndex > leftRecordCount) {
        // Not using Preconditions.checkState here since along with condition evaluation there will be cost of boxing
        // the arguments.
        throw new IllegalStateException(String.format("Either RowId in right batch is greater than total records in " +
          "left batch or all rows in left batch is processed but there are still rows in right batch. " +
          "Details[RightRowId: %s, LeftRecordCount: %s, LeftJoinIndex: %s, RightJoinIndex: %s]",
          currentRowId, leftRecordCount, leftJoinIndex, rightJoinIndex));
      }

      if (logger.isTraceEnabled()) {
        // Inside the if condition to eliminate parameter boxing cost
        logger.trace("leftRowId and currentRowId are: {}, {}", leftRowId, currentRowId);
      }

      // If leftRowId matches the rowId in right row then emit left and right row. Increment outputIndex, rightJoinIndex
      // and numRowsCopied. Also set leftMatchFound to true to indicate when to increase leftJoinIndex.
      if (leftRowId == currentRowId) {
        // there is a match
        matchedRecordFound = true;
        numRowsCopied = 1;
        //numRowsCopied = Math.min(indexToFreq.get(currentRowId), maxAvailableRowSlot);
        emitRight(rightJoinIndex, outputIndex, numRowsCopied);
        emitLeft(leftJoinIndex, outputIndex, numRowsCopied);
        outputIndex += numRowsCopied;
        rightJoinIndex += numRowsCopied;
      } else if (leftRowId < currentRowId) {
        // If a matching record for leftRowId was found in right batch in previous iteration, increase the leftJoinIndex
        // and reset the matchedRecordFound flag
        if (matchedRecordFound) {
          matchedRecordFound = false;
          ++leftJoinIndex;
          continue;
        } else { // If no matching row was found in right batch then in case of left join produce left row in output
          // and increase the indexes properly to reflect that
          if (JoinRelType.LEFT == popConfig.getJoinType()) {
            numRowsCopied = 1;
            emitLeft(leftJoinIndex, outputIndex, numRowsCopied);
            ++outputIndex;
          }
          ++leftJoinIndex;
        }
      } else {
        Preconditions.checkState(leftRowId <= currentRowId, "Unexpected case where rowId " +
          "%s in right batch of lateral is smaller than rowId %s in left batch being processed",
          currentRowId, leftRowId);
      }
      // Update the max available rows slot in output batch
      maxAvailableRowSlot -= numRowsCopied;
    }
  }

  /**
   * Get's references of vector's from input and output vector container and create the mapping between them in
   * respective maps. Example: for right incoming batch the references of input vector to corresponding output
   * vector will be stored in {@link LateralJoinBatch#rightInputOutputVector}. This is done here such that during
   * copy we don't have to figure out this mapping everytime for each input and output vector and then do actual copy.
   * There was overhead seen with functions {@link MaterializedField#getValueClass()} and
   * {@link RecordBatch#getValueAccessorById(Class, int...)} since it will be called for each row copy.
   * @param batch - Incoming RecordBatch
   * @param startVectorIndex - StartIndex of output vector container
   * @param endVectorIndex - endIndex of output vector container
   * @param baseVectorIndex - delta to add in startIndex for getting vectors in output container
   * @param isRightBatch - is batch passed left or right child
   */
  private void setupInputOutputVectors(RecordBatch batch, int startVectorIndex, int endVectorIndex,
                                       int baseVectorIndex, boolean isRightBatch) {
    // Get the vectors using field index rather than Materialized field since input batch field can be different from
    // output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
    // container.
    int inputIndex = 0;
    final Map<ValueVector, ValueVector> mappingToUse = (isRightBatch) ? rightInputOutputVector : leftInputOutputVector;

    for (int i = startVectorIndex; i < endVectorIndex; ++i) {
      // Get output vector
      final int outputVectorIndex = i + baseVectorIndex;
      final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass();
      final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
      final String outputFieldName = outputVector.getField().getName();

      ValueVector inputVector;
      Class<?> inputValueClass;
      String inputFieldName;
      do {
        // Get input vector
        inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass();
        inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector();
        inputFieldName = inputVector.getField().getName();

        // If implicit column is in left batch then preserve it
        if (inputFieldName.equals(implicitColumn) && !isRightBatch) {
          ++inputIndex;
          break;
        }

        ++inputIndex;
      } while (excludedFieldNames.contains(inputFieldName));

      Preconditions.checkState(outputFieldName.equals(inputFieldName),
        "Non-excluded Input and output container fields are not in same order. " +
          "[Output Schema: %s and Input Schema:%s]", this.getSchema(), batch.getSchema());

      mappingToUse.put(inputVector, outputVector);
    }
  }

  /**
   * Given a vector reference mapping between source and destination vector, copies data from all the source vectors
   * at fromRowIndex to all the destination vectors in output batch at toRowIndex.
   *
   * @param fromRowIndex - row index of all the vectors in batch to copy data from
   * @param toRowIndex - row index of all the vectors in outgoing batch to copy data to
   * @param mapping - source record batch holding vectors with data
   * @param numRowsToCopy - Number of rows to copy into output batch
   * @param isRightBatch - boolean to indicate if the fromIndex should also be increased or not. Since in case of
   *                      copying data from left vector fromIndex is constant whereas in case of copying data from right
   *                      vector fromIndex will move along with output index.
   */
  private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, Map<ValueVector, ValueVector> mapping,
                                       int numRowsToCopy, boolean isRightBatch) {
    for (Map.Entry<ValueVector, ValueVector> entry : mapping.entrySet()) {

      if (logger.isTraceEnabled()) {
        // Inside the if condition to eliminate parameter boxing cost
        logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
          "(RowIndex: {}, ColumnName: {}), OutputBatch: (RowIndex: {}, ColumnName: {}) and Other: (TimeEachValue: {})]",
          fromRowIndex, entry.getKey().getField().getName(), toRowIndex, entry.getValue().getField().getName(),
          numRowsToCopy);
      }

      // Copy data from input vector to output vector for numRowsToCopy times.
      for (int j = 0; j < numRowsToCopy; ++j) {
        entry.getValue().copyEntry(toRowIndex + j, entry.getKey(), (isRightBatch) ? fromRowIndex + j : fromRowIndex);
      }
    }
  }

  /**
   * Copies data at leftIndex from each of vector's in left incoming batch to outIndex at corresponding vectors in
   * outgoing record batch
   * @param leftIndex - index to copy data from left incoming batch vectors
   * @param outIndex - index to copy data to in outgoing batch vectors
   * @param numRowsToCopy - number of rows to copy from source vector to destination vectors
   */
  private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) {
    if (logger.isTraceEnabled()) {
      // Inside the if condition to eliminate parameter boxing cost
      logger.trace("Copying the left batch data. Details: [leftIndex: {}, outputIndex: {}, numsCopy: {}]",
        leftIndex, outIndex, numRowsToCopy);
    }
    copyDataToOutputVectors(leftIndex, outIndex, leftInputOutputVector, numRowsToCopy, false);
  }

  /**
   * Copies data at rightIndex from each of vector's in right incoming batch to outIndex at corresponding vectors in
   * outgoing record batch
   * @param rightIndex - index to copy data from right incoming batch vectors
   * @param outIndex - index to copy data to in outgoing batch vectors
   * @param numRowsToCopy - number of rows to copy from source vector to destination vectors
   */
  private void emitRight(int rightIndex, int outIndex, int numRowsToCopy) {
    if (logger.isTraceEnabled()) {
      // Inside the if condition to eliminate parameter boxing cost
      logger.trace("Copying the right batch data. Details: [rightIndex: {}, outputIndex: {}, numsCopy: {}]",
        rightIndex, outIndex, numRowsToCopy);
    }
    copyDataToOutputVectors(rightIndex, outIndex, rightInputOutputVector, numRowsToCopy, true);
  }

  /**
   * Used only for testing for cases when multiple output batches are produced for same input set
   * @param outputRowCount - Max rows that output batch can hold
   */
  @VisibleForTesting
  public void setMaxOutputRowCount(int outputRowCount) {
    if (isRecordBatchStatsLoggingEnabled()) {
      RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
        "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, outputRowCount);
    }
    maxOutputRowCount = outputRowCount;
  }

  /**
   * Used only for testing to disable output batch calculation using memory manager and instead use the static max
   * value set by {@link LateralJoinBatch#setMaxOutputRowCount(int)}
   * @param useMemoryManager - false - disable memory manager update to take effect, true enable memory manager update
   */
  @VisibleForTesting
  public void setUseMemoryManager(boolean useMemoryManager) {
    this.useMemoryManager = useMemoryManager;
  }

  private boolean isOutgoingBatchFull() {
    return outputIndex >= maxOutputRowCount;
  }

  private void updateMemoryManager(int inputIndex) {
    if (inputIndex == LEFT_INDEX && isNewLeftBatch) {
      // reset state and continue to update
      isNewLeftBatch = false;
    } else if (inputIndex == RIGHT_INDEX && (rightJoinIndex == 0 || rightJoinIndex == -1)) {
      // continue to update
    } else {
      return;
    }

    // For cases where all the previous input were consumed and send with previous output batch. But now we are building
    // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
    batchMemoryManager.update(inputIndex, outputIndex);

    if (isRecordBatchStatsLoggingEnabled()) {
      RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
      RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex),
        getRecordBatchStatsContext());
    }
  }

  private void populateExcludedField(PhysicalOperator lateralPop) {
    excludedFieldNames.add(implicitColumn);
    final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
    if (excludedCols != null) {
      for (SchemaPath currentPath : excludedCols) {
        excludedFieldNames.add(currentPath.rootName());
      }
    }
  }

  @Override
  public void dump() {
    logger.error("LateralJoinBatch[container={}, left={}, right={}, leftOutcome={}, rightOutcome={}, leftSchema={}, " +
            "rightSchema={}, outputIndex={}, leftJoinIndex={}, rightJoinIndex={}, hasRemainderForLeftJoin={}]",
        container, left, right, leftUpstream, rightUpstream, leftSchema, rightSchema, outputIndex,
        leftJoinIndex, rightJoinIndex, hasRemainderForLeftJoin);
  }
}