/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.drill.exec.record; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.base.PhysicalOperator; public abstract class AbstractBinaryRecordBatch extends AbstractRecordBatch { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass()); protected final RecordBatch left; protected final RecordBatch right; // state (IterOutcome) of the left input protected IterOutcome leftUpstream = IterOutcome.NONE; // state (IterOutcome) of the right input protected IterOutcome rightUpstream = IterOutcome.NONE; protected RecordBatchMemoryManager batchMemoryManager; public final int numInputs = 2; public static final int LEFT_INDEX = 0; public static final int RIGHT_INDEX = 1; public enum Metric implements MetricDef { LEFT_INPUT_BATCH_COUNT, LEFT_AVG_INPUT_BATCH_BYTES, LEFT_AVG_INPUT_ROW_BYTES, LEFT_INPUT_RECORD_COUNT, RIGHT_INPUT_BATCH_COUNT, RIGHT_AVG_INPUT_BATCH_BYTES, RIGHT_AVG_INPUT_ROW_BYTES, RIGHT_INPUT_RECORD_COUNT, OUTPUT_BATCH_COUNT, AVG_OUTPUT_BATCH_BYTES, AVG_OUTPUT_ROW_BYTES, OUTPUT_RECORD_COUNT; @Override public int metricId() { return ordinal(); } } protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { super(popConfig, context, true, context.newOperatorContext(popConfig)); this.left = left; this.right = right; } protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, RecordBatch left, RecordBatch right) throws OutOfMemoryException { super(popConfig, context, buildSchema); this.left = left; this.right = right; } protected boolean verifyOutcomeToSetBatchState(IterOutcome leftOutcome, IterOutcome rightOutcome) { if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) { state = BatchState.STOP; return false; } if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightOutcome == IterOutcome.OUT_OF_MEMORY) { state = BatchState.OUT_OF_MEMORY; return false; } if (checkForEarlyFinish(leftOutcome, rightOutcome)) { state = BatchState.DONE; return false; } // EMIT outcome is not expected as part of first batch from either side if (leftOutcome == IterOutcome.EMIT || rightOutcome == IterOutcome.EMIT) { state = BatchState.STOP; throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " + "buildSchema phase"); } return true; } /** * Prefetch first batch from both inputs. * @return true if caller should continue processing * false if caller should stop and exit from processing. */ protected boolean prefetchFirstBatchFromBothSides() { // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch leftUpstream = next(0, left); rightUpstream = next(1, right); return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream); } /** * Checks for the operator specific early terminal condition. * @return true if the further processing can stop. * false if the further processing is needed. */ protected boolean checkForEarlyFinish(IterOutcome leftOutcome, IterOutcome rightOutcome) { return (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE); } public RecordBatchMemoryManager getBatchMemoryManager() { return batchMemoryManager; } protected void updateBatchMemoryManagerStats() { stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX)); stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(LEFT_INDEX)); stats.setLongStat(Metric.LEFT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(LEFT_INDEX)); stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(LEFT_INDEX)); stats.setLongStat(Metric.RIGHT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(RIGHT_INDEX)); stats.setLongStat(Metric.RIGHT_AVG_INPUT_BATCH_BYTES, batchMemoryManager.getAvgInputBatchSize(RIGHT_INDEX)); stats.setLongStat(Metric.RIGHT_AVG_INPUT_ROW_BYTES, batchMemoryManager.getAvgInputRowWidth(RIGHT_INDEX)); stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, batchMemoryManager.getTotalInputRecords(RIGHT_INDEX)); stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, batchMemoryManager.getNumOutgoingBatches()); stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, batchMemoryManager.getAvgOutputBatchSize()); stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, batchMemoryManager.getAvgOutputRowWidth()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, batchMemoryManager.getTotalOutputRecords()); } }