aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java118
1 files changed, 105 insertions, 13 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
index 0b2424409..c61607ef5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/record/RecordBatchStats.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.util.record;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -134,25 +135,79 @@ public final class RecordBatchStats {
}
}
+ /** Indicates whether a record batch is Input or Output */
+ public enum RecordBatchIOType {
+ INPUT ("incoming"),
+ INPUT_RIGHT ("incoming right"),
+ INPUT_LEFT ("incoming left"),
+ OUTPUT ("outgoing"),
+ PASSTHROUGH ("passthrough");
+
+ private final String ioTypeString;
+
+ private RecordBatchIOType(String ioTypeString) {
+ this.ioTypeString = ioTypeString;
+ }
+
+ /**
+ * @return IO Type string
+ */
+ public String getIOTypeString() {
+ return ioTypeString;
+ }
+ }
+
+ /**
+ * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)}
+ */
+ public static void logRecordBatchStats(RecordBatchIOType ioType,
+ String sourceId,
+ RecordBatch recordBatch,
+ RecordBatchStatsContext batchStatsContext) {
+
+ if (!batchStatsContext.isEnableBatchSzLogging()) {
+ return; // NOOP
+ }
+
+ logRecordBatchStats(ioType, sourceId, new RecordBatchSizer(recordBatch), batchStatsContext);
+ }
+
+ /**
+ * @see {@link RecordBatchStats#logRecordBatchStats(IOType, String, RecordBatchSizer, RecordBatchStatsContext)}
+ */
+ public static void logRecordBatchStats(RecordBatchIOType ioType,
+ RecordBatch recordBatch,
+ RecordBatchStatsContext batchStatsContext) {
+
+ if (!batchStatsContext.isEnableBatchSzLogging()) {
+ return; // NOOP
+ }
+
+ logRecordBatchStats(ioType, null, new RecordBatchSizer(recordBatch), batchStatsContext);
+ }
+
/**
- * @see {@link RecordBatchStats#logRecordBatchStats(String, RecordBatch, RecordBatchStatsContext)}
+ * @see {@link RecordBatchStats#logRecordBatchStats(RecordBatchIOType, String, RecordBatchSizer, RecordBatchStatsContext)}
*/
- public static void logRecordBatchStats(RecordBatch recordBatch,
+ public static void logRecordBatchStats(RecordBatchIOType ioType,
+ RecordBatchSizer recordBatchSizer,
RecordBatchStatsContext batchStatsContext) {
- logRecordBatchStats(null, recordBatch, batchStatsContext);
+ logRecordBatchStats(ioType, null, recordBatchSizer, batchStatsContext);
}
/**
* Logs record batch statistics for the input record batch (logging happens only
* when record statistics logging is enabled).
*
+ * @param ioType whether a record batch is an input or/and output
* @param sourceId optional source identifier for scanners
- * @param recordBatch a set of records
+ * @param batchSizer contains batch sizing information
* @param batchStatsContext batch stats context object
*/
- public static void logRecordBatchStats(String sourceId,
- RecordBatch recordBatch,
+ public static void logRecordBatchStats(RecordBatchIOType ioType,
+ String sourceId,
+ RecordBatchSizer batchSizer,
RecordBatchStatsContext batchStatsContext) {
if (!batchStatsContext.isEnableBatchSzLogging()) {
@@ -161,7 +216,7 @@ public final class RecordBatchStats {
final String statsId = batchStatsContext.getContextOperatorId();
final boolean verbose = batchStatsContext.isEnableFgBatchSzLogging();
- final String msg = printRecordBatchStats(statsId, sourceId, recordBatch, verbose);
+ final String msg = printRecordBatchStats(statsId, ioType, sourceId, batchSizer, verbose);
logBatchStatsMsg(batchStatsContext, msg, false);
}
@@ -170,7 +225,6 @@ public final class RecordBatchStats {
* Logs a generic batch statistics message
*
* @param message log message
- * @param batchStatsLogging
* @param batchStatsContext batch stats context object
*/
public static void logRecordBatchStats(String message,
@@ -184,6 +238,25 @@ public final class RecordBatchStats {
}
/**
+ * Logs a generic batch statistics message
+ *
+ * @param batchStatsContext batch stats context object
+ * @param format a string format as in {@link String#format} method
+ * @param args format's arguments
+ */
+ public static void logRecordBatchStats(RecordBatchStatsContext batchStatsContext,
+ String format,
+ Object...args) {
+
+ if (!batchStatsContext.isEnableBatchSzLogging()) {
+ return; // NOOP
+ }
+
+ final String message = String.format(format, args);
+ logBatchStatsMsg(batchStatsContext, message, true);
+ }
+
+ /**
* @param allocator dumps allocator statistics
* @return string with allocator statistics
*/
@@ -212,27 +285,30 @@ public final class RecordBatchStats {
* Constructs record batch statistics for the input record batch
*
* @param stats instance identifier
+ * @param ioType whether a record batch is an input or/and output
* @param sourceId optional source identifier for scanners
- * @param recordBatch a set of records
+ * @param batchSizer contains batch sizing information
* @param verbose whether to include fine-grained stats
*
* @return a string containing the record batch statistics
*/
private static String printRecordBatchStats(String statsId,
+ RecordBatchIOType ioType,
String sourceId,
- RecordBatch recordBatch,
+ RecordBatchSizer batchSizer,
boolean verbose) {
- final RecordBatchSizer batchSizer = new RecordBatchSizer(recordBatch);
final StringBuilder msg = new StringBuilder();
msg.append(BATCH_STATS_PREFIX);
- msg.append(" Originator: {");
+ msg.append(" Operator: {");
msg.append(statsId);
if (sourceId != null) {
msg.append(':');
msg.append(sourceId);
}
+ msg.append("}, IO Type: {");
+ msg.append(toString(ioType));
msg.append("}, Batch size: {");
msg.append( " Records: " );
msg.append(batchSizer.rowCount());
@@ -269,7 +345,8 @@ public final class RecordBatchStats {
boolean includePrefix) {
if (includePrefix) {
- msg = BATCH_STATS_PREFIX + '\t' + msg;
+ final String statsId = batchStatsContext.getContextOperatorId();
+ msg = BATCH_STATS_PREFIX + " Operator: {" + statsId + "} " + msg;
}
if (batchStatsContext.useInfoLevelLogging()) {
@@ -279,4 +356,19 @@ public final class RecordBatchStats {
}
}
+ private static String toString(RecordBatchIOType ioType) {
+ Preconditions.checkNotNull(ioType, "The record batch IO type cannot be null");
+
+ switch (ioType) {
+ case INPUT: return "incoming";
+ case INPUT_RIGHT: return "incoming right";
+ case INPUT_LEFT: return "incoming left";
+ case OUTPUT: return "outgoing";
+ case PASSTHROUGH: return "passthrough";
+
+ default: throw new RuntimeException("Unexpected record batch IO type..");
+ }
+
+ }
+
} \ No newline at end of file