aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill
diff options
context:
space:
mode:
authorSalim Achouche <sachouche2@gmail.com>2018-08-24 08:55:52 -0700
committerBen-Zvi <bben-zvi@mapr.com>2018-09-07 18:36:36 -0700
commit85ebae5f1b447d1ec60e062ab9e00da7f3d186fb (patch)
treea7e24c9f7d3204a71a080bb1b357ada506ce79aa /exec/java-exec/src/main/java/org/apache/drill
parentfa0d78d16eaf35d30d95613913a5613b2a82280d (diff)
DRILL-6709: Extended the batch stats utility to other operators
closes #1444
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java32
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java118
16 files changed, 300 insertions, 174 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index cda98fc7a..cd24a4ca5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
@@ -302,7 +303,7 @@ public class ScanBatch implements CloseableRecordBatch {
return; // NOOP
}
- RecordBatchStats.logRecordBatchStats(getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, getFQNForLogging(MAX_FQN_LENGTH), this, batchStatsContext);
}
/** Might truncate the FQN if too long */
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 28f8263d1..9de9aae06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -60,6 +60,8 @@ import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -159,9 +161,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
updateIncomingStats();
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
- }
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
}
}
@@ -204,7 +204,9 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
hashAggMemoryManager = new HashAggMemoryManager(configuredBatchSize);
- logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "configured output batch size: %d", configuredBatchSize);
columnMapping = CaseInsensitiveMap.newHashMap();
}
@@ -474,15 +476,15 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
stats.setLongStat(HashAggTemplate.Metric.AVG_OUTPUT_ROW_BYTES, hashAggMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(HashAggTemplate.Metric.OUTPUT_RECORD_COUNT, hashAggMemoryManager.getTotalOutputRecords());
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
- hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ hashAggMemoryManager.getNumIncomingBatches(), hashAggMemoryManager.getAvgInputBatchSize(),
+ hashAggMemoryManager.getAvgInputRowWidth(), hashAggMemoryManager.getTotalInputRecords());
- logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
- hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
- }
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "outgoing aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ hashAggMemoryManager.getNumOutgoingBatches(), hashAggMemoryManager.getAvgOutputBatchSize(),
+ hashAggMemoryManager.getAvgOutputRowWidth(), hashAggMemoryManager.getTotalOutputRecords());
}
@Override
public void close() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 65ca82972..e8ae30e6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -70,7 +70,8 @@ import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
-
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
@@ -1223,10 +1224,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords);
-
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(outgoing));
- }
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext());
this.outcome = IterOutcome.OK;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index d634fb24a..1623319c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -52,6 +52,8 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
@@ -157,7 +159,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
// i.e. all rows fit within memory budget.
setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));
- logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
updateIncomingStats();
}
@@ -170,7 +172,8 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize);
- logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "configured output batch size: %d", configuredBatchSize);
}
@Override
@@ -261,10 +264,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
}
flattenMemoryManager.updateOutgoingStats(outputRecords);
-
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
- }
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
// Get the final outcome based on hasRemainder since that will determine if all the incoming records were
// consumed in current output batch or not
@@ -516,15 +516,15 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, flattenMemoryManager.getAvgOutputRowWidth());
stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, flattenMemoryManager.getTotalOutputRecords());
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
- flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ flattenMemoryManager.getNumIncomingBatches(), flattenMemoryManager.getAvgInputBatchSize(),
+ flattenMemoryManager.getAvgInputRowWidth(), flattenMemoryManager.getTotalInputRecords());
- logger.debug("BATCH_STATS, outgoing aggregate: count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
- flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
- }
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "outgoing aggregate: count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ flattenMemoryManager.getNumOutgoingBatches(), flattenMemoryManager.getAvgOutputBatchSize(),
+ flattenMemoryManager.getAvgOutputRowWidth(), flattenMemoryManager.getTotalOutputRecords());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index dc40b24bc..368bb5dc9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -68,9 +68,10 @@ import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -291,7 +292,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
buildBatch,
() -> {
batchMemoryManager.update(RIGHT_INDEX, 0, true);
- logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT, batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
});
}
@@ -306,7 +307,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
probeBatch,
() -> {
batchMemoryManager.update(LEFT_INDEX, 0);
- logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
});
}
@@ -488,9 +489,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
container.setRecordCount(outputRecords);
batchMemoryManager.updateOutgoingStats(outputRecords);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
- }
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
/* We are here because of one the following
* 1. Completed processing of all the records and we are done
@@ -1121,12 +1120,17 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
- logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
- configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d",
+ configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
- logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "configured output batch size: %d", configuredBatchSize);
+
enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
}
@@ -1242,19 +1246,20 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
updateMetrics();
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
- }
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
this.cleanup();
super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 5b7fd9cc6..1aaf5e202 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -240,25 +242,26 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
public void close() {
updateBatchMemoryManagerStats();
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
- "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
- "record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
- "record count : {}", batchMemoryManager.getNumOutgoingBatches(),
- batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(),
- batchMemoryManager.getTotalOutputRecords());
- }
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, " +
+ "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, " +
+ "record count : %d", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, " +
+ "record count : %d", batchMemoryManager.getNumOutgoingBatches(),
+ batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(),
+ batchMemoryManager.getTotalOutputRecords());
super.close();
}
@@ -733,11 +736,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
batchMemoryManager.updateOutgoingStats(outputIndex);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
- logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]",
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "Number of records emitted: %d and Allocator Stats: [AllocatedMem: %d, PeakMem: %d]",
outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
- }
// Update the output index for next output batch to zero
outputIndex = 0;
@@ -1182,10 +1184,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX ? "left" : "right",
- batchMemoryManager.getRecordBatchSizer(inputIndex));
- logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}", maxOutputRowCount, newOutputRowCount);
+ if (isRecordBatchStatsLoggingEnabled()) {
+ RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
+ RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount);
}
if (useMemoryManager) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 7b14e03ec..72f776a55 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -58,6 +58,8 @@ import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -123,7 +125,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
@Override
public void update(int inputIndex) {
status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
- logger.debug("BATCH_STATS, incoming {}: {}", inputIndex == 0 ? "left" : "right", getRecordBatchSizer(inputIndex));
+ RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
+ RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
}
}
@@ -134,7 +137,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new MergeJoinMemoryManager(configuredBatchSize, left, right);
- logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "configured output batch size: %d", configuredBatchSize);
if (popConfig.getConditions().size() == 0) {
throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions");
@@ -271,10 +275,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
vw.getValueVector().getMutator().setValueCount(getRecordCount());
}
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
- }
-
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
batchMemoryManager.updateOutgoingStats(getRecordCount());
}
@@ -282,23 +283,24 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
public void close() {
updateBatchMemoryManagerStats();
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
-
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
-
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
- }
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
super.close();
leftIterator.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 8054d7fef..e2f93ecf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -52,8 +52,9 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
-import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
@@ -133,7 +134,9 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
- logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "configured output batch size: %d", configuredBatchSize);
}
/**
@@ -168,7 +171,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
case OK:
// For right side, use aggregate i.e. average row width across batches
batchMemoryManager.update(RIGHT_INDEX, 0, true);
- logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+ batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
addBatchToHyperContainer(right);
break;
case OUT_OF_MEMORY:
@@ -202,10 +206,7 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
container.setRecordCount(outputRecords);
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
- }
-
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
logger.debug("Number of records emitted: " + outputRecords);
return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
@@ -357,7 +358,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
}
batchMemoryManager.update(RIGHT_INDEX, 0, true);
- logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_RIGHT,
+ batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX), getRecordBatchStatsContext());
if (leftUpstream != IterOutcome.NONE) {
leftSchema = left.getSchema();
@@ -395,7 +397,8 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
}
batchMemoryManager.update(LEFT_INDEX, 0);
- logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+ batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext());
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
@@ -422,23 +425,24 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi
public void close() {
updateBatchMemoryManagerStats();
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
- }
rightContainer.clear();
rightCounts.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index adf681b58..3b8ab8d2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -24,7 +24,8 @@ import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
-
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import javax.inject.Named;
import java.util.LinkedList;
import java.util.List;
@@ -192,7 +193,9 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
break;
case OK:
setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
- logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX));
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
+ outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
+ outgoing.getRecordBatchStatsContext());
leftRecordCount = left.getRecordCount();
break;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 6b273d863..81e54db90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -28,6 +28,8 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -136,7 +138,6 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
public ProjectMemoryManager(int configuredOutputSize) {
super(configuredOutputSize);
outputColumnSizes = new HashMap<>();
- logger.debug("BATCH_STATS, configuredOutputSize: {}", configuredOutputSize);
}
public boolean isComplex(MajorType majorType) {
@@ -253,6 +254,9 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
setIncomingBatch(incomingBatch);
setOutgoingBatch(outgoingBatch);
reset();
+
+ RecordBatchStats.logRecordBatchStats(outgoingBatch.getRecordBatchStatsContext(),
+ "configuredOutputSize: %d", getOutputBatchSize());
}
private void reset() {
@@ -321,7 +325,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
(batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
- logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext());
updateIncomingStats();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 247f36f19..4d55f0034 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -55,13 +55,14 @@ import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.SimpleRecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.UntypedNullHolder;
@@ -252,7 +253,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
memoryManager.updateOutgoingStats(outputRecords);
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
// Get the final outcome based on hasRemainder since that will determine if all the incoming records were
// consumed in current output batch or not
@@ -298,7 +299,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
memoryManager.updateOutgoingStats(projRecords);
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
}
public void addComplexWriter(final ComplexWriter writer) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 8df83ee79..7e16d6a35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -43,13 +43,14 @@ import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchMemoryManager;
-import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.resolver.TypeCastRules;
import org.apache.drill.exec.util.VectorUtil;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
@@ -76,7 +77,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new RecordBatchMemoryManager(numInputs, configuredBatchSize);
- logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "configured output batch size: %d", configuredBatchSize);
}
@Override
@@ -171,9 +174,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
batchStatus.recordsProcessed += recordCount;
batchMemoryManager.updateOutgoingStats(recordCount);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
- }
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
if (callBack.getSchemaChangedAndReset()) {
return IterOutcome.OK_NEW_SCHEMA;
@@ -368,8 +369,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
if (topStatus.prefetched) {
topStatus.prefetched = false;
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
- logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
- batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
+ RecordBatchStats.logRecordBatchStats(topStatus.inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT,
+ batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
+ getRecordBatchStatsContext());
return Pair.of(topStatus.outcome, topStatus);
} else {
@@ -387,8 +389,9 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
topStatus.recordsProcessed = 0;
topStatus.totalRecordsToProcess = topStatus.batch.getRecordCount();
batchMemoryManager.update(topStatus.batch, topStatus.inputIndex);
- logger.debug("BATCH_STATS, incoming {}: {}", topStatus.inputIndex == 0 ? "left" : "right",
- batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex));
+ RecordBatchStats.logRecordBatchStats(topStatus.inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT,
+ batchMemoryManager.getRecordBatchSizer(topStatus.inputIndex),
+ getRecordBatchStatsContext());
return Pair.of(outcome, topStatus);
case OUT_OF_MEMORY:
case STOP:
@@ -421,19 +424,20 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
super.close();
updateBatchMemoryManagerStats();
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate left: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));
- logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
- batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate right: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
+ batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
- batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
- batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
- }
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "outgoing aggregate: batch count : %d, avg bytes : %d, avg row bytes : %d, record count : %d",
+ batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
+ batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
index d58d242c7..5f6396731 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestRecordBatch.java
@@ -38,6 +38,8 @@ import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.util.record.RecordBatchStats;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedMapVector;
@@ -121,10 +123,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
// Limit to lower bound of total number of rows possible for this batch
// i.e. all rows fit within memory budget.
setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount()));
-
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, incoming:\n {}", getRecordBatchSizer());
- }
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), getRecordBatchStatsContext());
updateIncomingStats();
}
@@ -306,9 +305,7 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
// entire incoming recods has been unnested. If the entire records has been
// unnested, we return EMIT and any blocking operators in the pipeline will
// unblock.
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
- }
+ RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
return hasRemainder ? IterOutcome.OK : IterOutcome.EMIT;
}
@@ -420,14 +417,15 @@ public class UnnestRecordBatch extends AbstractTableFunctionRecordBatch<UnnestPO
stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, memoryManager.getAvgOutputRowWidth());
stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, memoryManager.getTotalOutputRecords());
- logger.debug("BATCH_STATS, incoming aggregate: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}",
- memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(),
- memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords());
-
- logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}",
- memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(),
- memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords());
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "incoming aggregate: batch count : %d, avg batch bytes : %d, avg row bytes : %d, record count : %d",
+ memoryManager.getNumIncomingBatches(), memoryManager.getAvgInputBatchSize(),
+ memoryManager.getAvgInputRowWidth(), memoryManager.getTotalInputRecords());
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "outgoing aggregate: batch count : %d, avg batch bytes : %d, avg row bytes : %d, record count : %d",
+ memoryManager.getNumOutgoingBatches(), memoryManager.getAvgOutputBatchSize(),
+ memoryManager.getAvgOutputRowWidth(), memoryManager.getTotalOutputRecords());
}
private TypedFieldId checkAndGetUnnestFieldId() throws SchemaChangeException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index dfa1424ca..c38de2d2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordbatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass());
@@ -41,6 +42,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected final T popConfig;
protected final FragmentContext context;
protected final OperatorContext oContext;
+ protected final RecordBatchStatsContext batchStatsContext;
protected final OperatorStats stats;
protected final boolean unionTypeEnabled;
protected BatchState state;
@@ -58,6 +60,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
this.context = context;
this.popConfig = popConfig;
this.oContext = oContext;
+ this.batchStatsContext = new RecordBatchStatsContext(context, oContext);
stats = oContext.getStats();
container = new VectorContainer(this.oContext.getAllocator());
if (buildSchema) {
@@ -241,4 +244,12 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
public VectorContainer getContainer() {
return container;
}
+
+ public RecordBatchStatsContext getRecordBatchStatsContext() {
+ return batchStatsContext;
+ }
+
+ public boolean isRecordBatchStatsLoggingEnabled() {
+ return batchStatsContext.isEnableBatchSzLogging();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
index 4a0e1e81e..14b31327c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/batchsizing/OverflowSerDeUtil.java
@@ -85,10 +85,8 @@ final class OverflowSerDeUtil {
// Allocate the required memory to serialize the overflow fields
final DrillBuf buffer = allocator.buffer(bufferLength);
- if (batchStatsContext.isEnableBatchSzLogging()) {
- final String msg = String.format("Allocated a buffer of length [%d] to handle overflow", bufferLength);
- RecordBatchStats.logRecordBatchStats(msg, batchStatsContext);
- }
+ RecordBatchStats.logRecordBatchStats(batchStatsContext,
+ "Allocated a buffer of length [%d] to handle overflow", bufferLength);
// Create the result object
final RecordOverflowContainer recordOverflowContainer = new RecordOverflowContainer();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
index 0b2424409..c61607ef5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.util.record;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -134,25 +135,79 @@ public final class RecordBatchStats {
}
}
+ /** Indicates whether a record batch is Input or Output */
+ public enum RecordBatchIOType {
+ INPUT ("incoming"),
+ INPUT_RIGHT ("incoming right"),
+ INPUT_LEFT ("incoming left"),
+ OUTPUT ("outgoing"),
+ PASSTHROUGH ("passthrough");
+
+ private final String ioTypeString;
+
+ private RecordBatchIOType(String ioTypeString) {
+ this.ioTypeString = ioTypeString;
+ }
+
+ /**
+ * @return IO Type string
+ */
+ public String getIOTypeString() {
+ return ioTypeString;
+ }
+ }
+
+ /**
+ * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)}
+ */
+ public static void logRecordBatchStats(RecordBatchIOType ioType,
+ String sourceId,
+ RecordBatch recordBatch,
+ RecordBatchStatsContext batchStatsContext) {
+
+ if (!batchStatsContext.isEnableBatchSzLogging()) {
+ return; // NOOP
+ }
+
+ logRecordBatchStats(ioType, sourceId, new RecordBatchSizer(recordBatch), batchStatsContext);
+ }
+
+ /**
+ * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)}
+ */
+ public static void logRecordBatchStats(RecordBatchIOType ioType,
+ RecordBatch recordBatch,
+ RecordBatchStatsContext batchStatsContext) {
+
+ if (!batchStatsContext.isEnableBatchSzLogging()) {
+ return; // NOOP
+ }
+
+ logRecordBatchStats(ioType, null, new RecordBatchSizer(recordBatch), batchStatsContext);
+ }
+
/**
- * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)}
+ * @see {@link RecordBatchStats#logRecordBatchStats(RecordBatchIOType, String, RecordBatchSizer, RecordBatchStatsContext)}
*/
- public static void logRecordBatchStats(RecordBatch recordBatch,
+ public static void logRecordBatchStats(RecordBatchIOType ioType,
+ RecordBatchSizer recordBatchSizer,
RecordBatchStatsContext batchStatsContext) {
- logRecordBatchStats(null, recordBatch, batchStatsContext);
+ logRecordBatchStats(ioType, null, recordBatchSizer, batchStatsContext);
}
/**
* Logs record batch statistics for the input record batch (logging happens only
* when record statistics logging is enabled).
*
+ * @param ioType whether a record batch is an input or/and output
* @param sourceId optional source identifier for scanners
- * @param recordBatch a set of records
+ * @param batchSizer contains batch sizing information
* @param batchStatsContext batch stats context object
*/
- public static void logRecordBatchStats(String sourceId,
- RecordBatch recordBatch,
+ public static void logRecordBatchStats(RecordBatchIOType ioType,
+ String sourceId,
+ RecordBatchSizer batchSizer,
RecordBatchStatsContext batchStatsContext) {
if (!batchStatsContext.isEnableBatchSzLogging()) {
@@ -161,7 +216,7 @@ public final class RecordBatchStats {
final String statsId = batchStatsContext.getContextOperatorId();
final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging();
- final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
+ final String msg = printRecordBatchStats(statsId, ioType, sourceId, batchSizer, verbose);
logBatchStatsMsg(batchStatsContext, msg, false);
}
@@ -170,7 +225,6 @@ public final class RecordBatchStats {
* Logs a generic batch statistics message
*
* @param message log message
- * @param batchStatsLogging
* @param batchStatsContext batch stats context object
*/
public static void logRecordBatchStats(String message,
@@ -184,6 +238,25 @@ public final class RecordBatchStats {
}
/**
+ * Logs a generic batch statistics message
+ *
+ * @param batchStatsContext batch stats context object
+ * @param format a string format as in {@link String#format} method
+ * @param args format's arguments
+ */
+ public static void logRecordBatchStats(RecordBatchStatsContext batchStatsContext,
+ String format,
+ Object...args) {
+
+ if (!batchStatsContext.isEnableBatchSzLogging()) {
+ return; // NOOP
+ }
+
+ final String message = String.format(format, args);
+ logBatchStatsMsg(batchStatsContext, message, true);
+ }
+
+ /**
* @param allocator dumps allocator statistics
* @return string with allocator statistics
*/
@@ -212,27 +285,30 @@ public final class RecordBatchStats {
* Constructs record batch statistics for the input record batch
*
* @param stats instance identifier
+ * @param ioType whether a record batch is an input or/and output
* @param sourceId optional source identifier for scanners
- * @param recordBatch a set of records
+ * @param batchSizer contains batch sizing information
* @param verbose whether to include fine-grained stats
*
* @return a string containing the record batch statistics
*/
private static String printRecordBatchStats(String statsId,
+ RecordBatchIOType ioType,
String sourceId,
- RecordBatch recordBatch,
+ RecordBatchSizer batchSizer,
boolean verbose) {
- final RecordBatchSizer batchSizer = new RecordBatchSizer(recordBatch);
final StringBuilder msg = new StringBuilder();
msg.append(BATCH_STATS_PREFIX);
- msg.append(" Originator: {");
+ msg.append(" Operator: {");
msg.append(statsId);
if (sourceId != null) {
msg.append(':');
msg.append(sourceId);
}
+ msg.append("}, IO Type: {");
+ msg.append(toString(ioType));
msg.append("}, Batch size: {");
msg.append( " Records: " );
msg.append(batchSizer.rowCount());
@@ -269,7 +345,8 @@ public final class RecordBatchStats {
boolean includePrefix) {
if (includePrefix) {
- msg = BATCH_STATS_PREFIX + '\t' + msg;
+ final String statsId = batchStatsContext.getContextOperatorId();
+ msg = BATCH_STATS_PREFIX + " Operator: {" + statsId + "} " + msg;
}
if (batchStatsContext.useInfoLevelLogging()) {
@@ -279,4 +356,19 @@ public final class RecordBatchStats {
}
}
+ private static String toString(RecordBatchIOType ioType) {
+ Preconditions.checkNotNull(ioType, "The record batch IO type cannot be null");
+
+ switch (ioType) {
+ case INPUT: return "incoming";
+ case INPUT_RIGHT: return "incoming right";
+ case INPUT_LEFT: return "incoming left";
+ case OUTPUT: return "outgoing";
+ case PASSTHROUGH: return "passthrough";
+
+ default: throw new RuntimeException("Unexpected record batch IO type..");
+ }
+
+ }
+
} \ No newline at end of file