diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill')
16 files changed, 300 insertions, 174 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index cda98fc7a..cd24a4ca5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -48,6 +48,7 @@ import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVarCharVector; @@ -302,7 +303,7 @@ public class ScanBatch implements CloseableRecordBatch { return; // NOOP } - RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext); } /** Might truncate the FQN if too long */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 28f8263d1..9de9aae06 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -60,6 +60,8 @@ import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; @@ -159,9 +161,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } updateIncomingStats(); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer()); - } + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext()); } } @@ -204,7 +204,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize); - logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); + + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "configured output batch size: %d", configuredBatchSize); columnMapping = CaseInsensitiveMap.newHashMap(); } @@ -474,15 +476,15 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { stats.setLongStat(HashAggTemplate.Metric.AVG_OUTPUT_ROW_BYTES, hashAggMemoryManager.getAvgOutputRowWidth()); stats.setLongStat(HashAggTemplate.Metric.OUTPUT_RECORD_COUNT, hashAggMemoryManager.getTotalOutputRecords()); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(), - hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords()); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(), + hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords()); - logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(), - hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords()); - } + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "outgoing aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(), + hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords()); } @Override public void close() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 65ca82972..e8ae30e6f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -70,7 +70,8 @@ import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; - +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; @@ -1223,10 +1224,7 @@ public abstract class HashAggTemplate implements HashAggregator { } outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords); - - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(outgoing)); - } + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext()); this.outcome = IterOutcome.OK; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index d634fb24a..1623319c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -52,6 +52,8 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; @@ -157,7 +159,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // i.e. all rows fit within memory budget. setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); - logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer()); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext()); updateIncomingStats(); } @@ -170,7 +172,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize); - logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "configured output batch size: %d", configuredBatchSize); } @Override @@ -261,10 +264,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } flattenMemoryManager.updateOutgoingStats(outputRecords); - - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); - } + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); // Get the final outcome based on hasRemainder since that will determine if all the incoming records were // consumed in current output batch or not @@ -516,15 +516,15 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords()); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(), - flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords()); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(), + flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords()); - logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(), - flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords()); - } + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "outgoing aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(), + flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index dc40b24bc..368bb5dc9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -68,9 +68,10 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -291,7 +292,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { buildBatch, () -> { batchMemoryManager.update(RIGHT_INDEX, 0, true); - logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext()); }); } @@ -306,7 +307,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { probeBatch, () -> { batchMemoryManager.update(LEFT_INDEX, 0); - logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext()); }); } @@ -488,9 +489,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { container.setRecordCount(outputRecords); batchMemoryManager.updateOutgoingStats(outputRecords); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); - } + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); /* We are here because of one the following * 1. Completed processing of all the records and we are done @@ -1121,12 +1120,17 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR); int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor))); - logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}", - configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d", + configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize); batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>()); - logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); + + + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "configured output batch size: %d", configuredBatchSize); + enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER); } @@ -1242,19 +1246,20 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { updateMetrics(); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); - logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); - logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), - batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); - } + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); this.cleanup(); super.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 5b7fd9cc6..1aaf5e202 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -39,6 +39,8 @@ import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.SchemaBuilder; import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; @@ -240,25 +242,26 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> public void close() { updateBatchMemoryManagerStats(); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, " + - "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); - - logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, " + - "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); - - logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, " + - "record count : {}", batchMemoryManager.getNumOutgoingBatches(), - batchMemoryManager.getAvgOutputBatchSize(), - batchMemoryManager.getAvgOutputRowWidth(), - batchMemoryManager.getTotalOutputRecords()); - } + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, " + + "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, " + + "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, " + + "record count : %d", batchMemoryManager.getNumOutgoingBatches(), + batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), + batchMemoryManager.getTotalOutputRecords()); super.close(); } @@ -733,11 +736,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> batchMemoryManager.updateOutgoingStats(outputIndex); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); - logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "Number of records emitted: %d and Allocator Stats: [AllocatedMem: %d, PeakMem: %d]", outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation()); - } // Update the output index for next output batch to zero outputIndex = 0; @@ -1182,10 +1184,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0 final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX ? "left" : "right", - batchMemoryManager.getRecordBatchSizer(inputIndex)); - logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}", maxOutputRowCount, newOutputRowCount); + if (isRecordBatchStatsLoggingEnabled()) { + RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; + RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext()); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount); } if (useMemoryManager) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 7b14e03ec..72f776a55 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -58,6 +58,8 @@ import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -123,7 +125,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { @Override public void update(int inputIndex) { status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition())); - logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex)); + RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; + RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext()); } } @@ -134,7 +137,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right); - logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "configured output batch size: %d", configuredBatchSize); if (popConfig.getConditions().size() == 0) { throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions"); @@ -271,10 +275,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { vw.getValueVector().getMutator().setValueCount(getRecordCount()); } - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); - } - + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); batchMemoryManager.updateOutgoingStats(getRecordCount()); } @@ -282,23 +283,24 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { public void close() { updateBatchMemoryManagerStats(); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); - - logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); - - logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), - batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); - } + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); super.close(); leftIterator.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index 8054d7fef..e2f93ecf2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -52,8 +52,9 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.record.JoinBatchMemoryManager; -import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; @@ -133,7 +134,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi // get the output batch size from config. int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>()); - logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); + + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "configured output batch size: %d", configuredBatchSize); } /** @@ -168,7 +171,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi case OK: // For right side, use aggregate i.e. average row width across batches batchMemoryManager.update(RIGHT_INDEX, 0, true); - logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, + batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext()); addBatchToHyperContainer(right); break; case OUT_OF_MEMORY: @@ -202,10 +206,7 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi container.setRecordCount(outputRecords); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); - } - + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); logger.debug("Number of records emitted: " + outputRecords); return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE; @@ -357,7 +358,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi } batchMemoryManager.update(RIGHT_INDEX, 0, true); - logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, + batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext()); if (leftUpstream != IterOutcome.NONE) { leftSchema = left.getSchema(); @@ -395,7 +397,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi } batchMemoryManager.update(LEFT_INDEX, 0); - logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, + batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext()); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); @@ -422,23 +425,24 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi public void close() { updateBatchMemoryManagerStats(); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); - logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); - logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); - } rightContainer.clear(); rightCounts.clear(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java index adf681b58..3b8ab8d2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java @@ -24,7 +24,8 @@ import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; - +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import javax.inject.Named; import java.util.LinkedList; import java.util.List; @@ -192,7 +193,9 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { break; case OK: setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex)); - logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX)); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, + outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX), + outgoing.getRecordBatchStatsContext()); leftRecordCount = left.getRecordCount(); break; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java index 6b273d863..81e54db90 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java @@ -28,6 +28,8 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchMemoryManager; import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.NullableVector; import org.apache.drill.exec.vector.ValueVector; @@ -136,7 +138,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { public ProjectMemoryManager(int configuredOutputSize) { super(configuredOutputSize); outputColumnSizes = new HashMap<>(); - logger.debug("BATCH_STATS, configuredOutputSize: {}", configuredOutputSize); } public boolean isComplex(MajorType majorType) { @@ -253,6 +254,9 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { setIncomingBatch(incomingBatch); setOutgoingBatch(outgoingBatch); reset(); + + RecordBatchStats.logRecordBatchStats(outgoingBatch.getRecordBatchStatsContext(), + "configuredOutputSize: %d", getOutputBatchSize()); } private void reset() { @@ -321,7 +325,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager { rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth, (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch); - logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer()); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext()); updateIncomingStats(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 247f36f19..4d55f0034 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -55,13 +55,14 @@ import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.SimpleRecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.store.ColumnExplorer; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.UntypedNullHolder; @@ -252,7 +253,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } memoryManager.updateOutgoingStats(outputRecords); - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); // Get the final outcome based on hasRemainder since that will determine if all the incoming records were // consumed in current output batch or not @@ -298,7 +299,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } memoryManager.updateOutgoingStats(projRecords); - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); } public void addComplexWriter(final ComplexWriter writer) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 8df83ee79..7e16d6a35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -43,13 +43,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchMemoryManager; -import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.resolver.TypeCastRules; import org.apache.drill.exec.util.VectorUtil; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; @@ -76,7 +77,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { // get the output batch size from config. int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize); - logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); + + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "configured output batch size: %d", configuredBatchSize); } @Override @@ -171,9 +174,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { batchStatus.recordsProcessed += recordCount; batchMemoryManager.updateOutgoingStats(recordCount); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); - } + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); if (callBack.getSchemaChangedAndReset()) { return IterOutcome.OK_NEW_SCHEMA; @@ -368,8 +369,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { if (topStatus.prefetched) { topStatus.prefetched = false; batchMemoryManager.update(topStatus.batch, topStatus.inputIndex); - logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right", - batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex)); + RecordBatchStats.logRecordBatchStats(topStatus.inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT, + batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex), + getRecordBatchStatsContext()); return Pair.of(topStatus.outcome, topStatus); } else { @@ -387,8 +389,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { topStatus.recordsProcessed = 0; topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount(); batchMemoryManager.update(topStatus.batch, topStatus.inputIndex); - logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right", - batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex)); + RecordBatchStats.logRecordBatchStats(topStatus.inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT, + batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex), + getRecordBatchStatsContext()); return Pair.of(outcome, topStatus); case OUT_OF_MEMORY: case STOP: @@ -421,19 +424,20 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { super.close(); updateBatchMemoryManagerStats(); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); - logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), - batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); - logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}", - batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), - batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); - } + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java index d58d242c7..5f6396731 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java @@ -38,6 +38,8 @@ import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.util.record.RecordBatchStats; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.RepeatedMapVector; @@ -121,10 +123,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO // Limit to lower bound of total number of rows possible for this batch // i.e. all rows fit within memory budget. setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); - - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer()); - } + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext()); updateIncomingStats(); } @@ -306,9 +305,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO // entire incoming recods has been unnested. If the entire records has been // unnested, we return EMIT and any blocking operators in the pipeline will // unblock. - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this)); - } + RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT; } @@ -420,14 +417,15 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, memoryManager.getAvgOutputRowWidth()); stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, memoryManager.getTotalOutputRecords()); - logger.debug("BATCH_STATS, incoming aggregate: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", - memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(), - memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords()); - - logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", - memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(), - memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords()); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "incoming aggregate: batch count : %d, avg batch bytes : %d, avg row bytes : %d, record count : %d", + memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(), + memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords()); + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "outgoing aggregate: batch count : %d, avg batch bytes : %d, avg row bytes : %d, record count : %d", + memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(), + memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords()); } private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index dfa1424ca..c38de2d2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.server.options.OptionValue; +import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext; public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass()); @@ -41,6 +42,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements protected final T popConfig; protected final FragmentContext context; protected final OperatorContext oContext; + protected final RecordBatchStatsContext batchStatsContext; protected final OperatorStats stats; protected final boolean unionTypeEnabled; protected BatchState state; @@ -58,6 +60,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements this.context = context; this.popConfig = popConfig; this.oContext = oContext; + this.batchStatsContext = new RecordBatchStatsContext(context, oContext); stats = oContext.getStats(); container = new VectorContainer(this.oContext.getAllocator()); if (buildSchema) { @@ -241,4 +244,12 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements public VectorContainer getContainer() { return container; } + + public RecordBatchStatsContext getRecordBatchStatsContext() { + return batchStatsContext; + } + + public boolean isRecordBatchStatsLoggingEnabled() { + return batchStatsContext.isEnableBatchSzLogging(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java index 4a0e1e81e..14b31327c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java @@ -85,10 +85,8 @@ final class OverflowSerDeUtil { // Allocate the required memory to serialize the overflow fields final DrillBuf buffer = allocator.buffer(bufferLength); - if (batchStatsContext.isEnableBatchSzLogging()) { - final String msg = String.format("Allocated a buffer of length [%d] to handle overflow", bufferLength); - RecordBatchStats.logRecordBatchStats(msg, batchStatsContext); - } + RecordBatchStats.logRecordBatchStats(batchStatsContext, + "Allocated a buffer of length [%d] to handle overflow", bufferLength); // Create the result object final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java index 0b2424409..c61607ef5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.util.record; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -134,25 +135,79 @@ public final class RecordBatchStats { } } + /** Indicates whether a record batch is Input or Output */ + public enum RecordBatchIOType { + INPUT ("incoming"), + INPUT_RIGHT ("incoming right"), + INPUT_LEFT ("incoming left"), + OUTPUT ("outgoing"), + PASSTHROUGH ("passthrough"); + + private final String ioTypeString; + + private RecordBatchIOType(String ioTypeString) { + this.ioTypeString = ioTypeString; + } + + /** + * @return IO Type string + */ + public String getIOTypeString() { + return ioTypeString; + } + } + + /** + * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)} + */ + public static void logRecordBatchStats(RecordBatchIOType ioType, + String sourceId, + RecordBatch recordBatch, + RecordBatchStatsContext batchStatsContext) { + + if (!batchStatsContext.isEnableBatchSzLogging()) { + return; // NOOP + } + + logRecordBatchStats(ioType, sourceId, new RecordBatchSizer(recordBatch), batchStatsContext); + } + + /** + * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)} + */ + public static void logRecordBatchStats(RecordBatchIOType ioType, + RecordBatch recordBatch, + RecordBatchStatsContext batchStatsContext) { + + if (!batchStatsContext.isEnableBatchSzLogging()) { + return; // NOOP + } + + logRecordBatchStats(ioType, null, new RecordBatchSizer(recordBatch), batchStatsContext); + } + /** - * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)} + * @see {@link RecordBatchStats#logRecordBatchStats(RecordBatchIOType, String, RecordBatchSizer, RecordBatchStatsContext)} */ - public static void logRecordBatchStats(RecordBatch recordBatch, + public static void logRecordBatchStats(RecordBatchIOType ioType, + RecordBatchSizer recordBatchSizer, RecordBatchStatsContext batchStatsContext) { - logRecordBatchStats(null, recordBatch, batchStatsContext); + logRecordBatchStats(ioType, null, recordBatchSizer, batchStatsContext); } /** * Logs record batch statistics for the input record batch (logging happens only * when record statistics logging is enabled). * + * @param ioType whether a record batch is an input or/and output * @param sourceId optional source identifier for scanners - * @param recordBatch a set of records + * @param batchSizer contains batch sizing information * @param batchStatsContext batch stats context object */ - public static void logRecordBatchStats(String sourceId, - RecordBatch recordBatch, + public static void logRecordBatchStats(RecordBatchIOType ioType, + String sourceId, + RecordBatchSizer batchSizer, RecordBatchStatsContext batchStatsContext) { if (!batchStatsContext.isEnableBatchSzLogging()) { @@ -161,7 +216,7 @@ public final class RecordBatchStats { final String statsId = batchStatsContext.getContextOperatorId(); final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging(); - final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose); + final String msg = printRecordBatchStats(statsId, ioType, sourceId, batchSizer, verbose); logBatchStatsMsg(batchStatsContext, msg, false); } @@ -170,7 +225,6 @@ public final class RecordBatchStats { * Logs a generic batch statistics message * * @param message log message - * @param batchStatsLogging * @param batchStatsContext batch stats context object */ public static void logRecordBatchStats(String message, @@ -184,6 +238,25 @@ public final class RecordBatchStats { } /** + * Logs a generic batch statistics message + * + * @param batchStatsContext batch stats context object + * @param format a string format as in {@link String#format} method + * @param args format's arguments + */ + public static void logRecordBatchStats(RecordBatchStatsContext batchStatsContext, + String format, + Object...args) { + + if (!batchStatsContext.isEnableBatchSzLogging()) { + return; // NOOP + } + + final String message = String.format(format, args); + logBatchStatsMsg(batchStatsContext, message, true); + } + + /** * @param allocator dumps allocator statistics * @return string with allocator statistics */ @@ -212,27 +285,30 @@ public final class RecordBatchStats { * Constructs record batch statistics for the input record batch * * @param stats instance identifier + * @param ioType whether a record batch is an input or/and output * @param sourceId optional source identifier for scanners - * @param recordBatch a set of records + * @param batchSizer contains batch sizing information * @param verbose whether to include fine-grained stats * * @return a string containing the record batch statistics */ private static String printRecordBatchStats(String statsId, + RecordBatchIOType ioType, String sourceId, - RecordBatch recordBatch, + RecordBatchSizer batchSizer, boolean verbose) { - final RecordBatchSizer batchSizer = new RecordBatchSizer(recordBatch); final StringBuilder msg = new StringBuilder(); msg.append(BATCH_STATS_PREFIX); - msg.append(" Originator: {"); + msg.append(" Operator: {"); msg.append(statsId); if (sourceId != null) { msg.append(':'); msg.append(sourceId); } + msg.append("}, IO Type: {"); + msg.append(toString(ioType)); msg.append("}, Batch size: {"); msg.append( " Records: " ); msg.append(batchSizer.rowCount()); @@ -269,7 +345,8 @@ public final class RecordBatchStats { boolean includePrefix) { if (includePrefix) { - msg = BATCH_STATS_PREFIX + '\t' + msg; + final String statsId = batchStatsContext.getContextOperatorId(); + msg = BATCH_STATS_PREFIX + " Operator: {" + statsId + "} " + msg; } if (batchStatsContext.useInfoLevelLogging()) { @@ -279,4 +356,19 @@ public final class RecordBatchStats { } } + private static String toString(RecordBatchIOType ioType) { + Preconditions.checkNotNull(ioType, "The record batch IO type cannot be null"); + + switch (ioType) { + case INPUT: return "incoming"; + case INPUT_RIGHT: return "incoming right"; + case INPUT_LEFT: return "incoming left"; + case OUTPUT: return "outgoing"; + case PASSTHROUGH: return "passthrough"; + + default: throw new RuntimeException("Unexpected record batch IO type.."); + } + + } + }
\ No newline at end of file |