diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java | 118 |
1 files changed, 105 insertions, 13 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java index 0b2424409..c61607ef5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.util.record; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -134,25 +135,79 @@ public final class RecordBatchStats { } } + /** Indicates whether a record batch is Input or Output */ + public enum RecordBatchIOType { + INPUT ("incoming"), + INPUT_RIGHT ("incoming right"), + INPUT_LEFT ("incoming left"), + OUTPUT ("outgoing"), + PASSTHROUGH ("passthrough"); + + private final String ioTypeString; + + private RecordBatchIOType(String ioTypeString) { + this.ioTypeString = ioTypeString; + } + + /** + * @return IO Type string + */ + public String getIOTypeString() { + return ioTypeString; + } + } + + /** + * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)} + */ + public static void logRecordBatchStats(RecordBatchIOType ioType, + String sourceId, + RecordBatch recordBatch, + RecordBatchStatsContext batchStatsContext) { + + if (!batchStatsContext.isEnableBatchSzLogging()) { + return; // NOOP + } + + logRecordBatchStats(ioType, sourceId, new RecordBatchSizer(recordBatch), batchStatsContext); + } + + /** + * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)} + */ + public static void logRecordBatchStats(RecordBatchIOType ioType, + RecordBatch recordBatch, + RecordBatchStatsContext batchStatsContext) { + + if (!batchStatsContext.isEnableBatchSzLogging()) { + return; // NOOP + } + + logRecordBatchStats(ioType, null, new RecordBatchSizer(recordBatch), batchStatsContext); + } + /** - * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)} + * @see {@link RecordBatchStats#logRecordBatchStats(RecordBatchIOType, String, RecordBatchSizer, RecordBatchStatsContext)} */ - public static void logRecordBatchStats(RecordBatch recordBatch, + public static void logRecordBatchStats(RecordBatchIOType ioType, + RecordBatchSizer recordBatchSizer, RecordBatchStatsContext batchStatsContext) { - logRecordBatchStats(null, recordBatch, batchStatsContext); + logRecordBatchStats(ioType, null, recordBatchSizer, batchStatsContext); } /** * Logs record batch statistics for the input record batch (logging happens only * when record statistics logging is enabled). * + * @param ioType whether a record batch is an input or/and output * @param sourceId optional source identifier for scanners - * @param recordBatch a set of records + * @param batchSizer contains batch sizing information * @param batchStatsContext batch stats context object */ - public static void logRecordBatchStats(String sourceId, - RecordBatch recordBatch, + public static void logRecordBatchStats(RecordBatchIOType ioType, + String sourceId, + RecordBatchSizer batchSizer, RecordBatchStatsContext batchStatsContext) { if (!batchStatsContext.isEnableBatchSzLogging()) { @@ -161,7 +216,7 @@ public final class RecordBatchStats { final String statsId = batchStatsContext.getContextOperatorId(); final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging(); - final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose); + final String msg = printRecordBatchStats(statsId, ioType, sourceId, batchSizer, verbose); logBatchStatsMsg(batchStatsContext, msg, false); } @@ -170,7 +225,6 @@ public final class RecordBatchStats { * Logs a generic batch statistics message * * @param message log message - * @param batchStatsLogging * @param batchStatsContext batch stats context object */ public static void logRecordBatchStats(String message, @@ -184,6 +238,25 @@ public final class RecordBatchStats { } /** + * Logs a generic batch statistics message + * + * @param batchStatsContext batch stats context object + * @param format a string format as in {@link String#format} method + * @param args format's arguments + */ + public static void logRecordBatchStats(RecordBatchStatsContext batchStatsContext, + String format, + Object...args) { + + if (!batchStatsContext.isEnableBatchSzLogging()) { + return; // NOOP + } + + final String message = String.format(format, args); + logBatchStatsMsg(batchStatsContext, message, true); + } + + /** * @param allocator dumps allocator statistics * @return string with allocator statistics */ @@ -212,27 +285,30 @@ public final class RecordBatchStats { * Constructs record batch statistics for the input record batch * * @param stats instance identifier + * @param ioType whether a record batch is an input or/and output * @param sourceId optional source identifier for scanners - * @param recordBatch a set of records + * @param batchSizer contains batch sizing information * @param verbose whether to include fine-grained stats * * @return a string containing the record batch statistics */ private static String printRecordBatchStats(String statsId, + RecordBatchIOType ioType, String sourceId, - RecordBatch recordBatch, + RecordBatchSizer batchSizer, boolean verbose) { - final RecordBatchSizer batchSizer = new RecordBatchSizer(recordBatch); final StringBuilder msg = new StringBuilder(); msg.append(BATCH_STATS_PREFIX); - msg.append(" Originator: {"); + msg.append(" Operator: {"); msg.append(statsId); if (sourceId != null) { msg.append(':'); msg.append(sourceId); } + msg.append("}, IO Type: {"); + msg.append(toString(ioType)); msg.append("}, Batch size: {"); msg.append( " Records: " ); msg.append(batchSizer.rowCount()); @@ -269,7 +345,8 @@ public final class RecordBatchStats { boolean includePrefix) { if (includePrefix) { - msg = BATCH_STATS_PREFIX + '\t' + msg; + final String statsId = batchStatsContext.getContextOperatorId(); + msg = BATCH_STATS_PREFIX + " Operator: {" + statsId + "} " + msg; } if (batchStatsContext.useInfoLevelLogging()) { @@ -279,4 +356,19 @@ public final class RecordBatchStats { } } + private static String toString(RecordBatchIOType ioType) { + Preconditions.checkNotNull(ioType, "The record batch IO type cannot be null"); + + switch (ioType) { + case INPUT: return "incoming"; + case INPUT_RIGHT: return "incoming right"; + case INPUT_LEFT: return "incoming left"; + case OUTPUT: return "outgoing"; + case PASSTHROUGH: return "passthrough"; + + default: throw new RuntimeException("Unexpected record batch IO type.."); + } + + } + }
\ No newline at end of file |